This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 568e96c9 [GsoC][BanyanDB] Self-Observability: Write Metric Data to 
Measure In Standalone Mode (#467)
568e96c9 is described below

commit 568e96c97a778f299a26da2422fc867fbc602b31
Author: Sylvie-Wxr <[email protected]>
AuthorDate: Mon Jun 17 00:28:05 2024 -0700

    [GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In 
Standalone Mode (#467)
    
    
    * add native metric collection
    
    * add hash in set
    
    * remove redundant constructor
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Huang Youliang <[email protected]>
---
 banyand/observability/instrument.go                |  38 ++++++--
 .../{meter_noop.go => meter_native.go}             |   7 ++
 banyand/observability/meter_prom.go                |   2 +
 banyand/observability/metrics_system.go            |  10 +-
 banyand/observability/service.go                   |  42 +++++---
 pkg/cmdsetup/data.go                               |   2 +-
 pkg/cmdsetup/liaison.go                            |   2 +-
 pkg/cmdsetup/standalone.go                         |   2 +-
 pkg/meter/native/collection.go                     | 108 +++++++++++++++++++++
 pkg/meter/native/instruments.go                    |  41 ++++++--
 pkg/meter/native/provider.go                       |  62 +++++++++---
 pkg/meter/native/vec.go                            |  97 ++++++++++++++++++
 12 files changed, 361 insertions(+), 52 deletions(-)

diff --git a/banyand/observability/instrument.go 
b/banyand/observability/instrument.go
index 077c79d3..74966d50 100644
--- a/banyand/observability/instrument.go
+++ b/banyand/observability/instrument.go
@@ -17,17 +17,25 @@
 
 package observability
 
-import "github.com/apache/skywalking-banyandb/pkg/meter"
+import (
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/meter/native"
+)
 
 type counterCollection struct {
        counters []meter.Counter
 }
 
 // NewCounter init and return the counterCollection.
-func NewCounter(providers []meter.Provider, name string, labelNames ...string) 
meter.Counter {
+func NewCounter(modes []string, name string, labelNames ...string) 
meter.Counter {
        var counters []meter.Counter
-       for _, provider := range providers {
-               counters = append(counters, provider.Counter(name, 
labelNames...))
+       if containsMode(modes, flagPromethusMode) {
+               counters = append(counters, PromMeterProvider.Counter(name, 
labelNames...))
+       }
+       if containsMode(modes, flagNativeMode) {
+               counter := NativeMeterProvider.Counter(name, labelNames...)
+               NativeMetricCollection.AddCollector(counter.(*native.Counter))
+               counters = append(counters, counter)
        }
        return &counterCollection{
                counters: counters,
@@ -53,10 +61,15 @@ type gaugeCollection struct {
 }
 
 // NewGauge init and return the gaugeCollection.
-func NewGauge(providers []meter.Provider, name string, labelNames ...string) 
meter.Gauge {
+func NewGauge(modes []string, name string, labelNames ...string) meter.Gauge {
        var gauges []meter.Gauge
-       for _, provider := range providers {
-               gauges = append(gauges, provider.Gauge(name, labelNames...))
+       if containsMode(modes, flagPromethusMode) {
+               gauges = append(gauges, PromMeterProvider.Gauge(name, 
labelNames...))
+       }
+       if containsMode(modes, flagNativeMode) {
+               gauge := NativeMeterProvider.Gauge(name, labelNames...)
+               NativeMetricCollection.AddCollector(gauge.(*native.Gauge))
+               gauges = append(gauges, gauge)
        }
        return &gaugeCollection{
                gauges: gauges,
@@ -88,10 +101,15 @@ type histogramCollection struct {
 }
 
 // NewHistogram init and return the histogramCollection.
-func NewHistogram(providers []meter.Provider, name string, buckets 
meter.Buckets, labelNames ...string) meter.Histogram {
+func NewHistogram(modes []string, name string, buckets meter.Buckets, 
labelNames ...string) meter.Histogram {
        var histograms []meter.Histogram
-       for _, provider := range providers {
-               histograms = append(histograms, provider.Histogram(name, 
buckets, labelNames...))
+       if containsMode(modes, flagPromethusMode) {
+               histograms = append(histograms, 
PromMeterProvider.Histogram(name, buckets, labelNames...))
+       }
+       if containsMode(modes, flagNativeMode) {
+               histogram := NativeMeterProvider.Histogram(name, buckets, 
labelNames...)
+               
NativeMetricCollection.AddCollector(histogram.(*native.Histogram))
+               histograms = append(histograms, histogram)
        }
        return &histogramCollection{
                histograms: histograms,
diff --git a/banyand/observability/meter_noop.go 
b/banyand/observability/meter_native.go
similarity index 87%
rename from banyand/observability/meter_noop.go
rename to banyand/observability/meter_native.go
index 7c1bfb00..0f86da94 100644
--- a/banyand/observability/meter_noop.go
+++ b/banyand/observability/meter_native.go
@@ -27,6 +27,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/meter/native"
 )
 
+var (
+       // NativeMetricCollection is a global native metrics collection.
+       NativeMetricCollection native.MetricCollection
+       // NativeMeterProvider is a global native meter provider.
+       NativeMeterProvider meter.Provider
+)
+
 // NewMeterProvider returns a meter.Provider based on the given scope.
 func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo) 
meter.Provider {
        return native.NewProvider(ctx, SystemScope, metadata)
diff --git a/banyand/observability/meter_prom.go 
b/banyand/observability/meter_prom.go
index e81519ae..260b442f 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -35,6 +35,8 @@ var (
 
        once       = sync.Once{}
        srvMetrics *grpcprom.ServerMetrics
+       // PromMeterProvider is a global promethus meter collector.
+       PromMeterProvider = newPromMeterProvider()
 )
 
 func init() {
diff --git a/banyand/observability/metrics_system.go 
b/banyand/observability/metrics_system.go
index 4d9392ea..c638ab6c 100644
--- a/banyand/observability/metrics_system.go
+++ b/banyand/observability/metrics_system.go
@@ -57,12 +57,12 @@ func init() {
        MetricsCollector.Register("net", collectNet)
 }
 
-func initMetrics(providers []meter.Provider) {
+func initMetrics(modes []string) {
        initMetricsOnce.Do(func() {
-               cpuStateGauge = NewGauge(providers, "cpu_state", "kind")
-               cpuNumGauge = NewGauge(providers, "cpu_num")
-               memorySateGauge = NewGauge(providers, "memory_state", "kind")
-               netStateGauge = NewGauge(providers, "net_state", "kind", "name")
+               cpuStateGauge = NewGauge(modes, "cpu_state", "kind")
+               cpuNumGauge = NewGauge(modes, "cpu_num")
+               memorySateGauge = NewGauge(modes, "memory_state", "kind")
+               netStateGauge = NewGauge(modes, "net_state", "kind", "name")
        })
 }
 
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index b9b68ec2..ba46f2d2 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -27,8 +27,9 @@ import (
        "google.golang.org/grpc"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/meter"
+       "github.com/apache/skywalking-banyandb/pkg/meter/native"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -53,10 +54,11 @@ type Service interface {
 }
 
 // NewMetricService returns a metric service.
-func NewMetricService(metadata metadata.Repo) Service {
+func NewMetricService(metadata metadata.Repo, pipeline queue.Client) Service {
        return &metricService{
                closer:   run.NewCloser(1),
                metadata: metadata,
+               pipeline: pipeline,
        }
 }
 
@@ -66,6 +68,7 @@ type metricService struct {
        closer     *run.Closer
        scheduler  *timestamp.Scheduler
        metadata   metadata.Repo
+       pipeline   queue.Client
        listenAddr string
        modes      []string
        mutex      sync.Mutex
@@ -99,17 +102,14 @@ func (p *metricService) PreRun(ctx context.Context) error {
        p.l = logger.GetLogger(p.Name())
        p.mutex.Lock()
        defer p.mutex.Unlock()
-       var providers []meter.Provider
-       for _, mode := range p.modes {
-               switch mode {
-               case flagPromethusMode:
-                       MetricsServerInterceptor = promMetricsServerInterceptor
-                       providers = append(providers, newPromMeterProvider())
-               case flagNativeMode:
-                       providers = append(providers, 
newNativeMeterProvider(ctx, p.metadata))
-               }
+       if containsMode(p.modes, flagPromethusMode) {
+               MetricsServerInterceptor = promMetricsServerInterceptor
+       }
+       if containsMode(p.modes, flagNativeMode) {
+               NativeMetricCollection = native.NewMetricsCollection(p.pipeline)
+               NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata)
        }
-       initMetrics(providers)
+       initMetrics(p.modes)
        return nil
 }
 
@@ -129,6 +129,15 @@ func (p *metricService) Serve() run.StopNotify {
        if err != nil {
                p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
        }
+       if containsMode(p.modes, flagNativeMode) {
+               err = p.scheduler.Register("native-metric-collection", 
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
+                       NativeMetricCollection.FlushMetrics()
+                       return true
+               })
+               if err != nil {
+                       p.l.Fatal().Err(err).Msg("Failed to register native 
metric collection")
+               }
+       }
        p.svr = &http.Server{
                Addr:              p.listenAddr,
                ReadHeaderTimeout: 3 * time.Second,
@@ -153,3 +162,12 @@ func (p *metricService) GracefulStop() {
        }
        p.closer.CloseThenWait()
 }
+
+func containsMode(modes []string, mode string) bool {
+       for _, item := range modes {
+               if item == mode {
+                       return true
+               }
+       }
+       return false
+}
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 2e3f526f..991804ab 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -57,7 +57,7 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc)
+       metricSvc := observability.NewMetricService(metaSvc, nil)
 
        var units []run.Unit
        units = append(units, runners...)
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 25dd359f..152ba7a0 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -52,7 +52,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        }
        grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
grpc.NewClusterNodeRegistry(pipeline, nodeSel))
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc)
+       metricSvc := observability.NewMetricService(metaSvc, nil)
        httpServer := http.NewServer()
        dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
        if err != nil {
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index e1b7f300..b98645d1 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -59,7 +59,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        }
        grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
        profSvc := observability.NewProfService()
-       metricSvc := observability.NewMetricService(metaSvc)
+       metricSvc := observability.NewMetricService(metaSvc, pipeline)
        httpServer := http.NewServer()
 
        var units []run.Unit
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
new file mode 100644
index 00000000..82630fa9
--- /dev/null
+++ b/pkg/meter/native/collection.go
@@ -0,0 +1,108 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package native provides a simple meter system for metrics. The metrics are 
aggregated by the meter provider.
+package native
+
+import (
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/bus"
+)
+
+type collector interface {
+       Collect() (string, []metricWithLabelValues)
+}
+
+// MetricCollection contains all the native implementations of metrics.
+type MetricCollection struct {
+       pipeline   queue.Client
+       collectors []collector
+}
+
+// NewMetricsCollection creates a new MetricCollection.
+func NewMetricsCollection(pipeline queue.Client) MetricCollection {
+       return MetricCollection{
+               pipeline: pipeline,
+       }
+}
+
+// AddCollector Add native metric to MetricCollection.
+func (m *MetricCollection) AddCollector(c collector) {
+       m.collectors = append(m.collectors, c)
+}
+
+// FlushMetrics write all the metrics by flushing.
+func (m *MetricCollection) FlushMetrics() {
+       if len(m.collectors) == 0 {
+               return
+       }
+       publisher := m.pipeline.NewBatchPublisher(writeTimeout)
+       defer publisher.Close()
+       var messages []bus.Message
+       for _, collector := range m.collectors {
+               name, metrics := collector.Collect()
+               for _, metric := range metrics {
+                       iwr := m.buildIWR(name, metric)
+                       messages = append(messages, 
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", iwr))
+               }
+       }
+       _, err := publisher.Publish(data.TopicMeasureWrite, messages...)
+       if err != nil {
+               log.Error().Err(err).Msg("Failed to publish messasges")
+       }
+}
+
+func (m *MetricCollection) buildIWR(metricName string, metric 
metricWithLabelValues) *measurev1.InternalWriteRequest {
+       writeRequest := &measurev1.WriteRequest{
+               MessageId: uint64(time.Now().UnixNano()),
+               Metadata: &commonv1.Metadata{
+                       Group: NativeObservabilityGroupName,
+                       Name:  metricName,
+               },
+               DataPoint: &measurev1.DataPointValue{
+                       Timestamp: 
timestamppb.New(time.Now().Truncate(time.Second)),
+                       TagFamilies: []*modelv1.TagFamilyForWrite{
+                               {
+                                       Tags: metric.labelValues,
+                               },
+                       },
+                       Fields: []*modelv1.FieldValue{
+                               {
+                                       Value: &modelv1.FieldValue_Float{
+                                               Float: &modelv1.Float{
+                                                       Value: 
metric.metricValue,
+                                               },
+                                       },
+                               },
+                       },
+               },
+       }
+       return &measurev1.InternalWriteRequest{
+               Request:      writeRequest,
+               ShardId:      uint32(0),
+               SeriesHash:   metric.seriesHash,
+               EntityValues: metric.labelValues,
+       }
+}
diff --git a/pkg/meter/native/instruments.go b/pkg/meter/native/instruments.go
index fd7cff44..d281a9a4 100644
--- a/pkg/meter/native/instruments.go
+++ b/pkg/meter/native/instruments.go
@@ -18,10 +18,39 @@
 // Package native provides a simple meter system for metrics. The metrics are 
aggregated by the meter provider.
 package native
 
-type nativeInstrument struct{}
+// Counter is the native implementation of meter.Counter.
+type Counter struct {
+       *metricVec
+}
 
-func (nativeInstrument) Inc(_ float64, _ ...string)     {}
-func (nativeInstrument) Set(_ float64, _ ...string)     {}
-func (nativeInstrument) Add(_ float64, _ ...string)     {}
-func (nativeInstrument) Observe(_ float64, _ ...string) {}
-func (nativeInstrument) Delete(_ ...string) bool        { return false }
+// Gauge is the native implementation of meter.Gauge.
+type Gauge struct {
+       *metricVec
+}
+
+// Add Metric Value in Gauge.
+func (g *Gauge) Add(delta float64, labelValues ...string) {
+       g.metricVec.Inc(delta, labelValues...)
+}
+
+// Set Metric Value in Gauge.
+func (g *Gauge) Set(value float64, labelValues ...string) {
+       g.mutex.Lock()
+       defer g.mutex.Unlock()
+       tagValues := buildTagValues(g.scope, labelValues...)
+       hash := seriesHash(tagValues)
+       key := string(hash)
+       g.metrics[key] = metricWithLabelValues{
+               metricValue: value,
+               labelValues: tagValues,
+               seriesHash:  hash,
+       }
+}
+
+// Histogram is the native implementation of meter.Histogram.
+type Histogram struct {
+       *metricVec
+}
+
+// Observe to be implemented.
+func (h *Histogram) Observe(_ float64, _ ...string) {}
diff --git a/pkg/meter/native/provider.go b/pkg/meter/native/provider.go
index 470d626c..97c2747f 100644
--- a/pkg/meter/native/provider.go
+++ b/pkg/meter/native/provider.go
@@ -24,6 +24,7 @@ import (
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/pkg/logger"
@@ -36,6 +37,7 @@ const (
        defaultTagFamily             = "default"
        defaultFieldName             = "value"
        nodeNameTag                  = "node_name"
+       standaloneNodeName           = "standalone"
 )
 
 var log = logger.GetLogger("observability", "metrics", "system")
@@ -58,27 +60,33 @@ func NewProvider(ctx context.Context, scope meter.Scope, 
metadata metadata.Repo)
        return p
 }
 
-// Counter returns a no-op implementation of the Counter interface.
+// Counter returns a native implementation of the Counter interface.
 func (p *provider) Counter(name string, labelNames ...string) meter.Counter {
-       err := p.createMeasure(name, labelNames...)
+       name, err := p.createMeasure(name, labelNames...)
        if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
                log.Error().Err(err).Msgf("Failure to createMeasure for Counter 
%s, labels: %v", name, labelNames)
        }
-       return nativeInstrument{}
+       return &Counter{
+               newMetricVec(name, p.scope),
+       }
 }
 
-// Gauge returns a no-op implementation of the Gauge interface.
+// Gauge returns a nativeimplementation of the Gauge interface.
 func (p *provider) Gauge(name string, labelNames ...string) meter.Gauge {
-       err := p.createMeasure(name, labelNames...)
+       name, err := p.createMeasure(name, labelNames...)
        if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
                log.Error().Err(err).Msgf("Failure to createMeasure for Gauge 
%s, labels: %v", name, labelNames)
        }
-       return nativeInstrument{}
+       return &Gauge{
+               newMetricVec(name, p.scope),
+       }
 }
 
-// Histogram returns a no-op implementation of the Histogram interface.
-func (p *provider) Histogram(_ string, _ meter.Buckets, _ ...string) 
meter.Histogram {
-       return nativeInstrument{}
+// Histogram returns a native implementation of the Histogram interface.
+func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string) 
meter.Histogram {
+       return &Histogram{
+               newMetricVec(name, p.scope),
+       }
 }
 
 func (p *provider) createNativeObservabilityGroup(ctx context.Context) error {
@@ -102,8 +110,8 @@ func (p *provider) createNativeObservabilityGroup(ctx 
context.Context) error {
        return p.metadata.GroupRegistry().CreateGroup(ctx, g)
 }
 
-func (p *provider) createMeasure(metric string, labels ...string) error {
-       tags, entityTags := p.getTags(labels)
+func (p *provider) createMeasure(metric string, labels ...string) (string, 
error) {
+       tags, entityTags := buildTags(p.scope, labels)
        _, err := 
p.metadata.MeasureRegistry().CreateMeasure(context.Background(), 
&databasev1.Measure{
                Metadata: &commonv1.Metadata{
                        Name:  metric,
@@ -121,16 +129,16 @@ func (p *provider) createMeasure(metric string, labels 
...string) error {
                Fields: []*databasev1.FieldSpec{
                        {
                                Name:              defaultFieldName,
-                               FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                               FieldType:         
databasev1.FieldType_FIELD_TYPE_FLOAT,
                                EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
                                CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
                        },
                },
        })
-       return err
+       return metric, err
 }
 
-func (p *provider) getTags(labels []string) ([]*databasev1.TagSpec, []string) {
+func buildTags(scope meter.Scope, labels []string) ([]*databasev1.TagSpec, 
[]string) {
        var tags []*databasev1.TagSpec
        var entityTags []string
        addTags := func(labels ...string) {
@@ -144,9 +152,31 @@ func (p *provider) getTags(labels []string) 
([]*databasev1.TagSpec, []string) {
                }
        }
        addTags(nodeNameTag)
-       addTags(labels...)
-       for label := range p.scope.GetLabels() {
+       for label := range scope.GetLabels() {
                addTags(label)
        }
+       addTags(labels...)
        return tags, entityTags
 }
+
+func buildTagValues(scope meter.Scope, labelValues ...string) 
[]*modelv1.TagValue {
+       var tagValues []*modelv1.TagValue
+       addTagValues := func(labelValues ...string) {
+               for _, value := range labelValues {
+                       tagValue := &modelv1.TagValue{
+                               Value: &modelv1.TagValue_Str{
+                                       Str: &modelv1.Str{
+                                               Value: value,
+                                       },
+                               },
+                       }
+                       tagValues = append(tagValues, tagValue)
+               }
+       }
+       addTagValues(standaloneNodeName)
+       for _, labelValue := range scope.GetLabels() {
+               addTagValues(labelValue)
+       }
+       addTagValues(labelValues...)
+       return tagValues
+}
diff --git a/pkg/meter/native/vec.go b/pkg/meter/native/vec.go
new file mode 100644
index 00000000..3b93fcdb
--- /dev/null
+++ b/pkg/meter/native/vec.go
@@ -0,0 +1,97 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package native provides a simple meter system for metrics. The metrics are 
aggregated by the meter provider.
+package native
+
+import (
+       "sync"
+       "time"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+const (
+       writeTimeout = 5 * time.Second
+)
+
+type metricWithLabelValues struct {
+       labelValues []*modelv1.TagValue
+       seriesHash  []byte
+       metricValue float64
+}
+
+type metricVec struct {
+       scope       meter.Scope
+       metrics     map[string]metricWithLabelValues
+       measureName string
+       mutex       sync.Mutex
+}
+
+func newMetricVec(measureName string, scope meter.Scope) *metricVec {
+       n := &metricVec{
+               scope:       scope,
+               measureName: measureName,
+               metrics:     map[string]metricWithLabelValues{},
+       }
+       return n
+}
+
+func (n *metricVec) Inc(delta float64, labelValues ...string) {
+       n.mutex.Lock()
+       defer n.mutex.Unlock()
+       tagValues := buildTagValues(n.scope, labelValues...)
+       hash := seriesHash(tagValues)
+       key := string(hash)
+       v, exist := n.metrics[key]
+       if !exist {
+               v = metricWithLabelValues{
+                       labelValues: tagValues,
+                       seriesHash:  hash,
+               }
+       }
+       v.metricValue += delta
+       n.metrics[key] = v
+}
+
+func (n *metricVec) Delete(labelValues ...string) bool {
+       n.mutex.Lock()
+       defer n.mutex.Unlock()
+       key := string(seriesHash(buildTagValues(n.scope, labelValues...)))
+       delete(n.metrics, key)
+       return true
+}
+
+func (n *metricVec) Collect() (string, []metricWithLabelValues) {
+       n.mutex.Lock()
+       defer n.mutex.Unlock()
+       var metrics []metricWithLabelValues
+       for _, metric := range n.metrics {
+               metrics = append(metrics, metric)
+       }
+       return n.measureName, metrics
+}
+
+func seriesHash(tagValues []*modelv1.TagValue) []byte {
+       entities, err := pbv1.EntityValues(tagValues).ToEntity()
+       if err != nil {
+               log.Error().Err(err).Msg("Failed to convert tagValues to 
Entity")
+       }
+       return pbv1.HashEntity(entities)
+}

Reply via email to