This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch metrics
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/metrics by this push:
     new fddd6499 Apply metrics to main components
fddd6499 is described below

commit fddd64995010c2e18c0d137594aadb580081c3f5
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Aug 22 10:17:16 2024 +0800

    Apply metrics to main components
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/common/id.go                          |  26 ++-
 banyand/internal/storage/index.go         |  21 +-
 banyand/internal/storage/index_test.go    |   4 +-
 banyand/internal/storage/metrics.go       | 112 ++++++++++
 banyand/internal/storage/rotation.go      |  14 +-
 banyand/internal/storage/rotation_test.go |   8 +-
 banyand/internal/storage/segment.go       |  58 +++--
 banyand/internal/storage/storage.go       |   4 +-
 banyand/internal/storage/tsdb.go          |  43 +++-
 banyand/liaison/grpc/measure.go           |  40 +++-
 banyand/liaison/grpc/metrics.go           |  66 ++++++
 banyand/liaison/grpc/property.go          |  58 +++++
 banyand/liaison/grpc/registry.go          | 338 ++++++++++++++++++++++++++++--
 banyand/liaison/grpc/registry_test.go     |   4 +-
 banyand/liaison/grpc/server.go            |  33 ++-
 banyand/liaison/grpc/stream.go            |  40 +++-
 banyand/measure/merger_test.go            |  16 +-
 banyand/measure/metadata.go               |  14 +-
 banyand/measure/metrics.go                |   8 +-
 banyand/measure/part.go                   |   5 +-
 banyand/queue/sub/server.go               |  39 +++-
 banyand/queue/sub/sub.go                  |  24 +++
 banyand/stream/flusher.go                 |  82 +++++---
 banyand/stream/index.go                   |   7 +-
 banyand/stream/introducer.go              |   6 +
 banyand/stream/merger.go                  |  46 ++--
 banyand/stream/merger_test.go             |  10 +-
 banyand/stream/metadata.go                |  15 +-
 banyand/stream/metrics.go                 | 322 +++++++++++++++++++++++++++-
 banyand/stream/part.go                    |   5 +-
 banyand/stream/query_test.go              |   2 +-
 banyand/stream/tstable.go                 |  19 +-
 banyand/stream/tstable_test.go            |   4 +-
 go.mod                                    |   1 +
 go.sum                                    |   2 +
 pkg/cmdsetup/data.go                      |   4 +-
 pkg/cmdsetup/liaison.go                   |   8 +-
 pkg/cmdsetup/standalone.go                |   6 +-
 pkg/index/index.go                        |   2 +-
 pkg/index/inverted/inverted.go            |  20 +-
 pkg/index/inverted/metrics.go             | 152 ++++++++++++++
 pkg/meter/meter.go                        |   9 +
 42 files changed, 1491 insertions(+), 206 deletions(-)

diff --git a/api/common/id.go b/api/common/id.go
index 4bb0d3ec..8ed29316 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -64,24 +64,34 @@ type Position struct {
        Segment  string
 }
 
-// LabelNames returns the label names of Position.
-func LabelNames() []string {
-       return []string{"module", "database", "shard", "seg"}
+// DBLabelNames returns the label names of Position in the database level.
+func DBLabelNames() []string {
+       return []string{"group"}
+}
+
+// SegLabelNames returns the label names of Position in the segment level.
+func SegLabelNames() []string {
+       return []string{"seg"}
 }
 
 // ShardLabelNames returns the label names of Position. It is used for shard 
level metrics.
 func ShardLabelNames() []string {
-       return []string{"module", "database", "shard"}
+       return []string{"seg", "shard"}
+}
+
+// DBLabelValues returns the label values of Position in the database level.
+func (p Position) DBLabelValues() []string {
+       return []string{p.Database}
 }
 
-// LabelValues returns the label values of Position.
-func (p Position) LabelValues() []string {
-       return []string{p.Module, p.Database, p.Shard, p.Segment}
+// SegLabelValues returns the label values of Position.
+func (p Position) SegLabelValues() []string {
+       return []string{p.Segment}
 }
 
 // ShardLabelValues returns the label values of Position. It is used for shard 
level metrics.
 func (p Position) ShardLabelValues() []string {
-       return []string{p.Module, p.Database, p.Shard}
+       return []string{p.Segment, p.Shard}
 }
 
 // SetPosition sets a position returned from fn to attach it to ctx, then 
return a new context.
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index d3b059ae..5a8e4900 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -43,20 +43,28 @@ func (s *segment[T, O]) Lookup(ctx context.Context, series 
[]*pbv1.Series) (pbv1
 }
 
 type seriesIndex struct {
-       store index.SeriesStore
-       l     *logger.Logger
+       store   index.SeriesStore
+       l       *logger.Logger
+       metrics *inverted.Metrics
+       p       common.Position
 }
 
-func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64, metrics *inverted.Metrics) (*seriesIndex, error) {
        si := &seriesIndex{
                l: logger.Fetch(ctx, "series_index"),
+               p: common.GetPosition(ctx),
        }
-       var err error
-       if si.store, err = inverted.NewStore(inverted.StoreOpts{
+       opts := inverted.StoreOpts{
                Path:         path.Join(root, "sidx"),
                Logger:       si.l,
                BatchWaitSec: flushTimeoutSeconds,
-       }); err != nil {
+       }
+       if metrics != nil {
+               opts.Metrics = metrics
+               si.metrics = opts.Metrics
+       }
+       var err error
+       if si.store, err = inverted.NewStore(opts); err != nil {
                return nil, err
        }
        return si, nil
@@ -270,5 +278,6 @@ func (s *seriesIndex) Search(ctx context.Context, series 
[]*pbv1.Series, opts In
 }
 
 func (s *seriesIndex) Close() error {
+       s.metrics.DeleteAll(s.p.SegLabelValues()...)
        return s.store.Close()
 }
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 2b196ec7..304bcff9 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -36,7 +36,7 @@ import (
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
-       si, err := newSeriesIndex(ctx, path, 0)
+       si, err := newSeriesIndex(ctx, path, 0, nil)
        require.NoError(t, err)
        defer func() {
                require.NoError(t, si.Close())
@@ -66,7 +66,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        require.NoError(t, si.Write(docs))
        // Restart the index
        require.NoError(t, si.Close())
-       si, err = newSeriesIndex(ctx, path, 0)
+       si, err = newSeriesIndex(ctx, path, 0, nil)
        require.NoError(t, err)
        tests := []struct {
                name         string
diff --git a/banyand/internal/storage/metrics.go 
b/banyand/internal/storage/metrics.go
new file mode 100644
index 00000000..54f9d81a
--- /dev/null
+++ b/banyand/internal/storage/metrics.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+       lastTickTime meter.Gauge
+       totalSegRefs meter.Gauge
+
+       totalRotationStarted  meter.Counter
+       totalRotationFinished meter.Counter
+       totalRotationErr      meter.Counter
+
+       totalRetentionStarted        meter.Counter
+       totalRetentionFinished       meter.Counter
+       totalRetentionHasData        meter.Counter
+       totalRetentionErr            meter.Counter
+       totalRetentionHasDataLatency meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+       if factory == nil {
+               return nil
+       }
+       return &metrics{
+               lastTickTime:                 
factory.NewGauge("last_tick_time"),
+               totalSegRefs:                 
factory.NewGauge("total_segment_refs"),
+               totalRotationStarted:         
factory.NewCounter("total_rotation_started"),
+               totalRotationFinished:        
factory.NewCounter("total_rotation_finished"),
+               totalRotationErr:             
factory.NewCounter("total_rotation_err"),
+               totalRetentionStarted:        
factory.NewCounter("total_retention_started"),
+               totalRetentionFinished:       
factory.NewCounter("total_retention_finished"),
+               totalRetentionErr:            
factory.NewCounter("total_retention_err"),
+               totalRetentionHasDataLatency: 
factory.NewCounter("total_retention_has_data_latency"),
+               totalRetentionHasData:        
factory.NewCounter("total_retention_has_data"),
+       }
+}
+
+func (d *database[T, O]) incTotalRotationStarted(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRotationStarted.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRotationFinished(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRotationFinished.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRotationErr(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRotationErr.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionStarted(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRetentionStarted.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionFinished(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRetentionFinished.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionHasData(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRetentionHasData.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionErr(delta int) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRetentionErr.Inc(float64(delta))
+}
+
+func (d *database[T, O]) incTotalRetentionHasDataLatency(delta float64) {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.totalRetentionHasDataLatency.Inc(delta)
+}
diff --git a/banyand/internal/storage/rotation.go 
b/banyand/internal/storage/rotation.go
index 9850dc62..ab18daec 100644
--- a/banyand/internal/storage/rotation.go
+++ b/banyand/internal/storage/rotation.go
@@ -68,11 +68,14 @@ func (d *database[T, O]) startRotationTask() error {
                                        if gap <= 0 || gap > newSegmentTimeGap {
                                                return
                                        }
+                                       d.incTotalRotationStarted(1)
+                                       defer d.incTotalRotationFinished(1)
                                        start := 
d.segmentController.opts.SegmentInterval.nextTime(t)
                                        d.logger.Info().Time("segment_start", 
start).Time("event_time", t).Msg("create new segment")
                                        _, err := 
d.segmentController.create(start)
                                        if err != nil {
                                                
d.logger.Error().Err(err).Msgf("failed to create new segment.")
+                                               d.incTotalRotationErr(1)
                                        }
                                }()
                        }(ts)
@@ -110,9 +113,18 @@ func (rc *retentionTask[T, O]) run(now time.Time, l 
*logger.Logger) bool {
                <-rc.running
        }()
 
+       rc.database.incTotalRetentionStarted(1)
+       defer rc.database.incTotalRetentionFinished(1)
        deadline := now.Add(-rc.duration)
-       if err := rc.database.segmentController.remove(deadline); err != nil {
+       start := time.Now()
+       hasData, err := rc.database.segmentController.remove(deadline)
+       if hasData {
+               rc.database.incTotalRetentionHasData(1)
+               
rc.database.incTotalRetentionHasDataLatency(time.Since(start).Seconds())
+       }
+       if err != nil {
                l.Error().Err(err)
+               rc.database.incTotalRetentionErr(1)
        }
        return true
 }
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index 69fdfab2..e9b1c9da 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -26,6 +26,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -140,7 +141,6 @@ func setUpDB(t *testing.T) (*database[*MockTSTable, any], 
timestamp.MockClock, *
                TTL:             IntervalRule{Unit: DAY, Num: 3},
                ShardNum:        1,
                TSTableCreator:  MockTSTableCreator,
-               MetricsCreator:  MockMetricsCreator,
        }
        ctx := context.Background()
        mc := timestamp.NewMockClock()
@@ -169,6 +169,8 @@ func (m *MockTSTable) Close() error {
        return nil
 }
 
+func (m *MockTSTable) Collect(_ Metrics) {}
+
 var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position,
        _ *logger.Logger, _ timestamp.TimeRange, _, _ any,
 ) (*MockTSTable, error) {
@@ -179,4 +181,8 @@ type MockMetrics struct{}
 
 func (m *MockMetrics) DeleteAll() {}
 
+func (m *MockMetrics) Factory() *observability.Factory {
+       return nil
+}
+
 var MockMetricsCreator = func(_ common.Position) Metrics { return 
&MockMetrics{} }
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 1b184794..d3e0e214 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -33,6 +33,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -57,14 +58,19 @@ type segment[T TSTable, O any] struct {
 }
 
 func (sc *segmentController[T, O]) openSegment(ctx context.Context, startTime, 
endTime time.Time, path, suffix string,
-       p common.Position, opts TSDBOpts[T, O],
+       opts TSDBOpts[T, O],
 ) (s *segment[T, O], err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
                return nil, err
        }
+       p := common.GetPosition(ctx)
+       p.Segment = suffix
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return p
+       })
        id := generateSegID(sc.opts.SegmentInterval.Unit, suffixInteger)
-       sir, err := newSeriesIndex(ctx, path, 
sc.opts.SeriesIndexFlushTimeoutSeconds)
+       sir, err := newSeriesIndex(ctx, path, 
sc.opts.SeriesIndexFlushTimeoutSeconds, sc.indexMetrics)
        if err != nil {
                return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
        }
@@ -195,28 +201,30 @@ func (s *segment[T, O]) String() string {
 }
 
 type segmentController[T TSTable, O any] struct {
-       clock    timestamp.Clock
-       metrics  Metrics
-       l        *logger.Logger
-       position common.Position
-       location string
-       lst      []*segment[T, O]
-       opts     TSDBOpts[T, O]
-       deadline atomic.Int64
+       clock        timestamp.Clock
+       metrics      Metrics
+       l            *logger.Logger
+       indexMetrics *inverted.Metrics
+       position     common.Position
+       location     string
+       lst          []*segment[T, O]
+       opts         TSDBOpts[T, O]
+       deadline     atomic.Int64
        sync.RWMutex
 }
 
 func newSegmentController[T TSTable, O any](ctx context.Context, location 
string,
-       l *logger.Logger, opts TSDBOpts[T, O], metrics Metrics,
+       l *logger.Logger, opts TSDBOpts[T, O], indexMetrics *inverted.Metrics, 
metrics Metrics,
 ) *segmentController[T, O] {
        clock, _ := timestamp.GetClock(ctx)
        return &segmentController[T, O]{
-               location: location,
-               opts:     opts,
-               l:        l,
-               clock:    clock,
-               position: common.GetPosition(ctx),
-               metrics:  metrics,
+               location:     location,
+               opts:         opts,
+               l:            l,
+               clock:        clock,
+               position:     common.GetPosition(ctx),
+               metrics:      metrics,
+               indexMetrics: indexMetrics,
        }
 }
 
@@ -367,9 +375,10 @@ func (sc *segmentController[T, O]) sortLst() {
 func (sc *segmentController[T, O]) load(start, end time.Time, root string) 
(seg *segment[T, O], err error) {
        suffix := sc.format(start)
        segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
-       p := sc.position
-       p.Segment = suffix
-       seg, err = sc.openSegment(context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, p, sc.opts)
+       ctx := common.SetPosition(context.WithValue(context.Background(), 
logger.ContextKey, sc.l), func(_ common.Position) common.Position {
+               return sc.position
+       })
+       seg, err = sc.openSegment(ctx, start, end, segPath, suffix, sc.opts)
        if err != nil {
                return nil, err
        }
@@ -378,9 +387,10 @@ func (sc *segmentController[T, O]) load(start, end 
time.Time, root string) (seg
        return seg, nil
 }
 
-func (sc *segmentController[T, O]) remove(deadline time.Time) (err error) {
+func (sc *segmentController[T, O]) remove(deadline time.Time) (hasSegment 
bool, err error) {
        for _, s := range sc.segments() {
                if s.Before(deadline) {
+                       hasSegment = true
                        s.delete()
                        sc.Lock()
                        sc.removeSeg(s.id)
@@ -389,7 +399,7 @@ func (sc *segmentController[T, O]) remove(deadline 
time.Time) (err error) {
                }
                s.DecRef()
        }
-       return err
+       return hasSegment, err
 }
 
 func (sc *segmentController[T, O]) removeSeg(segID segmentID) {
@@ -413,7 +423,9 @@ func (sc *segmentController[T, O]) close() {
                s.DecRef()
        }
        sc.lst = sc.lst[:0]
-       sc.metrics.DeleteAll()
+       if sc.metrics != nil {
+               sc.metrics.DeleteAll()
+       }
 }
 
 func loadSegments[T TSTable, O any](root, prefix string, parser 
*segmentController[T, O], intervalRule IntervalRule, loadFn func(start, end 
time.Time) error) error {
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index d1bb070c..4bcc43d8 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -105,6 +105,7 @@ type Segment[T TSTable, O any] interface {
 // TSTable is time series table.
 type TSTable interface {
        io.Closer
+       Collect(Metrics)
 }
 
 // TSTableCreator creates a TSTable.
@@ -117,9 +118,6 @@ type Metrics interface {
        DeleteAll()
 }
 
-// MetricsCreator creates a Metrics.
-type MetricsCreator func(position common.Position) Metrics
-
 // IntervalUnit denotes the unit of a time point.
 type IntervalUnit int
 
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 03d8b062..d1539d18 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -28,7 +28,9 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -50,13 +52,14 @@ const (
 // TSDBOpts wraps options to create a tsdb.
 type TSDBOpts[T TSTable, O any] struct {
        Option                         O
+       TableMetrics                   Metrics
        TSTableCreator                 TSTableCreator[T, O]
-       MetricsCreator                 MetricsCreator
+       StorageMetricsFactory          *observability.Factory
        Location                       string
        SegmentInterval                IntervalRule
        TTL                            IntervalRule
-       ShardNum                       uint32
        SeriesIndexFlushTimeoutSeconds int64
+       ShardNum                       uint32
 }
 
 type (
@@ -73,10 +76,11 @@ type database[T TSTable, O any] struct {
        scheduler         *timestamp.Scheduler
        tsEventCh         chan int64
        segmentController *segmentController[T, O]
-       p                 common.Position
-       location          string
-       opts              TSDBOpts[T, O]
-       latestTickTime    atomic.Int64
+       *metrics
+       p              common.Position
+       location       string
+       opts           TSDBOpts[T, O]
+       latestTickTime atomic.Int64
        sync.RWMutex
        rotationProcessOn atomic.Bool
 }
@@ -109,6 +113,11 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
        l := logger.Fetch(ctx, p.Database)
        clock, _ := timestamp.GetClock(ctx)
        scheduler := timestamp.NewScheduler(l, clock)
+
+       var indexMetrics *inverted.Metrics
+       if opts.StorageMetricsFactory != nil {
+               indexMetrics = inverted.NewMetrics(opts.StorageMetricsFactory, 
common.SegLabelNames()...)
+       }
        db := &database[T, O]{
                location:  location,
                scheduler: scheduler,
@@ -117,7 +126,8 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
                tsEventCh: make(chan int64),
                p:         p,
                segmentController: newSegmentController[T](ctx, location,
-                       l, opts, opts.MetricsCreator(p)),
+                       l, opts, indexMetrics, opts.TableMetrics),
+               metrics: newMetrics(opts.StorageMetricsFactory),
        }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        lockPath := filepath.Join(opts.Location, lockFilename)
@@ -129,6 +139,7 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
        if err := db.segmentController.open(); err != nil {
                return nil, err
        }
+       observability.MetricsCollector.Register(location, db.collect)
        return db, db.startRotationTask()
 }
 
@@ -140,6 +151,24 @@ func (d *database[T, O]) SelectSegments(timeRange 
timestamp.TimeRange) []Segment
        return d.segmentController.selectSegments(timeRange)
 }
 
+func (d *database[T, O]) collect() {
+       if d.metrics == nil {
+               return
+       }
+       d.metrics.lastTickTime.Set(float64(d.latestTickTime.Load()))
+       refCount := int32(0)
+       ss := d.segmentController.segments()
+       for _, s := range ss {
+               for _, t := range s.Tables() {
+                       t.Collect(d.segmentController.metrics)
+               }
+               s.index.store.CollectMetrics(s.index.p.SegLabelValues()...)
+               s.DecRef()
+               refCount += atomic.LoadInt32(&s.refCount)
+       }
+       d.totalSegRefs.Set(float64(refCount))
+}
+
 type walkFn func(suffix string) error
 
 func walkDir(root, prefix string, wf walkFn) error {
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 392fedca..de1a860d 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -43,12 +43,13 @@ import (
 
 type measureService struct {
        measurev1.UnimplementedMeasureServiceServer
-       *discoveryService
-       sampled            *logger.Logger
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
-       writeTimeout       time.Duration
+       *discoveryService
+       sampled      *logger.Logger
+       metrics      *metrics
+       writeTimeout time.Duration
 }
 
 func (ms *measureService) setLogger(log *logger.Logger) {
@@ -65,13 +66,24 @@ func (ms *measureService) activeIngestionAccessLog(root 
string) (err error) {
 
 func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) 
error {
        reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, measure measurev1.MeasureService_WriteServer, logger 
*logger.Logger) {
+               if status != modelv1.Status_STATUS_SUCCEED {
+                       ms.metrics.totalStreamMsgReceivedErr.Inc(1, 
metadata.Group, "measure", "write")
+               }
+               ms.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"measure", "write")
                if errResp := measure.Send(&measurev1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
                        logger.Debug().Err(errResp).Msg("failed to send 
response")
+                       ms.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"measure", "write")
                }
        }
        ctx := measure.Context()
        publisher := ms.pipeline.NewBatchPublisher(ms.writeTimeout)
-       defer publisher.Close()
+       ms.metrics.totalStreamStarted.Inc(1, "measure", "write")
+       start := time.Now()
+       defer func() {
+               publisher.Close()
+               ms.metrics.totalStreamFinished.Inc(1, "measure", "write")
+               ms.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), 
"measure", "write")
+       }()
        for {
                select {
                case <-ctx.Done():
@@ -83,9 +95,12 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        return nil
                }
                if err != nil {
-                       ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to receive message")
+                       if !errors.Is(err, context.DeadlineExceeded) && 
!errors.Is(err, context.Canceled) {
+                               ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to receive message")
+                       }
                        return err
                }
+               ms.metrics.totalStreamMsgReceived.Inc(1, 
writeRequest.Metadata.Group, "measure", "write")
                if errTime := 
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
                        ms.sampled.Error().Err(errTime).Stringer("written", 
writeRequest).Msg("the data point time is invalid")
                        reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, 
ms.sampled)
@@ -140,13 +155,26 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeRequest.GetMessageId(), measure, 
ms.sampled)
                        continue
                }
-               reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeRequest.GetMessageId(), measure, ms.sampled)
+               reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_SUCCEED, writeRequest.GetMessageId(), measure, ms.sampled)
        }
 }
 
 var emptyMeasureQueryResponse = &measurev1.QueryResponse{DataPoints: 
make([]*measurev1.DataPoint, 0)}
 
 func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest) (resp *measurev1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               ms.metrics.totalStarted.Inc(1, g, "measure", "query")
+       }
+       start := time.Now()
+       defer func() {
+               for _, g := range req.Groups {
+                       ms.metrics.totalFinished.Inc(1, g, "measure", "query")
+                       if err != nil {
+                               ms.metrics.totalErr.Inc(1, g, "measure", 
"query")
+                       }
+                       
ms.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, "measure", "query")
+               }
+       }()
        if err = timestamp.CheckTimeRange(req.GetTimeRange()); err != nil {
                return nil, status.Errorf(codes.InvalidArgument, "%v is invalid 
:%s", req.GetTimeRange(), err)
        }
diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go
new file mode 100644
index 00000000..dee5a378
--- /dev/null
+++ b/banyand/liaison/grpc/metrics.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+type metrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+
+       totalStreamStarted  meter.Counter
+       totalStreamFinished meter.Counter
+       totalStreamErr      meter.Counter
+       totalStreamLatency  meter.Counter
+
+       totalStreamMsgReceived    meter.Counter
+       totalStreamMsgReceivedErr meter.Counter
+       totalStreamMsgSent        meter.Counter
+       totalStreamMsgSentErr     meter.Counter
+
+       totalRegistryStarted  meter.Counter
+       totalRegistryFinished meter.Counter
+       totalRegistryErr      meter.Counter
+       totalRegistryLatency  meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+       return &metrics{
+               totalStarted:              factory.NewCounter("total_started", 
"group", "service", "method"),
+               totalFinished:             factory.NewCounter("total_finished", 
"group", "service", "method"),
+               totalErr:                  factory.NewCounter("total_err", 
"group", "service", "method"),
+               totalLatency:              factory.NewCounter("total_latency", 
"group", "service", "method"),
+               totalStreamStarted:        
factory.NewCounter("total_stream_started", "service", "method"),
+               totalStreamFinished:       
factory.NewCounter("total_stream_finished", "service", "method"),
+               totalStreamErr:            
factory.NewCounter("total_stream_err", "service", "method"),
+               totalStreamLatency:        
factory.NewCounter("total_stream_latency", "service", "method"),
+               totalStreamMsgReceived:    
factory.NewCounter("total_stream_msg_received", "group", "service", "method"),
+               totalStreamMsgReceivedErr: 
factory.NewCounter("total_stream_msg_received_err", "group", "service", 
"method"),
+               totalStreamMsgSent:        
factory.NewCounter("total_stream_msg_sent", "group", "service", "method"),
+               totalStreamMsgSentErr:     
factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"),
+               totalRegistryStarted:      
factory.NewCounter("total_registry_started", "group", "service", "method"),
+               totalRegistryFinished:     
factory.NewCounter("total_registry_finished", "group", "service", "method"),
+               totalRegistryErr:          
factory.NewCounter("total_registry_err", "group", "service", "method"),
+               totalRegistryLatency:      
factory.NewCounter("total_registry_latency", "group", "service", "method"),
+       }
+}
diff --git a/banyand/liaison/grpc/property.go b/banyand/liaison/grpc/property.go
index 6c0bcbc5..ca4fa61f 100644
--- a/banyand/liaison/grpc/property.go
+++ b/banyand/liaison/grpc/property.go
@@ -15,10 +15,28 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, "property",
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
 package grpc
 
 import (
        "context"
+       "time"
 
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -27,19 +45,36 @@ import (
 type propertyServer struct {
        propertyv1.UnimplementedPropertyServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
 func (ps *propertyServer) Apply(ctx context.Context, req 
*propertyv1.ApplyRequest) (*propertyv1.ApplyResponse, error) {
+       g := req.Property.Metadata.Container.Group
+       ps.metrics.totalStarted.Inc(1, g, "property", "apply")
+       start := time.Now()
+       defer func() {
+               ps.metrics.totalFinished.Inc(1, g, "property", "apply")
+               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, 
"property", "apply")
+       }()
        created, tagsNum, leaseID, err := 
ps.schemaRegistry.PropertyRegistry().ApplyProperty(ctx, req.Property, 
req.Strategy)
        if err != nil {
+               ps.metrics.totalErr.Inc(1, g, "property", "apply")
                return nil, err
        }
        return &propertyv1.ApplyResponse{Created: created, TagsNum: tagsNum, 
LeaseId: leaseID}, nil
 }
 
 func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequest) (*propertyv1.DeleteResponse, error) {
+       g := req.Metadata.Container.Group
+       ps.metrics.totalStarted.Inc(1, g, "property", "delete")
+       start := time.Now()
+       defer func() {
+               ps.metrics.totalFinished.Inc(1, g, "property", "delete")
+               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, 
"property", "delete")
+       }()
        ok, tagsNum, err := 
ps.schemaRegistry.PropertyRegistry().DeleteProperty(ctx, req.GetMetadata(), 
req.Tags)
        if err != nil {
+               ps.metrics.totalErr.Inc(1, g, "property", "delete")
                return nil, err
        }
        return &propertyv1.DeleteResponse{
@@ -49,8 +84,16 @@ func (ps *propertyServer) Delete(ctx context.Context, req 
*propertyv1.DeleteRequ
 }
 
 func (ps *propertyServer) Get(ctx context.Context, req *propertyv1.GetRequest) 
(*propertyv1.GetResponse, error) {
+       g := req.Metadata.Container.Group
+       ps.metrics.totalStarted.Inc(1, g, "property", "get")
+       start := time.Now()
+       defer func() {
+               ps.metrics.totalFinished.Inc(1, g, "property", "get")
+               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, 
"property", "get")
+       }()
        entity, err := ps.schemaRegistry.PropertyRegistry().GetProperty(ctx, 
req.GetMetadata(), req.GetTags())
        if err != nil {
+               ps.metrics.totalErr.Inc(1, g, "property", "get")
                return nil, err
        }
        return &propertyv1.GetResponse{
@@ -59,8 +102,16 @@ func (ps *propertyServer) Get(ctx context.Context, req 
*propertyv1.GetRequest) (
 }
 
 func (ps *propertyServer) List(ctx context.Context, req 
*propertyv1.ListRequest) (*propertyv1.ListResponse, error) {
+       g := req.Container.Group
+       ps.metrics.totalStarted.Inc(1, g, "property", "list")
+       start := time.Now()
+       defer func() {
+               ps.metrics.totalFinished.Inc(1, g, "property", "list")
+               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), g, 
"property", "list")
+       }()
        entities, err := ps.schemaRegistry.PropertyRegistry().ListProperty(ctx, 
req.GetContainer(), req.Ids, req.Tags)
        if err != nil {
+               ps.metrics.totalErr.Inc(1, g, "property", "list")
                return nil, err
        }
        return &propertyv1.ListResponse{
@@ -69,8 +120,15 @@ func (ps *propertyServer) List(ctx context.Context, req 
*propertyv1.ListRequest)
 }
 
 func (ps *propertyServer) KeepAlive(ctx context.Context, req 
*propertyv1.KeepAliveRequest) (*propertyv1.KeepAliveResponse, error) {
+       ps.metrics.totalStarted.Inc(1, "", "property", "keep_alive")
+       start := time.Now()
+       defer func() {
+               ps.metrics.totalFinished.Inc(1, "", "property", "keep_alive")
+               ps.metrics.totalLatency.Inc(time.Since(start).Seconds(), "", 
"property", "keep_alive")
+       }()
        err := ps.schemaRegistry.PropertyRegistry().KeepAlive(ctx, 
req.GetLeaseId())
        if err != nil {
+               ps.metrics.totalErr.Inc(1, "", "property", "keep_alive")
                return nil, err
        }
        return &propertyv1.KeepAliveResponse{}, nil
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 511d4e9b..ef3a59e7 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -20,6 +20,7 @@ package grpc
 import (
        "context"
        "errors"
+       "time"
 
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
@@ -33,13 +34,22 @@ import (
 type streamRegistryServer struct {
        databasev1.UnimplementedStreamRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
 func (rs *streamRegistryServer) Create(ctx context.Context,
        req *databasev1.StreamRegistryServiceCreateRequest,
 ) (*databasev1.StreamRegistryServiceCreateResponse, error) {
+       g := req.Stream.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "create")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "create")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"create")
+       }()
        modRevision, err := 
rs.schemaRegistry.StreamRegistry().CreateStream(ctx, req.GetStream())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "create")
                return nil, err
        }
        return &databasev1.StreamRegistryServiceCreateResponse{
@@ -50,8 +60,16 @@ func (rs *streamRegistryServer) Create(ctx context.Context,
 func (rs *streamRegistryServer) Update(ctx context.Context,
        req *databasev1.StreamRegistryServiceUpdateRequest,
 ) (*databasev1.StreamRegistryServiceUpdateResponse, error) {
+       g := req.Stream.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "update")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "update")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"update")
+       }()
        modRevision, err := 
rs.schemaRegistry.StreamRegistry().UpdateStream(ctx, req.GetStream())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "update")
                return nil, err
        }
        return &databasev1.StreamRegistryServiceUpdateResponse{
@@ -62,8 +80,16 @@ func (rs *streamRegistryServer) Update(ctx context.Context,
 func (rs *streamRegistryServer) Delete(ctx context.Context,
        req *databasev1.StreamRegistryServiceDeleteRequest,
 ) (*databasev1.StreamRegistryServiceDeleteResponse, error) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "delete")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "delete")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"delete")
+       }()
        ok, err := rs.schemaRegistry.StreamRegistry().DeleteStream(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "delete")
                return nil, err
        }
        return &databasev1.StreamRegistryServiceDeleteResponse{
@@ -74,8 +100,16 @@ func (rs *streamRegistryServer) Delete(ctx context.Context,
 func (rs *streamRegistryServer) Get(ctx context.Context,
        req *databasev1.StreamRegistryServiceGetRequest,
 ) (*databasev1.StreamRegistryServiceGetResponse, error) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "get")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "get")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"get")
+       }()
        entity, err := rs.schemaRegistry.StreamRegistry().GetStream(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "get")
                return nil, err
        }
        return &databasev1.StreamRegistryServiceGetResponse{
@@ -86,8 +120,16 @@ func (rs *streamRegistryServer) Get(ctx context.Context,
 func (rs *streamRegistryServer) List(ctx context.Context,
        req *databasev1.StreamRegistryServiceListRequest,
 ) (*databasev1.StreamRegistryServiceListResponse, error) {
+       g := req.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "list")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "list")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"list")
+       }()
        entities, err := rs.schemaRegistry.StreamRegistry().ListStream(ctx, 
schema.ListOpt{Group: req.GetGroup()})
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "list")
                return nil, err
        }
        return &databasev1.StreamRegistryServiceListResponse{
@@ -96,6 +138,13 @@ func (rs *streamRegistryServer) List(ctx context.Context,
 }
 
 func (rs *streamRegistryServer) Exist(ctx context.Context, req 
*databasev1.StreamRegistryServiceExistRequest) 
(*databasev1.StreamRegistryServiceExistResponse, error) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "stream", "exist")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "stream", "exist")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "stream", 
"exist")
+       }()
        _, err := rs.Get(ctx, 
&databasev1.StreamRegistryServiceGetRequest{Metadata: req.Metadata})
        if err == nil {
                return &databasev1.StreamRegistryServiceExistResponse{
@@ -105,6 +154,7 @@ func (rs *streamRegistryServer) Exist(ctx context.Context, 
req *databasev1.Strea
        }
        exist, errGroup := groupExist(ctx, err, req.Metadata, 
rs.schemaRegistry.GroupRegistry())
        if errGroup != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "stream", "exist")
                return nil, errGroup
        }
        return &databasev1.StreamRegistryServiceExistResponse{HasGroup: exist, 
HasStream: false}, nil
@@ -130,13 +180,22 @@ func groupExist(ctx context.Context, errResource error, 
metadata *commonv1.Metad
 type indexRuleBindingRegistryServer struct {
        databasev1.UnimplementedIndexRuleBindingRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
 func (rs *indexRuleBindingRegistryServer) Create(ctx context.Context,
        req *databasev1.IndexRuleBindingRegistryServiceCreateRequest) (
        *databasev1.IndexRuleBindingRegistryServiceCreateResponse, error,
 ) {
+       g := req.IndexRuleBinding.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "create")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"create")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "create")
+       }()
        if err := 
rs.schemaRegistry.IndexRuleBindingRegistry().CreateIndexRuleBinding(ctx, 
req.GetIndexRuleBinding()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", 
"create")
                return nil, err
        }
        return &databasev1.IndexRuleBindingRegistryServiceCreateResponse{}, nil
@@ -146,7 +205,15 @@ func (rs *indexRuleBindingRegistryServer) Update(ctx 
context.Context,
        req *databasev1.IndexRuleBindingRegistryServiceUpdateRequest) (
        *databasev1.IndexRuleBindingRegistryServiceUpdateResponse, error,
 ) {
+       g := req.IndexRuleBinding.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "update")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"update")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "update")
+       }()
        if err := 
rs.schemaRegistry.IndexRuleBindingRegistry().UpdateIndexRuleBinding(ctx, 
req.GetIndexRuleBinding()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", 
"update")
                return nil, err
        }
        return &databasev1.IndexRuleBindingRegistryServiceUpdateResponse{}, nil
@@ -156,8 +223,16 @@ func (rs *indexRuleBindingRegistryServer) Delete(ctx 
context.Context,
        req *databasev1.IndexRuleBindingRegistryServiceDeleteRequest) (
        *databasev1.IndexRuleBindingRegistryServiceDeleteResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "delete")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"delete")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "delete")
+       }()
        ok, err := 
rs.schemaRegistry.IndexRuleBindingRegistry().DeleteIndexRuleBinding(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", 
"delete")
                return nil, err
        }
        return &databasev1.IndexRuleBindingRegistryServiceDeleteResponse{
@@ -169,8 +244,16 @@ func (rs *indexRuleBindingRegistryServer) Get(ctx 
context.Context,
        req *databasev1.IndexRuleBindingRegistryServiceGetRequest) (
        *databasev1.IndexRuleBindingRegistryServiceGetResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "get")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"get")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "get")
+       }()
        entity, err := 
rs.schemaRegistry.IndexRuleBindingRegistry().GetIndexRuleBinding(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", "get")
                return nil, err
        }
        return &databasev1.IndexRuleBindingRegistryServiceGetResponse{
@@ -182,9 +265,17 @@ func (rs *indexRuleBindingRegistryServer) List(ctx 
context.Context,
        req *databasev1.IndexRuleBindingRegistryServiceListRequest) (
        *databasev1.IndexRuleBindingRegistryServiceListResponse, error,
 ) {
+       g := req.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "list")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"list")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "list")
+       }()
        entities, err := rs.schemaRegistry.IndexRuleBindingRegistry().
                ListIndexRuleBinding(ctx, schema.ListOpt{Group: req.GetGroup()})
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", 
"list")
                return nil, err
        }
        return &databasev1.IndexRuleBindingRegistryServiceListResponse{
@@ -195,6 +286,13 @@ func (rs *indexRuleBindingRegistryServer) List(ctx 
context.Context,
 func (rs *indexRuleBindingRegistryServer) Exist(ctx context.Context, req 
*databasev1.IndexRuleBindingRegistryServiceExistRequest) (
        *databasev1.IndexRuleBindingRegistryServiceExistResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRuleBinding", "exist")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRuleBinding", 
"exist")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRuleBinding", "exist")
+       }()
        _, err := rs.Get(ctx, 
&databasev1.IndexRuleBindingRegistryServiceGetRequest{Metadata: req.Metadata})
        if err == nil {
                return &databasev1.IndexRuleBindingRegistryServiceExistResponse{
@@ -204,6 +302,7 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx 
context.Context, req *databa
        }
        exist, errGroup := groupExist(ctx, err, req.Metadata, 
rs.schemaRegistry.GroupRegistry())
        if errGroup != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRuleBinding", 
"exist")
                return nil, errGroup
        }
        return 
&databasev1.IndexRuleBindingRegistryServiceExistResponse{HasGroup: exist, 
HasIndexRuleBinding: false}, nil
@@ -212,31 +311,59 @@ func (rs *indexRuleBindingRegistryServer) Exist(ctx 
context.Context, req *databa
 type indexRuleRegistryServer struct {
        databasev1.UnimplementedIndexRuleRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
-func (rs *indexRuleRegistryServer) Create(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceCreateRequest) (
+func (rs *indexRuleRegistryServer) Create(ctx context.Context,
+       req *databasev1.IndexRuleRegistryServiceCreateRequest) (
        *databasev1.IndexRuleRegistryServiceCreateResponse, error,
 ) {
+       g := req.IndexRule.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "create")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", 
"create")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "create")
+       }()
        if err := rs.schemaRegistry.IndexRuleRegistry().CreateIndexRule(ctx, 
req.GetIndexRule()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "create")
                return nil, err
        }
        return &databasev1.IndexRuleRegistryServiceCreateResponse{}, nil
 }
 
-func (rs *indexRuleRegistryServer) Update(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceUpdateRequest) (
+func (rs *indexRuleRegistryServer) Update(ctx context.Context,
+       req *databasev1.IndexRuleRegistryServiceUpdateRequest) (
        *databasev1.IndexRuleRegistryServiceUpdateResponse, error,
 ) {
+       g := req.IndexRule.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "update")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", 
"update")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "update")
+       }()
        if err := rs.schemaRegistry.IndexRuleRegistry().UpdateIndexRule(ctx, 
req.GetIndexRule()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "update")
                return nil, err
        }
        return &databasev1.IndexRuleRegistryServiceUpdateResponse{}, nil
 }
 
-func (rs *indexRuleRegistryServer) Delete(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceDeleteRequest) (
+func (rs *indexRuleRegistryServer) Delete(ctx context.Context,
+       req *databasev1.IndexRuleRegistryServiceDeleteRequest) (
        *databasev1.IndexRuleRegistryServiceDeleteResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "delete")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", 
"delete")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "delete")
+       }()
        ok, err := rs.schemaRegistry.IndexRuleRegistry().DeleteIndexRule(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "delete")
                return nil, err
        }
        return &databasev1.IndexRuleRegistryServiceDeleteResponse{
@@ -244,11 +371,20 @@ func (rs *indexRuleRegistryServer) Delete(ctx 
context.Context, req *databasev1.I
        }, nil
 }
 
-func (rs *indexRuleRegistryServer) Get(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceGetRequest) (
+func (rs *indexRuleRegistryServer) Get(ctx context.Context,
+       req *databasev1.IndexRuleRegistryServiceGetRequest) (
        *databasev1.IndexRuleRegistryServiceGetResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "get")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "get")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "get")
+       }()
        entity, err := rs.schemaRegistry.IndexRuleRegistry().GetIndexRule(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "get")
                return nil, err
        }
        return &databasev1.IndexRuleRegistryServiceGetResponse{
@@ -256,11 +392,20 @@ func (rs *indexRuleRegistryServer) Get(ctx 
context.Context, req *databasev1.Inde
        }, nil
 }
 
-func (rs *indexRuleRegistryServer) List(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceListRequest) (
+func (rs *indexRuleRegistryServer) List(ctx context.Context,
+       req *databasev1.IndexRuleRegistryServiceListRequest) (
        *databasev1.IndexRuleRegistryServiceListResponse, error,
 ) {
+       g := req.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "list")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "list")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "list")
+       }()
        entities, err := 
rs.schemaRegistry.IndexRuleRegistry().ListIndexRule(ctx, schema.ListOpt{Group: 
req.GetGroup()})
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "list")
                return nil, err
        }
        return &databasev1.IndexRuleRegistryServiceListResponse{
@@ -271,6 +416,13 @@ func (rs *indexRuleRegistryServer) List(ctx 
context.Context, req *databasev1.Ind
 func (rs *indexRuleRegistryServer) Exist(ctx context.Context, req 
*databasev1.IndexRuleRegistryServiceExistRequest) (
        *databasev1.IndexRuleRegistryServiceExistResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "indexRule", "exist")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "indexRule", "exist")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"indexRule", "exist")
+       }()
        _, err := rs.Get(ctx, 
&databasev1.IndexRuleRegistryServiceGetRequest{Metadata: req.Metadata})
        if err == nil {
                return &databasev1.IndexRuleRegistryServiceExistResponse{
@@ -280,6 +432,7 @@ func (rs *indexRuleRegistryServer) Exist(ctx 
context.Context, req *databasev1.In
        }
        exist, errGroup := groupExist(ctx, err, req.Metadata, 
rs.schemaRegistry.GroupRegistry())
        if errGroup != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "indexRule", "exist")
                return nil, errGroup
        }
        return &databasev1.IndexRuleRegistryServiceExistResponse{HasGroup: 
exist, HasIndexRule: false}, nil
@@ -288,13 +441,23 @@ func (rs *indexRuleRegistryServer) Exist(ctx 
context.Context, req *databasev1.In
 type measureRegistryServer struct {
        databasev1.UnimplementedMeasureRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
-func (rs *measureRegistryServer) Create(ctx context.Context, req 
*databasev1.MeasureRegistryServiceCreateRequest) (
+func (rs *measureRegistryServer) Create(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceCreateRequest) (
        *databasev1.MeasureRegistryServiceCreateResponse, error,
 ) {
+       g := req.Measure.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "create")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "create")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"create")
+       }()
        modRevision, err := 
rs.schemaRegistry.MeasureRegistry().CreateMeasure(ctx, req.GetMeasure())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "create")
                return nil, err
        }
        return &databasev1.MeasureRegistryServiceCreateResponse{
@@ -302,11 +465,20 @@ func (rs *measureRegistryServer) Create(ctx 
context.Context, req *databasev1.Mea
        }, nil
 }
 
-func (rs *measureRegistryServer) Update(ctx context.Context, req 
*databasev1.MeasureRegistryServiceUpdateRequest) (
+func (rs *measureRegistryServer) Update(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceUpdateRequest) (
        *databasev1.MeasureRegistryServiceUpdateResponse, error,
 ) {
+       g := req.Measure.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "update")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "update")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"update")
+       }()
        modRevision, err := 
rs.schemaRegistry.MeasureRegistry().UpdateMeasure(ctx, req.GetMeasure())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "update")
                return nil, err
        }
        return &databasev1.MeasureRegistryServiceUpdateResponse{
@@ -314,11 +486,20 @@ func (rs *measureRegistryServer) Update(ctx 
context.Context, req *databasev1.Mea
        }, nil
 }
 
-func (rs *measureRegistryServer) Delete(ctx context.Context, req 
*databasev1.MeasureRegistryServiceDeleteRequest) (
+func (rs *measureRegistryServer) Delete(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceDeleteRequest) (
        *databasev1.MeasureRegistryServiceDeleteResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "delete")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "delete")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"delete")
+       }()
        ok, err := rs.schemaRegistry.MeasureRegistry().DeleteMeasure(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "delete")
                return nil, err
        }
        return &databasev1.MeasureRegistryServiceDeleteResponse{
@@ -326,11 +507,20 @@ func (rs *measureRegistryServer) Delete(ctx 
context.Context, req *databasev1.Mea
        }, nil
 }
 
-func (rs *measureRegistryServer) Get(ctx context.Context, req 
*databasev1.MeasureRegistryServiceGetRequest) (
+func (rs *measureRegistryServer) Get(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceGetRequest) (
        *databasev1.MeasureRegistryServiceGetResponse, error,
 ) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "get")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "get")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"get")
+       }()
        entity, err := rs.schemaRegistry.MeasureRegistry().GetMeasure(ctx, 
req.GetMetadata())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "get")
                return nil, err
        }
        return &databasev1.MeasureRegistryServiceGetResponse{
@@ -338,11 +528,20 @@ func (rs *measureRegistryServer) Get(ctx context.Context, 
req *databasev1.Measur
        }, nil
 }
 
-func (rs *measureRegistryServer) List(ctx context.Context, req 
*databasev1.MeasureRegistryServiceListRequest) (
+func (rs *measureRegistryServer) List(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceListRequest) (
        *databasev1.MeasureRegistryServiceListResponse, error,
 ) {
+       g := req.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "list")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "list")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"list")
+       }()
        entities, err := rs.schemaRegistry.MeasureRegistry().ListMeasure(ctx, 
schema.ListOpt{Group: req.GetGroup()})
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "list")
                return nil, err
        }
        return &databasev1.MeasureRegistryServiceListResponse{
@@ -350,7 +549,17 @@ func (rs *measureRegistryServer) List(ctx context.Context, 
req *databasev1.Measu
        }, nil
 }
 
-func (rs *measureRegistryServer) Exist(ctx context.Context, req 
*databasev1.MeasureRegistryServiceExistRequest) 
(*databasev1.MeasureRegistryServiceExistResponse, error) {
+func (rs *measureRegistryServer) Exist(ctx context.Context,
+       req *databasev1.MeasureRegistryServiceExistRequest) (
+       *databasev1.MeasureRegistryServiceExistResponse, error,
+) {
+       g := req.Metadata.Group
+       rs.metrics.totalRegistryStarted.Inc(1, g, "measure", "exist")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "measure", "exist")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "measure", 
"exist")
+       }()
        _, err := rs.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{Metadata: req.Metadata})
        if err == nil {
                return &databasev1.MeasureRegistryServiceExistResponse{
@@ -360,6 +569,7 @@ func (rs *measureRegistryServer) Exist(ctx context.Context, 
req *databasev1.Meas
        }
        exist, errGroup := groupExist(ctx, err, req.Metadata, 
rs.schemaRegistry.GroupRegistry())
        if errGroup != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "measure", "exist")
                return nil, errGroup
        }
        return &databasev1.MeasureRegistryServiceExistResponse{HasGroup: exist, 
HasMeasure: false}, nil
@@ -368,12 +578,21 @@ func (rs *measureRegistryServer) Exist(ctx 
context.Context, req *databasev1.Meas
 type groupRegistryServer struct {
        databasev1.UnimplementedGroupRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
 func (rs *groupRegistryServer) Create(ctx context.Context, req 
*databasev1.GroupRegistryServiceCreateRequest) (
        *databasev1.GroupRegistryServiceCreateResponse, error,
 ) {
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "create")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "create")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"create")
+       }()
        if err := rs.schemaRegistry.GroupRegistry().CreateGroup(ctx, 
req.GetGroup()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "create")
                return nil, err
        }
        return &databasev1.GroupRegistryServiceCreateResponse{}, nil
@@ -382,7 +601,15 @@ func (rs *groupRegistryServer) Create(ctx context.Context, 
req *databasev1.Group
 func (rs *groupRegistryServer) Update(ctx context.Context, req 
*databasev1.GroupRegistryServiceUpdateRequest) (
        *databasev1.GroupRegistryServiceUpdateResponse, error,
 ) {
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "update")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "update")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"update")
+       }()
        if err := rs.schemaRegistry.GroupRegistry().UpdateGroup(ctx, 
req.GetGroup()); err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "update")
                return nil, err
        }
        return &databasev1.GroupRegistryServiceUpdateResponse{}, nil
@@ -391,8 +618,16 @@ func (rs *groupRegistryServer) Update(ctx context.Context, 
req *databasev1.Group
 func (rs *groupRegistryServer) Delete(ctx context.Context, req 
*databasev1.GroupRegistryServiceDeleteRequest) (
        *databasev1.GroupRegistryServiceDeleteResponse, error,
 ) {
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "delete")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "delete")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"delete")
+       }()
        deleted, err := rs.schemaRegistry.GroupRegistry().DeleteGroup(ctx, 
req.GetGroup())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "delete")
                return nil, err
        }
        return &databasev1.GroupRegistryServiceDeleteResponse{
@@ -403,20 +638,36 @@ func (rs *groupRegistryServer) Delete(ctx 
context.Context, req *databasev1.Group
 func (rs *groupRegistryServer) Get(ctx context.Context, req 
*databasev1.GroupRegistryServiceGetRequest) (
        *databasev1.GroupRegistryServiceGetResponse, error,
 ) {
-       g, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, 
req.GetGroup())
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "get")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "get")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"get")
+       }()
+       group, err := rs.schemaRegistry.GroupRegistry().GetGroup(ctx, 
req.GetGroup())
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "get")
                return nil, err
        }
        return &databasev1.GroupRegistryServiceGetResponse{
-               Group: g,
+               Group: group,
        }, nil
 }
 
 func (rs *groupRegistryServer) List(ctx context.Context, _ 
*databasev1.GroupRegistryServiceListRequest) (
        *databasev1.GroupRegistryServiceListResponse, error,
 ) {
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "list")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "list")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"list")
+       }()
        groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx)
        if err != nil {
+               rs.metrics.totalRegistryErr.Inc(1, g, "group", "list")
                return nil, err
        }
        return &databasev1.GroupRegistryServiceListResponse{
@@ -424,7 +675,16 @@ func (rs *groupRegistryServer) List(ctx context.Context, _ 
*databasev1.GroupRegi
        }, nil
 }
 
-func (rs *groupRegistryServer) Exist(ctx context.Context, req 
*databasev1.GroupRegistryServiceExistRequest) 
(*databasev1.GroupRegistryServiceExistResponse, error) {
+func (rs *groupRegistryServer) Exist(ctx context.Context, req 
*databasev1.GroupRegistryServiceExistRequest) (
+       *databasev1.GroupRegistryServiceExistResponse, error,
+) {
+       g := ""
+       rs.metrics.totalRegistryStarted.Inc(1, g, "group", "exist")
+       start := time.Now()
+       defer func() {
+               rs.metrics.totalRegistryFinished.Inc(1, g, "group", "exist")
+               
rs.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, "group", 
"exist")
+       }()
        _, err := rs.Get(ctx, &databasev1.GroupRegistryServiceGetRequest{Group: 
req.Group})
        if err == nil {
                return &databasev1.GroupRegistryServiceExistResponse{
@@ -436,18 +696,28 @@ func (rs *groupRegistryServer) Exist(ctx context.Context, 
req *databasev1.GroupR
                        HasGroup: false,
                }, nil
        }
+       rs.metrics.totalRegistryErr.Inc(1, g, "group", "exist")
        return nil, err
 }
 
 type topNAggregationRegistryServer struct {
        databasev1.UnimplementedTopNAggregationRegistryServiceServer
        schemaRegistry metadata.Repo
+       metrics        *metrics
 }
 
 func (ts *topNAggregationRegistryServer) Create(ctx context.Context,
        req *databasev1.TopNAggregationRegistryServiceCreateRequest,
 ) (*databasev1.TopNAggregationRegistryServiceCreateResponse, error) {
+       g := req.TopNAggregation.Metadata.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "create")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"create")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "create")
+       }()
        if err := 
ts.schemaRegistry.TopNAggregationRegistry().CreateTopNAggregation(ctx, 
req.GetTopNAggregation()); err != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", 
"create")
                return nil, err
        }
        return &databasev1.TopNAggregationRegistryServiceCreateResponse{}, nil
@@ -456,7 +726,15 @@ func (ts *topNAggregationRegistryServer) Create(ctx 
context.Context,
 func (ts *topNAggregationRegistryServer) Update(ctx context.Context,
        req *databasev1.TopNAggregationRegistryServiceUpdateRequest,
 ) (*databasev1.TopNAggregationRegistryServiceUpdateResponse, error) {
+       g := req.TopNAggregation.Metadata.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "update")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"update")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "update")
+       }()
        if err := 
ts.schemaRegistry.TopNAggregationRegistry().UpdateTopNAggregation(ctx, 
req.GetTopNAggregation()); err != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", 
"update")
                return nil, err
        }
        return &databasev1.TopNAggregationRegistryServiceUpdateResponse{}, nil
@@ -465,8 +743,16 @@ func (ts *topNAggregationRegistryServer) Update(ctx 
context.Context,
 func (ts *topNAggregationRegistryServer) Delete(ctx context.Context,
        req *databasev1.TopNAggregationRegistryServiceDeleteRequest,
 ) (*databasev1.TopNAggregationRegistryServiceDeleteResponse, error) {
+       g := req.Metadata.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "delete")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"delete")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "delete")
+       }()
        ok, err := 
ts.schemaRegistry.TopNAggregationRegistry().DeleteTopNAggregation(ctx, 
req.GetMetadata())
        if err != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", 
"delete")
                return nil, err
        }
        return &databasev1.TopNAggregationRegistryServiceDeleteResponse{
@@ -477,8 +763,16 @@ func (ts *topNAggregationRegistryServer) Delete(ctx 
context.Context,
 func (ts *topNAggregationRegistryServer) Get(ctx context.Context,
        req *databasev1.TopNAggregationRegistryServiceGetRequest,
 ) (*databasev1.TopNAggregationRegistryServiceGetResponse, error) {
+       g := req.Metadata.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "get")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"get")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "get")
+       }()
        entity, err := 
ts.schemaRegistry.TopNAggregationRegistry().GetTopNAggregation(ctx, 
req.GetMetadata())
        if err != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", "get")
                return nil, err
        }
        return &databasev1.TopNAggregationRegistryServiceGetResponse{
@@ -489,8 +783,16 @@ func (ts *topNAggregationRegistryServer) Get(ctx 
context.Context,
 func (ts *topNAggregationRegistryServer) List(ctx context.Context,
        req *databasev1.TopNAggregationRegistryServiceListRequest,
 ) (*databasev1.TopNAggregationRegistryServiceListResponse, error) {
+       g := req.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "list")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"list")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "list")
+       }()
        entities, err := 
ts.schemaRegistry.TopNAggregationRegistry().ListTopNAggregation(ctx, 
schema.ListOpt{Group: req.GetGroup()})
        if err != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", 
"list")
                return nil, err
        }
        return &databasev1.TopNAggregationRegistryServiceListResponse{
@@ -501,6 +803,13 @@ func (ts *topNAggregationRegistryServer) List(ctx 
context.Context,
 func (ts *topNAggregationRegistryServer) Exist(ctx context.Context, req 
*databasev1.TopNAggregationRegistryServiceExistRequest) (
        *databasev1.TopNAggregationRegistryServiceExistResponse, error,
 ) {
+       g := req.Metadata.Group
+       ts.metrics.totalRegistryStarted.Inc(1, g, "topn_aggregation", "exist")
+       start := time.Now()
+       defer func() {
+               ts.metrics.totalRegistryFinished.Inc(1, g, "topn_aggregation", 
"exist")
+               
ts.metrics.totalRegistryLatency.Inc(time.Since(start).Seconds(), g, 
"topn_aggregation", "exist")
+       }()
        _, err := ts.Get(ctx, 
&databasev1.TopNAggregationRegistryServiceGetRequest{Metadata: req.Metadata})
        if err == nil {
                return &databasev1.TopNAggregationRegistryServiceExistResponse{
@@ -510,6 +819,7 @@ func (ts *topNAggregationRegistryServer) Exist(ctx 
context.Context, req *databas
        }
        exist, errGroup := groupExist(ctx, err, req.Metadata, 
ts.schemaRegistry.GroupRegistry())
        if errGroup != nil {
+               ts.metrics.totalRegistryErr.Inc(1, g, "topn_aggregation", 
"exist")
                return nil, errGroup
        }
        return 
&databasev1.TopNAggregationRegistryServiceExistResponse{HasGroup: exist, 
HasTopNAggregation: false}, nil
diff --git a/banyand/liaison/grpc/registry_test.go 
b/banyand/liaison/grpc/registry_test.go
index b1f4ea94..bb29c731 100644
--- a/banyand/liaison/grpc/registry_test.go
+++ b/banyand/liaison/grpc/registry_test.go
@@ -34,6 +34,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/liaison/grpc"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedserver"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -178,8 +179,9 @@ func setupForRegistry() func() {
        // Init `Metadata` module
        metaSvc, err := embeddedserver.NewService(context.TODO())
        Expect(err).NotTo(HaveOccurred())
+       metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"standalone", nil)
 
-       tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
+       tcp := grpc.NewServer(context.TODO(), pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry(), metricSvc)
        preloadStreamSvc := &preloadStreamService{metaSvc: metaSvc}
        var flags []string
        metaPath, metaDeferFunc, err := test.NewSpace()
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 7485c4fe..31efab86 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -41,6 +41,7 @@ import (
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -54,6 +55,8 @@ var (
        errNoAddr            = errors.New("no address")
        errQueryMsg          = errors.New("invalid query message")
        errAccessLogRootPath = errors.New("access log root path is required")
+
+       liaisonGrpcScope = observability.RootScope.SubScope("liaison_grpc")
 )
 
 // Server defines the gRPC server.
@@ -63,24 +66,25 @@ type Server interface {
 }
 
 type server struct {
-       creds credentials.TransportCredentials
-       *streamRegistryServer
-       log *logger.Logger
-       *indexRuleBindingRegistryServer
-       ser *grpclib.Server
+       creds      credentials.TransportCredentials
+       omr        observability.MetricsRegistry
+       measureSVC *measureService
+       ser        *grpclib.Server
+       log        *logger.Logger
        *propertyServer
        *topNAggregationRegistryServer
        *groupRegistryServer
        stopCh chan struct{}
        *indexRuleRegistryServer
        *measureRegistryServer
-       streamSVC                *streamService
-       measureSVC               *measureService
-       host                     string
+       streamSVC *streamService
+       *streamRegistryServer
+       *indexRuleBindingRegistryServer
        keyFile                  string
        certFile                 string
        accessLogRootPath        string
        addr                     string
+       host                     string
        accessLogRecorders       []accessLogRecorder
        maxRecvMsgSize           run.Bytes
        port                     uint32
@@ -89,7 +93,7 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer(_ context.Context, pipeline, broadcaster queue.Client, 
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry) Server {
+func NewServer(_ context.Context, pipeline, broadcaster queue.Client, 
schemaRegistry metadata.Repo, nodeRegistry NodeRegistry, omr 
observability.MetricsRegistry) Server {
        streamSVC := &streamService{
                discoveryService: newDiscoveryService(schema.KindStream, 
schemaRegistry, nodeRegistry),
                pipeline:         pipeline,
@@ -101,6 +105,7 @@ func NewServer(_ context.Context, pipeline, broadcaster 
queue.Client, schemaRegi
                broadcaster:      broadcaster,
        }
        s := &server{
+               omr:        omr,
                streamSVC:  streamSVC,
                measureSVC: measureSVC,
                streamRegistryServer: &streamRegistryServer{
@@ -151,6 +156,16 @@ func (s *server) PreRun(_ context.Context) error {
                        }
                }
        }
+       metrics := newMetrics(s.omr.With(liaisonGrpcScope))
+       s.streamSVC.metrics = metrics
+       s.measureSVC.metrics = metrics
+       s.propertyServer.metrics = metrics
+       s.streamRegistryServer.metrics = metrics
+       s.indexRuleBindingRegistryServer.metrics = metrics
+       s.indexRuleRegistryServer.metrics = metrics
+       s.measureRegistryServer.metrics = metrics
+       s.groupRegistryServer.metrics = metrics
+       s.topNAggregationRegistryServer.metrics = metrics
        return nil
 }
 
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 4bdaae9a..ba264fdc 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -43,12 +43,13 @@ import (
 
 type streamService struct {
        streamv1.UnimplementedStreamServiceServer
-       *discoveryService
-       sampled            *logger.Logger
        ingestionAccessLog accesslog.Log
        pipeline           queue.Client
        broadcaster        queue.Client
-       writeTimeout       time.Duration
+       *discoveryService
+       sampled      *logger.Logger
+       metrics      *metrics
+       writeTimeout time.Duration
 }
 
 func (s *streamService) setLogger(log *logger.Logger) {
@@ -65,12 +66,23 @@ func (s *streamService) activeIngestionAccessLog(root 
string) (err error) {
 
 func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
        reply := func(metadata *commonv1.Metadata, status modelv1.Status, 
messageId uint64, stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
+               if status != modelv1.Status_STATUS_SUCCEED {
+                       s.metrics.totalStreamMsgReceivedErr.Inc(1, 
metadata.Group, "stream", "write")
+               }
+               s.metrics.totalStreamMsgReceived.Inc(1, metadata.Group, 
"stream", "write")
                if errResp := stream.Send(&streamv1.WriteResponse{Metadata: 
metadata, Status: status, MessageId: messageId}); errResp != nil {
                        logger.Debug().Err(errResp).Msg("failed to send 
response")
+                       s.metrics.totalStreamMsgSentErr.Inc(1, metadata.Group, 
"stream", "write")
                }
        }
+       s.metrics.totalStreamStarted.Inc(1, "stream", "write")
        publisher := s.pipeline.NewBatchPublisher(s.writeTimeout)
-       defer publisher.Close()
+       start := time.Now()
+       defer func() {
+               publisher.Close()
+               s.metrics.totalStreamFinished.Inc(1, "stream", "write")
+               s.metrics.totalStreamLatency.Inc(time.Since(start).Seconds(), 
"stream", "write")
+       }()
        ctx := stream.Context()
        for {
                select {
@@ -83,9 +95,12 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        return nil
                }
                if err != nil {
-                       s.sampled.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
+                       if !errors.Is(err, context.DeadlineExceeded) && 
!errors.Is(err, context.Canceled) {
+                               s.sampled.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
+                       }
                        return err
                }
+               s.metrics.totalStreamMsgReceived.Inc(1, 
writeEntity.Metadata.Group, "stream", "write")
                if errTime := 
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
                        s.sampled.Error().Stringer("written", 
writeEntity).Err(errTime).Msg("the element time is invalid")
                        reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream, s.sampled)
@@ -134,13 +149,26 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_INTERNAL_ERROR, writeEntity.GetMessageId(), stream, 
s.sampled)
                        continue
                }
-               reply(nil, modelv1.Status_STATUS_SUCCEED, 
writeEntity.GetMessageId(), stream, s.sampled)
+               reply(writeEntity.GetMetadata(), modelv1.Status_STATUS_SUCCEED, 
writeEntity.GetMessageId(), stream, s.sampled)
        }
 }
 
 var emptyStreamQueryResponse = &streamv1.QueryResponse{Elements: 
make([]*streamv1.Element, 0)}
 
 func (s *streamService) Query(_ context.Context, req *streamv1.QueryRequest) 
(resp *streamv1.QueryResponse, err error) {
+       for _, g := range req.Groups {
+               s.metrics.totalStarted.Inc(1, g, "stream", "query")
+       }
+       start := time.Now()
+       defer func() {
+               for _, g := range req.Groups {
+                       s.metrics.totalFinished.Inc(1, g, "stream", "query")
+                       if err != nil {
+                               s.metrics.totalErr.Inc(1, g, "stream", "query")
+                       }
+                       s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
g, "stream", "query")
+               }
+       }()
        timeRange := req.GetTimeRange()
        if timeRange == nil {
                req.TimeRange = timestamp.DefaultTimeRange
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index c8de11bd..22c804e0 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -287,14 +287,14 @@ func Test_mergeParts(t *testing.T) {
                        name:    "Test with multiple parts with a large 
quantity of different ts",
                        dpsList: []*dataPoints{generateHugeDps(1, 5000, 1), 
generateHugeDps(5001, 10000, 2)},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 2500, 
uncompressedSizeBytes: 4190000},
-                               {seriesID: 1, count: 1250, 
uncompressedSizeBytes: 2095000},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
+                               {seriesID: 1, count: 1265, 
uncompressedSizeBytes: 2120140},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
+                               {seriesID: 1, count: 2470, 
uncompressedSizeBytes: 4139720},
+                               {seriesID: 1, count: 2410, 
uncompressedSizeBytes: 4039160},
+                               {seriesID: 1, count: 1205, 
uncompressedSizeBytes: 2019580},
                                {seriesID: 2, count: 2, uncompressedSizeBytes: 
110},
                                {seriesID: 3, count: 2, uncompressedSizeBytes: 
48},
                        },
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index e6e69959..21b4076e 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -292,21 +292,25 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+       name := groupSchema.Metadata.Name
+       p := common.Position{
+               Module:   "measure",
+               Database: name,
+       }
+       metrics, factory := s.newMetrics(p)
        opts := storage.TSDBOpts[*tsTable, option]{
                ShardNum:                       
groupSchema.ResourceOpts.ShardNum,
                Location:                       path.Join(s.path, 
groupSchema.Metadata.Name),
                TSTableCreator:                 newTSTable,
-               MetricsCreator:                 s.newMetrics,
+               TableMetrics:                   metrics,
                SegmentInterval:                
storage.MustToIntervalRule(groupSchema.ResourceOpts.SegmentInterval),
                TTL:                            
storage.MustToIntervalRule(groupSchema.ResourceOpts.Ttl),
                Option:                         s.option,
                SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+               StorageMetricsFactory:          factory,
        }
-       name := groupSchema.Metadata.Name
        return storage.OpenTSDB(
-               common.SetPosition(context.Background(), func(p 
common.Position) common.Position {
-                       p.Module = "measure"
-                       p.Database = name
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
                        return p
                }),
                opts)
diff --git a/banyand/measure/metrics.go b/banyand/measure/metrics.go
index 7f38f432..0629ec7d 100644
--- a/banyand/measure/metrics.go
+++ b/banyand/measure/metrics.go
@@ -41,10 +41,16 @@ func (m *metrics) DeleteAll() {
        m.totalWritten.Delete()
 }
 
-func (s *supplier) newMetrics(p common.Position) storage.Metrics {
+func (s *supplier) newMetrics(p common.Position) (storage.Metrics, 
*observability.Factory) {
        factory := 
s.omr.With(measureScope.ConstLabels(meter.LabelPairs{"group": p.Database}))
        return &metrics{
                totalWritten: factory.NewCounter("total_written"),
+       }, factory
+}
+
+func (tst *tsTable) Collect(m storage.Metrics) {
+       if m == nil {
+               return
        }
 }
 
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 41baf29b..36a72eb8 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -23,7 +23,6 @@ import (
        "path/filepath"
        "sort"
        "sync/atomic"
-       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -203,7 +202,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path 
string) {
 }
 
 func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
-       n := uint64(len(time.RFC3339Nano))
+       // 8 bytes for timestamp
+       // 8 bytes for version
+       n := uint64(8 + 8)
        n += uint64(len(dps.fields[index].name))
        for i := range dps.fields[index].values {
                n += uint64(dps.fields[index].values[i].size())
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 383cdc25..c7b2f643 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -37,9 +37,11 @@ import (
 
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -52,18 +54,22 @@ var (
 
        _ run.PreRunner = (*server)(nil)
        _ run.Service   = (*server)(nil)
+
+       queueSubScope = observability.RootScope.SubScope("queue_sub")
 )
 
 type server struct {
+       omr       observability.MetricsRegistry
        creds     credentials.TransportCredentials
        log       *logger.Logger
        ser       *grpclib.Server
        listeners map[bus.Topic]bus.MessageListener
        *clusterv1.UnimplementedServiceServer
-       addr           string
+       metrics        *metrics
        certFile       string
-       keyFile        string
        host           string
+       keyFile        string
+       addr           string
        maxRecvMsgSize run.Bytes
        listenersLock  sync.RWMutex
        port           uint32
@@ -71,14 +77,16 @@ type server struct {
 }
 
 // NewServer returns a new gRPC server.
-func NewServer() queue.Server {
+func NewServer(omr observability.MetricsRegistry) queue.Server {
        return &server{
                listeners: make(map[bus.Topic]bus.MessageListener),
+               omr:       omr,
        }
 }
 
 func (s *server) PreRun(_ context.Context) error {
        s.log = logger.GetLogger("server-queue")
+       s.metrics = newMetrics(s.omr.With(queueSubScope))
        return nil
 }
 
@@ -191,3 +199,28 @@ func (s *server) GracefulStop() {
                s.log.Info().Msg("stopped gracefully")
        }
 }
+
+type metrics struct {
+       totalStarted  meter.Counter
+       totalFinished meter.Counter
+       totalErr      meter.Counter
+       totalLatency  meter.Counter
+
+       totalMsgReceived    meter.Counter
+       totalMsgReceivedErr meter.Counter
+       totalMsgSent        meter.Counter
+       totalMsgSentErr     meter.Counter
+}
+
+func newMetrics(factory *observability.Factory) *metrics {
+       return &metrics{
+               totalStarted:        factory.NewCounter("total_started", 
"topic"),
+               totalFinished:       factory.NewCounter("total_finished", 
"topic"),
+               totalErr:            factory.NewCounter("total_err", "topic"),
+               totalLatency:        factory.NewCounter("total_latency", 
"topic"),
+               totalMsgReceived:    factory.NewCounter("total_msg_received", 
"topic"),
+               totalMsgReceivedErr: 
factory.NewCounter("total_msg_received_err", "topic"),
+               totalMsgSent:        factory.NewCounter("total_msg_sent", 
"topic"),
+               totalMsgSentErr:     factory.NewCounter("total_msg_sent_err", 
"topic"),
+       }
+}
diff --git a/banyand/queue/sub/sub.go b/banyand/queue/sub/sub.go
index c58b99c5..a9e9e3cb 100644
--- a/banyand/queue/sub/sub.go
+++ b/banyand/queue/sub/sub.go
@@ -21,6 +21,7 @@ package sub
 import (
        "fmt"
        "io"
+       "time"
 
        "github.com/pkg/errors"
        "google.golang.org/grpc/codes"
@@ -37,17 +38,25 @@ import (
 func (s *server) Send(stream clusterv1.Service_SendServer) error {
        reply := func(writeEntity *clusterv1.SendRequest, err error, message 
string) {
                s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg(message)
+               s.metrics.totalMsgReceivedErr.Inc(1, writeEntity.Topic)
+               s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                if errResp := stream.Send(&clusterv1.SendResponse{
                        MessageId: writeEntity.MessageId,
                        Error:     message,
                }); errResp != nil {
                        s.log.Err(errResp).Msg("failed to send response")
+                       s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
                }
        }
        ctx := stream.Context()
        var topic *bus.Topic
        var m bus.Message
        var dataCollection []any
+       var start time.Time
+       defer func() {
+               s.metrics.totalFinished.Inc(1, topic.String())
+               s.metrics.totalLatency.Inc(time.Since(start).Seconds(), 
topic.String())
+       }()
        for {
                select {
                case <-ctx.Done():
@@ -77,6 +86,7 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        s.log.Error().Err(err).Msg("failed to receive message")
                        return err
                }
+               s.metrics.totalMsgReceived.Inc(1, writeEntity.Topic)
                if writeEntity.Topic != "" && topic == nil {
                        t, ok := data.TopicMap[writeEntity.Topic]
                        if !ok {
@@ -102,14 +112,22 @@ func (s *server) Send(stream 
clusterv1.Service_SendServer) error {
                        continue
                }
                if writeEntity.BatchMod {
+                       if len(dataCollection) == 0 {
+                               s.metrics.totalStarted.Inc(1, writeEntity.Topic)
+                               start = time.Now()
+                       }
                        dataCollection = append(dataCollection, 
writeEntity.Body)
                        if errSend := stream.Send(&clusterv1.SendResponse{
                                MessageId: writeEntity.MessageId,
                        }); errSend != nil {
                                s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send response")
+                               s.metrics.totalMsgSentErr.Inc(1, 
writeEntity.Topic)
+                               continue
                        }
+                       s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
                        continue
                }
+               s.metrics.totalStarted.Inc(1, writeEntity.Topic)
                listener := s.getListeners(*topic)
                if listener == nil {
                        reply(writeEntity, err, "no listener found")
@@ -122,7 +140,10 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                                MessageId: writeEntity.MessageId,
                        }); errSend != nil {
                                s.log.Error().Stringer("written", 
writeEntity).Err(errSend).Msg("failed to send response")
+                               s.metrics.totalMsgSentErr.Inc(1, 
writeEntity.Topic)
+                               continue
                        }
+                       s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
                        continue
                }
                var message proto.Message
@@ -146,7 +167,10 @@ func (s *server) Send(stream clusterv1.Service_SendServer) 
error {
                        Body:      anyMessage,
                }); err != nil {
                        s.log.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to send response")
+                       s.metrics.totalMsgSentErr.Inc(1, writeEntity.Topic)
+                       continue
                }
+               s.metrics.totalMsgSent.Inc(1, writeEntity.Topic)
        }
 }
 
diff --git a/banyand/stream/flusher.go b/banyand/stream/flusher.go
index c388ee0e..ae7010b3 100644
--- a/banyand/stream/flusher.go
+++ b/banyand/stream/flusher.go
@@ -40,40 +40,49 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, mergeCh chan
                case e := <-flusherWatcher:
                        flusherWatchers.Add(e)
                case <-epochWatcher.Watch():
-                       curSnapshot := tst.currentSnapshot()
-                       if curSnapshot != nil {
-                               flusherWatchers = 
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
-                               curSnapshot.decRef()
-                               curSnapshot = nil
-                       }
-                       tst.RLock()
-                       if tst.snapshot != nil && tst.snapshot.epoch > epoch {
-                               curSnapshot = tst.snapshot
-                               curSnapshot.incRef()
-                       }
-                       tst.RUnlock()
-                       if curSnapshot != nil {
-                               merged, err := tst.mergeMemParts(curSnapshot, 
mergeCh)
-                               if err != nil {
-                                       
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
+                       if func() bool {
+                               tst.incTotalFlushLoopStarted(1)
+                               start := time.Now()
+                               defer func() {
+                                       tst.incTotalFlushLoopFinished(1)
+                                       
tst.incTotalFlushLatency(time.Since(start).Seconds())
+                               }()
+                               curSnapshot := tst.currentSnapshot()
+                               if curSnapshot != nil {
+                                       flusherWatchers = 
tst.pauseFlusherToPileupMemParts(epoch, flusherWatcher, flusherWatchers)
                                        curSnapshot.decRef()
-                                       continue
+                                       curSnapshot = nil
                                }
-                               if !merged {
-                                       tst.flush(curSnapshot, flushCh)
+                               tst.RLock()
+                               if tst.snapshot != nil && tst.snapshot.epoch > 
epoch {
+                                       curSnapshot = tst.snapshot
+                                       curSnapshot.incRef()
                                }
-                               epoch = curSnapshot.epoch
-                               // Notify merger to start a new round of merge.
-                               // This round might have be triggered in 
pauseFlusherToPileupMemParts.
-                               flusherWatchers.Notify(math.MaxUint64)
-                               flusherWatchers = nil
-                               curSnapshot.decRef()
-                               if tst.currentEpoch() != epoch {
-                                       continue
+                               tst.RUnlock()
+                               if curSnapshot != nil {
+                                       defer curSnapshot.decRef()
+                                       merged, err := 
tst.mergeMemParts(curSnapshot, mergeCh)
+                                       if err != nil {
+                                               
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
+                                               tst.incTotalFlushLoopErr(1)
+                                               return false
+                                       }
+                                       if !merged {
+                                               tst.flush(curSnapshot, flushCh)
+                                       }
+                                       epoch = curSnapshot.epoch
+                                       // Notify merger to start a new round 
of merge.
+                                       // This round might have be triggered 
in pauseFlusherToPileupMemParts.
+                                       flusherWatchers.Notify(math.MaxUint64)
+                                       flusherWatchers = nil
+                                       if tst.currentEpoch() != epoch {
+                                               tst.incTotalFlushLoopProgress(1)
+                                               return false
+                                       }
                                }
-                       }
-                       epochWatcher = introducerWatcher.Add(epoch, 
tst.loopCloser.CloseNotify())
-                       if epochWatcher == nil {
+                               epochWatcher = introducerWatcher.Add(epoch, 
tst.loopCloser.CloseNotify())
+                               return epochWatcher == nil
+                       }() {
                                return
                        }
                }
@@ -93,9 +102,11 @@ func (tst *tsTable) pauseFlusherToPileupMemParts(epoch 
uint64, flushWatcher watc
        select {
        case <-tst.loopCloser.CloseNotify():
        case <-time.After(tst.option.flushTimeout):
+               tst.incTotalFlushPauseCompleted(1)
        case e := <-flushWatcher:
                flusherWatchers.Add(e)
                flusherWatchers.Notify(epoch)
+               tst.incTotalFlushPauseBreak(1)
        }
        return flusherWatchers
 }
@@ -115,7 +126,8 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh 
chan *mergerIntroductio
        }
        // merge memory must not be closed by the tsTable.close
        closeCh := make(chan struct{})
-       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts, 
mergedIDs, mergeCh, closeCh)
+       newPart, err := 
tst.mergePartsThenSendIntroduction(snapshotCreatorMergedFlusher, memParts,
+               mergedIDs, mergeCh, closeCh, "mem")
        close(closeCh)
        if err != nil {
                if errors.Is(err, errClosed) {
@@ -132,10 +144,13 @@ func (tst *tsTable) mergeMemParts(snp *snapshot, mergeCh 
chan *mergerIntroductio
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) {
        ind := generateFlusherIntroduction()
        defer releaseFlusherIntroduction(ind)
+       start := time.Now()
+       partsCount := 0
        for _, pw := range snapshot.parts {
                if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
                        continue
                }
+               partsCount++
                partPath := partPath(tst.root, pw.ID())
                pw.mp.mustFlush(tst.fileSystem, partPath)
                newPW := newPartWrapper(nil, mustOpenFilePart(pw.ID(), 
tst.root, tst.fileSystem))
@@ -145,6 +160,10 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
        if len(ind.flushed) < 1 {
                return
        }
+       end := time.Now()
+       tst.incTotalFlushed(1)
+       tst.incTotalFlushedMemParts(partsCount)
+       tst.incTotalFlushLatency(end.Sub(start).Seconds())
        ind.applied = make(chan struct{})
        select {
        case flushCh <- ind:
@@ -155,6 +174,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
        case <-ind.applied:
        case <-tst.loopCloser.CloseNotify():
        }
+       tst.incTotalFlushIntroLatency(time.Since(end).Seconds())
 }
 
 func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
diff --git a/banyand/stream/index.go b/banyand/stream/index.go
index c3f4cdb2..56e21327 100644
--- a/banyand/stream/index.go
+++ b/banyand/stream/index.go
@@ -37,7 +37,7 @@ type elementIndex struct {
        l     *logger.Logger
 }
 
-func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64) (*elementIndex, error) {
+func newElementIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64, metrics *inverted.Metrics) (*elementIndex, error) {
        ei := &elementIndex{
                l: logger.Fetch(ctx, "element_index"),
        }
@@ -46,6 +46,7 @@ func newElementIndex(ctx context.Context, root string, 
flushTimeoutSeconds int64
                Path:         path.Join(root, elementIndexFilename),
                Logger:       ei.l,
                BatchWaitSec: flushTimeoutSeconds,
+               Metrics:      metrics,
        }); err != nil {
                return nil, err
        }
@@ -94,3 +95,7 @@ func (e *elementIndex) Search(seriesList pbv1.SeriesList, 
filter index.Filter) (
 func (e *elementIndex) Close() error {
        return e.store.Close()
 }
+
+func (e *elementIndex) collectMetrics(labelValues ...string) {
+       e.store.CollectMetrics(labelValues...)
+}
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index 072e8b39..d0dc0911 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -118,14 +118,20 @@ func (tst *tsTable) introducerLoop(flushCh chan 
*flusherIntroduction, mergeCh ch
                case <-tst.loopCloser.CloseNotify():
                        return
                case next := <-tst.introductions:
+                       tst.incTotalIntroduceLoopStarted(1, "mem")
                        tst.introduceMemPart(next, epoch)
+                       tst.incTotalIntroduceLoopFinished(1, "mem")
                        epoch++
                case next := <-flushCh:
+                       tst.incTotalIntroduceLoopStarted(1, "flush")
                        tst.introduceFlushed(next, epoch)
+                       tst.incTotalIntroduceLoopFinished(1, "flush")
                        tst.gc.clean()
                        epoch++
                case next := <-mergeCh:
+                       tst.incTotalIntroduceLoopStarted(1, "merge")
                        tst.introduceMerged(next, epoch)
+                       tst.incTotalIntroduceLoopFinished(1, "merge")
                        tst.gc.clean()
                        epoch++
                case epochWatcher := <-watcherCh:
diff --git a/banyand/stream/merger.go b/banyand/stream/merger.go
index cd4f4df5..59cde2cd 100644
--- a/banyand/stream/merger.go
+++ b/banyand/stream/merger.go
@@ -47,26 +47,29 @@ func (tst *tsTable) mergeLoop(merges chan 
*mergerIntroduction, flusherNotifier w
                case <-tst.loopCloser.CloseNotify():
                        return
                case <-ew.Watch():
-                       curSnapshot := tst.currentSnapshot()
-                       if curSnapshot == nil {
-                               continue
-                       }
-                       if curSnapshot.epoch != epoch {
-                               var err error
-                               if pwsChunk, err = 
tst.mergeSnapshot(curSnapshot, merges, pwsChunk[:0]); err != nil {
-                                       if errors.Is(err, errClosed) {
-                                               curSnapshot.decRef()
-                                               return
+                       if func() bool {
+                               curSnapshot := tst.currentSnapshot()
+                               if curSnapshot == nil {
+                                       return false
+                               }
+                               defer curSnapshot.decRef()
+                               if curSnapshot.epoch != epoch {
+                                       tst.incTotalMergeLoopStarted(1)
+                                       defer tst.incTotalMergeLoopFinished(1)
+                                       var err error
+                                       if pwsChunk, err = 
tst.mergeSnapshot(curSnapshot, merges, pwsChunk[:0]); err != nil {
+                                               if errors.Is(err, errClosed) {
+                                                       return true
+                                               }
+                                               
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
+                                               tst.incTotalMergeLoopErr(1)
+                                               return false
                                        }
-                                       
tst.l.Logger.Warn().Err(err).Msgf("cannot merge snapshot: %d", 
curSnapshot.epoch)
-                                       curSnapshot.decRef()
-                                       continue
+                                       epoch = curSnapshot.epoch
                                }
-                               epoch = curSnapshot.epoch
-                       }
-                       curSnapshot.decRef()
-                       ew = flusherNotifier.Add(epoch, 
tst.loopCloser.CloseNotify())
-                       if ew == nil {
+                               ew = flusherNotifier.Add(epoch, 
tst.loopCloser.CloseNotify())
+                               return ew == nil
+                       }() {
                                return
                        }
                }
@@ -81,14 +84,14 @@ func (tst *tsTable) mergeSnapshot(curSnapshot *snapshot, 
merges chan *mergerIntr
                return nil, nil
        }
        if _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, 
dst,
-               toBeMerged, merges, tst.loopCloser.CloseNotify()); err != nil {
+               toBeMerged, merges, tst.loopCloser.CloseNotify(), "file"); err 
!= nil {
                return dst, err
        }
        return dst, nil
 }
 
 func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, 
parts []*partWrapper, merged map[uint64]struct{}, merges chan 
*mergerIntroduction,
-       closeCh <-chan struct{},
+       closeCh <-chan struct{}, typ string,
 ) (*partWrapper, error) {
        reservedSpace := tst.reserveSpace(parts)
        defer releaseDiskSpace(reservedSpace)
@@ -98,6 +101,9 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator 
snapshotCreator, part
                return nil, err
        }
        elapsed := time.Since(start)
+       tst.incTotalMergeLatency(elapsed.Seconds(), typ)
+       tst.incTotalMerged(1, typ)
+       tst.incTotalMergedParts(len(parts), typ)
        if elapsed > 30*time.Second {
                var totalCount uint64
                for _, pw := range parts {
diff --git a/banyand/stream/merger_test.go b/banyand/stream/merger_test.go
index d011b5d0..310d164b 100644
--- a/banyand/stream/merger_test.go
+++ b/banyand/stream/merger_test.go
@@ -275,11 +275,11 @@ func Test_mergeParts(t *testing.T) {
                        name:   "Test with multiple parts with a large quantity 
of different ts",
                        esList: []*elements{generateHugeEs(1, 5000, 1), 
generateHugeEs(5001, 10000, 2)},
                        want: []blockMetadata{
-                               {seriesID: 1, count: 2395, 
uncompressedSizeBytes: 2109995},
-                               {seriesID: 1, count: 2395, 
uncompressedSizeBytes: 2109995},
-                               {seriesID: 1, count: 2605, 
uncompressedSizeBytes: 2295005},
-                               {seriesID: 1, count: 2395, 
uncompressedSizeBytes: 2109995},
-                               {seriesID: 1, count: 210, 
uncompressedSizeBytes: 185010},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
+                               {seriesID: 1, count: 2552, 
uncompressedSizeBytes: 2248312},
+                               {seriesID: 1, count: 2448, 
uncompressedSizeBytes: 2156688},
+                               {seriesID: 1, count: 104, 
uncompressedSizeBytes: 91624},
                                {seriesID: 2, count: 2, uncompressedSizeBytes: 
110},
                                {seriesID: 3, count: 2, uncompressedSizeBytes: 
16},
                        },
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index ad31ac08..58300ae8 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -36,6 +36,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
@@ -275,21 +276,25 @@ func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.Resourc
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (io.Closer, error) {
+       name := groupSchema.Metadata.Name
+       p := common.Position{
+               Module:   "stream",
+               Database: name,
+       }
+
        opts := storage.TSDBOpts[*tsTable, option]{
                ShardNum:                       
groupSchema.ResourceOpts.ShardNum,
                Location:                       path.Join(s.path, 
groupSchema.Metadata.Name),
                TSTableCreator:                 newTSTable,
-               MetricsCreator:                 s.newMetrics,
+               TableMetrics:                   s.newMetrics(p),
                SegmentInterval:                
storage.MustToIntervalRule(groupSchema.ResourceOpts.SegmentInterval),
                TTL:                            
storage.MustToIntervalRule(groupSchema.ResourceOpts.Ttl),
                Option:                         s.option,
                SeriesIndexFlushTimeoutSeconds: 
s.option.flushTimeout.Nanoseconds() / int64(time.Second),
+               StorageMetricsFactory:          
s.omr.With(storageScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues()))),
        }
-       name := groupSchema.Metadata.Name
        return storage.OpenTSDB(
-               common.SetPosition(context.Background(), func(p 
common.Position) common.Position {
-                       p.Module = "stream"
-                       p.Database = name
+               common.SetPosition(context.Background(), func(_ 
common.Position) common.Position {
                        return p
                }),
                opts)
diff --git a/banyand/stream/metrics.go b/banyand/stream/metrics.go
index 9b78f361..f77bd41b 100644
--- a/banyand/stream/metrics.go
+++ b/banyand/stream/metrics.go
@@ -21,29 +21,337 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/meter"
 )
 
-var streamScope = observability.RootScope.SubScope("stream")
+var (
+       streamScope  = observability.RootScope.SubScope("stream")
+       tbScope      = streamScope.SubScope("tst")
+       storageScope = streamScope.SubScope("storage")
+)
 
 type metrics struct {
-       totalWritten meter.Counter
+       tbMetrics
+       indexMetrics           *inverted.Metrics
+       totalWritten           meter.Counter
+       totalBatch             meter.Counter
+       totalBatchIntroLatency meter.Counter
+
+       totalIntroduceLoopStarted  meter.Counter
+       totalIntroduceLoopFinished meter.Counter
+
+       totalFlushLoopStarted  meter.Counter
+       totalFlushLoopFinished meter.Counter
+       totalFlushLoopErr      meter.Counter
+
+       totalMergeLoopStarted  meter.Counter
+       totalMergeLoopFinished meter.Counter
+       totalMergeLoopErr      meter.Counter
+
+       totalFlushLoopProgress   meter.Counter
+       totalFlushed             meter.Counter
+       totalFlushedMemParts     meter.Counter
+       totalFlushPauseCompleted meter.Counter
+       totalFlushPauseBreak     meter.Counter
+       totalFlushIntroLatency   meter.Counter
+       totalFlushLatency        meter.Counter
+
+       totalMergedParts  meter.Counter
+       totalMergeLatency meter.Counter
+       totalMerged       meter.Counter
 }
 
-func (m *metrics) incTotalWritten(delta int) {
-       if m == nil {
+func (tst *tsTable) incTotalWritten(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalWritten.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalBatch(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalBatch.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalBatchIntroLatency(delta float64) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalBatchIntroLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalIntroduceLoopStarted(delta int, phase string) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalIntroduceLoopStarted.Inc(float64(delta), phase)
+}
+
+func (tst *tsTable) incTotalIntroduceLoopFinished(delta int, phase string) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalIntroduceLoopFinished.Inc(float64(delta), phase)
+}
+
+func (tst *tsTable) incTotalFlushLoopStarted(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushLoopStarted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopFinished(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushLoopFinished.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopErr(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushLoopErr.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopStarted(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMergeLoopStarted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopFinished(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMergeLoopFinished.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalMergeLoopErr(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMergeLoopErr.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushLoopProgress(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushLoopProgress.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushed(delta int) {
+       if tst == nil || tst.metrics == nil {
                return
        }
-       m.totalWritten.Inc(float64(delta))
+       tst.metrics.totalFlushed.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushedMemParts(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushedMemParts.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushPauseCompleted(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushPauseCompleted.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushPauseBreak(delta int) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushPauseBreak.Inc(float64(delta))
+}
+
+func (tst *tsTable) incTotalFlushIntroLatency(delta float64) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushIntroLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalFlushLatency(delta float64) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalFlushLatency.Inc(delta)
+}
+
+func (tst *tsTable) incTotalMergedParts(delta int, typ string) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMergedParts.Inc(float64(delta), typ)
+}
+
+func (tst *tsTable) incTotalMergeLatency(delta float64, typ string) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMergeLatency.Inc(delta, typ)
+}
+
+func (tst *tsTable) incTotalMerged(delta int, typ string) {
+       if tst == nil || tst.metrics == nil {
+               return
+       }
+       tst.metrics.totalMerged.Inc(float64(delta), typ)
 }
 
 func (m *metrics) DeleteAll() {
+       if m == nil {
+               return
+       }
        m.totalWritten.Delete()
+       m.totalBatch.Delete()
+       m.totalBatchIntroLatency.Delete()
+
+       m.totalIntroduceLoopStarted.Delete("mem")
+       m.totalIntroduceLoopStarted.Delete("flush")
+       m.totalIntroduceLoopStarted.Delete("merge")
+       m.totalIntroduceLoopFinished.Delete("mem")
+       m.totalIntroduceLoopFinished.Delete("flush")
+       m.totalIntroduceLoopFinished.Delete("merge")
+
+       m.totalFlushLoopStarted.Delete()
+       m.totalFlushLoopFinished.Delete()
+       m.totalFlushLoopErr.Delete()
+
+       m.totalMergeLoopStarted.Delete()
+       m.totalMergeLoopFinished.Delete()
+       m.totalMergeLoopErr.Delete()
+
+       m.totalFlushLoopProgress.Delete()
+       m.totalFlushed.Delete()
+       m.totalFlushedMemParts.Delete()
+       m.totalFlushPauseCompleted.Delete()
+       m.totalFlushPauseBreak.Delete()
+       m.totalFlushLatency.Delete()
+
+       m.totalMergedParts.Delete("mem")
+       m.totalMergeLatency.Delete("mem")
+       m.totalMerged.Delete("mem")
+       m.totalMergedParts.Delete("file")
+       m.totalMergeLatency.Delete("file")
+       m.totalMerged.Delete("file")
 }
 
 func (s *supplier) newMetrics(p common.Position) storage.Metrics {
-       factory := s.omr.With(streamScope.ConstLabels(meter.LabelPairs{"group": 
p.Database}))
+       factory := 
s.omr.With(tbScope.ConstLabels(meter.ToLabelPairs(common.DBLabelNames(), 
p.DBLabelValues())))
        return &metrics{
-               totalWritten: factory.NewCounter("total_written"),
+               totalWritten:               factory.NewCounter("total_written"),
+               totalBatch:                 factory.NewCounter("total_batch"),
+               totalBatchIntroLatency:     
factory.NewCounter("total_batch_intro_time"),
+               totalIntroduceLoopStarted:  
factory.NewCounter("total_introduce_loop_started", "phase"),
+               totalIntroduceLoopFinished: 
factory.NewCounter("total_introduce_loop_finished", "phase"),
+               totalFlushLoopStarted:      
factory.NewCounter("total_flush_loop_started"),
+               totalFlushLoopFinished:     
factory.NewCounter("total_flush_loop_finished"),
+               totalFlushLoopErr:          
factory.NewCounter("total_flush_loop_err"),
+               totalMergeLoopStarted:      
factory.NewCounter("total_merge_loop_started"),
+               totalMergeLoopFinished:     
factory.NewCounter("total_merge_loop_finished"),
+               totalMergeLoopErr:          
factory.NewCounter("total_merge_loop_err"),
+               totalFlushLoopProgress:     
factory.NewCounter("total_flush_loop_progress"),
+               totalFlushed:               factory.NewCounter("total_flushed"),
+               totalFlushedMemParts:       
factory.NewCounter("total_flushed_mem_parts"),
+               totalFlushPauseCompleted:   
factory.NewCounter("total_flush_pause_completed"),
+               totalFlushPauseBreak:       
factory.NewCounter("total_flush_pause_break"),
+               totalFlushIntroLatency:     
factory.NewCounter("total_flush_intro_latency"),
+               totalFlushLatency:          
factory.NewCounter("total_flush_latency"),
+               totalMergedParts:           
factory.NewCounter("total_merged_parts", "type"),
+               totalMergeLatency:          
factory.NewCounter("total_merge_latency", "type"),
+               totalMerged:                factory.NewCounter("total_merged", 
"type"),
+               tbMetrics: tbMetrics{
+                       totalMemParts:                  
factory.NewGauge("total_mem_part", common.ShardLabelNames()...),
+                       totalMemElements:               
factory.NewGauge("total_mem_elements", common.ShardLabelNames()...),
+                       totalMemBlocks:                 
factory.NewGauge("total_mem_blocks", common.ShardLabelNames()...),
+                       totalMemPartBytes:              
factory.NewGauge("total_mem_part_bytes", common.ShardLabelNames()...),
+                       totalMemPartUncompressedBytes:  
factory.NewGauge("total_mem_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+                       totalFileParts:                 
factory.NewGauge("total_file_parts", common.ShardLabelNames()...),
+                       totalFileElements:              
factory.NewGauge("total_file_elements", common.ShardLabelNames()...),
+                       totalFileBlocks:                
factory.NewGauge("total_file_blocks", common.ShardLabelNames()...),
+                       totalFilePartBytes:             
factory.NewGauge("total_file_part_bytes", common.ShardLabelNames()...),
+                       totalFilePartUncompressedBytes: 
factory.NewGauge("total_file_part_uncompressed_bytes", 
common.ShardLabelNames()...),
+               },
+               indexMetrics: inverted.NewMetrics(factory, 
common.SegLabelNames()...),
        }
 }
+
+func (tst *tsTable) Collect(m storage.Metrics) {
+       if m == nil {
+               return
+       }
+       metrics := m.(*metrics)
+       snp := tst.currentSnapshot()
+       defer snp.decRef()
+
+       var totalMemPart, totalMemElements, totalMemBlocks, totalMemPartBytes, 
totalMemPartUncompressedBytes uint64
+       var totalFileParts, totalFileElements, totalFileBlocks, 
totalFilePartBytes, totalFilePartUncompressedBytes uint64
+       for _, p := range snp.parts {
+               if p.mp == nil {
+                       totalFileParts++
+                       totalFileElements += p.p.partMetadata.TotalCount
+                       totalFileBlocks += p.p.partMetadata.BlocksCount
+                       totalFilePartBytes += 
p.p.partMetadata.CompressedSizeBytes
+                       totalFilePartUncompressedBytes += 
p.p.partMetadata.UncompressedSizeBytes
+                       continue
+               }
+               totalMemPart++
+               totalMemElements += p.mp.partMetadata.TotalCount
+               totalMemBlocks += p.mp.partMetadata.BlocksCount
+               totalMemPartBytes += p.mp.partMetadata.CompressedSizeBytes
+               totalMemPartUncompressedBytes += 
p.mp.partMetadata.UncompressedSizeBytes
+       }
+       metrics.totalMemParts.Set(float64(totalMemPart), 
tst.p.ShardLabelValues()...)
+       metrics.totalMemElements.Set(float64(totalMemElements), 
tst.p.ShardLabelValues()...)
+       metrics.totalMemBlocks.Set(float64(totalMemBlocks), 
tst.p.ShardLabelValues()...)
+       metrics.totalMemPartBytes.Set(float64(totalMemPartBytes), 
tst.p.ShardLabelValues()...)
+       
metrics.totalMemPartUncompressedBytes.Set(float64(totalMemPartUncompressedBytes),
 tst.p.ShardLabelValues()...)
+       metrics.totalFileParts.Set(float64(totalFileParts), 
tst.p.ShardLabelValues()...)
+       metrics.totalFileElements.Set(float64(totalFileElements), 
tst.p.ShardLabelValues()...)
+       metrics.totalFileBlocks.Set(float64(totalFileBlocks), 
tst.p.ShardLabelValues()...)
+       metrics.totalFilePartBytes.Set(float64(totalFilePartBytes), 
tst.p.ShardLabelValues()...)
+       
metrics.totalFilePartUncompressedBytes.Set(float64(totalFilePartUncompressedBytes),
 tst.p.ShardLabelValues()...)
+       tst.index.collectMetrics(tst.p.SegLabelValues()...)
+}
+
+func (tst *tsTable) deleteMetrics() {
+       if tst.metrics == nil {
+               return
+       }
+       tst.metrics.tbMetrics.totalMemParts.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalMemElements.Delete(tst.p.ShardLabelValues()...)
+       tst.metrics.tbMetrics.totalMemBlocks.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalMemPartBytes.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalMemPartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+       tst.metrics.tbMetrics.totalFileParts.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalFileElements.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalFileBlocks.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalFilePartBytes.Delete(tst.p.ShardLabelValues()...)
+       
tst.metrics.tbMetrics.totalFilePartUncompressedBytes.Delete(tst.p.ShardLabelValues()...)
+       tst.metrics.indexMetrics.DeleteAll(tst.p.SegLabelValues()...)
+}
+
+type tbMetrics struct {
+       totalMemParts                 meter.Gauge
+       totalMemElements              meter.Gauge
+       totalMemBlocks                meter.Gauge
+       totalMemPartBytes             meter.Gauge
+       totalMemPartUncompressedBytes meter.Gauge
+
+       totalFileParts                 meter.Gauge
+       totalFileElements              meter.Gauge
+       totalFileBlocks                meter.Gauge
+       totalFilePartBytes             meter.Gauge
+       totalFilePartUncompressedBytes meter.Gauge
+}
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index 228e607c..b4ad9d47 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -23,7 +23,6 @@ import (
        "path/filepath"
        "sort"
        "sync/atomic"
-       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -183,7 +182,9 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path 
string) {
 }
 
 func uncompressedElementSizeBytes(index int, es *elements) uint64 {
-       n := uint64(len(time.RFC3339Nano))
+       // 8 bytes for timestamp
+       // 8 bytes for elementID
+       n := uint64(8 + 8)
        for i := range es.tagFamilies[index] {
                n += uint64(len(es.tagFamilies[index][i].tag))
                for j := range es.tagFamilies[index][i].values {
diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go
index 4b5d7685..9eea344d 100644
--- a/banyand/stream/query_test.go
+++ b/banyand/stream/query_test.go
@@ -339,7 +339,7 @@ func TestQueryResult(t *testing.T) {
                        t.Run("memory snapshot", func(t *testing.T) {
                                tmpPath, defFn := test.Space(require.New(t))
                                defer defFn()
-                               index, _ := newElementIndex(context.TODO(), 
tmpPath, 0)
+                               index, _ := newElementIndex(context.TODO(), 
tmpPath, 0, nil)
                                tst := &tsTable{
                                        index:         index,
                                        loopCloser:    run.NewCloser(2),
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 8e6cd0e3..14997423 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -33,6 +33,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -169,21 +170,23 @@ func (tst *tsTable) mustReadSnapshot(snapshot uint64) 
[]uint64 {
 func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position,
        l *logger.Logger, _ timestamp.TimeRange, option option, m any,
 ) (*tsTable, error) {
-       index, err := newElementIndex(context.TODO(), rootPath, 
option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second))
-       if err != nil {
-               return nil, err
-       }
        tst := tsTable{
-               index:      index,
                fileSystem: fileSystem,
                root:       rootPath,
                option:     option,
                l:          l,
                p:          p,
        }
+       var indexMetrics *inverted.Metrics
        if m != nil {
                tst.metrics = m.(*metrics)
+               indexMetrics = tst.metrics.indexMetrics
+       }
+       index, err := newElementIndex(context.TODO(), rootPath, 
option.elementIndexFlushTimeout.Nanoseconds()/int64(time.Second), indexMetrics)
+       if err != nil {
+               return nil, err
        }
+       tst.index = index
        tst.gc.init(&tst)
        ee := fileSystem.ReadDir(rootPath)
        if len(ee) == 0 {
@@ -255,6 +258,7 @@ func (tst *tsTable) Close() error {
        }
        tst.Lock()
        defer tst.Unlock()
+       tst.deleteMetrics()
        if tst.snapshot == nil {
                return tst.index.Close()
        }
@@ -277,6 +281,7 @@ func (tst *tsTable) mustAddElements(es *elements) {
        ind.applied = make(chan struct{})
        ind.memPart = newPartWrapper(mp, p)
        ind.memPart.p.partMetadata.ID = atomic.AddUint64(&tst.curPartID, 1)
+       startTime := time.Now()
        select {
        case tst.introductions <- ind:
        case <-tst.loopCloser.CloseNotify():
@@ -286,7 +291,9 @@ func (tst *tsTable) mustAddElements(es *elements) {
        case <-ind.applied:
        case <-tst.loopCloser.CloseNotify():
        }
-       tst.metrics.incTotalWritten(len(es.timestamps))
+       tst.incTotalWritten(len(es.timestamps))
+       tst.incTotalBatch(1)
+       tst.incTotalBatchIntroLatency(time.Since(startTime).Seconds())
 }
 
 type tstIter struct {
diff --git a/banyand/stream/tstable_test.go b/banyand/stream/tstable_test.go
index 054fa288..c8759b9e 100644
--- a/banyand/stream/tstable_test.go
+++ b/banyand/stream/tstable_test.go
@@ -91,7 +91,7 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        tmpPath, _ := test.Space(require.New(t))
-                       index, _ := newElementIndex(context.TODO(), tmpPath, 0)
+                       index, _ := newElementIndex(context.TODO(), tmpPath, 0, 
nil)
                        tst := &tsTable{
                                index:         index,
                                loopCloser:    run.NewCloser(2),
@@ -229,7 +229,7 @@ func Test_tstIter(t *testing.T) {
                for _, tt := range tests {
                        t.Run(tt.name, func(t *testing.T) {
                                tmpPath, defFn := test.Space(require.New(t))
-                               index, _ := newElementIndex(context.TODO(), 
tmpPath, 0)
+                               index, _ := newElementIndex(context.TODO(), 
tmpPath, 0, nil)
                                defer defFn()
                                tst := &tsTable{
                                        index:         index,
diff --git a/go.mod b/go.mod
index a045afcb..703d1ee0 100644
--- a/go.mod
+++ b/go.mod
@@ -86,6 +86,7 @@ require (
        github.com/google/pprof v0.0.0-20240424215950-a892ee059fd6 // indirect
        github.com/gorilla/websocket v1.5.1 // indirect
        github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 // indirect
+       github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
        github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
        github.com/hashicorp/hcl v1.0.0 // indirect
diff --git a/go.sum b/go.sum
index 9dcd2a8a..efbfc08b 100644
--- a/go.sum
+++ b/go.sum
@@ -155,6 +155,8 @@ github.com/gorilla/websocket v1.5.1 
h1:gmztn0JnHVt9JZquRuzLw3g4wouNVzKL15iLr/zn/
 github.com/gorilla/websocket v1.5.1/go.mod 
h1:x3kM2JMyaluk02fnUJpQuwD2dCS5NDG2ZHL0uE0tcaY=
 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 
h1:UH//fgunKIs4JdUbpDl1VZCDaL56wXCB/5+wF6uHfaI=
 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0/go.mod 
h1:g5qyo/la0ALbONm6Vbp88Yd8NsDy6rZz+RcrMPxvld8=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus v1.0.1 
h1:qnpSQwGEnkcRpTqNOIR6bJbR0gAorgP9CSALpRcKoAA=
+github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus 
v1.0.1/go.mod h1:lXGCsh6c22WGtjr+qGHj1otzZpV/1kwTMAqkwZsnWRU=
 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0 
h1:pRhl55Yx1eC7BZ1N+BBWwnKaMyD8uC+34TLdndZMAKk=
 github.com/grpc-ecosystem/go-grpc-middleware/v2 v2.1.0/go.mod 
h1:XKMd7iuf/RGPSMJ/U4HP0zS2Z9Fh8Ps9a+6X26m/tmI=
 github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 
h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 52d342bd..54ce8020 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -43,9 +43,9 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
-       pipeline := sub.NewServer()
        localPipeline := queue.Local()
        metricSvc := observability.NewMetricService(metaSvc, localPipeline, 
"data", nil)
+       pipeline := sub.NewServer(metricSvc)
        streamSvc, err := stream.NewService(ctx, metaSvc, pipeline, metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
@@ -65,8 +65,8 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
        units = append(units,
                metaSvc,
                localPipeline,
-               pipeline,
                metricSvc,
+               pipeline,
                measureSvc,
                streamSvc,
                q,
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 66f3da41..bbb10db3 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -48,9 +48,9 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        localPipeline := queue.Local()
        nodeSel := node.NewRoundRobinSelector(metaSvc)
        nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
-       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry)
-       profSvc := observability.NewProfService()
        metricSvc := observability.NewMetricService(metaSvc, pipeline, 
"liaison", nodeRegistry)
+       grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry, metricSvc)
+       profSvc := observability.NewProfService()
        httpServer := http.NewServer()
        dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
        if err != nil {
@@ -63,14 +63,12 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
                localPipeline,
                pipeline,
                nodeSel,
+               metricSvc,
                dQuery,
                grpcServer,
                httpServer,
                profSvc,
        )
-       if metricSvc != nil {
-               units = append(units, metricSvc)
-       }
        liaisonGroup := run.NewGroup("liaison")
        liaisonGroup.Register(units...)
        liaisonCmd := &cobra.Command{
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index 74834126..1d43927d 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -21,6 +21,7 @@ import (
        "context"
        "os"
 
+       grpcprom 
"github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
        "github.com/spf13/cobra"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -50,6 +51,9 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate stream service")
        }
+       var srvMetrics *grpcprom.ServerMetrics
+       srvMetrics.UnaryServerInterceptor()
+       srvMetrics.UnaryServerInterceptor()
        measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil, 
metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
@@ -58,7 +62,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
-       grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry())
+       grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc, 
grpc.NewLocalNodeRegistry(), metricSvc)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer()
 
diff --git a/pkg/index/index.go b/pkg/index/index.go
index a7da177b..08c20615 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -184,7 +184,7 @@ type Store interface {
        io.Closer
        Writer
        Searcher
-       SizeOnDisk() int64
+       CollectMetrics(...string)
 }
 
 // Series represents a series in an index.
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 866aacc7..a3915bae 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -78,14 +78,16 @@ var _ index.Store = (*store)(nil)
 // StoreOpts wraps options to create an inverted index repository.
 type StoreOpts struct {
        Logger       *logger.Logger
+       Metrics      *Metrics
        Path         string
        BatchWaitSec int64
 }
 
 type store struct {
-       writer *bluge.Writer
-       closer *run.Closer
-       l      *logger.Logger
+       writer  *bluge.Writer
+       closer  *run.Closer
+       l       *logger.Logger
+       metrics *Metrics
 }
 
 var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch")
@@ -161,9 +163,10 @@ func NewStore(opts StoreOpts) (index.SeriesStore, error) {
                return nil, err
        }
        s := &store{
-               writer: w,
-               l:      opts.Logger,
-               closer: run.NewCloser(1),
+               writer:  w,
+               l:       opts.Logger,
+               closer:  run.NewCloser(1),
+               metrics: opts.Metrics,
        }
        return s, nil
 }
@@ -325,11 +328,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts 
index.RangeOpts) (list posti
        return
 }
 
-func (s *store) SizeOnDisk() int64 {
-       _, bytes := s.writer.DirectoryStats()
-       return int64(bytes)
-}
-
 type blugeMatchIterator struct {
        delegated        search.DocumentMatchIterator
        err              error
diff --git a/pkg/index/inverted/metrics.go b/pkg/index/inverted/metrics.go
new file mode 100644
index 00000000..6816b90e
--- /dev/null
+++ b/pkg/index/inverted/metrics.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package inverted
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// Metrics is the metrics for the inverted index.
+type Metrics struct {
+       totalUpdates meter.Gauge
+       totalDeletes meter.Gauge
+       totalBatches meter.Gauge
+       totalErrors  meter.Gauge
+
+       totalAnalysisTime meter.Gauge
+       totalIndexTime    meter.Gauge
+
+       totalTermSearchersStarted  meter.Gauge
+       totalTermSearchersFinished meter.Gauge
+
+       totalMergeStarted  meter.Gauge
+       totalMergeFinished meter.Gauge
+       totalMergeLatency  meter.Gauge
+       totalMergeErrors   meter.Gauge
+
+       totalMemSegments  meter.Gauge
+       totalFileSegments meter.Gauge
+       curOnDiskBytes    meter.Gauge
+       curOnDiskFiles    meter.Gauge
+
+       totalDocCount meter.Gauge
+}
+
+// NewMetrics creates a new metrics for the inverted index.
+func NewMetrics(factory *observability.Factory, labelNames ...string) *Metrics 
{
+       return &Metrics{
+               totalUpdates: factory.NewGauge("inverted_index_total_updates", 
labelNames...),
+               totalDeletes: factory.NewGauge("inverted_index_total_deletes", 
labelNames...),
+               totalBatches: factory.NewGauge("inverted_index_total_batches", 
labelNames...),
+               totalErrors:  factory.NewGauge("inverted_index_total_errors", 
labelNames...),
+
+               totalAnalysisTime: 
factory.NewGauge("inverted_index_total_analysis_time", labelNames...),
+               totalIndexTime:    
factory.NewGauge("inverted_index_total_index_time", labelNames...),
+
+               totalTermSearchersStarted:  
factory.NewGauge("inverted_index_total_term_searchers_started", labelNames...),
+               totalTermSearchersFinished: 
factory.NewGauge("inverted_index_total_term_searchers_finished", labelNames...),
+
+               totalMergeStarted:  
factory.NewGauge("inverted_index_total_merge_started", append(labelNames, 
"type")...),
+               totalMergeFinished: 
factory.NewGauge("inverted_index_total_merge_finished", append(labelNames, 
"type")...),
+               totalMergeLatency:  
factory.NewGauge("inverted_index_total_merge_latency", append(labelNames, 
"type")...),
+               totalMergeErrors:   
factory.NewGauge("inverted_index_total_merge_errors", append(labelNames, 
"type")...),
+
+               totalMemSegments:  
factory.NewGauge("inverted_index_total_mem_segments", labelNames...),
+               totalFileSegments: 
factory.NewGauge("inverted_index_total_file_segments", labelNames...),
+               curOnDiskBytes:    
factory.NewGauge("inverted_index_cur_on_disk_bytes", labelNames...),
+               curOnDiskFiles:    
factory.NewGauge("inverted_index_cur_on_disk_files", labelNames...),
+
+               totalDocCount: 
factory.NewGauge("inverted_index_total_doc_count", labelNames...),
+       }
+}
+
+// DeleteAll deletes all metrics with the given label values.
+func (m *Metrics) DeleteAll(labelValues ...string) {
+       if m == nil {
+               return
+       }
+       m.totalUpdates.Delete(labelValues...)
+       m.totalDeletes.Delete(labelValues...)
+       m.totalBatches.Delete(labelValues...)
+       m.totalErrors.Delete(labelValues...)
+
+       m.totalAnalysisTime.Delete(labelValues...)
+       m.totalIndexTime.Delete(labelValues...)
+
+       m.totalTermSearchersStarted.Delete(labelValues...)
+       m.totalTermSearchersFinished.Delete(labelValues...)
+
+       m.totalMergeStarted.Delete(append(labelValues, "mem")...)
+       m.totalMergeFinished.Delete(append(labelValues, "mem")...)
+       m.totalMergeLatency.Delete(append(labelValues, "mem")...)
+       m.totalMergeErrors.Delete(append(labelValues, "mem")...)
+
+       m.totalMergeStarted.Delete(append(labelValues, "file")...)
+       m.totalMergeFinished.Delete(append(labelValues, "file")...)
+       m.totalMergeLatency.Delete(append(labelValues, "file")...)
+       m.totalMergeErrors.Delete(append(labelValues, "file")...)
+
+       m.totalMemSegments.Delete(labelValues...)
+       m.totalFileSegments.Delete(labelValues...)
+       m.curOnDiskBytes.Delete(labelValues...)
+       m.curOnDiskFiles.Delete(labelValues...)
+}
+
+func (s *store) CollectMetrics(labelValues ...string) {
+       if s.metrics == nil {
+               return
+       }
+       status := s.writer.Status()
+       s.metrics.totalUpdates.Set(float64(status.TotUpdates), labelValues...)
+       s.metrics.totalDeletes.Set(float64(status.TotDeletes), labelValues...)
+       s.metrics.totalBatches.Set(float64(status.TotBatches), labelValues...)
+       s.metrics.totalErrors.Set(float64(status.TotOnErrors), labelValues...)
+
+       s.metrics.totalAnalysisTime.Set(float64(status.TotAnalysisTime), 
labelValues...)
+       s.metrics.totalIndexTime.Set(float64(status.TotIndexTime), 
labelValues...)
+
+       
s.metrics.totalTermSearchersStarted.Set(float64(status.TotTermSearchersStarted),
 labelValues...)
+       
s.metrics.totalTermSearchersFinished.Set(float64(status.TotTermSearchersFinished),
 labelValues...)
+
+       s.metrics.totalMergeStarted.Set(float64(status.TotMemMergeZapBeg), 
append(labelValues, "mem")...)
+       s.metrics.totalMergeFinished.Set(float64(status.TotMemMergeZapEnd), 
append(labelValues, "mem")...)
+       s.metrics.totalMergeLatency.Set(float64(status.TotMemMergeZapTime), 
append(labelValues, "mem")...)
+       s.metrics.totalMergeErrors.Set(float64(status.TotMemMergeErr), 
append(labelValues, "mem")...)
+
+       s.metrics.totalMergeStarted.Set(float64(status.TotFileMergeZapBeg), 
append(labelValues, "file")...)
+       s.metrics.totalMergeFinished.Set(float64(status.TotFileMergeZapEnd), 
append(labelValues, "file")...)
+       s.metrics.totalMergeLatency.Set(float64(status.TotFileMergeZapTime), 
append(labelValues, "file")...)
+       
s.metrics.totalMergeErrors.Set(float64(status.TotFileMergeLoopErr+status.TotFileMergePlanErr+status.TotFileMergePlanTasksErr),
 append(labelValues, "file")...)
+
+       s.metrics.totalMemSegments.Set(float64(status.TotMemorySegmentsAtRoot), 
labelValues...)
+       s.metrics.totalFileSegments.Set(float64(status.TotFileSegmentsAtRoot), 
labelValues...)
+       s.metrics.curOnDiskBytes.Set(float64(status.CurOnDiskBytes), 
labelValues...)
+       s.metrics.curOnDiskFiles.Set(float64(status.CurOnDiskFiles), 
labelValues...)
+
+       r, err := s.writer.Reader()
+       if err != nil {
+               return
+       }
+       defer r.Close()
+       n, err := r.Count()
+       if err != nil {
+               return
+       }
+       s.metrics.totalDocCount.Set(float64(n), labelValues...)
+}
diff --git a/pkg/meter/meter.go b/pkg/meter/meter.go
index a76ab4c2..80f30ab6 100644
--- a/pkg/meter/meter.go
+++ b/pkg/meter/meter.go
@@ -80,3 +80,12 @@ type Histogram interface {
        Instrument
        Observe(value float64, labelValues ...string)
 }
+
+// ToLabelPairs converts the given label names and label values to a map of 
label names to label values.
+func ToLabelPairs(labelNames, labelValues []string) LabelPairs {
+       labelPairs := make(LabelPairs, len(labelNames))
+       for i := range labelNames {
+               labelPairs[labelNames[i]] = labelValues[i]
+       }
+       return labelPairs
+}

Reply via email to