This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new d2e2b68fd Fix write handler resetting accumulated groups on error
(#1004)
d2e2b68fd is described below
commit d2e2b68fd56a5c1413fe799de32e2114c14e7f2e
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 12 09:00:54 2026 +0800
Fix write handler resetting accumulated groups on error (#1004)
* Fix measure standalone write handler to prevent dropping events on error;
update write liaison handlers for better error handling. Add observability
integration tests for native metrics collection.
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
CHANGES.md | 1 +
banyand/measure/write_liaison.go | 12 +-
banyand/measure/write_standalone.go | 12 +-
banyand/observability/services/meter_native.go | 26 ++++-
banyand/observability/services/service.go | 50 ++++++---
banyand/stream/write_liaison.go | 12 +-
banyand/stream/write_standalone.go | 12 +-
banyand/trace/write_liaison.go | 12 +-
banyand/trace/write_standalone.go | 12 +-
pkg/meter/native/collection.go | 22 +++-
.../integration/standalone/observability/common.go | 124 +++++++++++++++++++++
.../standalone/observability/measure_helper.go | 112 +++++++++++++++++++
.../observability/native_metrics_test.go | 92 +++++++++++++++
.../standalone/observability/suite_test.go | 37 ++++++
14 files changed, 475 insertions(+), 61 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index cacf66f52..30b531489 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -41,6 +41,7 @@ Release Notes.
- Fix trace queries with range conditions on the same tag (e.g., duration)
combined with ORDER BY by deduplicating tag names when merging logical
expression branches.
- Fix sidx tag filter range check returning inverted skip decision and use
correct int64 encoding for block min/max.
- Ignore take snapshot when no data.
+- Fix measure standalone write handler resetting accumulated groups on error,
which dropped all successfully processed events in the batch.
### Document
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index ec5639f45..7d0247e63 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -105,15 +105,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
if req != nil && req.GetDataPointSpec() != nil {
spec = req.GetDataPointSpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).Msg("cannot handle write event")
- groups = make(map[string]*dataPointsInQueue)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).Msg("cannot handle write
event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
es := g.tables[j]
// Marshal series metadata for persistence in part
folder
diff --git a/banyand/measure/write_standalone.go
b/banyand/measure/write_standalone.go
index 530e990e6..c6a430bba 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -477,15 +477,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
if req != nil && req.GetDataPointSpec() != nil {
spec = req.GetDataPointSpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).RawJSON("written",
logger.Proto(writeEvent)).Msg("cannot handle write event")
- groups = make(map[string]*dataPointsInGroup)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).RawJSON("written",
logger.Proto(writeEvent)).Msg("cannot handle write event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
dps := g.tables[j]
if dps.tsTable != nil && dps.dataPoints != nil {
diff --git a/banyand/observability/services/meter_native.go
b/banyand/observability/services/meter_native.go
index 1c55220bb..dfd9d516a 100644
--- a/banyand/observability/services/meter_native.go
+++ b/banyand/observability/services/meter_native.go
@@ -20,6 +20,8 @@ package services
import (
"context"
"sync"
+ "sync/atomic"
+ "time"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/pkg/meter"
@@ -27,10 +29,11 @@ import (
)
type nativeProviderFactory struct {
- metadata metadata.Repo
- nodeInfo native.NodeInfo
- providers []meter.Provider
- mu sync.Mutex
+ metadata metadata.Repo
+ nodeInfo native.NodeInfo
+ providers []meter.Provider
+ serveStarted atomic.Bool
+ mu sync.Mutex
}
func (f *nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
@@ -38,14 +41,25 @@ func (f *nativeProviderFactory) provider(scope meter.Scope)
meter.Provider {
f.mu.Lock()
f.providers = append(f.providers, p)
f.mu.Unlock()
+ if f.serveStarted.Load() {
+ go func(prov meter.Provider) {
+ ctx, cancel :=
context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+ native.InitSchema(ctx, prov)
+ }(p)
+ }
return p
}
+func (f *nativeProviderFactory) setServeStarted() {
+ f.serveStarted.Store(true)
+}
+
func (f *nativeProviderFactory) initAllSchemas(ctx context.Context) {
f.mu.Lock()
providers := append([]meter.Provider(nil), f.providers...)
f.mu.Unlock()
- for _, p := range providers {
- native.InitSchema(ctx, p)
+ for _, prov := range providers {
+ native.InitSchema(ctx, prov)
}
}
diff --git a/banyand/observability/services/service.go
b/banyand/observability/services/service.go
index 8f2b71690..e89917145 100644
--- a/banyand/observability/services/service.go
+++ b/banyand/observability/services/service.go
@@ -20,6 +20,7 @@ package services
import (
"context"
"errors"
+ "fmt"
"net/http"
"sync"
"time"
@@ -77,27 +78,31 @@ func NewMetricService(metadata metadata.Repo, pipeline
queue.Client, nodeType st
}
type metricService struct {
- metadata metadata.Repo
- nodeSelector native.NodeSelector
- pipeline queue.Client
- scheduler *timestamp.Scheduler
- l *logger.Logger
- closer *run.Closer
- svr *http.Server
- nCollection *native.MetricCollection
- promReg *prometheus.Registry
- schedulerMetrics *SchedulerMetrics
- listenAddr string
- nodeType string
- modes []string
- npf nativeProviderFactory
- mutex sync.Mutex
+ metadata metadata.Repo
+ nodeSelector native.NodeSelector
+ pipeline queue.Client
+ promReg *prometheus.Registry
+ l *logger.Logger
+ closer *run.Closer
+ svr *http.Server
+ nCollection *native.MetricCollection
+ scheduler *timestamp.Scheduler
+ schedulerMetrics *SchedulerMetrics
+ listenAddr string
+ nodeType string
+ modes []string
+ npf nativeProviderFactory
+ metricsInterval time.Duration
+ nativeFlushInterval time.Duration
+ mutex sync.Mutex
}
func (p *metricService) FlagSet() *run.FlagSet {
flagSet := run.NewFlagSet("observability")
flagSet.StringVar(&p.listenAddr, "observability-listener-addr",
":2121", "listen addr for observability")
flagSet.StringSliceVar(&p.modes, "observability-modes",
[]string{"prometheus"}, "modes for observability")
+ flagSet.DurationVar(&p.metricsInterval,
"observability-metrics-interval", 15*time.Second, "interval for metrics
collection")
+ flagSet.DurationVar(&p.nativeFlushInterval,
"observability-native-flush-interval", 5*time.Second, "interval for native
metrics flush")
return flagSet
}
@@ -108,6 +113,9 @@ func (p *metricService) Validate() error {
if len(p.modes) == 0 {
return errNoMode
}
+ if p.metricsInterval <= 0 || p.nativeFlushInterval <= 0 {
+ return errors.New("observability-metrics-interval and
observability-native-flush-interval must be greater than 0")
+ }
set := make(map[string]struct{})
for _, mode := range p.modes {
if mode != flagNativeMode && mode != flagPromethusMode {
@@ -158,14 +166,21 @@ func (p *metricService) Serve() run.StopNotify {
defer p.mutex.Unlock()
p.initMetrics()
if containsMode(p.modes, flagNativeMode) {
+ p.npf.setServeStarted()
ctx, cancel := context.WithTimeout(context.Background(),
30*time.Second)
p.npf.initAllSchemas(ctx)
cancel()
}
+ // First collection on startup so metrics arrive sooner.
+ MetricsCollector.collect()
+ if containsMode(p.modes, flagNativeMode) {
+ p.nCollection.FlushMetrics()
+ }
clock, _ := timestamp.GetClock(context.TODO())
p.scheduler = timestamp.NewScheduler(p.l, clock)
p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope))
- err := p.scheduler.Register("metrics-collector", cron.Descriptor,
"@every 15s", func(_ time.Time, _ *logger.Logger) bool {
+ metricsCollectorExpr := fmt.Sprintf("@every %s", p.metricsInterval)
+ err := p.scheduler.Register("metrics-collector", cron.Descriptor,
metricsCollectorExpr, func(_ time.Time, _ *logger.Logger) bool {
MetricsCollector.collect()
metrics := p.scheduler.Metrics()
for job, m := range metrics {
@@ -182,7 +197,8 @@ func (p *metricService) Serve() run.StopNotify {
registerMetricsEndpoint(p.promReg, metricsMux)
}
if containsMode(p.modes, flagNativeMode) {
- err = p.scheduler.Register("native-metric-collection",
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
+ nativeFlushExpr := fmt.Sprintf("@every %s",
p.nativeFlushInterval)
+ err = p.scheduler.Register("native-metric-collection",
cron.Descriptor, nativeFlushExpr, func(_ time.Time, _ *logger.Logger) bool {
p.nCollection.FlushMetrics()
return true
})
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index 4a9e93e75..76abc3ef5 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -182,15 +182,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
if req != nil && req.GetTagFamilySpec() != nil {
spec = req.GetTagFamilySpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).Msg("cannot handle write event")
- groups = make(map[string]*elementsInQueue)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).Msg("cannot handle write
event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
es := g.tables[j]
// Marshal series metadata for persistence in part
folder
diff --git a/banyand/stream/write_standalone.go
b/banyand/stream/write_standalone.go
index 0bf69a207..7a3db5b31 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -313,15 +313,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
if req != nil && req.GetTagFamilySpec() != nil {
spec = req.GetTagFamilySpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).Msg("cannot handle write event")
- groups = make(map[string]*elementsInGroup)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).Msg("cannot handle write
event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
es := g.tables[j]
es.tsTable.mustAddElements(es.elements)
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index e80cff24e..34bf7c719 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -193,15 +193,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context,
message bus.Message) (resp
if req != nil && req.GetTagSpec() != nil {
spec = req.GetTagSpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).Msg("cannot handle write event")
- groups = make(map[string]*tracesInQueue)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).Msg("cannot handle write
event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
es := g.tables[j]
// Marshal series metadata for persistence in part
folder
diff --git a/banyand/trace/write_standalone.go
b/banyand/trace/write_standalone.go
index 1ef83d11a..077d1e86a 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -416,15 +416,15 @@ func (w *writeCallback) Rev(_ context.Context, message
bus.Message) (resp bus.Me
if req != nil && req.GetTagSpec() != nil {
spec = req.GetTagSpec()
}
- var err error
- if groups, err = w.handle(groups, writeEvent, metadata, spec);
err != nil {
- w.l.Error().Err(err).Msg("cannot handle write event")
- groups = make(map[string]*tracesInGroup)
+ newGroups, handleErr := w.handle(groups, writeEvent, metadata,
spec)
+ if handleErr != nil {
+ w.l.Error().Err(handleErr).Msg("cannot handle write
event")
continue
}
+ groups = newGroups
}
- for i := range groups {
- g := groups[i]
+ for groupName := range groups {
+ g := groups[groupName]
for j := range g.tables {
es := g.tables[j]
var sidxMemPartMap map[string]*sidx.MemPart
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index a345530e8..6779a5697 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -21,6 +21,7 @@ package native
import (
"context"
"fmt"
+ "sync"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -48,6 +49,7 @@ type MetricCollection struct {
pipeline queue.Client
nodeSelector NodeSelector
collectors []collector
+ mu sync.RWMutex
}
// NewMetricsCollection creates a new MetricCollection.
@@ -60,19 +62,33 @@ func NewMetricsCollection(pipeline queue.Client,
nodeSelector NodeSelector) *Met
// AddCollector Add native metric to MetricCollection.
func (m *MetricCollection) AddCollector(c collector) {
+ m.mu.Lock()
m.collectors = append(m.collectors, c)
+ m.mu.Unlock()
}
// FlushMetrics write all the metrics by flushing.
func (m *MetricCollection) FlushMetrics() {
+ m.mu.RLock()
if len(m.collectors) == 0 {
+ m.mu.RUnlock()
+ log.Debug().Msg("native metric collection skipped: no
collectors registered")
return
}
+ collectorsCopy := append([]collector(nil), m.collectors...)
+ m.mu.RUnlock()
+
+ log.Debug().Int("collector_count", len(collectorsCopy)).Msg("native
metric collection started")
publisher := m.pipeline.NewBatchPublisher(writeTimeout)
defer publisher.Close()
var messages []bus.Message
- for _, collector := range m.collectors {
+ for _, collector := range collectorsCopy {
name, metrics := collector.Collect()
+ if len(metrics) == 0 {
+ log.Debug().Str("metric_name", name).Msg("native metric
collector returned no metrics")
+ continue
+ }
+ log.Debug().Str("metric_name", name).Int("metric_count",
len(metrics)).Msg("native metric collector collected metrics")
for _, metric := range metrics {
iwr := m.buildIWR(name, metric)
nodeID := ""
@@ -89,8 +105,10 @@ func (m *MetricCollection) FlushMetrics() {
}
_, err := publisher.Publish(context.TODO(), data.TopicMeasureWrite,
messages...)
if err != nil {
- log.Error().Err(err).Msg("Failed to publish messasges")
+ log.Error().Err(err).Msg("Failed to publish messages")
+ return
}
+ log.Debug().Int("message_count", len(messages)).Msg("native metric
collection published messages")
}
func (m *MetricCollection) buildIWR(metricName string, metric
metricWithLabelValues) *measurev1.InternalWriteRequest {
diff --git a/test/integration/standalone/observability/common.go
b/test/integration/standalone/observability/common.go
new file mode 100644
index 000000000..7125af98d
--- /dev/null
+++ b/test/integration/standalone/observability/common.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package observability provides shared setup and helpers for standalone
observability integration tests.
+package observability
+
+import (
+ "net"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ gm "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+type sharedEnv struct {
+ closeFn func()
+ grpcAddr string
+ httpAddr string
+}
+
+var (
+ env sharedEnv
+ conn *grpc.ClientConn
+ goods []gleak.Goroutine
+ sharedContext helpers.SharedContext
+)
+
+// Setup initializes the shared standalone BanyanDB instance with native
observability enabled.
+func Setup() {
+ gm.Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })).To(gm.Succeed())
+
+ // tmpDir is only used for discovery file; the actual data root will be
created
+ // by setup.EmptyStandalone and printed separately.
+ tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
+ gm.Expect(tmpErr).NotTo(gm.HaveOccurred())
+
+ dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
+ config := setup.PropertyClusterConfig(dfWriter)
+
+ // Use EmptyStandalone so we can get the real data root path used by
the server.
+ grpcAddr, httpAddr, closeFn := setup.EmptyStandalone(config,
+ "--observability-modes=native",
+ "--observability-listener-addr=:0",
+ "--observability-metrics-interval=2s",
+ "--observability-native-flush-interval=2s",
+ "--measure-flush-timeout=100ms",
+ )
+
+ // The measure root path is embedded in the standalone server's root
path.
+ // For EmptyStandalone, setup.StandaloneWithSchemaLoaders internally
calls test.NewSpace()
+ // and passes that path as --measure-root-path. We reuse the discovery
tmpDir just to
+ // construct the discovery file, but the actual data lives under that
internal root.
+ env = sharedEnv{
+ grpcAddr: grpcAddr,
+ httpAddr: httpAddr,
+ closeFn: func() {
+ closeFn()
+ tmpDirCleanup()
+ },
+ }
+
+ waitForHTTPReady(httpAddr)
+
+ var err error
+ conn, err = grpchelper.Conn(env.grpcAddr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+
+ sharedContext = helpers.SharedContext{
+ Connection: conn,
+ }
+
+ goods = gleak.Goroutines()
+
+ g.DeferCleanup(func() {
+ gm.Expect(conn.Close()).To(gm.Succeed())
+ env.closeFn()
+ gm.Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ })
+}
+
+func waitForHTTPReady(httpAddr string) {
+ host, port, err := net.SplitHostPort(httpAddr)
+ gm.Expect(err).NotTo(gm.HaveOccurred())
+ if host == "" {
+ host = "localhost"
+ }
+ addr := net.JoinHostPort(host, port)
+ client := &net.Dialer{Timeout: 2 * time.Second}
+ gm.Eventually(func() error {
+ conn, dialErr := client.Dial("tcp", addr)
+ if dialErr != nil {
+ return dialErr
+ }
+ _ = conn.Close()
+ return nil
+ }, flags.EventuallyTimeout).Should(gm.Succeed())
+}
diff --git a/test/integration/standalone/observability/measure_helper.go
b/test/integration/standalone/observability/measure_helper.go
new file mode 100644
index 000000000..d907bf343
--- /dev/null
+++ b/test/integration/standalone/observability/measure_helper.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/meter/native"
+)
+
+// Point represents a simplified view of a datapoint in the _monitoring group.
+type Point struct {
+ Tags map[string]string
+ Value float64
+}
+
+// QueryObservabilityMeasure queries a measure from the _monitoring group and
returns flattened datapoints.
+// tagNames are additional tag names to project beyond the default entity tags
(node_type, node_id, grpc_address, http_address).
+// For example, cpu_state and memory_state need "kind"; disk needs "path" and
"kind".
+func QueryObservabilityMeasure(metricName string, tagNames ...string)
([]Point, error) {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ client := measurev1.NewMeasureServiceClient(sharedContext.Connection)
+
+ now := time.Now().Truncate(time.Second)
+ resp, err := client.Query(ctx, &measurev1.QueryRequest{
+ Groups: []string{native.ObservabilityGroupName},
+ Name: metricName,
+ TimeRange: &modelv1.TimeRange{
+ Begin: timestamppb.New(now.Add(-10 * time.Minute)),
+ End: timestamppb.New(now.Add(1 * time.Minute)),
+ },
+ TagProjection: &modelv1.TagProjection{
+ TagFamilies: []*modelv1.TagProjection_TagFamily{
+ {
+ Name: "default",
+ Tags: append([]string{"node_type",
"node_id", "grpc_address", "http_address"}, tagNames...),
+ },
+ },
+ },
+ FieldProjection: &measurev1.QueryRequest_FieldProjection{
+ Names: []string{"value"},
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("query %s failed: %w", metricName, err)
+ }
+
+ var points []Point
+ for _, dp := range resp.GetDataPoints() {
+ if len(dp.TagFamilies) == 0 {
+ continue
+ }
+ tags := make(map[string]string)
+ for _, tagFamily := range dp.TagFamilies {
+ for _, tag := range tagFamily.Tags {
+ if tag == nil {
+ continue
+ }
+ tagValue := tag.GetValue()
+ if tagValue == nil {
+ continue
+ }
+ str := tagValue.GetStr()
+ if str == nil {
+ continue
+ }
+ if tag.Key == "" {
+ continue
+ }
+ tags[tag.Key] = str.Value
+ }
+ }
+ if len(dp.Fields) == 0 {
+ continue
+ }
+ field := dp.Fields[0]
+ floatField := field.GetValue().GetFloat()
+ if floatField == nil {
+ continue
+ }
+ value := floatField.Value
+ points = append(points, Point{
+ Value: value,
+ Tags: tags,
+ })
+ }
+
+ return points, nil
+}
diff --git a/test/integration/standalone/observability/native_metrics_test.go
b/test/integration/standalone/observability/native_metrics_test.go
new file mode 100644
index 000000000..5c4f050f6
--- /dev/null
+++ b/test/integration/standalone/observability/native_metrics_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability
+
+import (
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ gm "github.com/onsi/gomega"
+)
+
+var _ = g.Describe("Native self-observability metrics in _monitoring group",
func() {
+ g.It("collects up_time metric", func() {
+ gm.Eventually(func() (float64, error) {
+ points, err := QueryObservabilityMeasure("up_time")
+ if err != nil {
+ return 0, err
+ }
+ if len(points) == 0 {
+ return 0, nil
+ }
+ var maxValue float64
+ for _, p := range points {
+ if p.Value > maxValue {
+ maxValue = p.Value
+ }
+ }
+ return maxValue, nil
+ }, 90*time.Second, 5*time.Second).Should(gm.BeNumerically(">",
0.0))
+ })
+
+ g.It("collects cpu_state metric with reasonable fractions", func() {
+ gm.Eventually(func() (bool, error) {
+ points, err := QueryObservabilityMeasure("cpu_state",
"kind")
+ if err != nil {
+ return false, err
+ }
+ if len(points) == 0 {
+ return false, nil
+ }
+ var hasUserOrSystem bool
+ for _, p := range points {
+ kind, ok := p.Tags["kind"]
+ if !ok {
+ continue
+ }
+ if kind == "user" || kind == "system" {
+ if p.Value < 0.0 || p.Value > 1.0 {
+ return false, nil
+ }
+ hasUserOrSystem = true
+ }
+ }
+ return hasUserOrSystem, nil
+ }, 90*time.Second, 5*time.Second).Should(gm.BeTrue())
+ })
+
+ g.It("collects memory_state metric with basic sanity", func() {
+ // Note: memory_state may be created by liaison without "kind"
tag, so we query with base tags only.
+ // Validate that we get at least one datapoint with a positive
value (total/used bytes or used_percent).
+ gm.Eventually(func() (bool, error) {
+ points, err := QueryObservabilityMeasure("memory_state")
+ if err != nil {
+ return false, err
+ }
+ if len(points) == 0 {
+ return false, nil
+ }
+ for _, p := range points {
+ if p.Value > 0 {
+ return true, nil
+ }
+ }
+ return false, nil
+ }, 90*time.Second, 5*time.Second).Should(gm.BeTrue())
+ })
+})
diff --git a/test/integration/standalone/observability/suite_test.go
b/test/integration/standalone/observability/suite_test.go
new file mode 100644
index 000000000..bf4ec0e91
--- /dev/null
+++ b/test/integration/standalone/observability/suite_test.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability_test
+
+import (
+ "testing"
+
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ integration_standalone
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+
"github.com/apache/skywalking-banyandb/test/integration/standalone/observability"
+)
+
+func TestNativeObservabilityMetrics(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Native Observability Metrics Suite",
Label(integration_standalone.Labels...))
+}
+
+var _ = BeforeSuite(func() {
+ observability.Setup()
+})