This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 568e96c9 [GsoC][BanyanDB] Self-Observability: Write Metric Data to
Measure In Standalone Mode (#467)
568e96c9 is described below
commit 568e96c97a778f299a26da2422fc867fbc602b31
Author: Sylvie-Wxr <[email protected]>
AuthorDate: Mon Jun 17 00:28:05 2024 -0700
[GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In
Standalone Mode (#467)
* add native metric collection
* add hash in set
* remove redundant constructor
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Huang Youliang <[email protected]>
---
banyand/observability/instrument.go | 38 ++++++--
.../{meter_noop.go => meter_native.go} | 7 ++
banyand/observability/meter_prom.go | 2 +
banyand/observability/metrics_system.go | 10 +-
banyand/observability/service.go | 42 +++++---
pkg/cmdsetup/data.go | 2 +-
pkg/cmdsetup/liaison.go | 2 +-
pkg/cmdsetup/standalone.go | 2 +-
pkg/meter/native/collection.go | 108 +++++++++++++++++++++
pkg/meter/native/instruments.go | 41 ++++++--
pkg/meter/native/provider.go | 62 +++++++++---
pkg/meter/native/vec.go | 97 ++++++++++++++++++
12 files changed, 361 insertions(+), 52 deletions(-)
diff --git a/banyand/observability/instrument.go
b/banyand/observability/instrument.go
index 077c79d3..74966d50 100644
--- a/banyand/observability/instrument.go
+++ b/banyand/observability/instrument.go
@@ -17,17 +17,25 @@
package observability
-import "github.com/apache/skywalking-banyandb/pkg/meter"
+import (
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+ "github.com/apache/skywalking-banyandb/pkg/meter/native"
+)
type counterCollection struct {
counters []meter.Counter
}
// NewCounter init and return the counterCollection.
-func NewCounter(providers []meter.Provider, name string, labelNames ...string)
meter.Counter {
+func NewCounter(modes []string, name string, labelNames ...string)
meter.Counter {
var counters []meter.Counter
- for _, provider := range providers {
- counters = append(counters, provider.Counter(name,
labelNames...))
+ if containsMode(modes, flagPromethusMode) {
+ counters = append(counters, PromMeterProvider.Counter(name,
labelNames...))
+ }
+ if containsMode(modes, flagNativeMode) {
+ counter := NativeMeterProvider.Counter(name, labelNames...)
+ NativeMetricCollection.AddCollector(counter.(*native.Counter))
+ counters = append(counters, counter)
}
return &counterCollection{
counters: counters,
@@ -53,10 +61,15 @@ type gaugeCollection struct {
}
// NewGauge init and return the gaugeCollection.
-func NewGauge(providers []meter.Provider, name string, labelNames ...string)
meter.Gauge {
+func NewGauge(modes []string, name string, labelNames ...string) meter.Gauge {
var gauges []meter.Gauge
- for _, provider := range providers {
- gauges = append(gauges, provider.Gauge(name, labelNames...))
+ if containsMode(modes, flagPromethusMode) {
+ gauges = append(gauges, PromMeterProvider.Gauge(name,
labelNames...))
+ }
+ if containsMode(modes, flagNativeMode) {
+ gauge := NativeMeterProvider.Gauge(name, labelNames...)
+ NativeMetricCollection.AddCollector(gauge.(*native.Gauge))
+ gauges = append(gauges, gauge)
}
return &gaugeCollection{
gauges: gauges,
@@ -88,10 +101,15 @@ type histogramCollection struct {
}
// NewHistogram init and return the histogramCollection.
-func NewHistogram(providers []meter.Provider, name string, buckets
meter.Buckets, labelNames ...string) meter.Histogram {
+func NewHistogram(modes []string, name string, buckets meter.Buckets,
labelNames ...string) meter.Histogram {
var histograms []meter.Histogram
- for _, provider := range providers {
- histograms = append(histograms, provider.Histogram(name,
buckets, labelNames...))
+ if containsMode(modes, flagPromethusMode) {
+ histograms = append(histograms,
PromMeterProvider.Histogram(name, buckets, labelNames...))
+ }
+ if containsMode(modes, flagNativeMode) {
+ histogram := NativeMeterProvider.Histogram(name, buckets,
labelNames...)
+
NativeMetricCollection.AddCollector(histogram.(*native.Histogram))
+ histograms = append(histograms, histogram)
}
return &histogramCollection{
histograms: histograms,
diff --git a/banyand/observability/meter_noop.go
b/banyand/observability/meter_native.go
similarity index 87%
rename from banyand/observability/meter_noop.go
rename to banyand/observability/meter_native.go
index 7c1bfb00..0f86da94 100644
--- a/banyand/observability/meter_noop.go
+++ b/banyand/observability/meter_native.go
@@ -27,6 +27,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/meter/native"
)
+var (
+ // NativeMetricCollection is a global native metrics collection.
+ NativeMetricCollection native.MetricCollection
+ // NativeMeterProvider is a global native meter provider.
+ NativeMeterProvider meter.Provider
+)
+
// NewMeterProvider returns a meter.Provider based on the given scope.
func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo)
meter.Provider {
return native.NewProvider(ctx, SystemScope, metadata)
diff --git a/banyand/observability/meter_prom.go
b/banyand/observability/meter_prom.go
index e81519ae..260b442f 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -35,6 +35,8 @@ var (
once = sync.Once{}
srvMetrics *grpcprom.ServerMetrics
+ // PromMeterProvider is a global promethus meter collector.
+ PromMeterProvider = newPromMeterProvider()
)
func init() {
diff --git a/banyand/observability/metrics_system.go
b/banyand/observability/metrics_system.go
index 4d9392ea..c638ab6c 100644
--- a/banyand/observability/metrics_system.go
+++ b/banyand/observability/metrics_system.go
@@ -57,12 +57,12 @@ func init() {
MetricsCollector.Register("net", collectNet)
}
-func initMetrics(providers []meter.Provider) {
+func initMetrics(modes []string) {
initMetricsOnce.Do(func() {
- cpuStateGauge = NewGauge(providers, "cpu_state", "kind")
- cpuNumGauge = NewGauge(providers, "cpu_num")
- memorySateGauge = NewGauge(providers, "memory_state", "kind")
- netStateGauge = NewGauge(providers, "net_state", "kind", "name")
+ cpuStateGauge = NewGauge(modes, "cpu_state", "kind")
+ cpuNumGauge = NewGauge(modes, "cpu_num")
+ memorySateGauge = NewGauge(modes, "memory_state", "kind")
+ netStateGauge = NewGauge(modes, "net_state", "kind", "name")
})
}
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index b9b68ec2..ba46f2d2 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -27,8 +27,9 @@ import (
"google.golang.org/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/meter"
+ "github.com/apache/skywalking-banyandb/pkg/meter/native"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -53,10 +54,11 @@ type Service interface {
}
// NewMetricService returns a metric service.
-func NewMetricService(metadata metadata.Repo) Service {
+func NewMetricService(metadata metadata.Repo, pipeline queue.Client) Service {
return &metricService{
closer: run.NewCloser(1),
metadata: metadata,
+ pipeline: pipeline,
}
}
@@ -66,6 +68,7 @@ type metricService struct {
closer *run.Closer
scheduler *timestamp.Scheduler
metadata metadata.Repo
+ pipeline queue.Client
listenAddr string
modes []string
mutex sync.Mutex
@@ -99,17 +102,14 @@ func (p *metricService) PreRun(ctx context.Context) error {
p.l = logger.GetLogger(p.Name())
p.mutex.Lock()
defer p.mutex.Unlock()
- var providers []meter.Provider
- for _, mode := range p.modes {
- switch mode {
- case flagPromethusMode:
- MetricsServerInterceptor = promMetricsServerInterceptor
- providers = append(providers, newPromMeterProvider())
- case flagNativeMode:
- providers = append(providers,
newNativeMeterProvider(ctx, p.metadata))
- }
+ if containsMode(p.modes, flagPromethusMode) {
+ MetricsServerInterceptor = promMetricsServerInterceptor
+ }
+ if containsMode(p.modes, flagNativeMode) {
+ NativeMetricCollection = native.NewMetricsCollection(p.pipeline)
+ NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata)
}
- initMetrics(providers)
+ initMetrics(p.modes)
return nil
}
@@ -129,6 +129,15 @@ func (p *metricService) Serve() run.StopNotify {
if err != nil {
p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
}
+ if containsMode(p.modes, flagNativeMode) {
+ err = p.scheduler.Register("native-metric-collection",
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
+ NativeMetricCollection.FlushMetrics()
+ return true
+ })
+ if err != nil {
+ p.l.Fatal().Err(err).Msg("Failed to register native
metric collection")
+ }
+ }
p.svr = &http.Server{
Addr: p.listenAddr,
ReadHeaderTimeout: 3 * time.Second,
@@ -153,3 +162,12 @@ func (p *metricService) GracefulStop() {
}
p.closer.CloseThenWait()
}
+
+func containsMode(modes []string, mode string) bool {
+ for _, item := range modes {
+ if item == mode {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 2e3f526f..991804ab 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -57,7 +57,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc)
+ metricSvc := observability.NewMetricService(metaSvc, nil)
var units []run.Unit
units = append(units, runners...)
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 25dd359f..152ba7a0 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -52,7 +52,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
}
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
grpc.NewClusterNodeRegistry(pipeline, nodeSel))
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc)
+ metricSvc := observability.NewMetricService(metaSvc, nil)
httpServer := http.NewServer()
dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
if err != nil {
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index e1b7f300..b98645d1 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -59,7 +59,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
}
grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc)
+ metricSvc := observability.NewMetricService(metaSvc, pipeline)
httpServer := http.NewServer()
var units []run.Unit
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
new file mode 100644
index 00000000..82630fa9
--- /dev/null
+++ b/pkg/meter/native/collection.go
@@ -0,0 +1,108 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package native provides a simple meter system for metrics. The metrics are
aggregated by the meter provider.
+package native
+
+import (
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+type collector interface {
+ Collect() (string, []metricWithLabelValues)
+}
+
+// MetricCollection contains all the native implementations of metrics.
+type MetricCollection struct {
+ pipeline queue.Client
+ collectors []collector
+}
+
+// NewMetricsCollection creates a new MetricCollection.
+func NewMetricsCollection(pipeline queue.Client) MetricCollection {
+ return MetricCollection{
+ pipeline: pipeline,
+ }
+}
+
+// AddCollector Add native metric to MetricCollection.
+func (m *MetricCollection) AddCollector(c collector) {
+ m.collectors = append(m.collectors, c)
+}
+
+// FlushMetrics write all the metrics by flushing.
+func (m *MetricCollection) FlushMetrics() {
+ if len(m.collectors) == 0 {
+ return
+ }
+ publisher := m.pipeline.NewBatchPublisher(writeTimeout)
+ defer publisher.Close()
+ var messages []bus.Message
+ for _, collector := range m.collectors {
+ name, metrics := collector.Collect()
+ for _, metric := range metrics {
+ iwr := m.buildIWR(name, metric)
+ messages = append(messages,
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", iwr))
+ }
+ }
+ _, err := publisher.Publish(data.TopicMeasureWrite, messages...)
+ if err != nil {
+ log.Error().Err(err).Msg("Failed to publish messasges")
+ }
+}
+
+func (m *MetricCollection) buildIWR(metricName string, metric
metricWithLabelValues) *measurev1.InternalWriteRequest {
+ writeRequest := &measurev1.WriteRequest{
+ MessageId: uint64(time.Now().UnixNano()),
+ Metadata: &commonv1.Metadata{
+ Group: NativeObservabilityGroupName,
+ Name: metricName,
+ },
+ DataPoint: &measurev1.DataPointValue{
+ Timestamp:
timestamppb.New(time.Now().Truncate(time.Second)),
+ TagFamilies: []*modelv1.TagFamilyForWrite{
+ {
+ Tags: metric.labelValues,
+ },
+ },
+ Fields: []*modelv1.FieldValue{
+ {
+ Value: &modelv1.FieldValue_Float{
+ Float: &modelv1.Float{
+ Value:
metric.metricValue,
+ },
+ },
+ },
+ },
+ },
+ }
+ return &measurev1.InternalWriteRequest{
+ Request: writeRequest,
+ ShardId: uint32(0),
+ SeriesHash: metric.seriesHash,
+ EntityValues: metric.labelValues,
+ }
+}
diff --git a/pkg/meter/native/instruments.go b/pkg/meter/native/instruments.go
index fd7cff44..d281a9a4 100644
--- a/pkg/meter/native/instruments.go
+++ b/pkg/meter/native/instruments.go
@@ -18,10 +18,39 @@
// Package native provides a simple meter system for metrics. The metrics are
aggregated by the meter provider.
package native
-type nativeInstrument struct{}
+// Counter is the native implementation of meter.Counter.
+type Counter struct {
+ *metricVec
+}
-func (nativeInstrument) Inc(_ float64, _ ...string) {}
-func (nativeInstrument) Set(_ float64, _ ...string) {}
-func (nativeInstrument) Add(_ float64, _ ...string) {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool { return false }
+// Gauge is the native implementation of meter.Gauge.
+type Gauge struct {
+ *metricVec
+}
+
+// Add Metric Value in Gauge.
+func (g *Gauge) Add(delta float64, labelValues ...string) {
+ g.metricVec.Inc(delta, labelValues...)
+}
+
+// Set Metric Value in Gauge.
+func (g *Gauge) Set(value float64, labelValues ...string) {
+ g.mutex.Lock()
+ defer g.mutex.Unlock()
+ tagValues := buildTagValues(g.scope, labelValues...)
+ hash := seriesHash(tagValues)
+ key := string(hash)
+ g.metrics[key] = metricWithLabelValues{
+ metricValue: value,
+ labelValues: tagValues,
+ seriesHash: hash,
+ }
+}
+
+// Histogram is the native implementation of meter.Histogram.
+type Histogram struct {
+ *metricVec
+}
+
+// Observe to be implemented.
+func (h *Histogram) Observe(_ float64, _ ...string) {}
diff --git a/pkg/meter/native/provider.go b/pkg/meter/native/provider.go
index 470d626c..97c2747f 100644
--- a/pkg/meter/native/provider.go
+++ b/pkg/meter/native/provider.go
@@ -24,6 +24,7 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -36,6 +37,7 @@ const (
defaultTagFamily = "default"
defaultFieldName = "value"
nodeNameTag = "node_name"
+ standaloneNodeName = "standalone"
)
var log = logger.GetLogger("observability", "metrics", "system")
@@ -58,27 +60,33 @@ func NewProvider(ctx context.Context, scope meter.Scope,
metadata metadata.Repo)
return p
}
-// Counter returns a no-op implementation of the Counter interface.
+// Counter returns a native implementation of the Counter interface.
func (p *provider) Counter(name string, labelNames ...string) meter.Counter {
- err := p.createMeasure(name, labelNames...)
+ name, err := p.createMeasure(name, labelNames...)
if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
log.Error().Err(err).Msgf("Failure to createMeasure for Counter
%s, labels: %v", name, labelNames)
}
- return nativeInstrument{}
+ return &Counter{
+ newMetricVec(name, p.scope),
+ }
}
-// Gauge returns a no-op implementation of the Gauge interface.
+// Gauge returns a nativeimplementation of the Gauge interface.
func (p *provider) Gauge(name string, labelNames ...string) meter.Gauge {
- err := p.createMeasure(name, labelNames...)
+ name, err := p.createMeasure(name, labelNames...)
if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
log.Error().Err(err).Msgf("Failure to createMeasure for Gauge
%s, labels: %v", name, labelNames)
}
- return nativeInstrument{}
+ return &Gauge{
+ newMetricVec(name, p.scope),
+ }
}
-// Histogram returns a no-op implementation of the Histogram interface.
-func (p *provider) Histogram(_ string, _ meter.Buckets, _ ...string)
meter.Histogram {
- return nativeInstrument{}
+// Histogram returns a native implementation of the Histogram interface.
+func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string)
meter.Histogram {
+ return &Histogram{
+ newMetricVec(name, p.scope),
+ }
}
func (p *provider) createNativeObservabilityGroup(ctx context.Context) error {
@@ -102,8 +110,8 @@ func (p *provider) createNativeObservabilityGroup(ctx
context.Context) error {
return p.metadata.GroupRegistry().CreateGroup(ctx, g)
}
-func (p *provider) createMeasure(metric string, labels ...string) error {
- tags, entityTags := p.getTags(labels)
+func (p *provider) createMeasure(metric string, labels ...string) (string,
error) {
+ tags, entityTags := buildTags(p.scope, labels)
_, err :=
p.metadata.MeasureRegistry().CreateMeasure(context.Background(),
&databasev1.Measure{
Metadata: &commonv1.Metadata{
Name: metric,
@@ -121,16 +129,16 @@ func (p *provider) createMeasure(metric string, labels
...string) error {
Fields: []*databasev1.FieldSpec{
{
Name: defaultFieldName,
- FieldType:
databasev1.FieldType_FIELD_TYPE_INT,
+ FieldType:
databasev1.FieldType_FIELD_TYPE_FLOAT,
EncodingMethod:
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
},
},
})
- return err
+ return metric, err
}
-func (p *provider) getTags(labels []string) ([]*databasev1.TagSpec, []string) {
+func buildTags(scope meter.Scope, labels []string) ([]*databasev1.TagSpec,
[]string) {
var tags []*databasev1.TagSpec
var entityTags []string
addTags := func(labels ...string) {
@@ -144,9 +152,31 @@ func (p *provider) getTags(labels []string)
([]*databasev1.TagSpec, []string) {
}
}
addTags(nodeNameTag)
- addTags(labels...)
- for label := range p.scope.GetLabels() {
+ for label := range scope.GetLabels() {
addTags(label)
}
+ addTags(labels...)
return tags, entityTags
}
+
+func buildTagValues(scope meter.Scope, labelValues ...string)
[]*modelv1.TagValue {
+ var tagValues []*modelv1.TagValue
+ addTagValues := func(labelValues ...string) {
+ for _, value := range labelValues {
+ tagValue := &modelv1.TagValue{
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: value,
+ },
+ },
+ }
+ tagValues = append(tagValues, tagValue)
+ }
+ }
+ addTagValues(standaloneNodeName)
+ for _, labelValue := range scope.GetLabels() {
+ addTagValues(labelValue)
+ }
+ addTagValues(labelValues...)
+ return tagValues
+}
diff --git a/pkg/meter/native/vec.go b/pkg/meter/native/vec.go
new file mode 100644
index 00000000..3b93fcdb
--- /dev/null
+++ b/pkg/meter/native/vec.go
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package native provides a simple meter system for metrics. The metrics are
aggregated by the meter provider.
+package native
+
+import (
+ "sync"
+ "time"
+
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+const (
+ writeTimeout = 5 * time.Second
+)
+
+type metricWithLabelValues struct {
+ labelValues []*modelv1.TagValue
+ seriesHash []byte
+ metricValue float64
+}
+
+type metricVec struct {
+ scope meter.Scope
+ metrics map[string]metricWithLabelValues
+ measureName string
+ mutex sync.Mutex
+}
+
+func newMetricVec(measureName string, scope meter.Scope) *metricVec {
+ n := &metricVec{
+ scope: scope,
+ measureName: measureName,
+ metrics: map[string]metricWithLabelValues{},
+ }
+ return n
+}
+
+func (n *metricVec) Inc(delta float64, labelValues ...string) {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+ tagValues := buildTagValues(n.scope, labelValues...)
+ hash := seriesHash(tagValues)
+ key := string(hash)
+ v, exist := n.metrics[key]
+ if !exist {
+ v = metricWithLabelValues{
+ labelValues: tagValues,
+ seriesHash: hash,
+ }
+ }
+ v.metricValue += delta
+ n.metrics[key] = v
+}
+
+func (n *metricVec) Delete(labelValues ...string) bool {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+ key := string(seriesHash(buildTagValues(n.scope, labelValues...)))
+ delete(n.metrics, key)
+ return true
+}
+
+func (n *metricVec) Collect() (string, []metricWithLabelValues) {
+ n.mutex.Lock()
+ defer n.mutex.Unlock()
+ var metrics []metricWithLabelValues
+ for _, metric := range n.metrics {
+ metrics = append(metrics, metric)
+ }
+ return n.measureName, metrics
+}
+
+func seriesHash(tagValues []*modelv1.TagValue) []byte {
+ entities, err := pbv1.EntityValues(tagValues).ToEntity()
+ if err != nil {
+ log.Error().Err(err).Msg("Failed to convert tagValues to
Entity")
+ }
+ return pbv1.HashEntity(entities)
+}