This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 3a81f77b [GsoC][BanyanDB] Self-Observability: Write Metric Data to
Measure In Liaison and Data mode (#475)
3a81f77b is described below
commit 3a81f77b47c13b9715fefbe09f6f99d3575c050f
Author: Sylvie-Wxr <[email protected]>
AuthorDate: Wed Jun 26 17:51:54 2024 -0700
[GsoC][BanyanDB] Self-Observability: Write Metric Data to Measure In
Liaison and Data mode (#475)
* add new local pipeline to units
* add nodeSelector for liaison node
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/measure/measure_suite_test.go | 2 +-
banyand/measure/service.go | 32 ++++++++++++++++--------
banyand/observability/meter_native.go | 4 +--
banyand/observability/service.go | 47 ++++++++++++++++++++++++-----------
pkg/cmdsetup/data.go | 7 ++++--
pkg/cmdsetup/liaison.go | 5 ++--
pkg/cmdsetup/standalone.go | 4 +--
pkg/meter/native/collection.go | 26 +++++++++++++++----
pkg/meter/native/instruments.go | 2 +-
pkg/meter/native/provider.go | 36 ++++++++++++++++++++-------
pkg/meter/native/vec.go | 8 +++---
11 files changed, 120 insertions(+), 53 deletions(-)
diff --git a/banyand/measure/measure_suite_test.go
b/banyand/measure/measure_suite_test.go
index 8765c2b2..c7e8b224 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -77,7 +77,7 @@ func setUp() (*services, func()) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// Init Measure Service
- measureService, err := measure.NewService(context.TODO(),
metadataService, pipeline)
+ measureService, err := measure.NewService(context.TODO(),
metadataService, pipeline, nil)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
preloadMeasureSvc := &preloadMeasureService{metaSvc: metadataService}
var flags []string
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 7b7ab71c..85aeb1ed 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -53,14 +53,15 @@ type Service interface {
var _ Service = (*service)(nil)
type service struct {
- schemaRepo *schemaRepo
- writeListener bus.MessageListener
- metadata metadata.Repo
- pipeline queue.Server
- localPipeline queue.Queue
- option option
- l *logger.Logger
- root string
+ schemaRepo *schemaRepo
+ writeListener bus.MessageListener
+ metadata metadata.Repo
+ pipeline queue.Server
+ localPipeline queue.Queue
+ metricPipeline queue.Server
+ option option
+ l *logger.Logger
+ root string
}
func (s *service) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -108,6 +109,14 @@ func (s *service) PreRun(_ context.Context) error {
// run a serial watcher
s.writeListener = setUpWriteCallback(s.l, s.schemaRepo)
+ // only subscribe metricPipeline for data node
+ if s.metricPipeline != nil {
+ err := s.metricPipeline.Subscribe(data.TopicMeasureWrite,
s.writeListener)
+ if err != nil {
+ s.l.Err(err).Msgf("Fail to subscribe metricPipeline,
%v", err)
+ return err
+ }
+ }
err := s.pipeline.Subscribe(data.TopicMeasureWrite, s.writeListener)
if err != nil {
return err
@@ -125,9 +134,10 @@ func (s *service) GracefulStop() {
}
// NewService returns a new service.
-func NewService(_ context.Context, metadata metadata.Repo, pipeline
queue.Server) (Service, error) {
+func NewService(_ context.Context, metadata metadata.Repo, pipeline
queue.Server, metricPipeline queue.Server) (Service, error) {
return &service{
- metadata: metadata,
- pipeline: pipeline,
+ metadata: metadata,
+ pipeline: pipeline,
+ metricPipeline: metricPipeline,
}, nil
}
diff --git a/banyand/observability/meter_native.go
b/banyand/observability/meter_native.go
index 0f86da94..e239128d 100644
--- a/banyand/observability/meter_native.go
+++ b/banyand/observability/meter_native.go
@@ -35,8 +35,8 @@ var (
)
// NewMeterProvider returns a meter.Provider based on the given scope.
-func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo)
meter.Provider {
- return native.NewProvider(ctx, SystemScope, metadata)
+func newNativeMeterProvider(ctx context.Context, metadata metadata.Repo,
nodeInfo native.NodeInfo) meter.Provider {
+ return native.NewProvider(ctx, SystemScope, metadata, nodeInfo)
}
// MetricsServerInterceptor returns a grpc.UnaryServerInterceptor and a
grpc.StreamServerInterceptor.
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index a0b8baf2..8e41c5ed 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -19,6 +19,7 @@ package observability
import (
"context"
+ "errors"
"net/http"
"sync"
"time"
@@ -26,6 +27,7 @@ import (
"github.com/robfig/cron/v3"
"google.golang.org/grpc"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -54,24 +56,28 @@ type Service interface {
}
// NewMetricService returns a metric service.
-func NewMetricService(metadata metadata.Repo, pipeline queue.Client) Service {
+func NewMetricService(metadata metadata.Repo, pipeline queue.Client, nodeType
string, nodeSelector native.NodeSelector) Service {
return &metricService{
- closer: run.NewCloser(1),
- metadata: metadata,
- pipeline: pipeline,
+ closer: run.NewCloser(1),
+ metadata: metadata,
+ pipeline: pipeline,
+ nodeType: nodeType,
+ nodeSelector: nodeSelector,
}
}
type metricService struct {
- l *logger.Logger
- svr *http.Server
- closer *run.Closer
- scheduler *timestamp.Scheduler
- metadata metadata.Repo
- pipeline queue.Client
- listenAddr string
- modes []string
- mutex sync.Mutex
+ l *logger.Logger
+ svr *http.Server
+ closer *run.Closer
+ scheduler *timestamp.Scheduler
+ metadata metadata.Repo
+ pipeline queue.Client
+ nodeSelector native.NodeSelector
+ listenAddr string
+ nodeType string
+ modes []string
+ mutex sync.Mutex
}
func (p *metricService) FlagSet() *run.FlagSet {
@@ -106,8 +112,19 @@ func (p *metricService) PreRun(ctx context.Context) error {
MetricsServerInterceptor = promMetricsServerInterceptor
}
if containsMode(p.modes, flagNativeMode) {
- NativeMetricCollection = native.NewMetricsCollection(p.pipeline)
- NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata)
+ NativeMetricCollection =
native.NewMetricsCollection(p.pipeline, p.nodeSelector)
+ val := ctx.Value(common.ContextNodeKey)
+ if val == nil {
+ return errors.New("metric service native mode, node id
is empty")
+ }
+ node := val.(common.Node)
+ nodeInfo := native.NodeInfo{
+ Type: p.nodeType,
+ NodeID: node.NodeID,
+ GrpcAddress: node.GrpcAddress,
+ HTTPAddress: node.HTTPAddress,
+ }
+ NativeMeterProvider = newNativeMeterProvider(ctx, p.metadata,
nodeInfo)
}
initMetrics(p.modes)
return nil
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index 991804ab..21ab9b4a 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/query"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/banyand/queue/sub"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/logger"
@@ -43,11 +44,12 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
l.Fatal().Err(err).Msg("failed to initiate metadata service")
}
pipeline := sub.NewServer()
+ localPipeline := queue.Local()
streamSvc, err := stream.NewService(ctx, metaSvc, pipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
- measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
+ measureSvc, err := measure.NewService(ctx, metaSvc, pipeline,
localPipeline)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
@@ -57,12 +59,13 @@ func newDataCmd(runners ...run.Unit) *cobra.Command {
l.Fatal().Err(err).Msg("failed to initiate query processor")
}
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc, nil)
+ metricSvc := observability.NewMetricService(metaSvc, localPipeline,
"data", nil)
var units []run.Unit
units = append(units, runners...)
units = append(units,
metaSvc,
+ localPipeline,
pipeline,
measureSvc,
streamSvc,
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index 152ba7a0..a7112e94 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -50,9 +50,10 @@ func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate required node
selector")
}
- grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
grpc.NewClusterNodeRegistry(pipeline, nodeSel))
+ nodeRegistry := grpc.NewClusterNodeRegistry(pipeline, nodeSel)
+ grpcServer := grpc.NewServer(ctx, pipeline, localPipeline, metaSvc,
nodeRegistry)
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc, nil)
+ metricSvc := observability.NewMetricService(metaSvc, pipeline,
"liaison", nodeRegistry)
httpServer := http.NewServer()
dQuery, err := dquery.NewService(metaSvc, localPipeline, pipeline)
if err != nil {
diff --git a/pkg/cmdsetup/standalone.go b/pkg/cmdsetup/standalone.go
index b98645d1..31b6daa7 100644
--- a/pkg/cmdsetup/standalone.go
+++ b/pkg/cmdsetup/standalone.go
@@ -49,7 +49,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate stream service")
}
- measureSvc, err := measure.NewService(ctx, metaSvc, pipeline)
+ measureSvc, err := measure.NewService(ctx, metaSvc, pipeline, nil)
if err != nil {
l.Fatal().Err(err).Msg("failed to initiate measure service")
}
@@ -59,7 +59,7 @@ func newStandaloneCmd(runners ...run.Unit) *cobra.Command {
}
grpcServer := grpc.NewServer(ctx, pipeline, pipeline, metaSvc,
grpc.NewLocalNodeRegistry())
profSvc := observability.NewProfService()
- metricSvc := observability.NewMetricService(metaSvc, pipeline)
+ metricSvc := observability.NewMetricService(metaSvc, pipeline,
"standalone", nil)
httpServer := http.NewServer()
var units []run.Unit
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index 82630fa9..9904e11a 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -31,20 +31,27 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
)
+// NodeSelector has Locate method to select a nodeId.
+type NodeSelector interface {
+ Locate(group, name string, shardID uint32) (string, error)
+}
+
type collector interface {
Collect() (string, []metricWithLabelValues)
}
// MetricCollection contains all the native implementations of metrics.
type MetricCollection struct {
- pipeline queue.Client
- collectors []collector
+ pipeline queue.Client
+ nodeSelector NodeSelector
+ collectors []collector
}
// NewMetricsCollection creates a new MetricCollection.
-func NewMetricsCollection(pipeline queue.Client) MetricCollection {
+func NewMetricsCollection(pipeline queue.Client, nodeSelector NodeSelector)
MetricCollection {
return MetricCollection{
- pipeline: pipeline,
+ pipeline: pipeline,
+ nodeSelector: nodeSelector,
}
}
@@ -65,7 +72,16 @@ func (m *MetricCollection) FlushMetrics() {
name, metrics := collector.Collect()
for _, metric := range metrics {
iwr := m.buildIWR(name, metric)
- messages = append(messages,
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), "", iwr))
+ nodeID := ""
+ var err error
+ // only liaison node has a non-nil nodeSelector
+ if m.nodeSelector != nil {
+ nodeID, err =
m.nodeSelector.Locate(iwr.GetRequest().GetMetadata().GetGroup(),
iwr.GetRequest().GetMetadata().GetName(), uint32(0))
+ if err != nil {
+ log.Error().Err(err).Msg("Failed to
locate nodeID")
+ }
+ }
+ messages = append(messages,
bus.NewBatchMessageWithNode(bus.MessageID(time.Now().UnixNano()), nodeID, iwr))
}
}
_, err := publisher.Publish(data.TopicMeasureWrite, messages...)
diff --git a/pkg/meter/native/instruments.go b/pkg/meter/native/instruments.go
index d281a9a4..fcbee215 100644
--- a/pkg/meter/native/instruments.go
+++ b/pkg/meter/native/instruments.go
@@ -37,7 +37,7 @@ func (g *Gauge) Add(delta float64, labelValues ...string) {
func (g *Gauge) Set(value float64, labelValues ...string) {
g.mutex.Lock()
defer g.mutex.Unlock()
- tagValues := buildTagValues(g.scope, labelValues...)
+ tagValues := buildTagValues(g.nodeInfo, g.scope, labelValues...)
hash := seriesHash(tagValues)
key := string(hash)
g.metrics[key] = metricWithLabelValues{
diff --git a/pkg/meter/native/provider.go b/pkg/meter/native/provider.go
index 97c2747f..1ebc173b 100644
--- a/pkg/meter/native/provider.go
+++ b/pkg/meter/native/provider.go
@@ -36,22 +36,34 @@ const (
NativeObservabilityGroupName = "_monitoring"
defaultTagFamily = "default"
defaultFieldName = "value"
- nodeNameTag = "node_name"
- standaloneNodeName = "standalone"
+ tagNodeType = "node_type"
+ tagNodeID = "node_id"
+ tagGRPCAddress = "grpc_address"
+ tagHTTPAddress = "http_address"
)
var log = logger.GetLogger("observability", "metrics", "system")
+// NodeInfo is the struct that contains information used in native
observability mode.
+type NodeInfo struct {
+ Type string
+ NodeID string
+ GrpcAddress string
+ HTTPAddress string
+}
+
type provider struct {
metadata metadata.Repo
scope meter.Scope
+ nodeInfo NodeInfo
}
// NewProvider returns a native metrics Provider.
-func NewProvider(ctx context.Context, scope meter.Scope, metadata
metadata.Repo) meter.Provider {
+func NewProvider(ctx context.Context, scope meter.Scope, metadata
metadata.Repo, nodeInfo NodeInfo) meter.Provider {
p := &provider{
scope: scope,
metadata: metadata,
+ nodeInfo: nodeInfo,
}
err := p.createNativeObservabilityGroup(ctx)
if err != nil && !errors.Is(err, schema.ErrGRPCAlreadyExists) {
@@ -67,7 +79,7 @@ func (p *provider) Counter(name string, labelNames ...string)
meter.Counter {
log.Error().Err(err).Msgf("Failure to createMeasure for Counter
%s, labels: %v", name, labelNames)
}
return &Counter{
- newMetricVec(name, p.scope),
+ newMetricVec(name, p.scope, p.nodeInfo),
}
}
@@ -78,14 +90,14 @@ func (p *provider) Gauge(name string, labelNames ...string)
meter.Gauge {
log.Error().Err(err).Msgf("Failure to createMeasure for Gauge
%s, labels: %v", name, labelNames)
}
return &Gauge{
- newMetricVec(name, p.scope),
+ newMetricVec(name, p.scope, p.nodeInfo),
}
}
// Histogram returns a native implementation of the Histogram interface.
func (p *provider) Histogram(name string, _ meter.Buckets, _ ...string)
meter.Histogram {
return &Histogram{
- newMetricVec(name, p.scope),
+ newMetricVec(name, p.scope, p.nodeInfo),
}
}
@@ -151,7 +163,10 @@ func buildTags(scope meter.Scope, labels []string)
([]*databasev1.TagSpec, []str
entityTags = append(entityTags, label)
}
}
- addTags(nodeNameTag)
+ addTags(tagNodeType)
+ addTags(tagNodeID)
+ addTags(tagGRPCAddress)
+ addTags(tagHTTPAddress)
for label := range scope.GetLabels() {
addTags(label)
}
@@ -159,7 +174,7 @@ func buildTags(scope meter.Scope, labels []string)
([]*databasev1.TagSpec, []str
return tags, entityTags
}
-func buildTagValues(scope meter.Scope, labelValues ...string)
[]*modelv1.TagValue {
+func buildTagValues(nodeInfo NodeInfo, scope meter.Scope, labelValues
...string) []*modelv1.TagValue {
var tagValues []*modelv1.TagValue
addTagValues := func(labelValues ...string) {
for _, value := range labelValues {
@@ -173,7 +188,10 @@ func buildTagValues(scope meter.Scope, labelValues
...string) []*modelv1.TagValu
tagValues = append(tagValues, tagValue)
}
}
- addTagValues(standaloneNodeName)
+ addTagValues(nodeInfo.Type)
+ addTagValues(nodeInfo.NodeID)
+ addTagValues(nodeInfo.GrpcAddress)
+ addTagValues(nodeInfo.HTTPAddress)
for _, labelValue := range scope.GetLabels() {
addTagValues(labelValue)
}
diff --git a/pkg/meter/native/vec.go b/pkg/meter/native/vec.go
index 3b93fcdb..3dd93f4f 100644
--- a/pkg/meter/native/vec.go
+++ b/pkg/meter/native/vec.go
@@ -38,14 +38,16 @@ type metricWithLabelValues struct {
}
type metricVec struct {
+ nodeInfo NodeInfo
scope meter.Scope
metrics map[string]metricWithLabelValues
measureName string
mutex sync.Mutex
}
-func newMetricVec(measureName string, scope meter.Scope) *metricVec {
+func newMetricVec(measureName string, scope meter.Scope, nodeInfo NodeInfo)
*metricVec {
n := &metricVec{
+ nodeInfo: nodeInfo,
scope: scope,
measureName: measureName,
metrics: map[string]metricWithLabelValues{},
@@ -56,7 +58,7 @@ func newMetricVec(measureName string, scope meter.Scope)
*metricVec {
func (n *metricVec) Inc(delta float64, labelValues ...string) {
n.mutex.Lock()
defer n.mutex.Unlock()
- tagValues := buildTagValues(n.scope, labelValues...)
+ tagValues := buildTagValues(n.nodeInfo, n.scope, labelValues...)
hash := seriesHash(tagValues)
key := string(hash)
v, exist := n.metrics[key]
@@ -73,7 +75,7 @@ func (n *metricVec) Inc(delta float64, labelValues ...string)
{
func (n *metricVec) Delete(labelValues ...string) bool {
n.mutex.Lock()
defer n.mutex.Unlock()
- key := string(seriesHash(buildTagValues(n.scope, labelValues...)))
+ key := string(seriesHash(buildTagValues(n.nodeInfo, n.scope,
labelValues...)))
delete(n.metrics, key)
return true
}