This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch scheduler-metrics in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b8ab6e49bf24bd0f6717196bf0a952a4c5ffb86d Author: Gao Hongtao <[email protected]> AuthorDate: Fri Aug 23 23:57:56 2024 +0000 Fix lint Signed-off-by: Gao Hongtao <[email protected]> --- banyand/dquery/dquery.go | 2 +- banyand/measure/block.go | 4 +--- banyand/measure/query.go | 2 +- banyand/observability/service.go | 9 ++++++--- pkg/schema/cache.go | 2 +- pkg/schema/metrics.go | 2 ++ pkg/timestamp/scheduler.go | 4 +++- 7 files changed, 15 insertions(+), 10 deletions(-) diff --git a/banyand/dquery/dquery.go b/banyand/dquery/dquery.go index f629d703..14e9e775 100644 --- a/banyand/dquery/dquery.go +++ b/banyand/dquery/dquery.go @@ -53,13 +53,13 @@ var ( type queryService struct { metaService metadata.Repo pipeline queue.Server + omr observability.MetricsRegistry log *logger.Logger sqp *streamQueryProcessor mqp *measureQueryProcessor tqp *topNQueryProcessor closer *run.Closer nodeID string - omr observability.MetricsRegistry } // NewService return a new query service. diff --git a/banyand/measure/block.go b/banyand/measure/block.go index f676a542..b95329a1 100644 --- a/banyand/measure/block.go +++ b/banyand/measure/block.go @@ -632,9 +632,7 @@ func (bc *blockCursor) copyTo(r *model.MeasureResult, storedIndexValue map[commo } } -func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, - tagProjection []model.TagProjection, -) { +func (bc *blockCursor) replace(r *model.MeasureResult, storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue) { r.SID = bc.bm.seriesID r.Timestamps[len(r.Timestamps)-1] = bc.timestamps[bc.idx] r.Versions[len(r.Versions)-1] = bc.versions[bc.idx] diff --git a/banyand/measure/query.go b/banyand/measure/query.go index e107b4b0..bf2fa1bf 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -575,7 +575,7 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo if len(result.Timestamps) > 0 && topBC.timestamps[topBC.idx] == result.Timestamps[len(result.Timestamps)-1] { if topBC.versions[topBC.idx] > lastVersion { - topBC.replace(result, storedIndexValue, tagProjection) + topBC.replace(result, storedIndexValue) } } else { topBC.copyTo(result, storedIndexValue, tagProjection) diff --git a/banyand/observability/service.go b/banyand/observability/service.go index 4692d707..c08b9f10 100644 --- a/banyand/observability/service.go +++ b/banyand/observability/service.go @@ -71,18 +71,18 @@ type metricService struct { metadata metadata.Repo nodeSelector native.NodeSelector pipeline queue.Client - scheduler *timestamp.Scheduler + svr *http.Server l *logger.Logger closer *run.Closer - svr *http.Server + scheduler *timestamp.Scheduler nCollection *native.MetricCollection promReg *prometheus.Registry + schedulerMetrics *SchedulerMetrics npf nativeProviderFactory listenAddr string nodeType string modes []string mutex sync.Mutex - schedulerMetrics *SchedulerMetrics } func (p *metricService) FlagSet() *run.FlagSet { @@ -217,6 +217,7 @@ func containsMode(modes []string, mode string) bool { return false } +// SchedulerMetrics is the metrics for scheduler. type SchedulerMetrics struct { totalJobsStarted meter.Gauge totalJobsFinished meter.Gauge @@ -226,6 +227,7 @@ type SchedulerMetrics struct { totalTaskLatency meter.Gauge } +// NewSchedulerMetrics creates a new scheduler metrics. func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics { return &SchedulerMetrics{ totalJobsStarted: factory.NewGauge("scheduler_jobs_started", "job"), @@ -237,6 +239,7 @@ func NewSchedulerMetrics(factory *Factory) *SchedulerMetrics { } } +// Collect collects the scheduler metrics. func (sm *SchedulerMetrics) Collect(job string, m *timestamp.SchedulerMetrics) { sm.totalJobsStarted.Set(float64(m.TotalJobsStarted.Load()), job) sm.totalJobsFinished.Set(float64(m.TotalJobsFinished.Load()), job) diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go index 37b0ed0c..9abe21ec 100644 --- a/pkg/schema/cache.go +++ b/pkg/schema/cache.go @@ -108,12 +108,12 @@ type schemaRepo struct { l *logger.Logger closer *run.ChannelCloser eventCh chan MetadataEvent + metrics *Metrics groupMap sync.Map resourceMap sync.Map workerNum int resourceMutex sync.Mutex groupMux sync.Mutex - metrics *Metrics } func (sr *schemaRepo) SendMetadataEvent(event MetadataEvent) { diff --git a/pkg/schema/metrics.go b/pkg/schema/metrics.go index e83fe684..3ada77d5 100644 --- a/pkg/schema/metrics.go +++ b/pkg/schema/metrics.go @@ -22,12 +22,14 @@ import ( "github.com/apache/skywalking-banyandb/pkg/meter" ) +// Metrics is a collection of metrics. type Metrics struct { totalErrs meter.Counter totalRetries meter.Counter totalPanics meter.Counter } +// NewMetrics creates a new Metrics. func NewMetrics(factory *observability.Factory) *Metrics { return &Metrics{ totalErrs: factory.NewCounter("total_err"), diff --git a/pkg/timestamp/scheduler.go b/pkg/timestamp/scheduler.go index 62f77b18..1c2394f1 100644 --- a/pkg/timestamp/scheduler.go +++ b/pkg/timestamp/scheduler.go @@ -142,6 +142,7 @@ func (s *Scheduler) Close() { } } +// Metrics returns the metrics of all registered tasks. func (s *Scheduler) Metrics() map[string]*SchedulerMetrics { s.RLock() defer s.RUnlock() @@ -158,8 +159,8 @@ type task struct { closer *run.Closer l *logger.Logger action SchedulerAction - name string metrics *SchedulerMetrics + name string } func newTask(l *logger.Logger, name string, clock clock.Clock, schedule cron.Schedule, action SchedulerAction) *task { @@ -221,6 +222,7 @@ func (t *task) close() { t.closer.CloseThenWait() } +// SchedulerMetrics collects the metrics of a Scheduler. type SchedulerMetrics struct { TotalJobsStarted atomic.Uint64 TotalJobsFinished atomic.Uint64
