This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch metrics
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/metrics by this push:
new fddd6499 Apply metrics to main components
fddd6499 is described below
commit fddd64995010c2e18c0d137594aadb580081c3f5
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Aug 22 10:17:16 2024 +0800
Apply metrics to main components
Signed-off-by: Gao Hongtao <[email protected]>
---
api/common/id.go | 26 ++-
banyand/internal/storage/index.go | 21 +-
banyand/internal/storage/index_test.go | 4 +-
banyand/internal/storage/metrics.go | 112 ++++++++++
banyand/internal/storage/rotation.go | 14 +-
banyand/internal/storage/rotation_test.go | 8 +-
banyand/internal/storage/segment.go | 58 +++--
banyand/internal/storage/storage.go | 4 +-
banyand/internal/storage/tsdb.go | 43 +++-
banyand/liaison/grpc/measure.go | 40 +++-
banyand/liaison/grpc/metrics.go | 66 ++++++
banyand/liaison/grpc/property.go | 58 +++++
banyand/liaison/grpc/registry.go | 338 ++++++++++++++++++++++++++++--
banyand/liaison/grpc/registry_test.go | 4 +-
banyand/liaison/grpc/server.go | 33 ++-
banyand/liaison/grpc/stream.go | 40 +++-
banyand/measure/merger_test.go | 16 +-
banyand/measure/metadata.go | 14 +-
banyand/measure/metrics.go | 8 +-
banyand/measure/part.go | 5 +-
banyand/queue/sub/server.go | 39 +++-
banyand/queue/sub/sub.go | 24 +++
banyand/stream/flusher.go | 82 +++++---
banyand/stream/index.go | 7 +-
banyand/stream/introducer.go | 6 +
banyand/stream/merger.go | 46 ++--
banyand/stream/merger_test.go | 10 +-
banyand/stream/metadata.go | 15 +-
banyand/stream/metrics.go | 322 +++++++++++++++++++++++++++-
banyand/stream/part.go | 5 +-
banyand/stream/query_test.go | 2 +-
banyand/stream/tstable.go | 19 +-
banyand/stream/tstable_test.go | 4 +-
go.mod | 1 +
go.sum | 2 +
pkg/cmdsetup/data.go | 4 +-
pkg/cmdsetup/liaison.go | 8 +-
pkg/cmdsetup/standalone.go | 6 +-
pkg/index/index.go | 2 +-
pkg/index/inverted/inverted.go | 20 +-
pkg/index/inverted/metrics.go | 152 ++++++++++++++
pkg/meter/meter.go | 9 +
42 files changed, 1491 insertions(+), 206 deletions(-)
diff --git a/api/common/id.go b/api/common/id.go
index 4bb0d3ec..8ed29316 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -64,24 +64,34 @@ type Position struct {
Segment string
}
-// LabelNames returns the label names of Position.
-func LabelNames() []string {
- return []string{"module", "database", "shard", "seg"}
+// DBLabelNames returns the label names of Position in the database level.
+func DBLabelNames() []string {
+ return []string{"group"}
+}
+
+// SegLabelNames returns the label names of Position in the segment level.
+func SegLabelNames() []string {
+ return []string{"seg"}
}
// ShardLabelNames returns the label names of Position. It is used for shard
level metrics.
func ShardLabelNames() []string {
- return []string{"module", "database", "shard"}
+ return []string{"seg", "shard"}
+}
+
+// DBLabelValues returns the label values of Position in the database level.
+func (p Position) DBLabelValues() []string {
+ return []string{p.Database}
}
-// LabelValues returns the label values of Position.
-func (p Position) LabelValues() []string {
- return []string{p.Module, p.Database, p.Shard, p.Segment}
+// SegLabelValues returns the label values of Position.
+func (p Position) SegLabelValues() []string {
+ return []string{p.Segment}
}
// ShardLabelValues returns the label values of Position. It is used for shard
level metrics.
func (p Position) ShardLabelValues() []string {
- return []string{p.Module, p.Database, p.Shard}
+ return []string{p.Segment, p.Shard}
}
// SetPosition sets a position returned from fn to attach it to ctx, then
return a new context.
diff --git a/banyand/internal/storage/index.go
b/banyand/internal/storage/index.go
index d3b059ae..5a8e4900 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -43,20 +43,28 @@ func (s *segment[T, O]) Lookup(ctx context.Context, series
[]*pbv1.Series) (pbv1
}
type seriesIndex struct {
- store index.SeriesStore
- l *logger.Logger
+ store index.SeriesStore
+ l *logger.Logger
+ metrics *inverted.Metrics
+ p common.Position
}
-func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds
int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds
int64, metrics *inverted.Metrics) (*seriesIndex, error) {
si := &seriesIndex{
l: logger.Fetch(ctx, "series_index"),
+ p: common.GetPosition(ctx),
}
- var err error
- if si.store, err = inverted.NewStore(inverted.StoreOpts{
+ opts := inverted.StoreOpts{
Path: path.Join(root, "sidx"),
Logger: si.l,
BatchWaitSec: flushTimeoutSeconds,
- }); err != nil {
+ }
+ if metrics != nil {
+ opts.Metrics = metrics
+ si.metrics = opts.Metrics
+ }
+ var err error
+ if si.store, err = inverted.NewStore(opts); err != nil {
return nil, err
}
return si, nil
@@ -270,5 +278,6 @@ func (s *seriesIndex) Search(ctx context.Context, series
[]*pbv1.Series, opts In
}
func (s *seriesIndex) Close() error {
+ s.metrics.DeleteAll(s.p.SegLabelValues()...)
return s.store.Close()
}
diff --git a/banyand/internal/storage/index_test.go
b/banyand/internal/storage/index_test.go
index 2b196ec7..304bcff9 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -36,7 +36,7 @@ import (
func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
- si, err := newSeriesIndex(ctx, path, 0)
+ si, err := newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
defer func() {
require.NoError(t, si.Close())
@@ -66,7 +66,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
require.NoError(t, si.Write(docs))
// Restart the index
require.NoError(t, si.Close())
- si, err = newSeriesIndex(ctx, path, 0)
+ si, err = newSeriesIndex(ctx, path, 0, nil)
require.NoError(t, err)
tests := []struct {
name string
diff --git a/banyand/internal/storage/metrics.go
b/banyand/internal/storage/metrics.go
new file mode 100644
index 00000000..54f9d81a
--- /dev/null
+++ b/banyand/internal/storage/metrics.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+ lastTickTime meter.Gauge
+ totalSegRefs meter.Gauge
+
+ totalRotationStarted meter.Counter
+ totalRotationFinished meter.Counter
+ totalRotationErr meter.Counter
+
+ totalRetentionStarted meter.Counter
+ totalRetentionFinished meter.Counter
+ totalRetentionHasData meter.Counter
+ totalRetentionErr meter.Counter
+ totalRetentionHasDataLatency meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+ if factory == nil {
+ return nil
+ }
+ return &metrics{
+ lastTickTime:
factory.NewGauge("last_tick_time"),
+ totalSegRefs:
factory.NewGauge("total_segment_refs"),
+ totalRotationStarted:
factory.NewCounter("total_rotation_started"),
+ totalRotationFinished:
factory.NewCounter("total_rotation_finished"),
+ totalRotationErr:
factory.NewCounter("total_rotation_err"),
+ totalRetentionStarted:
factory.NewCounter("total_retention_started"),
+ totalRetentionFinished:
factory.NewCounter("total_retention_finished"),
+ totalRetentionErr:
factory.NewCounter("total_retention_err"),
+ totalRetentionHasDataLatency:
factory.NewCounter("total_retention_has_data_latency"),
+ totalRetentionHasData:
factory.NewCounter("total_retention_has_data"),
+ }
+}
+
+func (d *database[T, O]) incTotalRotationStarted(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRotationStarted.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRotationFinished(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRotationFinished.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRotationErr(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRotationErr.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionStarted(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRetentionStarted.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionFinished(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRetentionFinished.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionHasData(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRetentionHasData.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionErr(delta int) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRetentionErr.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionHasDataLatency(delta float64) {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.totalRetentionHasDataLatency.Inc(delta)
+}
diff --git a/banyand/internal/storage/rotation.go
b/banyand/internal/storage/rotation.go
index 9850dc62..ab18daec 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -68,11 +68,14 @@ func (d *database[T, O]) startRotationTask() error {
if gap <= 0 || gap > newSegmentTimeGap {
return
}
+ d.incTotalRotationStarted(1)
+ defer d.incTotalRotationFinished(1)
start :=
d.segmentController.opts.SegmentInterval.nextTime(t)
d.logger.Info().Time("segment_start",
start).Time("event_time", t).Msg("create new segment")
_, err :=
d.segmentController.create(start)
if err != nil {
d.logger.Error().Err(err).Msgf("failed to create new segment.")
+ d.incTotalRotationErr(1)
}
}()
}(ts)
@@ -110,9 +113,18 @@ func (rc *retentionTask[T, O]) run(now time.Time, l
*logger.Logger) bool {
<-rc.running
}()
+ rc.database.incTotalRetentionStarted(1)
+ defer rc.database.incTotalRetentionFinished(1)
deadline := now.Add(-rc.duration)
- if err := rc.database.segmentController.remove(deadline); err != nil {
+ start := time.Now()
+ hasData, err := rc.database.segmentController.remove(deadline)
+ if hasData {
+ rc.database.incTotalRetentionHasData(1)
+
rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
+ }
+ if err != nil {
l.Error().Err(err)
+ rc.database.incTotalRetentionErr(1)
}
return true
}
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index 69fdfab2..e9b1c9da 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -26,6 +26,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -140,7 +141,6 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any],
timestamp.MockClock, *
TTL: IntervalRule{Unit: DAY, Num: 3},
ShardNum: 1,
TSTableCreator: MockTSTableCreator,
- MetricsCreator: MockMetricsCreator,
}
ctx := context.Background()
mc := timestamp.NewMockClock()
@@ -169,6 +169,8 @@ func (m *MockTSTable) Close() error {
return nil
}
+func (m *MockTSTable) Collect(_ Metrics) {}
+
var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position,
_ *logger.Logger, _ timestamp.TimeRange, _, _ any,
) (*MockTSTable, error) {
@@ -179,4 +181,8 @@ type MockMetrics struct{}
func (m *MockMetrics) DeleteAll() {}
+func (m *MockMetrics) Factory() *observability.Factory {
+ return nil
+}
+
var MockMetricsCreator = func(_ common.Position) Metrics { return
&MockMetrics{} }
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 1b184794..d3e0e214 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -57,14 +58,19 @@ type segment[T TSTable, O any] struct {
}
func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime,
endTime time.Time, path, suffix string,
- p common.Position, opts TSDBOpts[T, O],
+ opts TSDBOpts[T, O],
) (s *segment[T, O], err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
+ p := common.GetPosition(ctx)
+ p.Segment = suffix
+ ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+ return p
+ })
id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger)
- sir, err := newSeriesIndex(ctx, path,
sc.opts.SeriesIndexFlushTimeoutSeconds)
+ sir, err := newSeriesIndex(ctx, path,
sc.opts.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
if err != nil {
return nil, errors.Wrap(errOpenDatabase,
errors.WithMessage(err, "create series index controller failed").Error())
}
@@ -195,28 +201,30 @@ func (s *segment[T, O]) String() string {
}
type segmentController[T TSTable, O any] struct {
- clock timestamp.Clock
- metrics Metrics
- l *logger.Logger
- position common.Position
- location string
- lst []*segment[T, O]
- opts TSDBOpts[T, O]
- deadline atomic.Int64
+ clock timestamp.Clock
+ metrics Metrics
+ l *logger.Logger
+ indexMetrics *inverted.Metrics
+ position common.Position
+ location string
+ lst []*segment[T, O]
+ opts TSDBOpts[T, O]
+ deadline atomic.Int64
sync.RWMutex
}
func newSegmentController[T TSTable, O any](ctx context.Context, location
string,
- l *logger.Logger, opts TSDBOpts[T, O], metrics Metrics,
+ l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics,
metrics Metrics,
) *segmentController[T, O] {
clock, _ := timestamp.GetClock(ctx)
return &segmentController[T, O]{
- location: location,
- opts: opts,
- l: l,
- clock: clock,
- position: common.GetPosition(ctx),
- metrics: metrics,
+ location: location,
+ opts: opts,
+ l: l,
+ clock: clock,
+ position: common.GetPosition(ctx),
+ metrics: metrics,
+ indexMetrics: indexMetrics,
}
}
@@ -367,9 +375,10 @@ func (sc *segmentController[T, O]) sortLst() {
func (sc *segmentController[T, O]) load(start, end time.Time, root string)
(seg *segment[T, O], err error) {
suffix := sc.format(start)
segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
- p := sc.position
- p.Segment = suffix
- seg, err = sc.openSegment(context.WithValue(context.Background(),
logger.ContextKey, sc.l), start, end, segPath, suffix, p, sc.opts)
+ ctx := common.SetPosition(context.WithValue(context.Background(),
logger.ContextKey, sc.l), func(_ common.Position) common.Position {
+ return sc.position
+ })
+ seg, err = sc.openSegment(ctx, start, end, segPath, suffix, sc.opts)
if err != nil {
return nil, err
}
@@ -378,9 +387,10 @@ func (sc *segmentController[T, O]) load(start, end
time.Time, root string) (seg
return seg, nil
}
-func (sc *segmentController[T, O]) remove(deadline time.Time) (err error) {
+func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment
bool, err error) {
for _, s := range sc.segments() {
if s.Before(deadline) {
+ hasSegment = true
s.delete()
sc.Lock()
sc.removeSeg(s.id)
@@ -389,7 +399,7 @@ func (sc *segmentController[T, O]) remove(deadline
time.Time) (err error) {
}
s.DecRef()
}
- return err
+ return hasSegment, err
}
func (sc *segmentController[T, O]) removeSeg(segID segmentID) {
@@ -413,7 +423,9 @@ func (sc *segmentController[T, O]) close() {
s.DecRef()
}
sc.lst = sc.lst[:0]
- sc.metrics.DeleteAll()
+ if sc.metrics != nil {
+ sc.metrics.DeleteAll()
+ }
}
func loadSegments[T TSTable, O any](root, prefix string, parser
*segmentController[T, O], intervalRule IntervalRule, loadFn func(start, end
time.Time) error) error {
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index d1bb070c..4bcc43d8 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -105,6 +105,7 @@ type Segment[T TSTable, O any] interface {
// TSTable is time series table.
type TSTable interface {
io.Closer
+ Collect(Metrics)
}
// TSTableCreator creates a TSTable.
@@ -117,9 +118,6 @@ type Metrics interface {
DeleteAll()
}
-// MetricsCreator creates a Metrics.
-type MetricsCreator func(position common.Position) Metrics
-
// IntervalUnit denotes the unit of a time point.
type IntervalUnit int
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 03d8b062..d1539d18 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -28,7 +28,9 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -50,13 +52,14 @@ const (
// TSDBOpts wraps options to create a tsdb.
type TSDBOpts[T TSTable, O any] struct {
Option O
+ TableMetrics Metrics
TSTableCreator TSTableCreator[T, O]
- MetricsCreator MetricsCreator
+ StorageMetricsFactory *observability.Factory
Location string
SegmentInterval IntervalRule
TTL IntervalRule
- ShardNum uint32
SeriesIndexFlushTimeoutSeconds int64
+ ShardNum uint32
}
type (
@@ -73,10 +76,11 @@ type database[T TSTable, O any] struct {
scheduler *timestamp.Scheduler
tsEventCh chan int64
segmentController *segmentController[T, O]
- p common.Position
- location string
- opts TSDBOpts[T, O]
- latestTickTime atomic.Int64
+ *metrics
+ p common.Position
+ location string
+ opts TSDBOpts[T, O]
+ latestTickTime atomic.Int64
sync.RWMutex
rotationProcessOn atomic.Bool
}
@@ -109,6 +113,11 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
l := logger.Fetch(ctx, p.Database)
clock, _ := timestamp.GetClock(ctx)
scheduler := timestamp.NewScheduler(l, clock)
+
+ var indexMetrics *inverted.Metrics
+ if opts.StorageMetricsFactory != nil {
+ indexMetrics = inverted.NewMetrics(opts.StorageMetricsFactory,
common.SegLabelNames()...)
+ }
db := &database[T, O]{
location: location,
scheduler: scheduler,
@@ -117,7 +126,8 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
tsEventCh: make(chan int64),
p: p,
segmentController: newSegmentController[T](ctx, location,
- l, opts, opts.MetricsCreator(p)),
+ l, opts, indexMetrics, opts.TableMetrics),
+ metrics: newMetrics(opts.StorageMetricsFactory),
}
db.logger.Info().Str("path", opts.Location).Msg("initialized")
lockPath := filepath.Join(opts.Location, lockFilename)
@@ -129,6 +139,7 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts
TSDBOpts[T, O]) (TSDB[
if err := db.segmentController.open(); err != nil {
return nil, err
}
+ observability.MetricsCollector.Register(location, db.collect)
return db, db.startRotationTask()
}
@@ -140,6 +151,24 @@ func (d *database[T, O]) SelectSegments(timeRange
timestamp.TimeRange) []Segment
return d.segmentController.selectSegments(timeRange)
}
+func (d *database[T, O]) collect() {
+ if d.metrics == nil {
+ return
+ }
+ d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load()))
+ refCount := int32(0)
+ ss := d.segmentController.segments()
+ for _, s := range ss {
+ for _, t := range s.Tables() {
+ t.Collect(d.segmentController.metrics)
+ }
+ s.index.store.CollectMetrics(s.index.p.SegLabelValues()...)
+ s.DecRef()
+ refCount += atomic.LoadInt32(&s.refCount)
+ }
+ d.totalSegRefs.Set(float64(refCount))
+}
+
type walkFn func(suffix string) error
func walkDir(root, prefix string, wf walkFn) error {
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 392fedca..de1a860d 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -43,12 +43,13 @@ import (
type measureService struct {
measurev1.UnimplementedMeasureServiceServer
- *discoveryService
- sampled *logger.Logger
ingestionAccessLog accesslog.Log
pipeline queue.Client
broadcaster queue.Client
- writeTimeout time.Duration
+ *discoveryService
+ sampled *logger.Logger
+ metrics *metrics
+ writeTimeout time.Duration
}
func (ms *measureService) setLogger(log *logger.Logger) {
@@ -65,13 +66,24 @@ func (ms *measureService) activeIngestionAccessLog(root
string) (err error) {
func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer)
error {
reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, measure measurev1.MeasureService_WriteServer, logger
*logger.Logger) {
+ if status != modelv1.Status_STATUS_SUCCEED {
+ ms.metrics.totalStreamMsgReceivedErr.Inc(1,
metadata.Group, "measure", "write")
+ }
+ ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"measure", "write")
if errResp := measure.Send(&measurev1.WriteResponse{Metadata:
metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send
response")
+ ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"measure", "write")
}
}
ctx := measure.Context()
publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
- defer publisher.Close()
+ ms.metrics.totalStreamStarted.Inc(1, "measure", "write")
+ start := time.Now()
+ defer func() {
+ publisher.Close()
+ ms.metrics.totalStreamFinished.Inc(1, "measure", "write")
+ ms.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(),
"measure", "write")
+ }()
for {
select {
case <-ctx.Done():
@@ -83,9 +95,12 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
return nil
}
if err != nil {
- ms.sampled.Error().Err(err).Stringer("written",
writeRequest).Msg("failed to receive message")
+ if !errors.Is(err, context.DeadlineExceeded) &&
!errors.Is(err, context.Canceled) {
+ ms.sampled.Error().Err(err).Stringer("written",
writeRequest).Msg("failed to receive message")
+ }
return err
}
+ ms.metrics.totalStreamMsgReceived.Inc(1,
writeRequest.Metadata.Group, "measure", "write")
if errTime :=
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
ms.sampled.Error().Err(errTime).Stringer("written",
writeRequest).Msg("the data point time is invalid")
reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure,
ms.sampled)
@@ -140,13 +155,26 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure,
ms.sampled)
continue
}
- reply(nil, modelv1.Status_STATUS_SUCCEED,
writeRequest.GetMessageId(), measure, ms.sampled)
+ reply(writeRequest.GetMetadata(),
modelv1.Status_STATUS_SUCCEED, writeRequest.GetMessageId(), measure, ms.sampled)
}
}
var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints:
make([]*measurev1.DataPoint, 0)}
func (ms *measureService) Query(_ context.Context, req
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ ms.metrics.totalStarted.Inc(1, g, "measure", "query")
+ }
+ start := time.Now()
+ defer func() {
+ for _, g := range req.Groups {
+ ms.metrics.totalFinished.Inc(1, g, "measure", "query")
+ if err != nil {
+ ms.metrics.totalErr.Inc(1, g, "measure",
"query")
+ }
+
ms.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "measure", "query")
+ }
+ }()
if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "%v is invalid
:%s", req.GetTimeRange(), err)
}
diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go
new file mode 100644
index 00000000..dee5a378
--- /dev/null
+++ b/banyand/liaison/grpc/metrics.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+
+ totalStreamStarted meter.Counter
+ totalStreamFinished meter.Counter
+ totalStreamErr meter.Counter
+ totalStreamLatency meter.Counter
+
+ totalStreamMsgReceived meter.Counter
+ totalStreamMsgReceivedErr meter.Counter
+ totalStreamMsgSent meter.Counter
+ totalStreamMsgSentErr meter.Counter
+
+ totalRegistryStarted meter.Counter
+ totalRegistryFinished meter.Counter
+ totalRegistryErr meter.Counter
+ totalRegistryLatency meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+ return &metrics{
+ totalStarted: factory.NewCounter("total_started",
"group", "service", "method"),
+ totalFinished: factory.NewCounter("total_finished",
"group", "service", "method"),
+ totalErr: factory.NewCounter("total_err",
"group", "service", "method"),
+ totalLatency: factory.NewCounter("total_latency",
"group", "service", "method"),
+ totalStreamStarted:
factory.NewCounter("total_stream_started", "service", "method"),
+ totalStreamFinished:
factory.NewCounter("total_stream_finished", "service", "method"),
+ totalStreamErr:
factory.NewCounter("total_stream_err", "service", "method"),
+ totalStreamLatency:
factory.NewCounter("total_stream_latency", "service", "method"),
+ totalStreamMsgReceived:
factory.NewCounter("total_stream_msg_received", "group", "service", "method"),
+ totalStreamMsgReceivedErr:
factory.NewCounter("total_stream_msg_received_err", "group", "service",
"method"),
+ totalStreamMsgSent:
factory.NewCounter("total_stream_msg_sent", "group", "service", "method"),
+ totalStreamMsgSentErr:
factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"),
+ totalRegistryStarted:
factory.NewCounter("total_registry_started", "group", "service", "method"),
+ totalRegistryFinished:
factory.NewCounter("total_registry_finished", "group", "service", "method"),
+ totalRegistryErr:
factory.NewCounter("total_registry_err", "group", "service", "method"),
+ totalRegistryLatency:
factory.NewCounter("total_registry_latency", "group", "service", "method"),
+ }
+}
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 6c0bcbc5..ca4fa61f 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -15,10 +15,28 @@
// specific language governing permissions and limitations
// under the License.
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, "property",
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
package grpc
import (
"context"
+ "time"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -27,19 +45,36 @@ import (
type propertyServer struct {
propertyv1.UnimplementedPropertyServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
func (ps *propertyServer) Apply(ctx context.Context, req
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+ g := req.Property.Metadata.Container.Group
+ ps.metrics.totalStarted.Inc(1, g, "property", "apply")
+ start := time.Now()
+ defer func() {
+ ps.metrics.totalFinished.Inc(1, g, "property", "apply")
+ ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g,
"property", "apply")
+ }()
created, tagsNum, leaseID, err :=
ps.schemaRegistry.PropertyRegistry().ApplyProperty(ctx, req.Property,
req.Strategy)
if err != nil {
+ ps.metrics.totalErr.Inc(1, g, "property", "apply")
return nil, err
}
return &propertyv1.ApplyResponse{Created: created, TagsNum: tagsNum,
LeaseId: leaseID}, nil
}
func (ps *propertyServer) Delete(ctx context.Context, req
*propertyv1.DeleteRequest) (*propertyv1.DeleteResponse, error) {
+ g := req.Metadata.Container.Group
+ ps.metrics.totalStarted.Inc(1, g, "property", "delete")
+ start := time.Now()
+ defer func() {
+ ps.metrics.totalFinished.Inc(1, g, "property", "delete")
+ ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g,
"property", "delete")
+ }()
ok, tagsNum, err :=
ps.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, req.GetMetadata(),
req.Tags)
if err != nil {
+ ps.metrics.totalErr.Inc(1, g, "property", "delete")
return nil, err
}
return &propertyv1.DeleteResponse{
@@ -49,8 +84,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req
*propertyv1.DeleteRequ
}
func (ps *propertyServer) Get(ctx context.Context, req *propertyv1.GetRequest)
(*propertyv1.GetResponse, error) {
+ g := req.Metadata.Container.Group
+ ps.metrics.totalStarted.Inc(1, g, "property", "get")
+ start := time.Now()
+ defer func() {
+ ps.metrics.totalFinished.Inc(1, g, "property", "get")
+ ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g,
"property", "get")
+ }()
entity, err := ps.schemaRegistry.PropertyRegistry().GetProperty(ctx,
req.GetMetadata(), req.GetTags())
if err != nil {
+ ps.metrics.totalErr.Inc(1, g, "property", "get")
return nil, err
}
return &propertyv1.GetResponse{
@@ -59,8 +102,16 @@ func (ps *propertyServer) Get(ctx context.Context, req
*propertyv1.GetRequest) (
}
func (ps *propertyServer) List(ctx context.Context, req
*propertyv1.ListRequest) (*propertyv1.ListResponse, error) {
+ g := req.Container.Group
+ ps.metrics.totalStarted.Inc(1, g, "property", "list")
+ start := time.Now()
+ defer func() {
+ ps.metrics.totalFinished.Inc(1, g, "property", "list")
+ ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g,
"property", "list")
+ }()
entities, err := ps.schemaRegistry.PropertyRegistry().ListProperty(ctx,
req.GetContainer(), req.Ids, req.Tags)
if err != nil {
+ ps.metrics.totalErr.Inc(1, g, "property", "list")
return nil, err
}
return &propertyv1.ListResponse{
@@ -69,8 +120,15 @@ func (ps *propertyServer) List(ctx context.Context, req
*propertyv1.ListRequest)
}
func (ps *propertyServer) KeepAlive(ctx context.Context, req
*propertyv1.KeepAliveRequest) (*propertyv1.KeepAliveResponse, error) {
+ ps.metrics.totalStarted.Inc(1, "", "property", "keep_alive")
+ start := time.Now()
+ defer func() {
+ ps.metrics.totalFinished.Inc(1, "", "property", "keep_alive")
+ ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "",
"property", "keep_alive")
+ }()
err := ps.schemaRegistry.PropertyRegistry().KeepAlive(ctx,
req.GetLeaseId())
if err != nil {
+ ps.metrics.totalErr.Inc(1, "", "property", "keep_alive")
return nil, err
}
return &propertyv1.KeepAliveResponse{}, nil
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 511d4e9b..ef3a59e7 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -20,6 +20,7 @@ package grpc
import (
"context"
"errors"
+ "time"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -33,13 +34,22 @@ import (
type streamRegistryServer struct {
databasev1.UnimplementedStreamRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
func (rs *streamRegistryServer) Create(ctx context.Context,
req *databasev1.StreamRegistryServiceCreateRequest,
) (*databasev1.StreamRegistryServiceCreateResponse, error) {
+ g := req.Stream.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "create")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "create")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"create")
+ }()
modRevision, err :=
rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "create")
return nil, err
}
return &databasev1.StreamRegistryServiceCreateResponse{
@@ -50,8 +60,16 @@ func (rs *streamRegistryServer) Create(ctx context.Context,
func (rs *streamRegistryServer) Update(ctx context.Context,
req *databasev1.StreamRegistryServiceUpdateRequest,
) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
+ g := req.Stream.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "update")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "update")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"update")
+ }()
modRevision, err :=
rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "update")
return nil, err
}
return &databasev1.StreamRegistryServiceUpdateResponse{
@@ -62,8 +80,16 @@ func (rs *streamRegistryServer) Update(ctx context.Context,
func (rs *streamRegistryServer) Delete(ctx context.Context,
req *databasev1.StreamRegistryServiceDeleteRequest,
) (*databasev1.StreamRegistryServiceDeleteResponse, error) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "delete")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "delete")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"delete")
+ }()
ok, err := rs.schemaRegistry.StreamRegistry().DeleteStream(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "delete")
return nil, err
}
return &databasev1.StreamRegistryServiceDeleteResponse{
@@ -74,8 +100,16 @@ func (rs *streamRegistryServer) Delete(ctx context.Context,
func (rs *streamRegistryServer) Get(ctx context.Context,
req *databasev1.StreamRegistryServiceGetRequest,
) (*databasev1.StreamRegistryServiceGetResponse, error) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "get")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "get")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"get")
+ }()
entity, err := rs.schemaRegistry.StreamRegistry().GetStream(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "get")
return nil, err
}
return &databasev1.StreamRegistryServiceGetResponse{
@@ -86,8 +120,16 @@ func (rs *streamRegistryServer) Get(ctx context.Context,
func (rs *streamRegistryServer) List(ctx context.Context,
req *databasev1.StreamRegistryServiceListRequest,
) (*databasev1.StreamRegistryServiceListResponse, error) {
+ g := req.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "list")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "list")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"list")
+ }()
entities, err := rs.schemaRegistry.StreamRegistry().ListStream(ctx,
schema.ListOpt{Group: req.GetGroup()})
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "list")
return nil, err
}
return &databasev1.StreamRegistryServiceListResponse{
@@ -96,6 +138,13 @@ func (rs *streamRegistryServer) List(ctx context.Context,
}
func (rs *streamRegistryServer) Exist(ctx context.Context, req
*databasev1.StreamRegistryServiceExistRequest)
(*databasev1.StreamRegistryServiceExistResponse, error) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "exist")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "exist")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream",
"exist")
+ }()
_, err := rs.Get(ctx,
&databasev1.StreamRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.StreamRegistryServiceExistResponse{
@@ -105,6 +154,7 @@ func (rs *streamRegistryServer) Exist(ctx context.Context,
req *databasev1.Strea
}
exist, errGroup := groupExist(ctx, err, req.Metadata,
rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "stream", "exist")
return nil, errGroup
}
return &databasev1.StreamRegistryServiceExistResponse{HasGroup: exist,
HasStream: false}, nil
@@ -130,13 +180,22 @@ func groupExist(ctx context.Context, errResource error,
metadata *commonv1.Metad
type indexRuleBindingRegistryServer struct {
databasev1.UnimplementedIndexRuleBindingRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
*databasev1.IndexRuleBindingRegistryServiceCreateResponse, error,
) {
+ g := req.IndexRuleBinding.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "create")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"create")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "create")
+ }()
if err :=
rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx,
req.GetIndexRuleBinding()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding",
"create")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -146,7 +205,15 @@ func (rs *indexRuleBindingRegistryServer) Update(ctx
context.Context,
req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) (
*databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error,
) {
+ g := req.IndexRuleBinding.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "update")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"update")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "update")
+ }()
if err :=
rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx,
req.GetIndexRuleBinding()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding",
"update")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil
@@ -156,8 +223,16 @@ func (rs *indexRuleBindingRegistryServer) Delete(ctx
context.Context,
req *databasev1.IndexRuleBindingRegistryServiceDeleteRequest) (
*databasev1.IndexRuleBindingRegistryServiceDeleteResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "delete")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"delete")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "delete")
+ }()
ok, err :=
rs.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding",
"delete")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceDeleteResponse{
@@ -169,8 +244,16 @@ func (rs *indexRuleBindingRegistryServer) Get(ctx
context.Context,
req *databasev1.IndexRuleBindingRegistryServiceGetRequest) (
*databasev1.IndexRuleBindingRegistryServiceGetResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "get")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"get")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "get")
+ }()
entity, err :=
rs.schemaRegistry.IndexRuleBindingRegistry().GetIndexRuleBinding(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "get")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceGetResponse{
@@ -182,9 +265,17 @@ func (rs *indexRuleBindingRegistryServer) List(ctx
context.Context,
req *databasev1.IndexRuleBindingRegistryServiceListRequest) (
*databasev1.IndexRuleBindingRegistryServiceListResponse, error,
) {
+ g := req.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "list")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"list")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "list")
+ }()
entities, err := rs.schemaRegistry.IndexRuleBindingRegistry().
ListIndexRuleBinding(ctx, schema.ListOpt{Group: req.GetGroup()})
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding",
"list")
return nil, err
}
return &databasev1.IndexRuleBindingRegistryServiceListResponse{
@@ -195,6 +286,13 @@ func (rs *indexRuleBindingRegistryServer) List(ctx
context.Context,
func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req
*databasev1.IndexRuleBindingRegistryServiceExistRequest) (
*databasev1.IndexRuleBindingRegistryServiceExistResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "exist")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding",
"exist")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRuleBinding", "exist")
+ }()
_, err := rs.Get(ctx,
&databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.IndexRuleBindingRegistryServiceExistResponse{
@@ -204,6 +302,7 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx
context.Context, req *databa
}
exist, errGroup := groupExist(ctx, err, req.Metadata,
rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding",
"exist")
return nil, errGroup
}
return
&databasev1.IndexRuleBindingRegistryServiceExistResponse{HasGroup: exist,
HasIndexRuleBinding: false}, nil
@@ -212,31 +311,59 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx
context.Context, req *databa
type indexRuleRegistryServer struct {
databasev1.UnimplementedIndexRuleRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
-func (rs *indexRuleRegistryServer) Create(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceCreateRequest) (
+func (rs *indexRuleRegistryServer) Create(ctx context.Context,
+ req *databasev1.IndexRuleRegistryServiceCreateRequest) (
*databasev1.IndexRuleRegistryServiceCreateResponse, error,
) {
+ g := req.IndexRule.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "create")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule",
"create")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "create")
+ }()
if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx,
req.GetIndexRule()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "create")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
}
-func (rs *indexRuleRegistryServer) Update(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceUpdateRequest) (
+func (rs *indexRuleRegistryServer) Update(ctx context.Context,
+ req *databasev1.IndexRuleRegistryServiceUpdateRequest) (
*databasev1.IndexRuleRegistryServiceUpdateResponse, error,
) {
+ g := req.IndexRule.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "update")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule",
"update")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "update")
+ }()
if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx,
req.GetIndexRule()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "update")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil
}
-func (rs *indexRuleRegistryServer) Delete(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceDeleteRequest) (
+func (rs *indexRuleRegistryServer) Delete(ctx context.Context,
+ req *databasev1.IndexRuleRegistryServiceDeleteRequest) (
*databasev1.IndexRuleRegistryServiceDeleteResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "delete")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule",
"delete")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "delete")
+ }()
ok, err := rs.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "delete")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceDeleteResponse{
@@ -244,11 +371,20 @@ func (rs *indexRuleRegistryServer) Delete(ctx
context.Context, req *databasev1.I
}, nil
}
-func (rs *indexRuleRegistryServer) Get(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceGetRequest) (
+func (rs *indexRuleRegistryServer) Get(ctx context.Context,
+ req *databasev1.IndexRuleRegistryServiceGetRequest) (
*databasev1.IndexRuleRegistryServiceGetResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "get")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "get")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "get")
+ }()
entity, err := rs.schemaRegistry.IndexRuleRegistry().GetIndexRule(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "get")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceGetResponse{
@@ -256,11 +392,20 @@ func (rs *indexRuleRegistryServer) Get(ctx
context.Context, req *databasev1.Inde
}, nil
}
-func (rs *indexRuleRegistryServer) List(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceListRequest) (
+func (rs *indexRuleRegistryServer) List(ctx context.Context,
+ req *databasev1.IndexRuleRegistryServiceListRequest) (
*databasev1.IndexRuleRegistryServiceListResponse, error,
) {
+ g := req.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "list")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "list")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "list")
+ }()
entities, err :=
rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, schema.ListOpt{Group:
req.GetGroup()})
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "list")
return nil, err
}
return &databasev1.IndexRuleRegistryServiceListResponse{
@@ -271,6 +416,13 @@ func (rs *indexRuleRegistryServer) List(ctx
context.Context, req *databasev1.Ind
func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req
*databasev1.IndexRuleRegistryServiceExistRequest) (
*databasev1.IndexRuleRegistryServiceExistResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "exist")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "exist")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"indexRule", "exist")
+ }()
_, err := rs.Get(ctx,
&databasev1.IndexRuleRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.IndexRuleRegistryServiceExistResponse{
@@ -280,6 +432,7 @@ func (rs *indexRuleRegistryServer) Exist(ctx
context.Context, req *databasev1.In
}
exist, errGroup := groupExist(ctx, err, req.Metadata,
rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "exist")
return nil, errGroup
}
return &databasev1.IndexRuleRegistryServiceExistResponse{HasGroup:
exist, HasIndexRule: false}, nil
@@ -288,13 +441,23 @@ func (rs *indexRuleRegistryServer) Exist(ctx
context.Context, req *databasev1.In
type measureRegistryServer struct {
databasev1.UnimplementedMeasureRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
-func (rs *measureRegistryServer) Create(ctx context.Context, req
*databasev1.MeasureRegistryServiceCreateRequest) (
+func (rs *measureRegistryServer) Create(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceCreateRequest) (
*databasev1.MeasureRegistryServiceCreateResponse, error,
) {
+ g := req.Measure.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "create")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "create")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"create")
+ }()
modRevision, err :=
rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "create")
return nil, err
}
return &databasev1.MeasureRegistryServiceCreateResponse{
@@ -302,11 +465,20 @@ func (rs *measureRegistryServer) Create(ctx
context.Context, req *databasev1.Mea
}, nil
}
-func (rs *measureRegistryServer) Update(ctx context.Context, req
*databasev1.MeasureRegistryServiceUpdateRequest) (
+func (rs *measureRegistryServer) Update(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceUpdateRequest) (
*databasev1.MeasureRegistryServiceUpdateResponse, error,
) {
+ g := req.Measure.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "update")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "update")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"update")
+ }()
modRevision, err :=
rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "update")
return nil, err
}
return &databasev1.MeasureRegistryServiceUpdateResponse{
@@ -314,11 +486,20 @@ func (rs *measureRegistryServer) Update(ctx
context.Context, req *databasev1.Mea
}, nil
}
-func (rs *measureRegistryServer) Delete(ctx context.Context, req
*databasev1.MeasureRegistryServiceDeleteRequest) (
+func (rs *measureRegistryServer) Delete(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceDeleteRequest) (
*databasev1.MeasureRegistryServiceDeleteResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "delete")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "delete")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"delete")
+ }()
ok, err := rs.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "delete")
return nil, err
}
return &databasev1.MeasureRegistryServiceDeleteResponse{
@@ -326,11 +507,20 @@ func (rs *measureRegistryServer) Delete(ctx
context.Context, req *databasev1.Mea
}, nil
}
-func (rs *measureRegistryServer) Get(ctx context.Context, req
*databasev1.MeasureRegistryServiceGetRequest) (
+func (rs *measureRegistryServer) Get(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceGetRequest) (
*databasev1.MeasureRegistryServiceGetResponse, error,
) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "get")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "get")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"get")
+ }()
entity, err := rs.schemaRegistry.MeasureRegistry().GetMeasure(ctx,
req.GetMetadata())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "get")
return nil, err
}
return &databasev1.MeasureRegistryServiceGetResponse{
@@ -338,11 +528,20 @@ func (rs *measureRegistryServer) Get(ctx context.Context,
req *databasev1.Measur
}, nil
}
-func (rs *measureRegistryServer) List(ctx context.Context, req
*databasev1.MeasureRegistryServiceListRequest) (
+func (rs *measureRegistryServer) List(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceListRequest) (
*databasev1.MeasureRegistryServiceListResponse, error,
) {
+ g := req.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "list")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "list")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"list")
+ }()
entities, err := rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx,
schema.ListOpt{Group: req.GetGroup()})
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "list")
return nil, err
}
return &databasev1.MeasureRegistryServiceListResponse{
@@ -350,7 +549,17 @@ func (rs *measureRegistryServer) List(ctx context.Context,
req *databasev1.Measu
}, nil
}
-func (rs *measureRegistryServer) Exist(ctx context.Context, req
*databasev1.MeasureRegistryServiceExistRequest)
(*databasev1.MeasureRegistryServiceExistResponse, error) {
+func (rs *measureRegistryServer) Exist(ctx context.Context,
+ req *databasev1.MeasureRegistryServiceExistRequest) (
+ *databasev1.MeasureRegistryServiceExistResponse, error,
+) {
+ g := req.Metadata.Group
+ rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "exist")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "exist")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure",
"exist")
+ }()
_, err := rs.Get(ctx,
&databasev1.MeasureRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.MeasureRegistryServiceExistResponse{
@@ -360,6 +569,7 @@ func (rs *measureRegistryServer) Exist(ctx context.Context,
req *databasev1.Meas
}
exist, errGroup := groupExist(ctx, err, req.Metadata,
rs.schemaRegistry.GroupRegistry())
if errGroup != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "measure", "exist")
return nil, errGroup
}
return &databasev1.MeasureRegistryServiceExistResponse{HasGroup: exist,
HasMeasure: false}, nil
@@ -368,12 +578,21 @@ func (rs *measureRegistryServer) Exist(ctx
context.Context, req *databasev1.Meas
type groupRegistryServer struct {
databasev1.UnimplementedGroupRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
func (rs *groupRegistryServer) Create(ctx context.Context, req
*databasev1.GroupRegistryServiceCreateRequest) (
*databasev1.GroupRegistryServiceCreateResponse, error,
) {
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "create")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "create")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"create")
+ }()
if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx,
req.GetGroup()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "create")
return nil, err
}
return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -382,7 +601,15 @@ func (rs *groupRegistryServer) Create(ctx context.Context,
req *databasev1.Group
func (rs *groupRegistryServer) Update(ctx context.Context, req
*databasev1.GroupRegistryServiceUpdateRequest) (
*databasev1.GroupRegistryServiceUpdateResponse, error,
) {
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "update")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "update")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"update")
+ }()
if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx,
req.GetGroup()); err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "update")
return nil, err
}
return &databasev1.GroupRegistryServiceUpdateResponse{}, nil
@@ -391,8 +618,16 @@ func (rs *groupRegistryServer) Update(ctx context.Context,
req *databasev1.Group
func (rs *groupRegistryServer) Delete(ctx context.Context, req
*databasev1.GroupRegistryServiceDeleteRequest) (
*databasev1.GroupRegistryServiceDeleteResponse, error,
) {
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"delete")
+ }()
deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx,
req.GetGroup())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
return nil, err
}
return &databasev1.GroupRegistryServiceDeleteResponse{
@@ -403,20 +638,36 @@ func (rs *groupRegistryServer) Delete(ctx
context.Context, req *databasev1.Group
func (rs *groupRegistryServer) Get(ctx context.Context, req
*databasev1.GroupRegistryServiceGetRequest) (
*databasev1.GroupRegistryServiceGetResponse, error,
) {
- g, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx,
req.GetGroup())
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "get")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "get")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"get")
+ }()
+ group, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx,
req.GetGroup())
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "get")
return nil, err
}
return &databasev1.GroupRegistryServiceGetResponse{
- Group: g,
+ Group: group,
}, nil
}
func (rs *groupRegistryServer) List(ctx context.Context, _
*databasev1.GroupRegistryServiceListRequest) (
*databasev1.GroupRegistryServiceListResponse, error,
) {
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "list")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "list")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"list")
+ }()
groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx)
if err != nil {
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "list")
return nil, err
}
return &databasev1.GroupRegistryServiceListResponse{
@@ -424,7 +675,16 @@ func (rs *groupRegistryServer) List(ctx context.Context, _
*databasev1.GroupRegi
}, nil
}
-func (rs *groupRegistryServer) Exist(ctx context.Context, req
*databasev1.GroupRegistryServiceExistRequest)
(*databasev1.GroupRegistryServiceExistResponse, error) {
+func (rs *groupRegistryServer) Exist(ctx context.Context, req
*databasev1.GroupRegistryServiceExistRequest) (
+ *databasev1.GroupRegistryServiceExistResponse, error,
+) {
+ g := ""
+ rs.metrics.totalRegistryStarted.Inc(1, g, "group", "exist")
+ start := time.Now()
+ defer func() {
+ rs.metrics.totalRegistryFinished.Inc(1, g, "group", "exist")
+
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group",
"exist")
+ }()
_, err := rs.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{Group:
req.Group})
if err == nil {
return &databasev1.GroupRegistryServiceExistResponse{
@@ -436,18 +696,28 @@ func (rs *groupRegistryServer) Exist(ctx context.Context,
req *databasev1.GroupR
HasGroup: false,
}, nil
}
+ rs.metrics.totalRegistryErr.Inc(1, g, "group", "exist")
return nil, err
}
type topNAggregationRegistryServer struct {
databasev1.UnimplementedTopNAggregationRegistryServiceServer
schemaRegistry metadata.Repo
+ metrics *metrics
}
func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceCreateRequest,
) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
+ g := req.TopNAggregation.Metadata.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "create")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"create")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "create")
+ }()
if err :=
ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx,
req.GetTopNAggregation()); err != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation",
"create")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
@@ -456,7 +726,15 @@ func (ts *topNAggregationRegistryServer) Create(ctx
context.Context,
func (ts *topNAggregationRegistryServer) Update(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceUpdateRequest,
) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) {
+ g := req.TopNAggregation.Metadata.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "update")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"update")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "update")
+ }()
if err :=
ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx,
req.GetTopNAggregation()); err != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation",
"update")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil
@@ -465,8 +743,16 @@ func (ts *topNAggregationRegistryServer) Update(ctx
context.Context,
func (ts *topNAggregationRegistryServer) Delete(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceDeleteRequest,
) (*databasev1.TopNAggregationRegistryServiceDeleteResponse, error) {
+ g := req.Metadata.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "delete")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"delete")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "delete")
+ }()
ok, err :=
ts.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx,
req.GetMetadata())
if err != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation",
"delete")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceDeleteResponse{
@@ -477,8 +763,16 @@ func (ts *topNAggregationRegistryServer) Delete(ctx
context.Context,
func (ts *topNAggregationRegistryServer) Get(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceGetRequest,
) (*databasev1.TopNAggregationRegistryServiceGetResponse, error) {
+ g := req.Metadata.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "get")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"get")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "get")
+ }()
entity, err :=
ts.schemaRegistry.TopNAggregationRegistry().GetTopNAggregation(ctx,
req.GetMetadata())
if err != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "get")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceGetResponse{
@@ -489,8 +783,16 @@ func (ts *topNAggregationRegistryServer) Get(ctx
context.Context,
func (ts *topNAggregationRegistryServer) List(ctx context.Context,
req *databasev1.TopNAggregationRegistryServiceListRequest,
) (*databasev1.TopNAggregationRegistryServiceListResponse, error) {
+ g := req.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "list")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"list")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "list")
+ }()
entities, err :=
ts.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx,
schema.ListOpt{Group: req.GetGroup()})
if err != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation",
"list")
return nil, err
}
return &databasev1.TopNAggregationRegistryServiceListResponse{
@@ -501,6 +803,13 @@ func (ts *topNAggregationRegistryServer) List(ctx
context.Context,
func (ts *topNAggregationRegistryServer) Exist(ctx context.Context, req
*databasev1.TopNAggregationRegistryServiceExistRequest) (
*databasev1.TopNAggregationRegistryServiceExistResponse, error,
) {
+ g := req.Metadata.Group
+ ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "exist")
+ start := time.Now()
+ defer func() {
+ ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation",
"exist")
+
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g,
"topn_aggregation", "exist")
+ }()
_, err := ts.Get(ctx,
&databasev1.TopNAggregationRegistryServiceGetRequest{Metadata: req.Metadata})
if err == nil {
return &databasev1.TopNAggregationRegistryServiceExistResponse{
@@ -510,6 +819,7 @@ func (ts *topNAggregationRegistryServer) Exist(ctx
context.Context, req *databas
}
exist, errGroup := groupExist(ctx, err, req.Metadata,
ts.schemaRegistry.GroupRegistry())
if errGroup != nil {
+ ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation",
"exist")
return nil, errGroup
}
return
&databasev1.TopNAggregationRegistryServiceExistResponse{HasGroup: exist,
HasTopNAggregation: false}, nil
diff --git a/banyand/liaison/grpc/registry_test.go
b/banyand/liaison/grpc/registry_test.go
index b1f4ea94..bb29c731 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -34,6 +34,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -178,8 +179,9 @@ func setupForRegistry() func() {
// Init `Metadata` module
metaSvc, err := embeddedserver.NewService(context.TODO())
Expect(err).NotTo(HaveOccurred())
+ metricSvc := observability.NewMetricService(metaSvc, pipeline,
"standalone", nil)
- tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
+ tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry(), metricSvc)
preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
var flags []string
metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 7485c4fe..31efab86 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -41,6 +41,7 @@ import (
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -54,6 +55,8 @@ var (
errNoAddr = errors.New("no address")
errQueryMsg = errors.New("invalid query message")
errAccessLogRootPath = errors.New("access log root path is required")
+
+ liaisonGrpcScope = observability.RootScope.SubScope("liaison_grpc")
)
// Server defines the gRPC server.
@@ -63,24 +66,25 @@ type Server interface {
}
type server struct {
- creds credentials.TransportCredentials
- *streamRegistryServer
- log *logger.Logger
- *indexRuleBindingRegistryServer
- ser *grpclib.Server
+ creds credentials.TransportCredentials
+ omr observability.MetricsRegistry
+ measureSVC *measureService
+ ser *grpclib.Server
+ log *logger.Logger
*propertyServer
*topNAggregationRegistryServer
*groupRegistryServer
stopCh chan struct{}
*indexRuleRegistryServer
*measureRegistryServer
- streamSVC *streamService
- measureSVC *measureService
- host string
+ streamSVC *streamService
+ *streamRegistryServer
+ *indexRuleBindingRegistryServer
keyFile string
certFile string
accessLogRootPath string
addr string
+ host string
accessLogRecorders []accessLogRecorder
maxRecvMsgSize run.Bytes
port uint32
@@ -89,7 +93,7 @@ type server struct {
}
// NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline, broadcaster queue.Client,
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server {
+func NewServer(_ context.Context, pipeline, broadcaster queue.Client,
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry, omr
observability.MetricsRegistry) Server {
streamSVC := &streamService{
discoveryService: newDiscoveryService(schema.KindStream,
schemaRegistry, nodeRegistry),
pipeline: pipeline,
@@ -101,6 +105,7 @@ func NewServer(_ context.Context, pipeline, broadcaster
queue.Client, schemaRegi
broadcaster: broadcaster,
}
s := &server{
+ omr: omr,
streamSVC: streamSVC,
measureSVC: measureSVC,
streamRegistryServer: &streamRegistryServer{
@@ -151,6 +156,16 @@ func (s *server) PreRun(_ context.Context) error {
}
}
}
+ metrics := newMetrics(s.omr.With(liaisonGrpcScope))
+ s.streamSVC.metrics = metrics
+ s.measureSVC.metrics = metrics
+ s.propertyServer.metrics = metrics
+ s.streamRegistryServer.metrics = metrics
+ s.indexRuleBindingRegistryServer.metrics = metrics
+ s.indexRuleRegistryServer.metrics = metrics
+ s.measureRegistryServer.metrics = metrics
+ s.groupRegistryServer.metrics = metrics
+ s.topNAggregationRegistryServer.metrics = metrics
return nil
}
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 4bdaae9a..ba264fdc 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -43,12 +43,13 @@ import (
type streamService struct {
streamv1.UnimplementedStreamServiceServer
- *discoveryService
- sampled *logger.Logger
ingestionAccessLog accesslog.Log
pipeline queue.Client
broadcaster queue.Client
- writeTimeout time.Duration
+ *discoveryService
+ sampled *logger.Logger
+ metrics *metrics
+ writeTimeout time.Duration
}
func (s *streamService) setLogger(log *logger.Logger) {
@@ -65,12 +66,23 @@ func (s *streamService) activeIngestionAccessLog(root
string) (err error) {
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
reply := func(metadata *commonv1.Metadata, status modelv1.Status,
messageId uint64, stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
+ if status != modelv1.Status_STATUS_SUCCEED {
+ s.metrics.totalStreamMsgReceivedErr.Inc(1,
metadata.Group, "stream", "write")
+ }
+ s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group,
"stream", "write")
if errResp := stream.Send(&streamv1.WriteResponse{Metadata:
metadata, Status: status, MessageId: messageId}); errResp != nil {
logger.Debug().Err(errResp).Msg("failed to send
response")
+ s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group,
"stream", "write")
}
}
+ s.metrics.totalStreamStarted.Inc(1, "stream", "write")
publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
- defer publisher.Close()
+ start := time.Now()
+ defer func() {
+ publisher.Close()
+ s.metrics.totalStreamFinished.Inc(1, "stream", "write")
+ s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(),
"stream", "write")
+ }()
ctx := stream.Context()
for {
select {
@@ -83,9 +95,12 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
return nil
}
if err != nil {
- s.sampled.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to receive message")
+ if !errors.Is(err, context.DeadlineExceeded) &&
!errors.Is(err, context.Canceled) {
+ s.sampled.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to receive message")
+ }
return err
}
+ s.metrics.totalStreamMsgReceived.Inc(1,
writeEntity.Metadata.Group, "stream", "write")
if errTime :=
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
s.sampled.Error().Stringer("written",
writeEntity).Err(errTime).Msg("the element time is invalid")
reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP,
writeEntity.GetMessageId(), stream, s.sampled)
@@ -134,13 +149,26 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
reply(writeEntity.GetMetadata(),
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream,
s.sampled)
continue
}
- reply(nil, modelv1.Status_STATUS_SUCCEED,
writeEntity.GetMessageId(), stream, s.sampled)
+ reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_SUCCEED,
writeEntity.GetMessageId(), stream, s.sampled)
}
}
var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements:
make([]*streamv1.Element, 0)}
func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest)
(resp *streamv1.QueryResponse, err error) {
+ for _, g := range req.Groups {
+ s.metrics.totalStarted.Inc(1, g, "stream", "query")
+ }
+ start := time.Now()
+ defer func() {
+ for _, g := range req.Groups {
+ s.metrics.totalFinished.Inc(1, g, "stream", "query")
+ if err != nil {
+ s.metrics.totalErr.Inc(1, g, "stream", "query")
+ }
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
g, "stream", "query")
+ }
+ }()
timeRange := req.GetTimeRange()
if timeRange == nil {
req.TimeRange = timestamp.DefaultTimeRange
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index c8de11bd..22c804e0 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -287,14 +287,14 @@ func Test_mergeParts(t *testing.T) {
name: "Test with multiple parts with a large
quantity of different ts",
dpsList: []*dataPoints{generateHugeDps(1, 5000, 1),
generateHugeDps(5001, 10000, 2)},
want: []blockMetadata{
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 2500,
uncompressedSizeBytes: 4190000},
- {seriesID: 1, count: 1250,
uncompressedSizeBytes: 2095000},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
+ {seriesID: 1, count: 1265,
uncompressedSizeBytes: 2120140},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
+ {seriesID: 1, count: 2470,
uncompressedSizeBytes: 4139720},
+ {seriesID: 1, count: 2410,
uncompressedSizeBytes: 4039160},
+ {seriesID: 1, count: 1205,
uncompressedSizeBytes: 2019580},
{seriesID: 2, count: 2, uncompressedSizeBytes:
110},
{seriesID: 3, count: 2, uncompressedSizeBytes:
48},
},
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index e6e69959..21b4076e 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -292,21 +292,25 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.Resourc
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+ name := groupSchema.Metadata.Name
+ p := common.Position{
+ Module: "measure",
+ Database: name,
+ }
+ metrics, factory := s.newMetrics(p)
opts := storage.TSDBOpts[*tsTable, option]{
ShardNum:
groupSchema.ResourceOpts.ShardNum,
Location: path.Join(s.path,
groupSchema.Metadata.Name),
TSTableCreator: newTSTable,
- MetricsCreator: s.newMetrics,
+ TableMetrics: metrics,
SegmentInterval:
storage.MustToIntervalRule(groupSchema.ResourceOpts.SegmentInterval),
TTL:
storage.MustToIntervalRule(groupSchema.ResourceOpts.Ttl),
Option: s.option,
SeriesIndexFlushTimeoutSeconds:
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+ StorageMetricsFactory: factory,
}
- name := groupSchema.Metadata.Name
return storage.OpenTSDB(
- common.SetPosition(context.Background(), func(p
common.Position) common.Position {
- p.Module = "measure"
- p.Database = name
+ common.SetPosition(context.Background(), func(_
common.Position) common.Position {
return p
}),
opts)
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 7f38f432..0629ec7d 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -41,10 +41,16 @@ func (m *metrics) DeleteAll() {
m.totalWritten.Delete()
}
-func (s *supplier) newMetrics(p common.Position) storage.Metrics {
+func (s *supplier) newMetrics(p common.Position) (storage.Metrics,
*observability.Factory) {
factory :=
s.omr.With(measureScope.ConstLabels(meter.LabelPairs{"group": p.Database}))
return &metrics{
totalWritten: factory.NewCounter("total_written"),
+ }, factory
+}
+
+func (tst *tsTable) Collect(m storage.Metrics) {
+ if m == nil {
+ return
}
}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 41baf29b..36a72eb8 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -23,7 +23,6 @@ import (
"path/filepath"
"sort"
"sync/atomic"
- "time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -203,7 +202,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path
string) {
}
func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
- n := uint64(len(time.RFC3339Nano))
+ // 8 bytes for timestamp
+ // 8 bytes for version
+ n := uint64(8 + 8)
n += uint64(len(dps.fields[index].name))
for i := range dps.fields[index].values {
n += uint64(dps.fields[index].values[i].size())
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 383cdc25..c7b2f643 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -37,9 +37,11 @@ import (
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -52,18 +54,22 @@ var (
_ run.PreRunner = (*server)(nil)
_ run.Service = (*server)(nil)
+
+ queueSubScope = observability.RootScope.SubScope("queue_sub")
)
type server struct {
+ omr observability.MetricsRegistry
creds credentials.TransportCredentials
log *logger.Logger
ser *grpclib.Server
listeners map[bus.Topic]bus.MessageListener
*clusterv1.UnimplementedServiceServer
- addr string
+ metrics *metrics
certFile string
- keyFile string
host string
+ keyFile string
+ addr string
maxRecvMsgSize run.Bytes
listenersLock sync.RWMutex
port uint32
@@ -71,14 +77,16 @@ type server struct {
}
// NewServer returns a new gRPC server.
-func NewServer() queue.Server {
+func NewServer(omr observability.MetricsRegistry) queue.Server {
return &server{
listeners: make(map[bus.Topic]bus.MessageListener),
+ omr: omr,
}
}
func (s *server) PreRun(_ context.Context) error {
s.log = logger.GetLogger("server-queue")
+ s.metrics = newMetrics(s.omr.With(queueSubScope))
return nil
}
@@ -191,3 +199,28 @@ func (s *server) GracefulStop() {
s.log.Info().Msg("stopped gracefully")
}
}
+
+type metrics struct {
+ totalStarted meter.Counter
+ totalFinished meter.Counter
+ totalErr meter.Counter
+ totalLatency meter.Counter
+
+ totalMsgReceived meter.Counter
+ totalMsgReceivedErr meter.Counter
+ totalMsgSent meter.Counter
+ totalMsgSentErr meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+ return &metrics{
+ totalStarted: factory.NewCounter("total_started",
"topic"),
+ totalFinished: factory.NewCounter("total_finished",
"topic"),
+ totalErr: factory.NewCounter("total_err", "topic"),
+ totalLatency: factory.NewCounter("total_latency",
"topic"),
+ totalMsgReceived: factory.NewCounter("total_msg_received",
"topic"),
+ totalMsgReceivedErr:
factory.NewCounter("total_msg_received_err", "topic"),
+ totalMsgSent: factory.NewCounter("total_msg_sent",
"topic"),
+ totalMsgSentErr: factory.NewCounter("total_msg_sent_err",
"topic"),
+ }
+}
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index c58b99c5..a9e9e3cb 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -21,6 +21,7 @@ package sub
import (
"fmt"
"io"
+ "time"
"github.com/pkg/errors"
"google.golang.org/grpc/codes"
@@ -37,17 +38,25 @@ import (
func (s *server) Send(stream clusterv1.Service_SendServer) error {
reply := func(writeEntity *clusterv1.SendRequest, err error, message
string) {
s.log.Error().Stringer("written",
writeEntity).Err(err).Msg(message)
+ s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
+ s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
if errResp := stream.Send(&clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
Error: message,
}); errResp != nil {
s.log.Err(errResp).Msg("failed to send response")
+ s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
}
}
ctx := stream.Context()
var topic *bus.Topic
var m bus.Message
var dataCollection []any
+ var start time.Time
+ defer func() {
+ s.metrics.totalFinished.Inc(1, topic.String())
+ s.metrics.totalLatency.Inc(time.Since(start).Seconds(),
topic.String())
+ }()
for {
select {
case <-ctx.Done():
@@ -77,6 +86,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
s.log.Error().Err(err).Msg("failed to receive message")
return err
}
+ s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic)
if writeEntity.Topic != "" && topic == nil {
t, ok := data.TopicMap[writeEntity.Topic]
if !ok {
@@ -102,14 +112,22 @@ func (s *server) Send(stream
clusterv1.Service_SendServer) error {
continue
}
if writeEntity.BatchMod {
+ if len(dataCollection) == 0 {
+ s.metrics.totalStarted.Inc(1, writeEntity.Topic)
+ start = time.Now()
+ }
dataCollection = append(dataCollection,
writeEntity.Body)
if errSend := stream.Send(&clusterv1.SendResponse{
MessageId: writeEntity.MessageId,
}); errSend != nil {
s.log.Error().Stringer("written",
writeEntity).Err(errSend).Msg("failed to send response")
+ s.metrics.totalMsgSentErr.Inc(1,
writeEntity.Topic)
+ continue
}
+ s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
continue
}
+ s.metrics.totalStarted.Inc(1, writeEntity.Topic)
listener := s.getListeners(*topic)
if listener == nil {
reply(writeEntity, err, "no listener found")
@@ -122,7 +140,10 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
MessageId: writeEntity.MessageId,
}); errSend != nil {
s.log.Error().Stringer("written",
writeEntity).Err(errSend).Msg("failed to send response")
+ s.metrics.totalMsgSentErr.Inc(1,
writeEntity.Topic)
+ continue
}
+ s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
continue
}
var message proto.Message
@@ -146,7 +167,10 @@ func (s *server) Send(stream clusterv1.Service_SendServer)
error {
Body: anyMessage,
}); err != nil {
s.log.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to send response")
+ s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
+ continue
}
+ s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
}
}
diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go
index c388ee0e..ae7010b3 100644
--- a/banyand/stream/flusher.go
+++ b/banyand/stream/flusher.go
@@ -40,40 +40,49 @@ func (tst *tsTable) flusherLoop(flushCh chan
*flusherIntroduction, mergeCh chan
case e := <-flusherWatcher:
flusherWatchers.Add(e)
case <-epochWatcher.Watch():
- curSnapshot := tst.currentSnapshot()
- if curSnapshot != nil {
- flusherWatchers =
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
- curSnapshot.decRef()
- curSnapshot = nil
- }
- tst.RLock()
- if tst.snapshot != nil && tst.snapshot.epoch > epoch {
- curSnapshot = tst.snapshot
- curSnapshot.incRef()
- }
- tst.RUnlock()
- if curSnapshot != nil {
- merged, err := tst.mergeMemParts(curSnapshot,
mergeCh)
- if err != nil {
-
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
+ if func() bool {
+ tst.incTotalFlushLoopStarted(1)
+ start := time.Now()
+ defer func() {
+ tst.incTotalFlushLoopFinished(1)
+
tst.incTotalFlushLatency(time.Since(start).Seconds())
+ }()
+ curSnapshot := tst.currentSnapshot()
+ if curSnapshot != nil {
+ flusherWatchers =
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
curSnapshot.decRef()
- continue
+ curSnapshot = nil
}
- if !merged {
- tst.flush(curSnapshot, flushCh)
+ tst.RLock()
+ if tst.snapshot != nil && tst.snapshot.epoch >
epoch {
+ curSnapshot = tst.snapshot
+ curSnapshot.incRef()
}
- epoch = curSnapshot.epoch
- // Notify merger to start a new round of merge.
- // This round might have be triggered in
pauseFlusherToPileupMemParts.
- flusherWatchers.Notify(math.MaxUint64)
- flusherWatchers = nil
- curSnapshot.decRef()
- if tst.currentEpoch() != epoch {
- continue
+ tst.RUnlock()
+ if curSnapshot != nil {
+ defer curSnapshot.decRef()
+ merged, err :=
tst.mergeMemParts(curSnapshot, mergeCh)
+ if err != nil {
+
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
+ tst.incTotalFlushLoopErr(1)
+ return false
+ }
+ if !merged {
+ tst.flush(curSnapshot, flushCh)
+ }
+ epoch = curSnapshot.epoch
+ // Notify merger to start a new round
of merge.
+ // This round might have be triggered
in pauseFlusherToPileupMemParts.
+ flusherWatchers.Notify(math.MaxUint64)
+ flusherWatchers = nil
+ if tst.currentEpoch() != epoch {
+ tst.incTotalFlushLoopProgress(1)
+ return false
+ }
}
- }
- epochWatcher = introducerWatcher.Add(epoch,
tst.loopCloser.CloseNotify())
- if epochWatcher == nil {
+ epochWatcher = introducerWatcher.Add(epoch,
tst.loopCloser.CloseNotify())
+ return epochWatcher == nil
+ }() {
return
}
}
@@ -93,9 +102,11 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch
uint64, flushWatcher watc
select {
case <-tst.loopCloser.CloseNotify():
case <-time.After(tst.option.flushTimeout):
+ tst.incTotalFlushPauseCompleted(1)
case e := <-flushWatcher:
flusherWatchers.Add(e)
flusherWatchers.Notify(epoch)
+ tst.incTotalFlushPauseBreak(1)
}
return flusherWatchers
}
@@ -115,7 +126,8 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh
chan *mergerIntroductio
}
// merge memory must not be closed by the tsTable.close
closeCh := make(chan struct{})
- newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
mergedIDs, mergeCh, closeCh)
+ newPart, err :=
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+ mergedIDs, mergeCh, closeCh, "mem")
close(closeCh)
if err != nil {
if errors.Is(err, errClosed) {
@@ -132,10 +144,13 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh
chan *mergerIntroductio
func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction) {
ind := generateFlusherIntroduction()
defer releaseFlusherIntroduction(ind)
+ start := time.Now()
+ partsCount := 0
for _, pw := range snapshot.parts {
if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
continue
}
+ partsCount++
partPath := partPath(tst.root, pw.ID())
pw.mp.mustFlush(tst.fileSystem, partPath)
newPW := newPartWrapper(nil, mustOpenFilePart(pw.ID(),
tst.root, tst.fileSystem))
@@ -145,6 +160,10 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction)
if len(ind.flushed) < 1 {
return
}
+ end := time.Now()
+ tst.incTotalFlushed(1)
+ tst.incTotalFlushedMemParts(partsCount)
+ tst.incTotalFlushLatency(end.Sub(start).Seconds())
ind.applied = make(chan struct{})
select {
case flushCh <- ind:
@@ -155,6 +174,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction)
case <-ind.applied:
case <-tst.loopCloser.CloseNotify():
}
+ tst.incTotalFlushIntroLatency(time.Since(end).Seconds())
}
func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index c3f4cdb2..56e21327 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -37,7 +37,7 @@ type elementIndex struct {
l *logger.Logger
}
-func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds
int64) (*elementIndex, error) {
+func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds
int64, metrics *inverted.Metrics) (*elementIndex, error) {
ei := &elementIndex{
l: logger.Fetch(ctx, "element_index"),
}
@@ -46,6 +46,7 @@ func newElementIndex(ctx context.Context, root string,
flushTimeoutSeconds int64
Path: path.Join(root, elementIndexFilename),
Logger: ei.l,
BatchWaitSec: flushTimeoutSeconds,
+ Metrics: metrics,
}); err != nil {
return nil, err
}
@@ -94,3 +95,7 @@ func (e *elementIndex) Search(seriesList pbv1.SeriesList,
filter index.Filter) (
func (e *elementIndex) Close() error {
return e.store.Close()
}
+
+func (e *elementIndex) collectMetrics(labelValues ...string) {
+ e.store.CollectMetrics(labelValues...)
+}
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index 072e8b39..d0dc0911 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -118,14 +118,20 @@ func (tst *tsTable) introducerLoop(flushCh chan
*flusherIntroduction, mergeCh ch
case <-tst.loopCloser.CloseNotify():
return
case next := <-tst.introductions:
+ tst.incTotalIntroduceLoopStarted(1, "mem")
tst.introduceMemPart(next, epoch)
+ tst.incTotalIntroduceLoopFinished(1, "mem")
epoch++
case next := <-flushCh:
+ tst.incTotalIntroduceLoopStarted(1, "flush")
tst.introduceFlushed(next, epoch)
+ tst.incTotalIntroduceLoopFinished(1, "flush")
tst.gc.clean()
epoch++
case next := <-mergeCh:
+ tst.incTotalIntroduceLoopStarted(1, "merge")
tst.introduceMerged(next, epoch)
+ tst.incTotalIntroduceLoopFinished(1, "merge")
tst.gc.clean()
epoch++
case epochWatcher := <-watcherCh:
diff --git a/banyand/stream/merger.go b/banyand/stream/merger.go
index cd4f4df5..59cde2cd 100644
--- a/banyand/stream/merger.go
+++ b/banyand/stream/merger.go
@@ -47,26 +47,29 @@ func (tst *tsTable) mergeLoop(merges chan
*mergerIntroduction, flusherNotifier w
case <-tst.loopCloser.CloseNotify():
return
case <-ew.Watch():
- curSnapshot := tst.currentSnapshot()
- if curSnapshot == nil {
- continue
- }
- if curSnapshot.epoch != epoch {
- var err error
- if pwsChunk, err =
tst.mergeSnapshot(curSnapshot, merges, pwsChunk[:0]); err != nil {
- if errors.Is(err, errClosed) {
- curSnapshot.decRef()
- return
+ if func() bool {
+ curSnapshot := tst.currentSnapshot()
+ if curSnapshot == nil {
+ return false
+ }
+ defer curSnapshot.decRef()
+ if curSnapshot.epoch != epoch {
+ tst.incTotalMergeLoopStarted(1)
+ defer tst.incTotalMergeLoopFinished(1)
+ var err error
+ if pwsChunk, err =
tst.mergeSnapshot(curSnapshot, merges, pwsChunk[:0]); err != nil {
+ if errors.Is(err, errClosed) {
+ return true
+ }
+
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
+ tst.incTotalMergeLoopErr(1)
+ return false
}
-
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d",
curSnapshot.epoch)
- curSnapshot.decRef()
- continue
+ epoch = curSnapshot.epoch
}
- epoch = curSnapshot.epoch
- }
- curSnapshot.decRef()
- ew = flusherNotifier.Add(epoch,
tst.loopCloser.CloseNotify())
- if ew == nil {
+ ew = flusherNotifier.Add(epoch,
tst.loopCloser.CloseNotify())
+ return ew == nil
+ }() {
return
}
}
@@ -81,14 +84,14 @@ func (tst *tsTable) mergeSnapshot(curSnapshot *snapshot,
merges chan *mergerIntr
return nil, nil
}
if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger,
dst,
- toBeMerged, merges, tst.loopCloser.CloseNotify()); err != nil {
+ toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err
!= nil {
return dst, err
}
return dst, nil
}
func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator,
parts []*partWrapper, merged map[uint64]struct{}, merges chan
*mergerIntroduction,
- closeCh <-chan struct{},
+ closeCh <-chan struct{}, typ string,
) (*partWrapper, error) {
reservedSpace := tst.reserveSpace(parts)
defer releaseDiskSpace(reservedSpace)
@@ -98,6 +101,9 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator
snapshotCreator, part
return nil, err
}
elapsed := time.Since(start)
+ tst.incTotalMergeLatency(elapsed.Seconds(), typ)
+ tst.incTotalMerged(1, typ)
+ tst.incTotalMergedParts(len(parts), typ)
if elapsed > 30*time.Second {
var totalCount uint64
for _, pw := range parts {
diff --git a/banyand/stream/merger_test.go b/banyand/stream/merger_test.go
index d011b5d0..310d164b 100644
--- a/banyand/stream/merger_test.go
+++ b/banyand/stream/merger_test.go
@@ -275,11 +275,11 @@ func Test_mergeParts(t *testing.T) {
name: "Test with multiple parts with a large quantity
of different ts",
esList: []*elements{generateHugeEs(1, 5000, 1),
generateHugeEs(5001, 10000, 2)},
want: []blockMetadata{
- {seriesID: 1, count: 2395,
uncompressedSizeBytes: 2109995},
- {seriesID: 1, count: 2395,
uncompressedSizeBytes: 2109995},
- {seriesID: 1, count: 2605,
uncompressedSizeBytes: 2295005},
- {seriesID: 1, count: 2395,
uncompressedSizeBytes: 2109995},
- {seriesID: 1, count: 210,
uncompressedSizeBytes: 185010},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
+ {seriesID: 1, count: 2552,
uncompressedSizeBytes: 2248312},
+ {seriesID: 1, count: 2448,
uncompressedSizeBytes: 2156688},
+ {seriesID: 1, count: 104,
uncompressedSizeBytes: 91624},
{seriesID: 2, count: 2, uncompressedSizeBytes:
110},
{seriesID: 3, count: 2, uncompressedSizeBytes:
16},
},
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index ad31ac08..58300ae8 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -36,6 +36,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
@@ -275,21 +276,25 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata)
(resourceSchema.Resourc
}
func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+ name := groupSchema.Metadata.Name
+ p := common.Position{
+ Module: "stream",
+ Database: name,
+ }
+
opts := storage.TSDBOpts[*tsTable, option]{
ShardNum:
groupSchema.ResourceOpts.ShardNum,
Location: path.Join(s.path,
groupSchema.Metadata.Name),
TSTableCreator: newTSTable,
- MetricsCreator: s.newMetrics,
+ TableMetrics: s.newMetrics(p),
SegmentInterval:
storage.MustToIntervalRule(groupSchema.ResourceOpts.SegmentInterval),
TTL:
storage.MustToIntervalRule(groupSchema.ResourceOpts.Ttl),
Option: s.option,
SeriesIndexFlushTimeoutSeconds:
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+ StorageMetricsFactory:
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues()))),
}
- name := groupSchema.Metadata.Name
return storage.OpenTSDB(
- common.SetPosition(context.Background(), func(p
common.Position) common.Position {
- p.Module = "stream"
- p.Database = name
+ common.SetPosition(context.Background(), func(_
common.Position) common.Position {
return p
}),
opts)
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index 9b78f361..f77bd41b 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -21,29 +21,337 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/meter"
)
-var streamScope = observability.RootScope.SubScope("stream")
+var (
+ streamScope = observability.RootScope.SubScope("stream")
+ tbScope = streamScope.SubScope("tst")
+ storageScope = streamScope.SubScope("storage")
+)
type metrics struct {
- totalWritten meter.Counter
+ tbMetrics
+ indexMetrics *inverted.Metrics
+ totalWritten meter.Counter
+ totalBatch meter.Counter
+ totalBatchIntroLatency meter.Counter
+
+ totalIntroduceLoopStarted meter.Counter
+ totalIntroduceLoopFinished meter.Counter
+
+ totalFlushLoopStarted meter.Counter
+ totalFlushLoopFinished meter.Counter
+ totalFlushLoopErr meter.Counter
+
+ totalMergeLoopStarted meter.Counter
+ totalMergeLoopFinished meter.Counter
+ totalMergeLoopErr meter.Counter
+
+ totalFlushLoopProgress meter.Counter
+ totalFlushed meter.Counter
+ totalFlushedMemParts meter.Counter
+ totalFlushPauseCompleted meter.Counter
+ totalFlushPauseBreak meter.Counter
+ totalFlushIntroLatency meter.Counter
+ totalFlushLatency meter.Counter
+
+ totalMergedParts meter.Counter
+ totalMergeLatency meter.Counter
+ totalMerged meter.Counter
}
-func (m *metrics) incTotalWritten(delta int) {
- if m == nil {
+func (tst *tsTable) incTotalWritten(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalWritten.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalBatch(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalBatch.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalBatchIntroLatency(delta float64) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalBatchIntroLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalIntroduceLoopStarted(delta int, phase string) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalIntroduceLoopStarted.Inc(float64(delta), phase)
+}
+
+func (tst *tsTable) incTotalIntroduceLoopFinished(delta int, phase string) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalIntroduceLoopFinished.Inc(float64(delta), phase)
+}
+
+func (tst *tsTable) incTotalFlushLoopStarted(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushLoopStarted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopFinished(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushLoopFinished.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopErr(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushLoopErr.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopStarted(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMergeLoopStarted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopFinished(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMergeLoopFinished.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopErr(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMergeLoopErr.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopProgress(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushLoopProgress.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushed(delta int) {
+ if tst == nil || tst.metrics == nil {
return
}
- m.totalWritten.Inc(float64(delta))
+ tst.metrics.totalFlushed.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushedMemParts(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushedMemParts.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushPauseCompleted(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushPauseCompleted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushPauseBreak(delta int) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushPauseBreak.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushIntroLatency(delta float64) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushIntroLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalFlushLatency(delta float64) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalFlushLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalMergedParts(delta int, typ string) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMergedParts.Inc(float64(delta), typ)
+}
+
+func (tst *tsTable) incTotalMergeLatency(delta float64, typ string) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMergeLatency.Inc(delta, typ)
+}
+
+func (tst *tsTable) incTotalMerged(delta int, typ string) {
+ if tst == nil || tst.metrics == nil {
+ return
+ }
+ tst.metrics.totalMerged.Inc(float64(delta), typ)
}
func (m *metrics) DeleteAll() {
+ if m == nil {
+ return
+ }
m.totalWritten.Delete()
+ m.totalBatch.Delete()
+ m.totalBatchIntroLatency.Delete()
+
+ m.totalIntroduceLoopStarted.Delete("mem")
+ m.totalIntroduceLoopStarted.Delete("flush")
+ m.totalIntroduceLoopStarted.Delete("merge")
+ m.totalIntroduceLoopFinished.Delete("mem")
+ m.totalIntroduceLoopFinished.Delete("flush")
+ m.totalIntroduceLoopFinished.Delete("merge")
+
+ m.totalFlushLoopStarted.Delete()
+ m.totalFlushLoopFinished.Delete()
+ m.totalFlushLoopErr.Delete()
+
+ m.totalMergeLoopStarted.Delete()
+ m.totalMergeLoopFinished.Delete()
+ m.totalMergeLoopErr.Delete()
+
+ m.totalFlushLoopProgress.Delete()
+ m.totalFlushed.Delete()
+ m.totalFlushedMemParts.Delete()
+ m.totalFlushPauseCompleted.Delete()
+ m.totalFlushPauseBreak.Delete()
+ m.totalFlushLatency.Delete()
+
+ m.totalMergedParts.Delete("mem")
+ m.totalMergeLatency.Delete("mem")
+ m.totalMerged.Delete("mem")
+ m.totalMergedParts.Delete("file")
+ m.totalMergeLatency.Delete("file")
+ m.totalMerged.Delete("file")
}
func (s *supplier) newMetrics(p common.Position) storage.Metrics {
- factory := s.omr.With(streamScope.ConstLabels(meter.LabelPairs{"group":
p.Database}))
+ factory :=
s.omr.With(tbScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(),
p.DBLabelValues())))
return &metrics{
- totalWritten: factory.NewCounter("total_written"),
+ totalWritten: factory.NewCounter("total_written"),
+ totalBatch: factory.NewCounter("total_batch"),
+ totalBatchIntroLatency:
factory.NewCounter("total_batch_intro_time"),
+ totalIntroduceLoopStarted:
factory.NewCounter("total_introduce_loop_started", "phase"),
+ totalIntroduceLoopFinished:
factory.NewCounter("total_introduce_loop_finished", "phase"),
+ totalFlushLoopStarted:
factory.NewCounter("total_flush_loop_started"),
+ totalFlushLoopFinished:
factory.NewCounter("total_flush_loop_finished"),
+ totalFlushLoopErr:
factory.NewCounter("total_flush_loop_err"),
+ totalMergeLoopStarted:
factory.NewCounter("total_merge_loop_started"),
+ totalMergeLoopFinished:
factory.NewCounter("total_merge_loop_finished"),
+ totalMergeLoopErr:
factory.NewCounter("total_merge_loop_err"),
+ totalFlushLoopProgress:
factory.NewCounter("total_flush_loop_progress"),
+ totalFlushed: factory.NewCounter("total_flushed"),
+ totalFlushedMemParts:
factory.NewCounter("total_flushed_mem_parts"),
+ totalFlushPauseCompleted:
factory.NewCounter("total_flush_pause_completed"),
+ totalFlushPauseBreak:
factory.NewCounter("total_flush_pause_break"),
+ totalFlushIntroLatency:
factory.NewCounter("total_flush_intro_latency"),
+ totalFlushLatency:
factory.NewCounter("total_flush_latency"),
+ totalMergedParts:
factory.NewCounter("total_merged_parts", "type"),
+ totalMergeLatency:
factory.NewCounter("total_merge_latency", "type"),
+ totalMerged: factory.NewCounter("total_merged",
"type"),
+ tbMetrics: tbMetrics{
+ totalMemParts:
factory.NewGauge("total_mem_part", common.ShardLabelNames()...),
+ totalMemElements:
factory.NewGauge("total_mem_elements", common.ShardLabelNames()...),
+ totalMemBlocks:
factory.NewGauge("total_mem_blocks", common.ShardLabelNames()...),
+ totalMemPartBytes:
factory.NewGauge("total_mem_part_bytes", common.ShardLabelNames()...),
+ totalMemPartUncompressedBytes:
factory.NewGauge("total_mem_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ totalFileParts:
factory.NewGauge("total_file_parts", common.ShardLabelNames()...),
+ totalFileElements:
factory.NewGauge("total_file_elements", common.ShardLabelNames()...),
+ totalFileBlocks:
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
+ totalFilePartBytes:
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
+ totalFilePartUncompressedBytes:
factory.NewGauge("total_file_part_uncompressed_bytes",
common.ShardLabelNames()...),
+ },
+ indexMetrics: inverted.NewMetrics(factory,
common.SegLabelNames()...),
}
}
+
+func (tst *tsTable) Collect(m storage.Metrics) {
+ if m == nil {
+ return
+ }
+ metrics := m.(*metrics)
+ snp := tst.currentSnapshot()
+ defer snp.decRef()
+
+ var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes,
totalMemPartUncompressedBytes uint64
+ var totalFileParts, totalFileElements, totalFileBlocks,
totalFilePartBytes, totalFilePartUncompressedBytes uint64
+ for _, p := range snp.parts {
+ if p.mp == nil {
+ totalFileParts++
+ totalFileElements += p.p.partMetadata.TotalCount
+ totalFileBlocks += p.p.partMetadata.BlocksCount
+ totalFilePartBytes +=
p.p.partMetadata.CompressedSizeBytes
+ totalFilePartUncompressedBytes +=
p.p.partMetadata.UncompressedSizeBytes
+ continue
+ }
+ totalMemPart++
+ totalMemElements += p.mp.partMetadata.TotalCount
+ totalMemBlocks += p.mp.partMetadata.BlocksCount
+ totalMemPartBytes += p.mp.partMetadata.CompressedSizeBytes
+ totalMemPartUncompressedBytes +=
p.mp.partMetadata.UncompressedSizeBytes
+ }
+ metrics.totalMemParts.Set(float64(totalMemPart),
tst.p.ShardLabelValues()...)
+ metrics.totalMemElements.Set(float64(totalMemElements),
tst.p.ShardLabelValues()...)
+ metrics.totalMemBlocks.Set(float64(totalMemBlocks),
tst.p.ShardLabelValues()...)
+ metrics.totalMemPartBytes.Set(float64(totalMemPartBytes),
tst.p.ShardLabelValues()...)
+
metrics.totalMemPartUncompressedBytes.Set(float64(totalMemPartUncompressedBytes),
tst.p.ShardLabelValues()...)
+ metrics.totalFileParts.Set(float64(totalFileParts),
tst.p.ShardLabelValues()...)
+ metrics.totalFileElements.Set(float64(totalFileElements),
tst.p.ShardLabelValues()...)
+ metrics.totalFileBlocks.Set(float64(totalFileBlocks),
tst.p.ShardLabelValues()...)
+ metrics.totalFilePartBytes.Set(float64(totalFilePartBytes),
tst.p.ShardLabelValues()...)
+
metrics.totalFilePartUncompressedBytes.Set(float64(totalFilePartUncompressedBytes),
tst.p.ShardLabelValues()...)
+ tst.index.collectMetrics(tst.p.SegLabelValues()...)
+}
+
+func (tst *tsTable) deleteMetrics() {
+ if tst.metrics == nil {
+ return
+ }
+ tst.metrics.tbMetrics.totalMemParts.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalMemElements.Delete(tst.p.ShardLabelValues()...)
+ tst.metrics.tbMetrics.totalMemBlocks.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalMemPartBytes.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalMemPartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+ tst.metrics.tbMetrics.totalFileParts.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalFileElements.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
+
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+ tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
+}
+
+type tbMetrics struct {
+ totalMemParts meter.Gauge
+ totalMemElements meter.Gauge
+ totalMemBlocks meter.Gauge
+ totalMemPartBytes meter.Gauge
+ totalMemPartUncompressedBytes meter.Gauge
+
+ totalFileParts meter.Gauge
+ totalFileElements meter.Gauge
+ totalFileBlocks meter.Gauge
+ totalFilePartBytes meter.Gauge
+ totalFilePartUncompressedBytes meter.Gauge
+}
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 228e607c..b4ad9d47 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -23,7 +23,6 @@ import (
"path/filepath"
"sort"
"sync/atomic"
- "time"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -183,7 +182,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path
string) {
}
func uncompressedElementSizeBytes(index int, es *elements) uint64 {
- n := uint64(len(time.RFC3339Nano))
+ // 8 bytes for timestamp
+ // 8 bytes for elementID
+ n := uint64(8 + 8)
for i := range es.tagFamilies[index] {
n += uint64(len(es.tagFamilies[index][i].tag))
for j := range es.tagFamilies[index][i].values {
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index 4b5d7685..9eea344d 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -339,7 +339,7 @@ func TestQueryResult(t *testing.T) {
t.Run("memory snapshot", func(t *testing.T) {
tmpPath, defFn := test.Space(require.New(t))
defer defFn()
- index, _ := newElementIndex(context.TODO(),
tmpPath, 0)
+ index, _ := newElementIndex(context.TODO(),
tmpPath, 0, nil)
tst := &tsTable{
index: index,
loopCloser: run.NewCloser(2),
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 8e6cd0e3..14997423 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -169,21 +170,23 @@ func (tst *tsTable) mustReadSnapshot(snapshot uint64)
[]uint64 {
func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
l *logger.Logger, _ timestamp.TimeRange, option option, m any,
) (*tsTable, error) {
- index, err := newElementIndex(context.TODO(), rootPath,
option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second))
- if err != nil {
- return nil, err
- }
tst := tsTable{
- index: index,
fileSystem: fileSystem,
root: rootPath,
option: option,
l: l,
p: p,
}
+ var indexMetrics *inverted.Metrics
if m != nil {
tst.metrics = m.(*metrics)
+ indexMetrics = tst.metrics.indexMetrics
+ }
+ index, err := newElementIndex(context.TODO(), rootPath,
option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second), indexMetrics)
+ if err != nil {
+ return nil, err
}
+ tst.index = index
tst.gc.init(&tst)
ee := fileSystem.ReadDir(rootPath)
if len(ee) == 0 {
@@ -255,6 +258,7 @@ func (tst *tsTable) Close() error {
}
tst.Lock()
defer tst.Unlock()
+ tst.deleteMetrics()
if tst.snapshot == nil {
return tst.index.Close()
}
@@ -277,6 +281,7 @@ func (tst *tsTable) mustAddElements(es *elements) {
ind.applied = make(chan struct{})
ind.memPart = newPartWrapper(mp, p)
ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
+ startTime := time.Now()
select {
case tst.introductions <- ind:
case <-tst.loopCloser.CloseNotify():
@@ -286,7 +291,9 @@ func (tst *tsTable) mustAddElements(es *elements) {
case <-ind.applied:
case <-tst.loopCloser.CloseNotify():
}
- tst.metrics.incTotalWritten(len(es.timestamps))
+ tst.incTotalWritten(len(es.timestamps))
+ tst.incTotalBatch(1)
+ tst.incTotalBatchIntroLatency(time.Since(startTime).Seconds())
}
type tstIter struct {
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index 054fa288..c8759b9e 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -91,7 +91,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tmpPath, _ := test.Space(require.New(t))
- index, _ := newElementIndex(context.TODO(), tmpPath, 0)
+ index, _ := newElementIndex(context.TODO(), tmpPath, 0,
nil)
tst := &tsTable{
index: index,
loopCloser: run.NewCloser(2),
@@ -229,7 +229,7 @@ func Test_tstIter(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tmpPath, defFn := test.Space(require.New(t))
- index, _ := newElementIndex(context.TODO(),
tmpPath, 0)
+ index, _ := newElementIndex(context.TODO(),
tmpPath, 0, nil)
defer defFn()
tst := &tsTable{
index: index,
diff --git a/go.mod b/go.mod
index a045afcb..703d1ee0 100644
--- a/go.mod
+++ b/go.mod
@@ -86,6 +86,7 @@ require (
github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
github.com/gorilla/websocket v1.5.1 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
+ github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
diff --git a/go.sum b/go.sum
index 9dcd2a8a..efbfc08b 100644
--- a/go.sum
+++ b/go.sum
@@ -155,6 +155,8 @@ github.com/gorilla/websocket v1.5.1
h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
github.com/gorilla/websocket v1.5.1/go.mod
h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod
h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus
v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0
h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod
h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 52d342bd..54ce8020 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -43,9 +43,9 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
- pipeline := sub.NewServer()
localPipeline := queue.Local()
metricSvc := observability.NewMetricService(metaSvc, localPipeline,
"data", nil)
+ pipeline := sub.NewServer(metricSvc)
streamSvc, err := stream.NewService(ctx, metaSvc, pipeline, metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
@@ -65,8 +65,8 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
units = append(units,
metaSvc,
localPipeline,
- pipeline,
metricSvc,
+ pipeline,
measureSvc,
streamSvc,
q,
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 66f3da41..bbb10db3 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -48,9 +48,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
localPipeline := queue.Local()
nodeSel := node.NewRoundRobinSelector(metaSvc)
nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
- grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
- profSvc := observability.NewProfService()
metricSvc := observability.NewMetricService(metaSvc, pipeline,
"liaison", nodeRegistry)
+ grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry, metricSvc)
+ profSvc := observability.NewProfService()
httpServer := http.NewServer()
dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
if err != nil {
@@ -63,14 +63,12 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
localPipeline,
pipeline,
nodeSel,
+ metricSvc,
dQuery,
grpcServer,
httpServer,
profSvc,
)
- if metricSvc != nil {
- units = append(units, metricSvc)
- }
liaisonGroup := run.NewGroup("liaison")
liaisonGroup.Register(units...)
liaisonCmd := &cobra.Command{
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 74834126..1d43927d 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -21,6 +21,7 @@ import (
"context"
"os"
+ grpcprom
"github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
"github.com/spf13/cobra"
"github.com/apache/skywalking-banyandb/api/common"
@@ -50,6 +51,9 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
+ var srvMetrics *grpcprom.ServerMetrics
+ srvMetrics.UnaryServerInterceptor()
+ srvMetrics.UnaryServerInterceptor()
measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil,
metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
@@ -58,7 +62,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
- grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
+ grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry(), metricSvc)
profSvc := observability.NewProfService()
httpServer := http.NewServer()
diff --git a/pkg/index/index.go b/pkg/index/index.go
index a7da177b..08c20615 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -184,7 +184,7 @@ type Store interface {
io.Closer
Writer
Searcher
- SizeOnDisk() int64
+ CollectMetrics(...string)
}
// Series represents a series in an index.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 866aacc7..a3915bae 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -78,14 +78,16 @@ var _ index.Store = (*store)(nil)
// StoreOpts wraps options to create an inverted index repository.
type StoreOpts struct {
Logger *logger.Logger
+ Metrics *Metrics
Path string
BatchWaitSec int64
}
type store struct {
- writer *bluge.Writer
- closer *run.Closer
- l *logger.Logger
+ writer *bluge.Writer
+ closer *run.Closer
+ l *logger.Logger
+ metrics *Metrics
}
var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch")
@@ -161,9 +163,10 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
return nil, err
}
s := &store{
- writer: w,
- l: opts.Logger,
- closer: run.NewCloser(1),
+ writer: w,
+ l: opts.Logger,
+ closer: run.NewCloser(1),
+ metrics: opts.Metrics,
}
return s, nil
}
@@ -325,11 +328,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts
index.RangeOpts) (list posti
return
}
-func (s *store) SizeOnDisk() int64 {
- _, bytes := s.writer.DirectoryStats()
- return int64(bytes)
-}
-
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
err error
diff --git a/pkg/index/inverted/metrics.go b/pkg/index/inverted/metrics.go
new file mode 100644
index 00000000..6816b90e
--- /dev/null
+++ b/pkg/index/inverted/metrics.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// Metrics is the metrics for the inverted index.
+type Metrics struct {
+ totalUpdates meter.Gauge
+ totalDeletes meter.Gauge
+ totalBatches meter.Gauge
+ totalErrors meter.Gauge
+
+ totalAnalysisTime meter.Gauge
+ totalIndexTime meter.Gauge
+
+ totalTermSearchersStarted meter.Gauge
+ totalTermSearchersFinished meter.Gauge
+
+ totalMergeStarted meter.Gauge
+ totalMergeFinished meter.Gauge
+ totalMergeLatency meter.Gauge
+ totalMergeErrors meter.Gauge
+
+ totalMemSegments meter.Gauge
+ totalFileSegments meter.Gauge
+ curOnDiskBytes meter.Gauge
+ curOnDiskFiles meter.Gauge
+
+ totalDocCount meter.Gauge
+}
+
+// NewMetrics creates a new metrics for the inverted index.
+func NewMetrics(factory *observability.Factory, labelNames ...string) *Metrics
{
+ return &Metrics{
+ totalUpdates: factory.NewGauge("inverted_index_total_updates",
labelNames...),
+ totalDeletes: factory.NewGauge("inverted_index_total_deletes",
labelNames...),
+ totalBatches: factory.NewGauge("inverted_index_total_batches",
labelNames...),
+ totalErrors: factory.NewGauge("inverted_index_total_errors",
labelNames...),
+
+ totalAnalysisTime:
factory.NewGauge("inverted_index_total_analysis_time", labelNames...),
+ totalIndexTime:
factory.NewGauge("inverted_index_total_index_time", labelNames...),
+
+ totalTermSearchersStarted:
factory.NewGauge("inverted_index_total_term_searchers_started", labelNames...),
+ totalTermSearchersFinished:
factory.NewGauge("inverted_index_total_term_searchers_finished", labelNames...),
+
+ totalMergeStarted:
factory.NewGauge("inverted_index_total_merge_started", append(labelNames,
"type")...),
+ totalMergeFinished:
factory.NewGauge("inverted_index_total_merge_finished", append(labelNames,
"type")...),
+ totalMergeLatency:
factory.NewGauge("inverted_index_total_merge_latency", append(labelNames,
"type")...),
+ totalMergeErrors:
factory.NewGauge("inverted_index_total_merge_errors", append(labelNames,
"type")...),
+
+ totalMemSegments:
factory.NewGauge("inverted_index_total_mem_segments", labelNames...),
+ totalFileSegments:
factory.NewGauge("inverted_index_total_file_segments", labelNames...),
+ curOnDiskBytes:
factory.NewGauge("inverted_index_cur_on_disk_bytes", labelNames...),
+ curOnDiskFiles:
factory.NewGauge("inverted_index_cur_on_disk_files", labelNames...),
+
+ totalDocCount:
factory.NewGauge("inverted_index_total_doc_count", labelNames...),
+ }
+}
+
+// DeleteAll deletes all metrics with the given label values.
+func (m *Metrics) DeleteAll(labelValues ...string) {
+ if m == nil {
+ return
+ }
+ m.totalUpdates.Delete(labelValues...)
+ m.totalDeletes.Delete(labelValues...)
+ m.totalBatches.Delete(labelValues...)
+ m.totalErrors.Delete(labelValues...)
+
+ m.totalAnalysisTime.Delete(labelValues...)
+ m.totalIndexTime.Delete(labelValues...)
+
+ m.totalTermSearchersStarted.Delete(labelValues...)
+ m.totalTermSearchersFinished.Delete(labelValues...)
+
+ m.totalMergeStarted.Delete(append(labelValues, "mem")...)
+ m.totalMergeFinished.Delete(append(labelValues, "mem")...)
+ m.totalMergeLatency.Delete(append(labelValues, "mem")...)
+ m.totalMergeErrors.Delete(append(labelValues, "mem")...)
+
+ m.totalMergeStarted.Delete(append(labelValues, "file")...)
+ m.totalMergeFinished.Delete(append(labelValues, "file")...)
+ m.totalMergeLatency.Delete(append(labelValues, "file")...)
+ m.totalMergeErrors.Delete(append(labelValues, "file")...)
+
+ m.totalMemSegments.Delete(labelValues...)
+ m.totalFileSegments.Delete(labelValues...)
+ m.curOnDiskBytes.Delete(labelValues...)
+ m.curOnDiskFiles.Delete(labelValues...)
+}
+
+func (s *store) CollectMetrics(labelValues ...string) {
+ if s.metrics == nil {
+ return
+ }
+ status := s.writer.Status()
+ s.metrics.totalUpdates.Set(float64(status.TotUpdates), labelValues...)
+ s.metrics.totalDeletes.Set(float64(status.TotDeletes), labelValues...)
+ s.metrics.totalBatches.Set(float64(status.TotBatches), labelValues...)
+ s.metrics.totalErrors.Set(float64(status.TotOnErrors), labelValues...)
+
+ s.metrics.totalAnalysisTime.Set(float64(status.TotAnalysisTime),
labelValues...)
+ s.metrics.totalIndexTime.Set(float64(status.TotIndexTime),
labelValues...)
+
+
s.metrics.totalTermSearchersStarted.Set(float64(status.TotTermSearchersStarted),
labelValues...)
+
s.metrics.totalTermSearchersFinished.Set(float64(status.TotTermSearchersFinished),
labelValues...)
+
+ s.metrics.totalMergeStarted.Set(float64(status.TotMemMergeZapBeg),
append(labelValues, "mem")...)
+ s.metrics.totalMergeFinished.Set(float64(status.TotMemMergeZapEnd),
append(labelValues, "mem")...)
+ s.metrics.totalMergeLatency.Set(float64(status.TotMemMergeZapTime),
append(labelValues, "mem")...)
+ s.metrics.totalMergeErrors.Set(float64(status.TotMemMergeErr),
append(labelValues, "mem")...)
+
+ s.metrics.totalMergeStarted.Set(float64(status.TotFileMergeZapBeg),
append(labelValues, "file")...)
+ s.metrics.totalMergeFinished.Set(float64(status.TotFileMergeZapEnd),
append(labelValues, "file")...)
+ s.metrics.totalMergeLatency.Set(float64(status.TotFileMergeZapTime),
append(labelValues, "file")...)
+
s.metrics.totalMergeErrors.Set(float64(status.TotFileMergeLoopErr+status.TotFileMergePlanErr+status.TotFileMergePlanTasksErr),
append(labelValues, "file")...)
+
+ s.metrics.totalMemSegments.Set(float64(status.TotMemorySegmentsAtRoot),
labelValues...)
+ s.metrics.totalFileSegments.Set(float64(status.TotFileSegmentsAtRoot),
labelValues...)
+ s.metrics.curOnDiskBytes.Set(float64(status.CurOnDiskBytes),
labelValues...)
+ s.metrics.curOnDiskFiles.Set(float64(status.CurOnDiskFiles),
labelValues...)
+
+ r, err := s.writer.Reader()
+ if err != nil {
+ return
+ }
+ defer r.Close()
+ n, err := r.Count()
+ if err != nil {
+ return
+ }
+ s.metrics.totalDocCount.Set(float64(n), labelValues...)
+}
diff --git a/pkg/meter/meter.go b/pkg/meter/meter.go
index a76ab4c2..80f30ab6 100644
--- a/pkg/meter/meter.go
+++ b/pkg/meter/meter.go
@@ -80,3 +80,12 @@ type Histogram interface {
Instrument
Observe(value float64, labelValues ...string)
}
+
+// ToLabelPairs converts the given label names and label values to a map of
label names to label values.
+func ToLabelPairs(labelNames, labelValues []string) LabelPairs {
+ labelPairs := make(LabelPairs, len(labelNames))
+ for i := range labelNames {
+ labelPairs[labelNames[i]] = labelValues[i]
+ }
+ return labelPairs
+}