This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 7e541d02 Scheduler metrics and Several Bugs (#519)
7e541d02 is described below

commit 7e541d0275cd776360a14c75044be4bdac872b11
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Aug 24 09:55:33 2024 +0800

    Scheduler metrics and Several Bugs (#519)
---
 CHANGES.md                          |  4 ++-
 banyand/dquery/dquery.go            | 19 +++++++---
 banyand/internal/storage/metrics.go |  3 ++
 banyand/internal/storage/segment.go |  3 +-
 banyand/internal/storage/tsdb.go    |  4 +++
 banyand/liaison/grpc/metrics.go     |  2 ++
 banyand/liaison/grpc/server.go      |  4 ++-
 banyand/measure/block.go            | 38 ++++++++++++++++++++
 banyand/measure/merger_test.go      |  2 +-
 banyand/measure/metadata.go         |  6 +++-
 banyand/measure/query.go            |  2 +-
 banyand/observability/service.go    | 69 +++++++++++++++++++++++++++++--------
 banyand/stream/metadata.go          |  6 +++-
 pkg/cmdsetup/liaison.go             |  2 +-
 pkg/schema/cache.go                 |  8 +++++
 pkg/schema/metrics.go               | 39 +++++++++++++++++++++
 pkg/timestamp/scheduler.go          | 31 +++++++++++++++++
 17 files changed, 216 insertions(+), 26 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 05910c3c..57d781eb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,7 +23,7 @@ Release Notes.
 - Add the topN query trace.
 - Introduce the round-robin selector to Liaison Node.
 - Optimize query performance of series index.
-- Add storage metrics.
+- Add liaison, remote queue, storage(rotation), time-series tables, metadata 
cache and scheduler metrics.
 
 ### Bugs
 
@@ -38,6 +38,8 @@ Release Notes.
 - Fix a bug where a distributed query would return an empty result if the 
"limit" was set much lower than the "offset".
 - Fix duplicated measure data in a single part.
 - Fix several "sync.Pool" leak issues by adding a tracker to the pool.
+- Fix panic when removing a expired segment.
+- Fix panic when reading a disorder block of measure. This block's versions 
are not sorted in descending order.
 
 ### Documentation
 
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index d378febb..14e9e775 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -29,23 +29,31 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
 const (
        moduleName = "distributed-query"
 )
 
-var _ run.Service = (*queryService)(nil)
+var (
+       _                     run.Service = (*queryService)(nil)
+       distributedQueryScope             = 
observability.RootScope.SubScope("dquery")
+       streamScope                       = 
distributedQueryScope.SubScope("stream")
+       measureScope                      = 
distributedQueryScope.SubScope("measure")
+)
 
 type queryService struct {
        metaService metadata.Repo
        pipeline    queue.Server
+       omr         observability.MetricsRegistry
        log         *logger.Logger
        sqp         *streamQueryProcessor
        mqp         *measureQueryProcessor
@@ -55,12 +63,13 @@ type queryService struct {
 }
 
 // NewService return a new query service.
-func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster 
bus.Broadcaster,
+func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster 
bus.Broadcaster, omr observability.MetricsRegistry,
 ) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
                closer:      run.NewCloser(1),
                pipeline:    pipeline,
+               omr:         omr,
        }
        svc.sqp = &streamQueryProcessor{
                queryService: svc,
@@ -89,8 +98,10 @@ func (q *queryService) PreRun(ctx context.Context) error {
        node := val.(common.Node)
        q.nodeID = node.NodeID
        q.log = logger.GetLogger(moduleName)
-       q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
-       q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log)
+       q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log,
+               schema.NewMetrics(q.omr.With(streamScope)))
+       q.mqp.measureService = measure.NewPortableRepository(q.metaService, 
q.log,
+               schema.NewMetrics(q.omr.With(measureScope)))
        return multierr.Combine(
                q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
                q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
diff --git a/banyand/internal/storage/metrics.go 
b/banyand/internal/storage/metrics.go
index 54f9d81a..a7003360 100644
--- a/banyand/internal/storage/metrics.go
+++ b/banyand/internal/storage/metrics.go
@@ -35,6 +35,8 @@ type metrics struct {
        totalRetentionHasData        meter.Counter
        totalRetentionErr            meter.Counter
        totalRetentionHasDataLatency meter.Counter
+
+       schedulerMetrics *observability.SchedulerMetrics
 }
 
 func newMetrics(factory *observability.Factory) *metrics {
@@ -52,6 +54,7 @@ func newMetrics(factory *observability.Factory) *metrics {
                totalRetentionErr:            
factory.NewCounter("total_retention_err"),
                totalRetentionHasDataLatency: 
factory.NewCounter("total_retention_has_data_latency"),
                totalRetentionHasData:        
factory.NewCounter("total_retention_has_data"),
+               schedulerMetrics:             
observability.NewSchedulerMetrics(factory),
        }
 }
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index d3e0e214..0f7d816b 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -391,9 +391,10 @@ func (sc *segmentController[T, O]) remove(deadline 
time.Time) (hasSegment bool,
        for _, s := range sc.segments() {
                if s.Before(deadline) {
                        hasSegment = true
+                       id := s.id
                        s.delete()
                        sc.Lock()
-                       sc.removeSeg(s.id)
+                       sc.removeSeg(id)
                        sc.Unlock()
                        sc.l.Info().Stringer("segment", s).Msg("removed a 
segment")
                }
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index d1539d18..f17dc7c2 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -167,6 +167,10 @@ func (d *database[T, O]) collect() {
                refCount += atomic.LoadInt32(&s.refCount)
        }
        d.totalSegRefs.Set(float64(refCount))
+       metrics := d.scheduler.Metrics()
+       for job, m := range metrics {
+               d.metrics.schedulerMetrics.Collect(job, m)
+       }
 }
 
 type walkFn func(suffix string) error
diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go
index dee5a378..1321158f 100644
--- a/banyand/liaison/grpc/metrics.go
+++ b/banyand/liaison/grpc/metrics.go
@@ -26,6 +26,7 @@ type metrics struct {
        totalStarted  meter.Counter
        totalFinished meter.Counter
        totalErr      meter.Counter
+       totalPanic    meter.Counter
        totalLatency  meter.Counter
 
        totalStreamStarted  meter.Counter
@@ -49,6 +50,7 @@ func newMetrics(factory *observability.Factory) *metrics {
                totalStarted:              factory.NewCounter("total_started", 
"group", "service", "method"),
                totalFinished:             factory.NewCounter("total_finished", 
"group", "service", "method"),
                totalErr:                  factory.NewCounter("total_err", 
"group", "service", "method"),
+               totalPanic:                factory.NewCounter("total_panic"),
                totalLatency:              factory.NewCounter("total_latency", 
"group", "service", "method"),
                totalStreamStarted:        
factory.NewCounter("total_stream_started", "service", "method"),
                totalStreamFinished:       
factory.NewCounter("total_stream_finished", "service", "method"),
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 31efab86..af89eb96 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -80,6 +80,7 @@ type server struct {
        streamSVC *streamService
        *streamRegistryServer
        *indexRuleBindingRegistryServer
+       metrics                  *metrics
        keyFile                  string
        certFile                 string
        accessLogRootPath        string
@@ -157,6 +158,7 @@ func (s *server) PreRun(_ context.Context) error {
                }
        }
        metrics := newMetrics(s.omr.With(liaisonGrpcScope))
+       s.metrics = metrics
        s.streamSVC.metrics = metrics
        s.measureSVC.metrics = metrics
        s.propertyServer.metrics = metrics
@@ -229,7 +231,7 @@ func (s *server) Serve() run.StopNotify {
        }
        grpcPanicRecoveryHandler := func(p any) (err error) {
                s.log.Error().Interface("panic", p).Str("stack", 
string(debug.Stack())).Msg("recovered from panic")
-
+               s.metrics.totalPanic.Inc(1)
                return status.Errorf(codes.Internal, "%s", p)
        }
 
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index ae5006e6..b95329a1 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -632,6 +632,44 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, 
storedIndexValue map[commo
        }
 }
 
+func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue 
map[common.SeriesID]map[string]*modelv1.TagValue) {
+       r.SID = bc.bm.seriesID
+       r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx]
+       r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
+       var indexValue map[string]*modelv1.TagValue
+       if storedIndexValue != nil {
+               indexValue = storedIndexValue[r.SID]
+       }
+       for i := range r.TagFamilies {
+               tfName := r.TagFamilies[i].Name
+               var cf *columnFamily
+               for j := range r.TagFamilies[i].Tags {
+                       tagName := r.TagFamilies[i].Tags[j].Name
+                       if indexValue != nil && indexValue[tagName] != nil {
+                               
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
indexValue[tagName]
+                               continue
+                       }
+                       if cf == nil {
+                               for i := range bc.tagFamilies {
+                                       if bc.tagFamilies[i].name == tfName {
+                                               cf = &bc.tagFamilies[i]
+                                               break
+                                       }
+                               }
+                       }
+                       for _, c := range cf.columns {
+                               if c.name == tagName {
+                                       
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] = 
mustDecodeTagValue(c.valueType, c.values[bc.idx])
+                                       break
+                               }
+                       }
+               }
+       }
+       for i, c := range bc.fields.columns {
+               r.Fields[i].Values[len(r.Fields[i].Values)-1] = 
mustDecodeFieldValue(c.valueType, c.values[bc.idx])
+       }
+}
+
 func (bc *blockCursor) loadData(tmpBlock *block) bool {
        tmpBlock.reset()
        cfm := make([]columnMetadata, 0, len(bc.fieldProjection))
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 22c804e0..b3983407 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -276,7 +276,7 @@ func Test_mergeParts(t *testing.T) {
                },
                {
                        name:    "Test with multiple parts with same ts",
-                       dpsList: []*dataPoints{dpsTS1, dpsTS1, dpsTS1},
+                       dpsList: []*dataPoints{dpsTS11, dpsTS1},
                        want: []blockMetadata{
                                {seriesID: 1, count: 1, uncompressedSizeBytes: 
1676},
                                {seriesID: 2, count: 1, uncompressedSizeBytes: 
55},
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 21b4076e..83dbbc02 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -39,6 +39,8 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
+var metadataScope = measureScope.SubScope("metadata")
+
 // SchemaService allows querying schema information.
 type SchemaService interface {
        Query
@@ -58,6 +60,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
                        svc.metadata,
                        svc.l,
                        newSupplier(path, svc),
+                       resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
                ),
        }
        sr.start()
@@ -65,7 +68,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
 }
 
 // NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) 
SchemaService {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics 
*resourceSchema.Metrics) SchemaService {
        r := &schemaRepo{
                l:        l,
                metadata: metadata,
@@ -73,6 +76,7 @@ func NewPortableRepository(metadata metadata.Repo, l 
*logger.Logger) SchemaServi
                        metadata,
                        l,
                        newPortableSupplier(metadata, l),
+                       metrics,
                ),
        }
        r.start()
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index c9ac2117..bf2fa1bf 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -575,7 +575,7 @@ func (qr *queryResult) merge(storedIndexValue 
map[common.SeriesID]map[string]*mo
                if len(result.Timestamps) > 0 &&
                        topBC.timestamps[topBC.idx] == 
result.Timestamps[len(result.Timestamps)-1] {
                        if topBC.versions[topBC.idx] > lastVersion {
-                               logger.Panicf("following parts version should 
be less or equal to the previous one")
+                               topBC.replace(result, storedIndexValue)
                        }
                } else {
                        topBC.copyTo(result, storedIndexValue, tagProjection)
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index eb7bac61..593f9b04 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/meter/native"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -45,6 +46,8 @@ const (
 var (
        _ run.Service = (*metricService)(nil)
        _ run.Config  = (*metricService)(nil)
+
+       obScope = RootScope.SubScope("observability")
 )
 
 // Service type for Metric Service.
@@ -65,20 +68,21 @@ func NewMetricService(metadata metadata.Repo, pipeline 
queue.Client, nodeType st
 }
 
 type metricService struct {
-       metadata     metadata.Repo
-       nodeSelector native.NodeSelector
-       pipeline     queue.Client
-       scheduler    *timestamp.Scheduler
-       l            *logger.Logger
-       closer       *run.Closer
-       svr          *http.Server
-       nCollection  *native.MetricCollection
-       promReg      *prometheus.Registry
-       npf          nativeProviderFactory
-       listenAddr   string
-       nodeType     string
-       modes        []string
-       mutex        sync.Mutex
+       metadata         metadata.Repo
+       nodeSelector     native.NodeSelector
+       pipeline         queue.Client
+       svr              *http.Server
+       l                *logger.Logger
+       closer           *run.Closer
+       scheduler        *timestamp.Scheduler
+       nCollection      *native.MetricCollection
+       promReg          *prometheus.Registry
+       schedulerMetrics *SchedulerMetrics
+       npf              nativeProviderFactory
+       listenAddr       string
+       nodeType         string
+       modes            []string
+       mutex            sync.Mutex
 }
 
 func (p *metricService) FlagSet() *run.FlagSet {
@@ -143,8 +147,13 @@ func (p *metricService) Serve() run.StopNotify {
        p.initMetrics()
        clock, _ := timestamp.GetClock(context.TODO())
        p.scheduler = timestamp.NewScheduler(p.l, clock)
+       p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope))
        err := p.scheduler.Register("metrics-collector", cron.Descriptor, 
"@every 15s", func(_ time.Time, _ *logger.Logger) bool {
                MetricsCollector.collect()
+               metrics := p.scheduler.Metrics()
+               for job, m := range metrics {
+                       p.schedulerMetrics.Collect(job, m)
+               }
                return true
        })
        if err != nil {
@@ -207,3 +216,35 @@ func containsMode(modes []string, mode string) bool {
        }
        return false
 }
+
+// SchedulerMetrics is the metrics for scheduler.
+type SchedulerMetrics struct {
+       totalJobsStarted   meter.Gauge
+       totalJobsFinished  meter.Gauge
+       totalTasksStarted  meter.Gauge
+       totalTasksFinished meter.Gauge
+       totalTasksPanic    meter.Gauge
+       totalTaskLatency   meter.Gauge
+}
+
+// NewSchedulerMetrics creates a new scheduler metrics.
+func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics {
+       return &SchedulerMetrics{
+               totalJobsStarted:   factory.NewGauge("scheduler_jobs_started", 
"job"),
+               totalJobsFinished:  factory.NewGauge("scheduler_jobs_finished", 
"job"),
+               totalTasksStarted:  factory.NewGauge("scheduler_tasks_started", 
"job"),
+               totalTasksFinished: 
factory.NewGauge("scheduler_tasks_finished", "job"),
+               totalTasksPanic:    factory.NewGauge("scheduler_tasks_panic", 
"job"),
+               totalTaskLatency:   factory.NewGauge("scheduler_task_latency", 
"job"),
+       }
+}
+
+// Collect collects the scheduler metrics.
+func (sm *SchedulerMetrics) Collect(job string, m *timestamp.SchedulerMetrics) 
{
+       sm.totalJobsStarted.Set(float64(m.TotalJobsStarted.Load()), job)
+       sm.totalJobsFinished.Set(float64(m.TotalJobsFinished.Load()), job)
+       sm.totalTasksStarted.Set(float64(m.TotalTasksStarted.Load()), job)
+       sm.totalTasksFinished.Set(float64(m.TotalTasksFinished.Load()), job)
+       sm.totalTasksPanic.Set(float64(m.TotalTasksPanic.Load()), job)
+       
sm.totalTaskLatency.Set(float64(m.TotalTaskLatencyInNanoseconds.Load())/float64(time.Second),
 job)
+}
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 58300ae8..1103b3d5 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -40,6 +40,8 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
+var metadataScope = streamScope.SubScope("metadata")
+
 // SchemaService allows querying schema information.
 type SchemaService interface {
        Query
@@ -59,6 +61,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo {
                        svc.metadata,
                        svc.l,
                        newSupplier(path, svc),
+                       resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
                ),
        }
        sr.start()
@@ -66,7 +69,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo {
 }
 
 // NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger) 
SchemaService {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics 
*resourceSchema.Metrics) SchemaService {
        r := &schemaRepo{
                l:        l,
                metadata: metadata,
@@ -74,6 +77,7 @@ func NewPortableRepository(metadata metadata.Repo, l 
*logger.Logger) SchemaServi
                        metadata,
                        l,
                        newPortableSupplier(metadata, l),
+                       metrics,
                ),
        }
        r.start()
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index bbb10db3..40ea283e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -52,7 +52,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc, 
nodeRegistry, metricSvc)
        profSvc := observability.NewProfService()
        httpServer := http.NewServer()
-       dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
+       dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline, 
metricSvc)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate distributed query 
service")
        }
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 5c0ff1f5..9abe21ec 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -108,6 +108,7 @@ type schemaRepo struct {
        l                      *logger.Logger
        closer                 *run.ChannelCloser
        eventCh                chan MetadataEvent
+       metrics                *Metrics
        groupMap               sync.Map
        resourceMap            sync.Map
        workerNum              int
@@ -136,6 +137,7 @@ func NewRepository(
        metadata metadata.Repo,
        l *logger.Logger,
        resourceSupplier ResourceSupplier,
+       metrics *Metrics,
 ) Repository {
        workNum := getWorkerNum()
        return &schemaRepo{
@@ -146,6 +148,7 @@ func NewRepository(
                eventCh:                make(chan MetadataEvent, workNum),
                workerNum:              workNum,
                closer:                 run.NewChannelCloser(),
+               metrics:                metrics,
        }
 }
 
@@ -154,6 +157,7 @@ func NewPortableRepository(
        metadata metadata.Repo,
        l *logger.Logger,
        supplier ResourceSchemaSupplier,
+       metrics *Metrics,
 ) Repository {
        workNum := getWorkerNum()
        return &schemaRepo{
@@ -163,6 +167,7 @@ func NewPortableRepository(
                eventCh:                make(chan MetadataEvent, workNum),
                workerNum:              workNum,
                closer:                 run.NewChannelCloser(),
+               metrics:                metrics,
        }
 }
 
@@ -177,6 +182,7 @@ func (sr *schemaRepo) Watcher() {
                                if err := recover(); err != nil {
                                        sr.l.Warn().Interface("err", 
err).Msg("watching the events")
                                }
+                               sr.metrics.totalPanics.Inc(1)
                        }()
                        for {
                                select {
@@ -224,8 +230,10 @@ func (sr *schemaRepo) Watcher() {
                                                default:
                                                }
                                                
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
+                                               sr.metrics.totalErrs.Inc(1)
                                                go func() {
                                                        
sr.SendMetadataEvent(evt)
+                                                       
sr.metrics.totalRetries.Inc(1)
                                                }()
                                        }
                                case <-sr.closer.CloseNotify():
diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go
new file mode 100644
index 00000000..3ada77d5
--- /dev/null
+++ b/pkg/schema/metrics.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// Metrics is a collection of metrics.
+type Metrics struct {
+       totalErrs    meter.Counter
+       totalRetries meter.Counter
+       totalPanics  meter.Counter
+}
+
+// NewMetrics creates a new Metrics.
+func NewMetrics(factory *observability.Factory) *Metrics {
+       return &Metrics{
+               totalErrs:    factory.NewCounter("total_err"),
+               totalRetries: factory.NewCounter("total_retries"),
+               totalPanics:  factory.NewCounter("total_panics"),
+       }
+}
diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go
index effadf37..1c2394f1 100644
--- a/pkg/timestamp/scheduler.go
+++ b/pkg/timestamp/scheduler.go
@@ -20,6 +20,7 @@ package timestamp
 import (
        "runtime/debug"
        "sync"
+       "sync/atomic"
        "time"
 
        "github.com/benbjohnson/clock"
@@ -141,12 +142,24 @@ func (s *Scheduler) Close() {
        }
 }
 
+// Metrics returns the metrics of all registered tasks.
+func (s *Scheduler) Metrics() map[string]*SchedulerMetrics {
+       s.RLock()
+       defer s.RUnlock()
+       m := make(map[string]*SchedulerMetrics, len(s.tasks))
+       for k, t := range s.tasks {
+               m[k] = t.metrics
+       }
+       return m
+}
+
 type task struct {
        clock    Clock
        schedule cron.Schedule
        closer   *run.Closer
        l        *logger.Logger
        action   SchedulerAction
+       metrics  *SchedulerMetrics
        name     string
 }
 
@@ -158,6 +171,7 @@ func newTask(l *logger.Logger, name string, clock 
clock.Clock, schedule cron.Sch
                schedule: schedule,
                action:   action,
                closer:   run.NewCloser(1),
+               metrics:  &SchedulerMetrics{},
        }
 }
 
@@ -165,6 +179,8 @@ func (t *task) run() {
        defer t.closer.Done()
        now := t.clock.Now()
        t.l.Info().Str("name", t.name).Time("now", now).Msg("start")
+       t.metrics.TotalJobsStarted.Add(1)
+       defer t.metrics.TotalJobsFinished.Add(1)
        for {
                next := t.schedule.Next(now)
                d := next.Sub(now)
@@ -178,10 +194,15 @@ func (t *task) run() {
                                e.Str("name", t.name).Time("now", 
now).Msg("wake")
                        }
                        if !func() (ret bool) {
+                               t.metrics.TotalTasksStarted.Add(1)
+                               start := time.Now()
                                defer func() {
+                                       t.metrics.TotalTasksFinished.Add(1)
+                                       
t.metrics.TotalTaskLatencyInNanoseconds.Add(time.Since(start).Nanoseconds())
                                        if r := recover(); r != nil {
                                                t.l.Error().Str("name", 
t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic")
                                                ret = true
+                                               t.metrics.TotalTasksPanic.Add(1)
                                        }
                                }()
                                return t.action(now, t.l)
@@ -200,3 +221,13 @@ func (t *task) run() {
 func (t *task) close() {
        t.closer.CloseThenWait()
 }
+
+// SchedulerMetrics collects the metrics of a Scheduler.
+type SchedulerMetrics struct {
+       TotalJobsStarted              atomic.Uint64
+       TotalJobsFinished             atomic.Uint64
+       TotalTasksStarted             atomic.Uint64
+       TotalTasksFinished            atomic.Uint64
+       TotalTasksPanic               atomic.Uint64
+       TotalTaskLatencyInNanoseconds atomic.Int64
+}

Reply via email to