This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d2e2b68fd Fix write handler resetting accumulated groups on error 
(#1004)
d2e2b68fd is described below

commit d2e2b68fd56a5c1413fe799de32e2114c14e7f2e
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Mar 12 09:00:54 2026 +0800

    Fix write handler resetting accumulated groups on error (#1004)
    
    * Fix measure standalone write handler to prevent dropping events on error; 
update write liaison handlers for better error handling. Add observability 
integration tests for native metrics collection.
    
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 CHANGES.md                                         |   1 +
 banyand/measure/write_liaison.go                   |  12 +-
 banyand/measure/write_standalone.go                |  12 +-
 banyand/observability/services/meter_native.go     |  26 ++++-
 banyand/observability/services/service.go          |  50 ++++++---
 banyand/stream/write_liaison.go                    |  12 +-
 banyand/stream/write_standalone.go                 |  12 +-
 banyand/trace/write_liaison.go                     |  12 +-
 banyand/trace/write_standalone.go                  |  12 +-
 pkg/meter/native/collection.go                     |  22 +++-
 .../integration/standalone/observability/common.go | 124 +++++++++++++++++++++
 .../standalone/observability/measure_helper.go     | 112 +++++++++++++++++++
 .../observability/native_metrics_test.go           |  92 +++++++++++++++
 .../standalone/observability/suite_test.go         |  37 ++++++
 14 files changed, 475 insertions(+), 61 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index cacf66f52..30b531489 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -41,6 +41,7 @@ Release Notes.
 - Fix trace queries with range conditions on the same tag (e.g., duration) 
combined with ORDER BY by deduplicating tag names when merging logical 
expression branches.
 - Fix sidx tag filter range check returning inverted skip decision and use 
correct int64 encoding for block min/max.
 - Ignore take snapshot when no data.
+- Fix measure standalone write handler resetting accumulated groups on error, 
which dropped all successfully processed events in the batch.
 
 ### Document
 
diff --git a/banyand/measure/write_liaison.go b/banyand/measure/write_liaison.go
index ec5639f45..7d0247e63 100644
--- a/banyand/measure/write_liaison.go
+++ b/banyand/measure/write_liaison.go
@@ -105,15 +105,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                if req != nil && req.GetDataPointSpec() != nil {
                        spec = req.GetDataPointSpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).Msg("cannot handle write event")
-                       groups = make(map[string]*dataPointsInQueue)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).Msg("cannot handle write 
event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        es := g.tables[j]
                        // Marshal series metadata for persistence in part 
folder
diff --git a/banyand/measure/write_standalone.go 
b/banyand/measure/write_standalone.go
index 530e990e6..c6a430bba 100644
--- a/banyand/measure/write_standalone.go
+++ b/banyand/measure/write_standalone.go
@@ -477,15 +477,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                if req != nil && req.GetDataPointSpec() != nil {
                        spec = req.GetDataPointSpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).RawJSON("written", 
logger.Proto(writeEvent)).Msg("cannot handle write event")
-                       groups = make(map[string]*dataPointsInGroup)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).RawJSON("written", 
logger.Proto(writeEvent)).Msg("cannot handle write event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        dps := g.tables[j]
                        if dps.tsTable != nil && dps.dataPoints != nil {
diff --git a/banyand/observability/services/meter_native.go 
b/banyand/observability/services/meter_native.go
index 1c55220bb..dfd9d516a 100644
--- a/banyand/observability/services/meter_native.go
+++ b/banyand/observability/services/meter_native.go
@@ -20,6 +20,8 @@ package services
 import (
        "context"
        "sync"
+       "sync/atomic"
+       "time"
 
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/pkg/meter"
@@ -27,10 +29,11 @@ import (
 )
 
 type nativeProviderFactory struct {
-       metadata  metadata.Repo
-       nodeInfo  native.NodeInfo
-       providers []meter.Provider
-       mu        sync.Mutex
+       metadata     metadata.Repo
+       nodeInfo     native.NodeInfo
+       providers    []meter.Provider
+       serveStarted atomic.Bool
+       mu           sync.Mutex
 }
 
 func (f *nativeProviderFactory) provider(scope meter.Scope) meter.Provider {
@@ -38,14 +41,25 @@ func (f *nativeProviderFactory) provider(scope meter.Scope) 
meter.Provider {
        f.mu.Lock()
        f.providers = append(f.providers, p)
        f.mu.Unlock()
+       if f.serveStarted.Load() {
+               go func(prov meter.Provider) {
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                       defer cancel()
+                       native.InitSchema(ctx, prov)
+               }(p)
+       }
        return p
 }
 
+func (f *nativeProviderFactory) setServeStarted() {
+       f.serveStarted.Store(true)
+}
+
 func (f *nativeProviderFactory) initAllSchemas(ctx context.Context) {
        f.mu.Lock()
        providers := append([]meter.Provider(nil), f.providers...)
        f.mu.Unlock()
-       for _, p := range providers {
-               native.InitSchema(ctx, p)
+       for _, prov := range providers {
+               native.InitSchema(ctx, prov)
        }
 }
diff --git a/banyand/observability/services/service.go 
b/banyand/observability/services/service.go
index 8f2b71690..e89917145 100644
--- a/banyand/observability/services/service.go
+++ b/banyand/observability/services/service.go
@@ -20,6 +20,7 @@ package services
 import (
        "context"
        "errors"
+       "fmt"
        "net/http"
        "sync"
        "time"
@@ -77,27 +78,31 @@ func NewMetricService(metadata metadata.Repo, pipeline 
queue.Client, nodeType st
 }
 
 type metricService struct {
-       metadata         metadata.Repo
-       nodeSelector     native.NodeSelector
-       pipeline         queue.Client
-       scheduler        *timestamp.Scheduler
-       l                *logger.Logger
-       closer           *run.Closer
-       svr              *http.Server
-       nCollection      *native.MetricCollection
-       promReg          *prometheus.Registry
-       schedulerMetrics *SchedulerMetrics
-       listenAddr       string
-       nodeType         string
-       modes            []string
-       npf              nativeProviderFactory
-       mutex            sync.Mutex
+       metadata            metadata.Repo
+       nodeSelector        native.NodeSelector
+       pipeline            queue.Client
+       promReg             *prometheus.Registry
+       l                   *logger.Logger
+       closer              *run.Closer
+       svr                 *http.Server
+       nCollection         *native.MetricCollection
+       scheduler           *timestamp.Scheduler
+       schedulerMetrics    *SchedulerMetrics
+       listenAddr          string
+       nodeType            string
+       modes               []string
+       npf                 nativeProviderFactory
+       metricsInterval     time.Duration
+       nativeFlushInterval time.Duration
+       mutex               sync.Mutex
 }
 
 func (p *metricService) FlagSet() *run.FlagSet {
        flagSet := run.NewFlagSet("observability")
        flagSet.StringVar(&p.listenAddr, "observability-listener-addr", 
":2121", "listen addr for observability")
        flagSet.StringSliceVar(&p.modes, "observability-modes", 
[]string{"prometheus"}, "modes for observability")
+       flagSet.DurationVar(&p.metricsInterval, 
"observability-metrics-interval", 15*time.Second, "interval for metrics 
collection")
+       flagSet.DurationVar(&p.nativeFlushInterval, 
"observability-native-flush-interval", 5*time.Second, "interval for native 
metrics flush")
        return flagSet
 }
 
@@ -108,6 +113,9 @@ func (p *metricService) Validate() error {
        if len(p.modes) == 0 {
                return errNoMode
        }
+       if p.metricsInterval <= 0 || p.nativeFlushInterval <= 0 {
+               return errors.New("observability-metrics-interval and 
observability-native-flush-interval must be greater than 0")
+       }
        set := make(map[string]struct{})
        for _, mode := range p.modes {
                if mode != flagNativeMode && mode != flagPromethusMode {
@@ -158,14 +166,21 @@ func (p *metricService) Serve() run.StopNotify {
        defer p.mutex.Unlock()
        p.initMetrics()
        if containsMode(p.modes, flagNativeMode) {
+               p.npf.setServeStarted()
                ctx, cancel := context.WithTimeout(context.Background(), 
30*time.Second)
                p.npf.initAllSchemas(ctx)
                cancel()
        }
+       // First collection on startup so metrics arrive sooner.
+       MetricsCollector.collect()
+       if containsMode(p.modes, flagNativeMode) {
+               p.nCollection.FlushMetrics()
+       }
        clock, _ := timestamp.GetClock(context.TODO())
        p.scheduler = timestamp.NewScheduler(p.l, clock)
        p.schedulerMetrics = NewSchedulerMetrics(p.With(obScope))
-       err := p.scheduler.Register("metrics-collector", cron.Descriptor, 
"@every 15s", func(_ time.Time, _ *logger.Logger) bool {
+       metricsCollectorExpr := fmt.Sprintf("@every %s", p.metricsInterval)
+       err := p.scheduler.Register("metrics-collector", cron.Descriptor, 
metricsCollectorExpr, func(_ time.Time, _ *logger.Logger) bool {
                MetricsCollector.collect()
                metrics := p.scheduler.Metrics()
                for job, m := range metrics {
@@ -182,7 +197,8 @@ func (p *metricService) Serve() run.StopNotify {
                registerMetricsEndpoint(p.promReg, metricsMux)
        }
        if containsMode(p.modes, flagNativeMode) {
-               err = p.scheduler.Register("native-metric-collection", 
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
+               nativeFlushExpr := fmt.Sprintf("@every %s", 
p.nativeFlushInterval)
+               err = p.scheduler.Register("native-metric-collection", 
cron.Descriptor, nativeFlushExpr, func(_ time.Time, _ *logger.Logger) bool {
                        p.nCollection.FlushMetrics()
                        return true
                })
diff --git a/banyand/stream/write_liaison.go b/banyand/stream/write_liaison.go
index 4a9e93e75..76abc3ef5 100644
--- a/banyand/stream/write_liaison.go
+++ b/banyand/stream/write_liaison.go
@@ -182,15 +182,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                if req != nil && req.GetTagFamilySpec() != nil {
                        spec = req.GetTagFamilySpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).Msg("cannot handle write event")
-                       groups = make(map[string]*elementsInQueue)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).Msg("cannot handle write 
event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        es := g.tables[j]
                        // Marshal series metadata for persistence in part 
folder
diff --git a/banyand/stream/write_standalone.go 
b/banyand/stream/write_standalone.go
index 0bf69a207..7a3db5b31 100644
--- a/banyand/stream/write_standalone.go
+++ b/banyand/stream/write_standalone.go
@@ -313,15 +313,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                if req != nil && req.GetTagFamilySpec() != nil {
                        spec = req.GetTagFamilySpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).Msg("cannot handle write event")
-                       groups = make(map[string]*elementsInGroup)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).Msg("cannot handle write 
event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        es := g.tables[j]
                        es.tsTable.mustAddElements(es.elements)
diff --git a/banyand/trace/write_liaison.go b/banyand/trace/write_liaison.go
index e80cff24e..34bf7c719 100644
--- a/banyand/trace/write_liaison.go
+++ b/banyand/trace/write_liaison.go
@@ -193,15 +193,15 @@ func (w *writeQueueCallback) Rev(ctx context.Context, 
message bus.Message) (resp
                if req != nil && req.GetTagSpec() != nil {
                        spec = req.GetTagSpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).Msg("cannot handle write event")
-                       groups = make(map[string]*tracesInQueue)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).Msg("cannot handle write 
event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        es := g.tables[j]
                        // Marshal series metadata for persistence in part 
folder
diff --git a/banyand/trace/write_standalone.go 
b/banyand/trace/write_standalone.go
index 1ef83d11a..077d1e86a 100644
--- a/banyand/trace/write_standalone.go
+++ b/banyand/trace/write_standalone.go
@@ -416,15 +416,15 @@ func (w *writeCallback) Rev(_ context.Context, message 
bus.Message) (resp bus.Me
                if req != nil && req.GetTagSpec() != nil {
                        spec = req.GetTagSpec()
                }
-               var err error
-               if groups, err = w.handle(groups, writeEvent, metadata, spec); 
err != nil {
-                       w.l.Error().Err(err).Msg("cannot handle write event")
-                       groups = make(map[string]*tracesInGroup)
+               newGroups, handleErr := w.handle(groups, writeEvent, metadata, 
spec)
+               if handleErr != nil {
+                       w.l.Error().Err(handleErr).Msg("cannot handle write 
event")
                        continue
                }
+               groups = newGroups
        }
-       for i := range groups {
-               g := groups[i]
+       for groupName := range groups {
+               g := groups[groupName]
                for j := range g.tables {
                        es := g.tables[j]
                        var sidxMemPartMap map[string]*sidx.MemPart
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index a345530e8..6779a5697 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -21,6 +21,7 @@ package native
 import (
        "context"
        "fmt"
+       "sync"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -48,6 +49,7 @@ type MetricCollection struct {
        pipeline     queue.Client
        nodeSelector NodeSelector
        collectors   []collector
+       mu           sync.RWMutex
 }
 
 // NewMetricsCollection creates a new MetricCollection.
@@ -60,19 +62,33 @@ func NewMetricsCollection(pipeline queue.Client, 
nodeSelector NodeSelector) *Met
 
 // AddCollector Add native metric to MetricCollection.
 func (m *MetricCollection) AddCollector(c collector) {
+       m.mu.Lock()
        m.collectors = append(m.collectors, c)
+       m.mu.Unlock()
 }
 
 // FlushMetrics write all the metrics by flushing.
 func (m *MetricCollection) FlushMetrics() {
+       m.mu.RLock()
        if len(m.collectors) == 0 {
+               m.mu.RUnlock()
+               log.Debug().Msg("native metric collection skipped: no 
collectors registered")
                return
        }
+       collectorsCopy := append([]collector(nil), m.collectors...)
+       m.mu.RUnlock()
+
+       log.Debug().Int("collector_count", len(collectorsCopy)).Msg("native 
metric collection started")
        publisher := m.pipeline.NewBatchPublisher(writeTimeout)
        defer publisher.Close()
        var messages []bus.Message
-       for _, collector := range m.collectors {
+       for _, collector := range collectorsCopy {
                name, metrics := collector.Collect()
+               if len(metrics) == 0 {
+                       log.Debug().Str("metric_name", name).Msg("native metric 
collector returned no metrics")
+                       continue
+               }
+               log.Debug().Str("metric_name", name).Int("metric_count", 
len(metrics)).Msg("native metric collector collected metrics")
                for _, metric := range metrics {
                        iwr := m.buildIWR(name, metric)
                        nodeID := ""
@@ -89,8 +105,10 @@ func (m *MetricCollection) FlushMetrics() {
        }
        _, err := publisher.Publish(context.TODO(), data.TopicMeasureWrite, 
messages...)
        if err != nil {
-               log.Error().Err(err).Msg("Failed to publish messasges")
+               log.Error().Err(err).Msg("Failed to publish messages")
+               return
        }
+       log.Debug().Int("message_count", len(messages)).Msg("native metric 
collection published messages")
 }
 
 func (m *MetricCollection) buildIWR(metricName string, metric 
metricWithLabelValues) *measurev1.InternalWriteRequest {
diff --git a/test/integration/standalone/observability/common.go 
b/test/integration/standalone/observability/common.go
new file mode 100644
index 000000000..7125af98d
--- /dev/null
+++ b/test/integration/standalone/observability/common.go
@@ -0,0 +1,124 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package observability provides shared setup and helpers for standalone 
observability integration tests.
+package observability
+
+import (
+       "net"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+type sharedEnv struct {
+       closeFn  func()
+       grpcAddr string
+       httpAddr string
+}
+
+var (
+       env           sharedEnv
+       conn          *grpc.ClientConn
+       goods         []gleak.Goroutine
+       sharedContext helpers.SharedContext
+)
+
+// Setup initializes the shared standalone BanyanDB instance with native 
observability enabled.
+func Setup() {
+       gm.Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(gm.Succeed())
+
+       // tmpDir is only used for discovery file; the actual data root will be 
created
+       // by setup.EmptyStandalone and printed separately.
+       tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
+       gm.Expect(tmpErr).NotTo(gm.HaveOccurred())
+
+       dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
+       config := setup.PropertyClusterConfig(dfWriter)
+
+       // Use EmptyStandalone so we can get the real data root path used by 
the server.
+       grpcAddr, httpAddr, closeFn := setup.EmptyStandalone(config,
+               "--observability-modes=native",
+               "--observability-listener-addr=:0",
+               "--observability-metrics-interval=2s",
+               "--observability-native-flush-interval=2s",
+               "--measure-flush-timeout=100ms",
+       )
+
+       // The measure root path is embedded in the standalone server's root 
path.
+       // For EmptyStandalone, setup.StandaloneWithSchemaLoaders internally 
calls test.NewSpace()
+       // and passes that path as --measure-root-path. We reuse the discovery 
tmpDir just to
+       // construct the discovery file, but the actual data lives under that 
internal root.
+       env = sharedEnv{
+               grpcAddr: grpcAddr,
+               httpAddr: httpAddr,
+               closeFn: func() {
+                       closeFn()
+                       tmpDirCleanup()
+               },
+       }
+
+       waitForHTTPReady(httpAddr)
+
+       var err error
+       conn, err = grpchelper.Conn(env.grpcAddr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+
+       sharedContext = helpers.SharedContext{
+               Connection: conn,
+       }
+
+       goods = gleak.Goroutines()
+
+       g.DeferCleanup(func() {
+               gm.Expect(conn.Close()).To(gm.Succeed())
+               env.closeFn()
+               gm.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+       })
+}
+
+func waitForHTTPReady(httpAddr string) {
+       host, port, err := net.SplitHostPort(httpAddr)
+       gm.Expect(err).NotTo(gm.HaveOccurred())
+       if host == "" {
+               host = "localhost"
+       }
+       addr := net.JoinHostPort(host, port)
+       client := &net.Dialer{Timeout: 2 * time.Second}
+       gm.Eventually(func() error {
+               conn, dialErr := client.Dial("tcp", addr)
+               if dialErr != nil {
+                       return dialErr
+               }
+               _ = conn.Close()
+               return nil
+       }, flags.EventuallyTimeout).Should(gm.Succeed())
+}
diff --git a/test/integration/standalone/observability/measure_helper.go 
b/test/integration/standalone/observability/measure_helper.go
new file mode 100644
index 000000000..d907bf343
--- /dev/null
+++ b/test/integration/standalone/observability/measure_helper.go
@@ -0,0 +1,112 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/meter/native"
+)
+
+// Point represents a simplified view of a datapoint in the _monitoring group.
+type Point struct {
+       Tags  map[string]string
+       Value float64
+}
+
+// QueryObservabilityMeasure queries a measure from the _monitoring group and 
returns flattened datapoints.
+// tagNames are additional tag names to project beyond the default entity tags 
(node_type, node_id, grpc_address, http_address).
+// For example, cpu_state and memory_state need "kind"; disk needs "path" and 
"kind".
+func QueryObservabilityMeasure(metricName string, tagNames ...string) 
([]Point, error) {
+       ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+       defer cancel()
+
+       client := measurev1.NewMeasureServiceClient(sharedContext.Connection)
+
+       now := time.Now().Truncate(time.Second)
+       resp, err := client.Query(ctx, &measurev1.QueryRequest{
+               Groups: []string{native.ObservabilityGroupName},
+               Name:   metricName,
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(now.Add(-10 * time.Minute)),
+                       End:   timestamppb.New(now.Add(1 * time.Minute)),
+               },
+               TagProjection: &modelv1.TagProjection{
+                       TagFamilies: []*modelv1.TagProjection_TagFamily{
+                               {
+                                       Name: "default",
+                                       Tags: append([]string{"node_type", 
"node_id", "grpc_address", "http_address"}, tagNames...),
+                               },
+                       },
+               },
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{
+                       Names: []string{"value"},
+               },
+       })
+       if err != nil {
+               return nil, fmt.Errorf("query %s failed: %w", metricName, err)
+       }
+
+       var points []Point
+       for _, dp := range resp.GetDataPoints() {
+               if len(dp.TagFamilies) == 0 {
+                       continue
+               }
+               tags := make(map[string]string)
+               for _, tagFamily := range dp.TagFamilies {
+                       for _, tag := range tagFamily.Tags {
+                               if tag == nil {
+                                       continue
+                               }
+                               tagValue := tag.GetValue()
+                               if tagValue == nil {
+                                       continue
+                               }
+                               str := tagValue.GetStr()
+                               if str == nil {
+                                       continue
+                               }
+                               if tag.Key == "" {
+                                       continue
+                               }
+                               tags[tag.Key] = str.Value
+                       }
+               }
+               if len(dp.Fields) == 0 {
+                       continue
+               }
+               field := dp.Fields[0]
+               floatField := field.GetValue().GetFloat()
+               if floatField == nil {
+                       continue
+               }
+               value := floatField.Value
+               points = append(points, Point{
+                       Value: value,
+                       Tags:  tags,
+               })
+       }
+
+       return points, nil
+}
diff --git a/test/integration/standalone/observability/native_metrics_test.go 
b/test/integration/standalone/observability/native_metrics_test.go
new file mode 100644
index 000000000..5c4f050f6
--- /dev/null
+++ b/test/integration/standalone/observability/native_metrics_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability
+
+import (
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+)
+
+var _ = g.Describe("Native self-observability metrics in _monitoring group", 
func() {
+       g.It("collects up_time metric", func() {
+               gm.Eventually(func() (float64, error) {
+                       points, err := QueryObservabilityMeasure("up_time")
+                       if err != nil {
+                               return 0, err
+                       }
+                       if len(points) == 0 {
+                               return 0, nil
+                       }
+                       var maxValue float64
+                       for _, p := range points {
+                               if p.Value > maxValue {
+                                       maxValue = p.Value
+                               }
+                       }
+                       return maxValue, nil
+               }, 90*time.Second, 5*time.Second).Should(gm.BeNumerically(">", 
0.0))
+       })
+
+       g.It("collects cpu_state metric with reasonable fractions", func() {
+               gm.Eventually(func() (bool, error) {
+                       points, err := QueryObservabilityMeasure("cpu_state", 
"kind")
+                       if err != nil {
+                               return false, err
+                       }
+                       if len(points) == 0 {
+                               return false, nil
+                       }
+                       var hasUserOrSystem bool
+                       for _, p := range points {
+                               kind, ok := p.Tags["kind"]
+                               if !ok {
+                                       continue
+                               }
+                               if kind == "user" || kind == "system" {
+                                       if p.Value < 0.0 || p.Value > 1.0 {
+                                               return false, nil
+                                       }
+                                       hasUserOrSystem = true
+                               }
+                       }
+                       return hasUserOrSystem, nil
+               }, 90*time.Second, 5*time.Second).Should(gm.BeTrue())
+       })
+
+       g.It("collects memory_state metric with basic sanity", func() {
+               // Note: memory_state may be created by liaison without "kind" 
tag, so we query with base tags only.
+               // Validate that we get at least one datapoint with a positive 
value (total/used bytes or used_percent).
+               gm.Eventually(func() (bool, error) {
+                       points, err := QueryObservabilityMeasure("memory_state")
+                       if err != nil {
+                               return false, err
+                       }
+                       if len(points) == 0 {
+                               return false, nil
+                       }
+                       for _, p := range points {
+                               if p.Value > 0 {
+                                       return true, nil
+                               }
+                       }
+                       return false, nil
+               }, 90*time.Second, 5*time.Second).Should(gm.BeTrue())
+       })
+})
diff --git a/test/integration/standalone/observability/suite_test.go 
b/test/integration/standalone/observability/suite_test.go
new file mode 100644
index 000000000..bf4ec0e91
--- /dev/null
+++ b/test/integration/standalone/observability/suite_test.go
@@ -0,0 +1,37 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package observability_test
+
+import (
+       "testing"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+       
"github.com/apache/skywalking-banyandb/test/integration/standalone/observability"
+)
+
+func TestNativeObservabilityMetrics(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Native Observability Metrics Suite", 
Label(integration_standalone.Labels...))
+}
+
+var _ = BeforeSuite(func() {
+       observability.Setup()
+})

Reply via email to