This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 7e541d02 Scheduler metrics and Several Bugs (#519)
7e541d02 is described below
commit 7e541d0275cd776360a14c75044be4bdac872b11
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Aug 24 09:55:33 2024 +0800
Scheduler metrics and Several Bugs (#519)
---
CHANGES.md | 4 ++-
banyand/dquery/dquery.go | 19 +++++++---
banyand/internal/storage/metrics.go | 3 ++
banyand/internal/storage/segment.go | 3 +-
banyand/internal/storage/tsdb.go | 4 +++
banyand/liaison/grpc/metrics.go | 2 ++
banyand/liaison/grpc/server.go | 4 ++-
banyand/measure/block.go | 38 ++++++++++++++++++++
banyand/measure/merger_test.go | 2 +-
banyand/measure/metadata.go | 6 +++-
banyand/measure/query.go | 2 +-
banyand/observability/service.go | 69 +++++++++++++++++++++++++++++--------
banyand/stream/metadata.go | 6 +++-
pkg/cmdsetup/liaison.go | 2 +-
pkg/schema/cache.go | 8 +++++
pkg/schema/metrics.go | 39 +++++++++++++++++++++
pkg/timestamp/scheduler.go | 31 +++++++++++++++++
17 files changed, 216 insertions(+), 26 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 05910c3c..57d781eb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -23,7 +23,7 @@ Release Notes.
- Add the topN query trace.
- Introduce the round-robin selector to Liaison Node.
- Optimize query performance of series index.
-- Add storage metrics.
+- Add liaison, remote queue, storage(rotation), time-series tables, metadata
cache and scheduler metrics.
### Bugs
@@ -38,6 +38,8 @@ Release Notes.
- Fix a bug where a distributed query would return an empty result if the
"limit" was set much lower than the "offset".
- Fix duplicated measure data in a single part.
- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
+- Fix panic when removing a expired segment.
+- Fix panic when reading a disorder block of measure. This block's versions
are not sorted in descending order.
### Documentation
diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go
index d378febb..14e9e775 100644
--- a/banyand/dquery/dquery.go
+++ b/banyand/dquery/dquery.go
@@ -29,23 +29,31 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
"github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/schema"
)
const (
moduleName = "distributed-query"
)
-var _ run.Service = (*queryService)(nil)
+var (
+ _ run.Service = (*queryService)(nil)
+ distributedQueryScope =
observability.RootScope.SubScope("dquery")
+ streamScope =
distributedQueryScope.SubScope("stream")
+ measureScope =
distributedQueryScope.SubScope("measure")
+)
type queryService struct {
metaService metadata.Repo
pipeline queue.Server
+ omr observability.MetricsRegistry
log *logger.Logger
sqp *streamQueryProcessor
mqp *measureQueryProcessor
@@ -55,12 +63,13 @@ type queryService struct {
}
// NewService return a new query service.
-func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster
bus.Broadcaster,
+func NewService(metaService metadata.Repo, pipeline queue.Server, broadcaster
bus.Broadcaster, omr observability.MetricsRegistry,
) (run.Unit, error) {
svc := &queryService{
metaService: metaService,
closer: run.NewCloser(1),
pipeline: pipeline,
+ omr: omr,
}
svc.sqp = &streamQueryProcessor{
queryService: svc,
@@ -89,8 +98,10 @@ func (q *queryService) PreRun(ctx context.Context) error {
node := val.(common.Node)
q.nodeID = node.NodeID
q.log = logger.GetLogger(moduleName)
- q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log)
- q.mqp.measureService = measure.NewPortableRepository(q.metaService,
q.log)
+ q.sqp.streamService = stream.NewPortableRepository(q.metaService, q.log,
+ schema.NewMetrics(q.omr.With(streamScope)))
+ q.mqp.measureService = measure.NewPortableRepository(q.metaService,
q.log,
+ schema.NewMetrics(q.omr.With(measureScope)))
return multierr.Combine(
q.pipeline.Subscribe(data.TopicStreamQuery, q.sqp),
q.pipeline.Subscribe(data.TopicMeasureQuery, q.mqp),
diff --git a/banyand/internal/storage/metrics.go
b/banyand/internal/storage/metrics.go
index 54f9d81a..a7003360 100644
--- a/banyand/internal/storage/metrics.go
+++ b/banyand/internal/storage/metrics.go
@@ -35,6 +35,8 @@ type metrics struct {
totalRetentionHasData meter.Counter
totalRetentionErr meter.Counter
totalRetentionHasDataLatency meter.Counter
+
+ schedulerMetrics *observability.SchedulerMetrics
}
func newMetrics(factory *observability.Factory) *metrics {
@@ -52,6 +54,7 @@ func newMetrics(factory *observability.Factory) *metrics {
totalRetentionErr:
factory.NewCounter("total_retention_err"),
totalRetentionHasDataLatency:
factory.NewCounter("total_retention_has_data_latency"),
totalRetentionHasData:
factory.NewCounter("total_retention_has_data"),
+ schedulerMetrics:
observability.NewSchedulerMetrics(factory),
}
}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index d3e0e214..0f7d816b 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -391,9 +391,10 @@ func (sc *segmentController[T, O]) remove(deadline
time.Time) (hasSegment bool,
for _, s := range sc.segments() {
if s.Before(deadline) {
hasSegment = true
+ id := s.id
s.delete()
sc.Lock()
- sc.removeSeg(s.id)
+ sc.removeSeg(id)
sc.Unlock()
sc.l.Info().Stringer("segment", s).Msg("removed a
segment")
}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index d1539d18..f17dc7c2 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -167,6 +167,10 @@ func (d *database[T, O]) collect() {
refCount += atomic.LoadInt32(&s.refCount)
}
d.totalSegRefs.Set(float64(refCount))
+ metrics := d.scheduler.Metrics()
+ for job, m := range metrics {
+ d.metrics.schedulerMetrics.Collect(job, m)
+ }
}
type walkFn func(suffix string) error
diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go
index dee5a378..1321158f 100644
--- a/banyand/liaison/grpc/metrics.go
+++ b/banyand/liaison/grpc/metrics.go
@@ -26,6 +26,7 @@ type metrics struct {
totalStarted meter.Counter
totalFinished meter.Counter
totalErr meter.Counter
+ totalPanic meter.Counter
totalLatency meter.Counter
totalStreamStarted meter.Counter
@@ -49,6 +50,7 @@ func newMetrics(factory *observability.Factory) *metrics {
totalStarted: factory.NewCounter("total_started",
"group", "service", "method"),
totalFinished: factory.NewCounter("total_finished",
"group", "service", "method"),
totalErr: factory.NewCounter("total_err",
"group", "service", "method"),
+ totalPanic: factory.NewCounter("total_panic"),
totalLatency: factory.NewCounter("total_latency",
"group", "service", "method"),
totalStreamStarted:
factory.NewCounter("total_stream_started", "service", "method"),
totalStreamFinished:
factory.NewCounter("total_stream_finished", "service", "method"),
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 31efab86..af89eb96 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -80,6 +80,7 @@ type server struct {
streamSVC *streamService
*streamRegistryServer
*indexRuleBindingRegistryServer
+ metrics *metrics
keyFile string
certFile string
accessLogRootPath string
@@ -157,6 +158,7 @@ func (s *server) PreRun(_ context.Context) error {
}
}
metrics := newMetrics(s.omr.With(liaisonGrpcScope))
+ s.metrics = metrics
s.streamSVC.metrics = metrics
s.measureSVC.metrics = metrics
s.propertyServer.metrics = metrics
@@ -229,7 +231,7 @@ func (s *server) Serve() run.StopNotify {
}
grpcPanicRecoveryHandler := func(p any) (err error) {
s.log.Error().Interface("panic", p).Str("stack",
string(debug.Stack())).Msg("recovered from panic")
-
+ s.metrics.totalPanic.Inc(1)
return status.Errorf(codes.Internal, "%s", p)
}
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index ae5006e6..b95329a1 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -632,6 +632,44 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult,
storedIndexValue map[commo
}
}
+func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue
map[common.SeriesID]map[string]*modelv1.TagValue) {
+ r.SID = bc.bm.seriesID
+ r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx]
+ r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
+ var indexValue map[string]*modelv1.TagValue
+ if storedIndexValue != nil {
+ indexValue = storedIndexValue[r.SID]
+ }
+ for i := range r.TagFamilies {
+ tfName := r.TagFamilies[i].Name
+ var cf *columnFamily
+ for j := range r.TagFamilies[i].Tags {
+ tagName := r.TagFamilies[i].Tags[j].Name
+ if indexValue != nil && indexValue[tagName] != nil {
+
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
indexValue[tagName]
+ continue
+ }
+ if cf == nil {
+ for i := range bc.tagFamilies {
+ if bc.tagFamilies[i].name == tfName {
+ cf = &bc.tagFamilies[i]
+ break
+ }
+ }
+ }
+ for _, c := range cf.columns {
+ if c.name == tagName {
+
r.TagFamilies[i].Tags[j].Values[len(r.TagFamilies[i].Tags[j].Values)-1] =
mustDecodeTagValue(c.valueType, c.values[bc.idx])
+ break
+ }
+ }
+ }
+ }
+ for i, c := range bc.fields.columns {
+ r.Fields[i].Values[len(r.Fields[i].Values)-1] =
mustDecodeFieldValue(c.valueType, c.values[bc.idx])
+ }
+}
+
func (bc *blockCursor) loadData(tmpBlock *block) bool {
tmpBlock.reset()
cfm := make([]columnMetadata, 0, len(bc.fieldProjection))
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 22c804e0..b3983407 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -276,7 +276,7 @@ func Test_mergeParts(t *testing.T) {
},
{
name: "Test with multiple parts with same ts",
- dpsList: []*dataPoints{dpsTS1, dpsTS1, dpsTS1},
+ dpsList: []*dataPoints{dpsTS11, dpsTS1},
want: []blockMetadata{
{seriesID: 1, count: 1, uncompressedSizeBytes:
1676},
{seriesID: 2, count: 1, uncompressedSizeBytes:
55},
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 21b4076e..83dbbc02 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -39,6 +39,8 @@ import (
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
+var metadataScope = measureScope.SubScope("metadata")
+
// SchemaService allows querying schema information.
type SchemaService interface {
Query
@@ -58,6 +60,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
svc.metadata,
svc.l,
newSupplier(path, svc),
+ resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
sr.start()
@@ -65,7 +68,7 @@ func newSchemaRepo(path string, svc *service) *schemaRepo {
}
// NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger)
SchemaService {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics
*resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
@@ -73,6 +76,7 @@ func NewPortableRepository(metadata metadata.Repo, l
*logger.Logger) SchemaServi
metadata,
l,
newPortableSupplier(metadata, l),
+ metrics,
),
}
r.start()
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index c9ac2117..bf2fa1bf 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -575,7 +575,7 @@ func (qr *queryResult) merge(storedIndexValue
map[common.SeriesID]map[string]*mo
if len(result.Timestamps) > 0 &&
topBC.timestamps[topBC.idx] ==
result.Timestamps[len(result.Timestamps)-1] {
if topBC.versions[topBC.idx] > lastVersion {
- logger.Panicf("following parts version should
be less or equal to the previous one")
+ topBC.replace(result, storedIndexValue)
}
} else {
topBC.copyTo(result, storedIndexValue, tagProjection)
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index eb7bac61..593f9b04 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -32,6 +32,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/meter/native"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -45,6 +46,8 @@ const (
var (
_ run.Service = (*metricService)(nil)
_ run.Config = (*metricService)(nil)
+
+ obScope = RootScope.SubScope("observability")
)
// Service type for Metric Service.
@@ -65,20 +68,21 @@ func NewMetricService(metadata metadata.Repo, pipeline
queue.Client, nodeType st
}
type metricService struct {
- metadata metadata.Repo
- nodeSelector native.NodeSelector
- pipeline queue.Client
- scheduler *timestamp.Scheduler
- l *logger.Logger
- closer *run.Closer
- svr *http.Server
- nCollection *native.MetricCollection
- promReg *prometheus.Registry
- npf nativeProviderFactory
- listenAddr string
- nodeType string
- modes []string
- mutex sync.Mutex
+ metadata metadata.Repo
+ nodeSelector native.NodeSelector
+ pipeline queue.Client
+ svr *http.Server
+ l *logger.Logger
+ closer *run.Closer
+ scheduler *timestamp.Scheduler
+ nCollection *native.MetricCollection
+ promReg *prometheus.Registry
+ schedulerMetrics *SchedulerMetrics
+ npf nativeProviderFactory
+ listenAddr string
+ nodeType string
+ modes []string
+ mutex sync.Mutex
}
func (p *metricService) FlagSet() *run.FlagSet {
@@ -143,8 +147,13 @@ func (p *metricService) Serve() run.StopNotify {
p.initMetrics()
clock, _ := timestamp.GetClock(context.TODO())
p.scheduler = timestamp.NewScheduler(p.l, clock)
+ p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope))
err := p.scheduler.Register("metrics-collector", cron.Descriptor,
"@every 15s", func(_ time.Time, _ *logger.Logger) bool {
MetricsCollector.collect()
+ metrics := p.scheduler.Metrics()
+ for job, m := range metrics {
+ p.schedulerMetrics.Collect(job, m)
+ }
return true
})
if err != nil {
@@ -207,3 +216,35 @@ func containsMode(modes []string, mode string) bool {
}
return false
}
+
+// SchedulerMetrics is the metrics for scheduler.
+type SchedulerMetrics struct {
+ totalJobsStarted meter.Gauge
+ totalJobsFinished meter.Gauge
+ totalTasksStarted meter.Gauge
+ totalTasksFinished meter.Gauge
+ totalTasksPanic meter.Gauge
+ totalTaskLatency meter.Gauge
+}
+
+// NewSchedulerMetrics creates a new scheduler metrics.
+func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics {
+ return &SchedulerMetrics{
+ totalJobsStarted: factory.NewGauge("scheduler_jobs_started",
"job"),
+ totalJobsFinished: factory.NewGauge("scheduler_jobs_finished",
"job"),
+ totalTasksStarted: factory.NewGauge("scheduler_tasks_started",
"job"),
+ totalTasksFinished:
factory.NewGauge("scheduler_tasks_finished", "job"),
+ totalTasksPanic: factory.NewGauge("scheduler_tasks_panic",
"job"),
+ totalTaskLatency: factory.NewGauge("scheduler_task_latency",
"job"),
+ }
+}
+
+// Collect collects the scheduler metrics.
+func (sm *SchedulerMetrics) Collect(job string, m *timestamp.SchedulerMetrics)
{
+ sm.totalJobsStarted.Set(float64(m.TotalJobsStarted.Load()), job)
+ sm.totalJobsFinished.Set(float64(m.TotalJobsFinished.Load()), job)
+ sm.totalTasksStarted.Set(float64(m.TotalTasksStarted.Load()), job)
+ sm.totalTasksFinished.Set(float64(m.TotalTasksFinished.Load()), job)
+ sm.totalTasksPanic.Set(float64(m.TotalTasksPanic.Load()), job)
+
sm.totalTaskLatency.Set(float64(m.TotalTaskLatencyInNanoseconds.Load())/float64(time.Second),
job)
+}
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 58300ae8..1103b3d5 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -40,6 +40,8 @@ import (
resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
)
+var metadataScope = streamScope.SubScope("metadata")
+
// SchemaService allows querying schema information.
type SchemaService interface {
Query
@@ -59,6 +61,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo {
svc.metadata,
svc.l,
newSupplier(path, svc),
+ resourceSchema.NewMetrics(svc.omr.With(metadataScope)),
),
}
sr.start()
@@ -66,7 +69,7 @@ func newSchemaRepo(path string, svc *service) schemaRepo {
}
// NewPortableRepository creates a new portable repository.
-func NewPortableRepository(metadata metadata.Repo, l *logger.Logger)
SchemaService {
+func NewPortableRepository(metadata metadata.Repo, l *logger.Logger, metrics
*resourceSchema.Metrics) SchemaService {
r := &schemaRepo{
l: l,
metadata: metadata,
@@ -74,6 +77,7 @@ func NewPortableRepository(metadata metadata.Repo, l
*logger.Logger) SchemaServi
metadata,
l,
newPortableSupplier(metadata, l),
+ metrics,
),
}
r.start()
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index bbb10db3..40ea283e 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -52,7 +52,7 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry, metricSvc)
profSvc := observability.NewProfService()
httpServer := http.NewServer()
- dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
+ dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline,
metricSvc)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate distributed query
service")
}
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 5c0ff1f5..9abe21ec 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -108,6 +108,7 @@ type schemaRepo struct {
l *logger.Logger
closer *run.ChannelCloser
eventCh chan MetadataEvent
+ metrics *Metrics
groupMap sync.Map
resourceMap sync.Map
workerNum int
@@ -136,6 +137,7 @@ func NewRepository(
metadata metadata.Repo,
l *logger.Logger,
resourceSupplier ResourceSupplier,
+ metrics *Metrics,
) Repository {
workNum := getWorkerNum()
return &schemaRepo{
@@ -146,6 +148,7 @@ func NewRepository(
eventCh: make(chan MetadataEvent, workNum),
workerNum: workNum,
closer: run.NewChannelCloser(),
+ metrics: metrics,
}
}
@@ -154,6 +157,7 @@ func NewPortableRepository(
metadata metadata.Repo,
l *logger.Logger,
supplier ResourceSchemaSupplier,
+ metrics *Metrics,
) Repository {
workNum := getWorkerNum()
return &schemaRepo{
@@ -163,6 +167,7 @@ func NewPortableRepository(
eventCh: make(chan MetadataEvent, workNum),
workerNum: workNum,
closer: run.NewChannelCloser(),
+ metrics: metrics,
}
}
@@ -177,6 +182,7 @@ func (sr *schemaRepo) Watcher() {
if err := recover(); err != nil {
sr.l.Warn().Interface("err",
err).Msg("watching the events")
}
+ sr.metrics.totalPanics.Inc(1)
}()
for {
select {
@@ -224,8 +230,10 @@ func (sr *schemaRepo) Watcher() {
default:
}
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event.
retry...")
+ sr.metrics.totalErrs.Inc(1)
go func() {
sr.SendMetadataEvent(evt)
+
sr.metrics.totalRetries.Inc(1)
}()
}
case <-sr.closer.CloseNotify():
diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go
new file mode 100644
index 00000000..3ada77d5
--- /dev/null
+++ b/pkg/schema/metrics.go
@@ -0,0 +1,39 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+ "github.com/apache/skywalking-banyandb/banyand/observability"
+ "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// Metrics is a collection of metrics.
+type Metrics struct {
+ totalErrs meter.Counter
+ totalRetries meter.Counter
+ totalPanics meter.Counter
+}
+
+// NewMetrics creates a new Metrics.
+func NewMetrics(factory *observability.Factory) *Metrics {
+ return &Metrics{
+ totalErrs: factory.NewCounter("total_err"),
+ totalRetries: factory.NewCounter("total_retries"),
+ totalPanics: factory.NewCounter("total_panics"),
+ }
+}
diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go
index effadf37..1c2394f1 100644
--- a/pkg/timestamp/scheduler.go
+++ b/pkg/timestamp/scheduler.go
@@ -20,6 +20,7 @@ package timestamp
import (
"runtime/debug"
"sync"
+ "sync/atomic"
"time"
"github.com/benbjohnson/clock"
@@ -141,12 +142,24 @@ func (s *Scheduler) Close() {
}
}
+// Metrics returns the metrics of all registered tasks.
+func (s *Scheduler) Metrics() map[string]*SchedulerMetrics {
+ s.RLock()
+ defer s.RUnlock()
+ m := make(map[string]*SchedulerMetrics, len(s.tasks))
+ for k, t := range s.tasks {
+ m[k] = t.metrics
+ }
+ return m
+}
+
type task struct {
clock Clock
schedule cron.Schedule
closer *run.Closer
l *logger.Logger
action SchedulerAction
+ metrics *SchedulerMetrics
name string
}
@@ -158,6 +171,7 @@ func newTask(l *logger.Logger, name string, clock
clock.Clock, schedule cron.Sch
schedule: schedule,
action: action,
closer: run.NewCloser(1),
+ metrics: &SchedulerMetrics{},
}
}
@@ -165,6 +179,8 @@ func (t *task) run() {
defer t.closer.Done()
now := t.clock.Now()
t.l.Info().Str("name", t.name).Time("now", now).Msg("start")
+ t.metrics.TotalJobsStarted.Add(1)
+ defer t.metrics.TotalJobsFinished.Add(1)
for {
next := t.schedule.Next(now)
d := next.Sub(now)
@@ -178,10 +194,15 @@ func (t *task) run() {
e.Str("name", t.name).Time("now",
now).Msg("wake")
}
if !func() (ret bool) {
+ t.metrics.TotalTasksStarted.Add(1)
+ start := time.Now()
defer func() {
+ t.metrics.TotalTasksFinished.Add(1)
+
t.metrics.TotalTaskLatencyInNanoseconds.Add(time.Since(start).Nanoseconds())
if r := recover(); r != nil {
t.l.Error().Str("name",
t.name).Interface("panic", r).Str("stack", string(debug.Stack())).Msg("panic")
ret = true
+ t.metrics.TotalTasksPanic.Add(1)
}
}()
return t.action(now, t.l)
@@ -200,3 +221,13 @@ func (t *task) run() {
func (t *task) close() {
t.closer.CloseThenWait()
}
+
+// SchedulerMetrics collects the metrics of a Scheduler.
+type SchedulerMetrics struct {
+ TotalJobsStarted atomic.Uint64
+ TotalJobsFinished atomic.Uint64
+ TotalTasksStarted atomic.Uint64
+ TotalTasksFinished atomic.Uint64
+ TotalTasksPanic atomic.Uint64
+ TotalTaskLatencyInNanoseconds atomic.Int64
+}