This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch phase-2-cp5-march
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c6c9ae8216e4765f4738f89fd5e2d0ac1db799ae
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 23:58:59 2026 +0000

    feat(barrier): metrics + access logging for the schema-consistency cluster 
(Phase 2 Step 2.7, §A17)
    
    Wires the A17 observability surface the plan calls out as a CP-6
    prerequisite. Three duration histograms — one per Await* RPC — record
    every call labelled by `result` ("applied" / "timeout" /
    "invalid_argument" / "error"); a single counter aggregates per-laggard
    observations across all three barriers, broken out by `barrier`,
    `role` ("liaison" / "data" / "self" for unprefixed standalone
    laggards), and `node`. The split decodes the
    <role>-<Metadata.Name> identifier the cluster barrier already emits
    (see member.laggardName in barrier_cluster.go).
    
    Two status counters cover the gate's user-visible verdict on the same
    node:
      schema_status_schema_not_applied_total{rpc,group,reason}
      schema_status_expired_schema_total{rpc,group}
    The reason label is fixed at "wait_timeout" in v0.11.0 — the only
    emitting path is awaitRevisionReached's deadline expiration — but the
    label is retained on the metric so dashboards built today do not break
    if optional fast-sync paths land in a later release. The rpc label
    covers the six gate entry points (measure / stream / trace × write /
    query).
    
    barrierService grows two fields populated in server.go PreRun: a
    metrics handle and a named logger child. Each AwaitX entry point uses
    named return values + defer to record duration, recordBarrierLaggards,
    and emit a structured access-log line carrying min_revision (or
    keys count for schema_applied/deleted), laggards count, duration, and
    result. validateWriteRequest and the per-group query gate caller in
    measure.go / stream.go / trace.go bump the status counters at the same
    return points where the response status is set.
    
    19 unit tests in banyand/liaison/grpc/barrier_metrics_test.go pin the
    contract via a recordingFactory that captures every Counter.Inc and
    Histogram.Observe call: result-label routing under each outcome,
    role/name split for laggards, per-RPC counter labels for the gate's
    three-way split, and nil-safe behavior for fixtures without a metrics
    handle.
    
    Distributed schema integration smoke (./test/integration/distributed/
    schema/...) still green; full liaison/grpc unit suite green.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 banyand/liaison/grpc/barrier.go              |  26 +-
 banyand/liaison/grpc/barrier_metrics.go      | 250 +++++++++++++++++
 banyand/liaison/grpc/barrier_metrics_test.go | 405 +++++++++++++++++++++++++++
 banyand/liaison/grpc/measure.go              |   3 +
 banyand/liaison/grpc/metrics.go              |  62 ++--
 banyand/liaison/grpc/server.go               |   4 +
 banyand/liaison/grpc/stream.go               |   3 +
 banyand/liaison/grpc/trace.go                |   3 +
 8 files changed, 733 insertions(+), 23 deletions(-)

diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go
index c4a606736..7ac0cb6dc 100644
--- a/banyand/liaison/grpc/barrier.go
+++ b/banyand/liaison/grpc/barrier.go
@@ -28,6 +28,7 @@ import (
        schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
 const (
@@ -60,6 +61,13 @@ type barrierService struct {
        peerLiaisons func() queue.Client
        dataNodes    func() queue.Client
        selfName     func() string
+       // metrics and l are wired in during PreRun (server.go) so the barrier
+       // can record schema_await_*_duration_seconds histograms,
+       // schema_barrier_laggard_nodes_total counters, and structured 
access-log
+       // lines on every call. Both are nil for legacy fixtures that exercise
+       // the standalone path with newBarrierService directly.
+       metrics *metrics
+       l       *logger.Logger
 }
 
 func newBarrierService(cacheProvider func() barrierCacheReader) 
*barrierService {
@@ -101,7 +109,11 @@ func (b *barrierService) cache() barrierCacheReader {
 // those dependencies (Phase 1 unit-test path), it falls back to a single
 // in-process cache poll, returning a self-only laggard on timeout so callers
 // can diagnose how far behind the standalone cache is.
-func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, 
error) {
+func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (resp 
*schemav1.AwaitRevisionAppliedResponse, err error) {
+       start := time.Now()
+       defer func() {
+               b.recordAwaitRevisionAppliedResult(start, req, resp, err)
+       }()
        if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
                return b.awaitRevisionAppliedCluster(ctx, req)
        }
@@ -142,7 +154,11 @@ func (b *barrierService) AwaitRevisionApplied(ctx 
context.Context, req *schemav1
 // dependencies are wired (production), the call probes the frozen tier1 +
 // tier2 + self watched set in parallel via GetKeyRevisions; without them
 // (Phase 1 unit-test path), it falls back to a single in-process cache poll.
-func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, 
error) {
+func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (resp 
*schemav1.AwaitSchemaAppliedResponse, err error) {
+       start := time.Now()
+       defer func() {
+               b.recordAwaitSchemaAppliedResult(start, req, resp, err)
+       }()
        if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
                return b.awaitSchemaAppliedCluster(ctx, req)
        }
@@ -181,7 +197,11 @@ func (b *barrierService) AwaitSchemaApplied(ctx 
context.Context, req *schemav1.A
 // wired (production), the call probes the frozen tier1 + tier2 + self
 // watched set in parallel via GetAbsentKeys; without them (Phase 1
 // unit-test path), it falls back to a single in-process cache poll.
-func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req 
*schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, 
error) {
+func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req 
*schemav1.AwaitSchemaDeletedRequest) (resp 
*schemav1.AwaitSchemaDeletedResponse, err error) {
+       start := time.Now()
+       defer func() {
+               b.recordAwaitSchemaDeletedResult(start, req, resp, err)
+       }()
        if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
                return b.awaitSchemaDeletedCluster(ctx, req)
        }
diff --git a/banyand/liaison/grpc/barrier_metrics.go 
b/banyand/liaison/grpc/barrier_metrics.go
new file mode 100644
index 000000000..a29904cfe
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_metrics.go
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "strings"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+const (
+       // barrierLabelRevisionApplied / SchemaApplied / SchemaDeleted are the
+       // three values used for the `barrier` label on
+       // schema_barrier_laggard_nodes_total. The histogram metrics carry the
+       // barrier identity in their metric name instead, so this label is only
+       // emitted on the laggard counter.
+       barrierLabelRevisionApplied = "revision_applied"
+       barrierLabelSchemaApplied   = "schema_applied"
+       barrierLabelSchemaDeleted   = "schema_deleted"
+
+       // resultLabel* are the values for the `result` label on the three
+       // schema_await_*_duration_seconds histograms. Recorded once per call
+       // at return time.
+       resultLabelApplied         = "applied"
+       resultLabelTimeout         = "timeout"
+       resultLabelInvalidArgument = "invalid_argument"
+       resultLabelError           = "error"
+
+       // roleLabelSelf is used when a laggard's Node field is unprefixed —
+       // the standalone barrier path emits a single self-laggard with no name
+       // on timeout, where there is no role to label against.
+       roleLabelSelf = "self"
+
+       // statusReasonWaitTimeout is the only `reason` value emitted for
+       // schema_status_schema_not_applied_total in v0.11.0. The label is
+       // retained for forward-compat with optional fast-sync paths.
+       statusReasonWaitTimeout = "wait_timeout"
+
+       // rpcLabel* are the values for the `rpc` label on the two status
+       // counters (schema_status_schema_not_applied_total /
+       // schema_status_expired_schema_total). Bound at call sites in
+       // measure.go / stream.go / trace.go for the write and query gates.
+       rpcLabelMeasureWrite = "measure_write"
+       rpcLabelStreamWrite  = "stream_write"
+       rpcLabelTraceWrite   = "trace_write"
+       rpcLabelMeasureQuery = "measure_query"
+       rpcLabelStreamQuery  = "stream_query"
+       rpcLabelTraceQuery   = "trace_query"
+)
+
+// splitRoleNode extracts the `<role>` and `<name>` halves of a laggard's
+// Node identifier per the cluster barrier's `<role>-<Metadata.Name>`
+// convention (see member.laggardName in barrier_cluster.go). Unprefixed
+// values — emitted by the standalone barrier path on timeout — map to
+// role="self" and name="" so the laggard counter still increments without
+// dropping the observation.
+func splitRoleNode(node string) (role, name string) {
+       if i := strings.IndexByte(node, '-'); i >= 0 {
+               return node[:i], node[i+1:]
+       }
+       return roleLabelSelf, node
+}
+
+// barrierResultLabel maps a barrier RPC's (response.Applied, error) outcome
+// to the `result` label value used on the duration histogram. An
+// InvalidArgument status reflects the 10 000-key cap rejection; any other
+// error is reported as "error" so dashboards can flag transport-layer
+// failures separately from cache-applied vs cache-timeout.
+func barrierResultLabel(applied bool, err error) string {
+       if err != nil {
+               if s, ok := status.FromError(err); ok && s.Code() == 
codes.InvalidArgument {
+                       return resultLabelInvalidArgument
+               }
+               return resultLabelError
+       }
+       if applied {
+               return resultLabelApplied
+       }
+       return resultLabelTimeout
+}
+
+// recordBarrierLaggards bumps the schema_barrier_laggard_nodes_total counter
+// once per laggard in the response. The role/name split lets dashboards
+// answer "which node fell behind on which barrier" without unbounded
+// cardinality on a single label.
+//
+// Metric is silently skipped when counter is nil (test fixtures that
+// construct barrierService without metrics) or when laggards is empty.
+func recordBarrierLaggards(counter meter.Counter, barrier string, laggards 
[]*schemav1.NodeLaggard) {
+       if counter == nil {
+               return
+       }
+       for _, lag := range laggards {
+               role, name := splitRoleNode(lag.GetNode())
+               counter.Inc(1, barrier, role, name)
+       }
+}
+
+// recordAwaitRevisionAppliedResult observes the duration histogram, bumps
+// the laggard counter, and emits a structured access-log line for one
+// AwaitRevisionApplied call. Defensive about nil metrics / nil logger so
+// fixtures that construct barrierService directly (without server.PreRun)
+// stay zero-cost.
+func (b *barrierService) recordAwaitRevisionAppliedResult(
+       start time.Time,
+       req *schemav1.AwaitRevisionAppliedRequest,
+       resp *schemav1.AwaitRevisionAppliedResponse,
+       err error,
+) {
+       duration := time.Since(start)
+       result := barrierResultLabel(resp.GetApplied(), err)
+       if b.metrics != nil {
+               if h := b.metrics.schemaAwaitRevisionAppliedDuration; h != nil {
+                       h.Observe(duration.Seconds(), result)
+               }
+               recordBarrierLaggards(b.metrics.schemaBarrierLaggards, 
barrierLabelRevisionApplied, resp.GetLaggards())
+       }
+       if b.l != nil {
+               b.l.Info().
+                       Str("barrier", barrierLabelRevisionApplied).
+                       Str("result", result).
+                       Int64("min_revision", req.GetMinRevision()).
+                       Int("laggards", len(resp.GetLaggards())).
+                       Dur("duration", duration).
+                       Msg("schema barrier completed")
+       }
+}
+
+// recordAwaitSchemaAppliedResult is the AwaitSchemaApplied counterpart of
+// recordAwaitRevisionAppliedResult. The access-log carries the request's
+// key count instead of min_revision because per-key revisions are too
+// chatty to log at INFO level.
+func (b *barrierService) recordAwaitSchemaAppliedResult(
+       start time.Time,
+       req *schemav1.AwaitSchemaAppliedRequest,
+       resp *schemav1.AwaitSchemaAppliedResponse,
+       err error,
+) {
+       duration := time.Since(start)
+       result := barrierResultLabel(resp.GetApplied(), err)
+       if b.metrics != nil {
+               if h := b.metrics.schemaAwaitSchemaAppliedDuration; h != nil {
+                       h.Observe(duration.Seconds(), result)
+               }
+               recordBarrierLaggards(b.metrics.schemaBarrierLaggards, 
barrierLabelSchemaApplied, resp.GetLaggards())
+       }
+       if b.l != nil {
+               b.l.Info().
+                       Str("barrier", barrierLabelSchemaApplied).
+                       Str("result", result).
+                       Int("keys", len(req.GetKeys())).
+                       Int("laggards", len(resp.GetLaggards())).
+                       Dur("duration", duration).
+                       Msg("schema barrier completed")
+       }
+}
+
+// emitStatusExpired bumps schema_status_expired_schema_total once for an
+// (rpc, group) pair. Skipped when m or the counter is nil so legacy test
+// fixtures stay zero-cost.
+func (m *metrics) emitStatusExpired(rpc, group string) {
+       if m == nil || m.schemaStatusExpired == nil {
+               return
+       }
+       m.schemaStatusExpired.Inc(1, rpc, group)
+}
+
+// emitStatusNotAppliedTimeout bumps schema_status_schema_not_applied_total
+// with reason="wait_timeout" — the only reason emitted in v0.11.0. The
+// reason label is retained on the metric so dashboards built against
+// v0.11.0 do not break if fast-sync paths land in a later release.
+func (m *metrics) emitStatusNotAppliedTimeout(rpc, group string) {
+       if m == nil || m.schemaStatusNotApplied == nil {
+               return
+       }
+       m.schemaStatusNotApplied.Inc(1, rpc, group, statusReasonWaitTimeout)
+}
+
+// recordQueryGateStatuses iterates the per-group gate verdicts produced by
+// checkQueryGate and bumps the matching status counters. STATUS_SUCCEED is
+// a no-op; STATUS_EXPIRED_SCHEMA bumps schema_status_expired_schema_total;
+// STATUS_SCHEMA_NOT_APPLIED bumps schema_status_schema_not_applied_total
+// with reason="wait_timeout". Other statuses (STATUS_NOT_FOUND) carry no
+// counter — they are surfaced through the gate's normal response shape.
+func recordQueryGateStatuses(m *metrics, rpc string, statuses 
map[string]modelv1.Status) {
+       if m == nil {
+               return
+       }
+       for group, st := range statuses {
+               switch st {
+               case modelv1.Status_STATUS_EXPIRED_SCHEMA:
+                       m.emitStatusExpired(rpc, group)
+               case modelv1.Status_STATUS_SCHEMA_NOT_APPLIED:
+                       m.emitStatusNotAppliedTimeout(rpc, group)
+               default:
+                       // STATUS_SUCCEED (no-op) and STATUS_NOT_FOUND / other 
non-gate
+                       // statuses are surfaced through the response shape and 
carry
+                       // no counter in v0.11.0.
+               }
+       }
+}
+
+// recordAwaitSchemaDeletedResult mirrors recordAwaitSchemaAppliedResult for
+// the deletion barrier; the only differences are the metric routing and
+// the access-log barrier label.
+func (b *barrierService) recordAwaitSchemaDeletedResult(
+       start time.Time,
+       req *schemav1.AwaitSchemaDeletedRequest,
+       resp *schemav1.AwaitSchemaDeletedResponse,
+       err error,
+) {
+       duration := time.Since(start)
+       result := barrierResultLabel(resp.GetApplied(), err)
+       if b.metrics != nil {
+               if h := b.metrics.schemaAwaitSchemaDeletedDuration; h != nil {
+                       h.Observe(duration.Seconds(), result)
+               }
+               recordBarrierLaggards(b.metrics.schemaBarrierLaggards, 
barrierLabelSchemaDeleted, resp.GetLaggards())
+       }
+       if b.l != nil {
+               b.l.Info().
+                       Str("barrier", barrierLabelSchemaDeleted).
+                       Str("result", result).
+                       Int("keys", len(req.GetKeys())).
+                       Int("laggards", len(resp.GetLaggards())).
+                       Dur("duration", duration).
+                       Msg("schema barrier completed")
+       }
+}
diff --git a/banyand/liaison/grpc/barrier_metrics_test.go 
b/banyand/liaison/grpc/barrier_metrics_test.go
new file mode 100644
index 000000000..e0433d416
--- /dev/null
+++ b/banyand/liaison/grpc/barrier_metrics_test.go
@@ -0,0 +1,405 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "context"
+       "sync"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/banyand/observability"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+// recordingFactory is a minimal observability.Factory that captures every
+// Counter.Inc / Histogram.Observe / Gauge.Set call for assertion. Tests use
+// it to pin the schema-barrier metric labels emitted by the production
+// instrumentation without depending on Prometheus or the bypass registry's
+// no-op semantics.
+type recordingFactory struct {
+       counters   map[string]*recordingCounter
+       histograms map[string]*recordingHistogram
+       gauges     map[string]*recordingGauge
+       mu         sync.Mutex
+}
+
+func newRecordingFactory() *recordingFactory {
+       return &recordingFactory{
+               counters:   map[string]*recordingCounter{},
+               histograms: map[string]*recordingHistogram{},
+               gauges:     map[string]*recordingGauge{},
+       }
+}
+
+func (f *recordingFactory) NewCounter(name string, _ ...string) meter.Counter {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       c := &recordingCounter{}
+       f.counters[name] = c
+       return c
+}
+
+func (f *recordingFactory) NewHistogram(name string, _ meter.Buckets, _ 
...string) meter.Histogram {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       h := &recordingHistogram{}
+       f.histograms[name] = h
+       return h
+}
+
+func (f *recordingFactory) NewGauge(name string, _ ...string) meter.Gauge {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       g := &recordingGauge{}
+       f.gauges[name] = g
+       return g
+}
+
+func (f *recordingFactory) Close() {}
+
+func (f *recordingFactory) counter(name string) *recordingCounter {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       return f.counters[name]
+}
+
+func (f *recordingFactory) histogram(name string) *recordingHistogram {
+       f.mu.Lock()
+       defer f.mu.Unlock()
+       return f.histograms[name]
+}
+
+type recordedCall struct {
+       labels []string
+       value  float64
+}
+
+type recordingCounter struct {
+       calls []recordedCall
+       mu    sync.Mutex
+}
+
+func (r *recordingCounter) Inc(delta float64, labelValues ...string) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.calls = append(r.calls, recordedCall{value: delta, labels: 
append([]string{}, labelValues...)})
+}
+
+func (r *recordingCounter) Delete(_ ...string) bool { return true }
+
+func (r *recordingCounter) snapshot() []recordedCall {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       out := make([]recordedCall, len(r.calls))
+       copy(out, r.calls)
+       return out
+}
+
+type recordingHistogram struct {
+       calls []recordedCall
+       mu    sync.Mutex
+}
+
+func (r *recordingHistogram) Observe(value float64, labelValues ...string) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       r.calls = append(r.calls, recordedCall{value: value, labels: 
append([]string{}, labelValues...)})
+}
+
+func (r *recordingHistogram) Delete(_ ...string) bool { return true }
+
+func (r *recordingHistogram) snapshot() []recordedCall {
+       r.mu.Lock()
+       defer r.mu.Unlock()
+       out := make([]recordedCall, len(r.calls))
+       copy(out, r.calls)
+       return out
+}
+
+type recordingGauge struct{}
+
+func (r *recordingGauge) Set(_ float64, _ ...string) {}
+func (r *recordingGauge) Add(_ float64, _ ...string) {}
+func (r *recordingGauge) Delete(_ ...string) bool    { return true }
+
+var _ observability.Factory = (*recordingFactory)(nil)
+
+func newRecordingMetrics(t *testing.T) (*recordingFactory, *metrics) {
+       t.Helper()
+       f := newRecordingFactory()
+       return f, newMetrics(f)
+}
+
+// fakeBarrierCache satisfies barrierCacheReader with controllable revision
+// + per-key state for the standalone path tests below.
+type fakeBarrierCache struct {
+       keyRevs map[string]int64
+       maxRev  int64
+}
+
+func (f fakeBarrierCache) GetMaxModRevision() int64 {
+       return f.maxRev
+}
+
+func (f fakeBarrierCache) GetKeyModRevision(propID string) (int64, bool) {
+       if f.keyRevs == nil {
+               return 0, false
+       }
+       rev, ok := f.keyRevs[propID]
+       return rev, ok
+}
+
+// --- Helper-level tests --------------------------------------------------.
+
+func TestBarrierResultLabel_AppliedTrue_ReturnsApplied(t *testing.T) {
+       assert.Equal(t, resultLabelApplied, barrierResultLabel(true, nil))
+}
+
+func TestBarrierResultLabel_AppliedFalseNoErr_ReturnsTimeout(t *testing.T) {
+       assert.Equal(t, resultLabelTimeout, barrierResultLabel(false, nil))
+}
+
+func TestBarrierResultLabel_InvalidArgument_ReturnsInvalidArgument(t 
*testing.T) {
+       err := status.Errorf(codes.InvalidArgument, "too many keys")
+       assert.Equal(t, resultLabelInvalidArgument, barrierResultLabel(false, 
err))
+}
+
+func TestBarrierResultLabel_OtherError_ReturnsError(t *testing.T) {
+       err := status.Errorf(codes.Internal, "boom")
+       assert.Equal(t, resultLabelError, barrierResultLabel(false, err))
+}
+
+func TestSplitRoleNode_LiaisonPrefix(t *testing.T) {
+       role, name := splitRoleNode("liaison-foo")
+       assert.Equal(t, "liaison", role)
+       assert.Equal(t, "foo", name)
+}
+
+func TestSplitRoleNode_DataPrefix(t *testing.T) {
+       role, name := splitRoleNode("data-bar")
+       assert.Equal(t, "data", role)
+       assert.Equal(t, "bar", name)
+}
+
+func TestSplitRoleNode_NoPrefix_FallsBackToSelf(t *testing.T) {
+       role, name := splitRoleNode("nodash")
+       assert.Equal(t, roleLabelSelf, role)
+       assert.Equal(t, "nodash", name)
+}
+
+func TestSplitRoleNode_Empty_FallsBackToSelf(t *testing.T) {
+       role, name := splitRoleNode("")
+       assert.Equal(t, roleLabelSelf, role)
+       assert.Equal(t, "", name)
+}
+
+func TestSplitRoleNode_DataWithEmbeddedDashes_PreservesTail(t *testing.T) {
+       role, name := splitRoleNode("data-host-with-dashes")
+       assert.Equal(t, "data", role)
+       assert.Equal(t, "host-with-dashes", name)
+}
+
+func TestRecordBarrierLaggards_NilCounter_NoPanic(_ *testing.T) {
+       // Defensive: empty laggards on nil counter must not panic.
+       recordBarrierLaggards(nil, barrierLabelRevisionApplied, 
[]*schemav1.NodeLaggard{{Node: "data-bar"}})
+}
+
+func TestRecordBarrierLaggards_BumpsPerLaggardWithRoleSplit(t *testing.T) {
+       f, m := newRecordingMetrics(t)
+       recordBarrierLaggards(m.schemaBarrierLaggards, 
barrierLabelSchemaApplied, []*schemav1.NodeLaggard{
+               {Node: "liaison-foo"},
+               {Node: "data-bar"},
+               {Node: "noprefix"},
+       })
+       calls := f.counter("schema_barrier_laggard_nodes_total").snapshot()
+       require.Len(t, calls, 3)
+       assert.Equal(t, []string{barrierLabelSchemaApplied, "liaison", "foo"}, 
calls[0].labels)
+       assert.Equal(t, []string{barrierLabelSchemaApplied, "data", "bar"}, 
calls[1].labels)
+       assert.Equal(t, []string{barrierLabelSchemaApplied, roleLabelSelf, 
"noprefix"}, calls[2].labels)
+}
+
+// --- Barrier integration tests ------------------------------------------.
+
+func newRecordingBarrier(t *testing.T, cache barrierCacheReader) 
(*barrierService, *recordingFactory) {
+       t.Helper()
+       f, m := newRecordingMetrics(t)
+       b := newBarrierService(func() barrierCacheReader { return cache })
+       b.metrics = m
+       b.l = logger.GetLogger("test-barrier")
+       return b, f
+}
+
+func TestAwaitRevisionApplied_AppliedRecordsAppliedHistogram(t *testing.T) {
+       b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 100})
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
200*time.Millisecond)
+       defer cancel()
+       resp, err := b.AwaitRevisionApplied(ctx, 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(50 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied())
+
+       hist := f.histogram("schema_await_revision_applied_duration_seconds")
+       require.NotNil(t, hist)
+       calls := hist.snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{resultLabelApplied}, calls[0].labels)
+
+       lag := f.counter("schema_barrier_laggard_nodes_total").snapshot()
+       assert.Empty(t, lag, "applied=true response carries no laggards")
+}
+
+func TestAwaitRevisionApplied_TimeoutRecordsTimeoutAndLaggard(t *testing.T) {
+       b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 50})
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
200*time.Millisecond)
+       defer cancel()
+       resp, err := b.AwaitRevisionApplied(ctx, 
&schemav1.AwaitRevisionAppliedRequest{
+               MinRevision: 100,
+               Timeout:     durationpb.New(30 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.False(t, resp.GetApplied())
+
+       calls := 
f.histogram("schema_await_revision_applied_duration_seconds").snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{resultLabelTimeout}, calls[0].labels)
+
+       lag := f.counter("schema_barrier_laggard_nodes_total").snapshot()
+       require.Len(t, lag, 1, "standalone timeout emits a single self laggard")
+       assert.Equal(t, []string{barrierLabelRevisionApplied, roleLabelSelf, 
""}, lag[0].labels)
+}
+
+func TestAwaitSchemaApplied_TooManyKeys_RecordsInvalidArgument(t *testing.T) {
+       b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 0})
+
+       keys := make([]*schemav1.SchemaKey, barrierMaxKeys+1)
+       for i := range keys {
+               keys[i] = &schemav1.SchemaKey{Kind: "measure", Group: "g", 
Name: "m"}
+       }
+       resp, err := b.AwaitSchemaApplied(context.Background(), 
&schemav1.AwaitSchemaAppliedRequest{Keys: keys})
+       require.Error(t, err, "expected InvalidArgument when keys exceed cap")
+       require.Nil(t, resp)
+
+       calls := 
f.histogram("schema_await_schema_applied_duration_seconds").snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{resultLabelInvalidArgument}, calls[0].labels)
+}
+
+func TestAwaitSchemaDeleted_AllAbsent_RecordsAppliedHistogram(t *testing.T) {
+       b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 0})
+
+       resp, err := b.AwaitSchemaDeleted(context.Background(), 
&schemav1.AwaitSchemaDeletedRequest{
+               Keys:    []*schemav1.SchemaKey{{Kind: "measure", Group: "g", 
Name: "m"}},
+               Timeout: durationpb.New(20 * time.Millisecond),
+       })
+       require.NoError(t, err)
+       assert.True(t, resp.GetApplied())
+
+       calls := 
f.histogram("schema_await_schema_deleted_duration_seconds").snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{resultLabelApplied}, calls[0].labels)
+}
+
+// --- Status counter integration -----------------------------------------.
+
+func newRecordingMeasureService(t *testing.T, er *entityRepo, maxWait 
time.Duration) (*measureService, *recordingFactory) {
+       t.Helper()
+       f, m := newRecordingMetrics(t)
+       return &measureService{
+               discoveryService: &discoveryService{entityRepo: er},
+               maxWaitDuration:  maxWait,
+               l:                logger.GetLogger("test"),
+               metrics:          m,
+       }, f
+}
+
+func TestWriteGate_Measure_StaleClient_BumpsExpiredCounter(t *testing.T) {
+       id := identity{group: "g", name: "m"}
+       er := seededLocatorRepo(id)
+       svc, f := newRecordingMeasureService(t, er, 30*time.Millisecond)
+       mock := &mockBidiServer[measurev1.WriteRequest, 
measurev1.WriteResponse]{}
+
+       meta := &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 50}
+       st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock)
+       require.Equal(t, modelv1.Status_STATUS_EXPIRED_SCHEMA, st)
+
+       calls := f.counter("schema_status_expired_schema_total").snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{rpcLabelMeasureWrite, "g"}, calls[0].labels)
+
+       notApplied := 
f.counter("schema_status_schema_not_applied_total").snapshot()
+       assert.Empty(t, notApplied, "stale-client path must not bump the 
not-applied counter")
+}
+
+func TestWriteGate_Measure_AheadClient_TimesOut_BumpsNotAppliedCounter(t 
*testing.T) {
+       id := identity{group: "g", name: "m"}
+       er := seededLocatorRepo(id)
+       svc, f := newRecordingMeasureService(t, er, 30*time.Millisecond)
+       mock := &mockBidiServer[measurev1.WriteRequest, 
measurev1.WriteResponse]{}
+
+       meta := &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 9999}
+       st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock)
+       require.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, st)
+
+       calls := f.counter("schema_status_schema_not_applied_total").snapshot()
+       require.Len(t, calls, 1)
+       assert.Equal(t, []string{rpcLabelMeasureWrite, "g", 
statusReasonWaitTimeout}, calls[0].labels)
+
+       expired := f.counter("schema_status_expired_schema_total").snapshot()
+       assert.Empty(t, expired, "ahead-client path must not bump the expired 
counter")
+}
+
+func TestRecordQueryGateStatuses_BumpsPerGroupCounters(t *testing.T) {
+       f, m := newRecordingMetrics(t)
+       statuses := map[string]modelv1.Status{
+               "g_succ":    modelv1.Status_STATUS_SUCCEED,
+               "g_expired": modelv1.Status_STATUS_EXPIRED_SCHEMA,
+               "g_lag":     modelv1.Status_STATUS_SCHEMA_NOT_APPLIED,
+               "g_other":   modelv1.Status_STATUS_NOT_FOUND,
+       }
+       recordQueryGateStatuses(m, rpcLabelMeasureQuery, statuses)
+
+       expired := f.counter("schema_status_expired_schema_total").snapshot()
+       require.Len(t, expired, 1)
+       assert.Equal(t, []string{rpcLabelMeasureQuery, "g_expired"}, 
expired[0].labels)
+
+       notApplied := 
f.counter("schema_status_schema_not_applied_total").snapshot()
+       require.Len(t, notApplied, 1)
+       assert.Equal(t, []string{rpcLabelMeasureQuery, "g_lag", 
statusReasonWaitTimeout}, notApplied[0].labels)
+}
+
+func TestRecordQueryGateStatuses_NilMetrics_NoPanic(_ *testing.T) {
+       recordQueryGateStatuses(nil, rpcLabelMeasureQuery, 
map[string]modelv1.Status{
+               "g": modelv1.Status_STATUS_EXPIRED_SCHEMA,
+       })
+}
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index fd3aaaba9..b5e6d87ba 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -209,6 +209,7 @@ func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequ
                cacheRev := resolveSchemaRevision(reg, schema.KindMeasure, 
metadata.GetGroup(), metadata.GetName(), measureCache.ModRevision)
                if clientRev < cacheRev {
                        ms.l.Error().Stringer("written", writeRequest).Msg("the 
measure schema is expired")
+                       ms.metrics.emitStatusExpired(rpcLabelMeasureWrite, 
metadata.GetGroup())
                        ms.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure)
                        return modelv1.Status_STATUS_EXPIRED_SCHEMA
                }
@@ -221,6 +222,7 @@ func (ms *measureService) validateWriteRequest(writeRequest 
*measurev1.WriteRequ
                                return resolveSchemaRevision(reg, 
schema.KindMeasure, metadata.GetGroup(), metadata.GetName(), fallback)
                        }, clientRev, ms.maxWaitDuration)
                        if !reached {
+                               
ms.metrics.emitStatusNotAppliedTimeout(rpcLabelMeasureWrite, 
metadata.GetGroup())
                                ms.sendReply(metadata, 
modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeRequest.GetMessageId(), measure)
                                return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED
                        }
@@ -450,6 +452,7 @@ func (ms *measureService) Query(ctx context.Context, req 
*measurev1.QueryRequest
                        loc, ok := ms.entityRepo.getLocator(identity{name: 
name, group: group})
                        return resolveQueryGateRevision(measureReg, 
schema.KindMeasure, group, name, loc.ModRevision, ok)
                }, ms.maxWaitDuration)
+       recordQueryGateStatuses(ms.metrics, rpcLabelMeasureQuery, gatedStatuses)
        if shortCircuit {
                return &measurev1.QueryResponse{GroupStatuses: gatedStatuses}, 
nil
        }
diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go
index ed7d546c2..e97e35ce7 100644
--- a/banyand/liaison/grpc/metrics.go
+++ b/banyand/liaison/grpc/metrics.go
@@ -47,30 +47,52 @@ type metrics struct {
        memoryLoadSheddingRejections meter.Counter
        grpcBufferSize               meter.Gauge // Shared gauge for both conn 
and stream buffer sizes
        memoryState                  meter.Gauge
+
+       // Step 2.7 — schema-barrier observability. The three Await* RPCs each
+       // emit one histogram observation per call labeled by `result`
+       // ("applied", "timeout", "invalid_argument"). Laggards observed during
+       // any barrier are counted by `barrier`, `role`, `node` so dashboards
+       // can break out which node fell behind on which call. The two status
+       // counters cover the gate's STATUS_SCHEMA_NOT_APPLIED / EXPIRED_SCHEMA
+       // outcomes by `rpc` and `group`; in v0.11.0 the only `reason` value
+       // emitted is "wait_timeout" — the label is retained for forward-compat
+       // with optional fast-sync paths that may land in a later release.
+       schemaAwaitRevisionAppliedDuration meter.Histogram
+       schemaAwaitSchemaAppliedDuration   meter.Histogram
+       schemaAwaitSchemaDeletedDuration   meter.Histogram
+       schemaBarrierLaggards              meter.Counter
+       schemaStatusNotApplied             meter.Counter
+       schemaStatusExpired                meter.Counter
 }
 
 func newMetrics(factory observability.Factory) *metrics {
        return &metrics{
-               totalStarted:                 
factory.NewCounter("total_started", "group", "service", "method"),
-               totalFinished:                
factory.NewCounter("total_finished", "group", "service", "method"),
-               totalErr:                     factory.NewCounter("total_err", 
"group", "service", "method"),
-               totalPanic:                   factory.NewCounter("total_panic"),
-               totalLatency:                 
factory.NewCounter("total_latency", "group", "service", "method"),
-               totalStreamStarted:           
factory.NewCounter("total_stream_started", "service", "method"),
-               totalStreamFinished:          
factory.NewCounter("total_stream_finished", "service", "method"),
-               totalStreamErr:               
factory.NewCounter("total_stream_err", "service", "method"),
-               totalStreamLatency:           
factory.NewCounter("total_stream_latency", "service", "method"),
-               totalStreamMsgReceived:       
factory.NewCounter("total_stream_msg_received", "group", "service", "method"),
-               totalStreamMsgReceivedErr:    
factory.NewCounter("total_stream_msg_received_err", "group", "service", 
"method"),
-               totalStreamMsgSent:           
factory.NewCounter("total_stream_msg_sent", "group", "service", "method"),
-               totalStreamMsgSentErr:        
factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"),
-               totalRegistryStarted:         
factory.NewCounter("total_registry_started", "group", "service", "method"),
-               totalRegistryFinished:        
factory.NewCounter("total_registry_finished", "group", "service", "method"),
-               totalRegistryErr:             
factory.NewCounter("total_registry_err", "group", "service", "method"),
-               totalRegistryLatency:         
factory.NewCounter("total_registry_latency", "group", "service", "method"),
-               memoryLoadSheddingRejections: 
factory.NewCounter("memory_load_shedding_rejections_total", "service"),
-               grpcBufferSize:               
factory.NewGauge("grpc_buffer_size_bytes", "type"),
-               memoryState:                  factory.NewGauge("memory_state"),
+               totalStarted:                       
factory.NewCounter("total_started", "group", "service", "method"),
+               totalFinished:                      
factory.NewCounter("total_finished", "group", "service", "method"),
+               totalErr:                           
factory.NewCounter("total_err", "group", "service", "method"),
+               totalPanic:                         
factory.NewCounter("total_panic"),
+               totalLatency:                       
factory.NewCounter("total_latency", "group", "service", "method"),
+               totalStreamStarted:                 
factory.NewCounter("total_stream_started", "service", "method"),
+               totalStreamFinished:                
factory.NewCounter("total_stream_finished", "service", "method"),
+               totalStreamErr:                     
factory.NewCounter("total_stream_err", "service", "method"),
+               totalStreamLatency:                 
factory.NewCounter("total_stream_latency", "service", "method"),
+               totalStreamMsgReceived:             
factory.NewCounter("total_stream_msg_received", "group", "service", "method"),
+               totalStreamMsgReceivedErr:          
factory.NewCounter("total_stream_msg_received_err", "group", "service", 
"method"),
+               totalStreamMsgSent:                 
factory.NewCounter("total_stream_msg_sent", "group", "service", "method"),
+               totalStreamMsgSentErr:              
factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"),
+               totalRegistryStarted:               
factory.NewCounter("total_registry_started", "group", "service", "method"),
+               totalRegistryFinished:              
factory.NewCounter("total_registry_finished", "group", "service", "method"),
+               totalRegistryErr:                   
factory.NewCounter("total_registry_err", "group", "service", "method"),
+               totalRegistryLatency:               
factory.NewCounter("total_registry_latency", "group", "service", "method"),
+               memoryLoadSheddingRejections:       
factory.NewCounter("memory_load_shedding_rejections_total", "service"),
+               grpcBufferSize:                     
factory.NewGauge("grpc_buffer_size_bytes", "type"),
+               memoryState:                        
factory.NewGauge("memory_state"),
+               schemaAwaitRevisionAppliedDuration: 
factory.NewHistogram("schema_await_revision_applied_duration_seconds", 
meter.DefBuckets, "result"),
+               schemaAwaitSchemaAppliedDuration:   
factory.NewHistogram("schema_await_schema_applied_duration_seconds", 
meter.DefBuckets, "result"),
+               schemaAwaitSchemaDeletedDuration:   
factory.NewHistogram("schema_await_schema_deleted_duration_seconds", 
meter.DefBuckets, "result"),
+               schemaBarrierLaggards:              
factory.NewCounter("schema_barrier_laggard_nodes_total", "barrier", "role", 
"node"),
+               schemaStatusNotApplied:             
factory.NewCounter("schema_status_schema_not_applied_total", "rpc", "group", 
"reason"),
+               schemaStatusExpired:                
factory.NewCounter("schema_status_expired_schema_total", "rpc", "group"),
        }
 }
 
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index e787368d7..0e3cdd722 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -364,6 +364,10 @@ func (s *server) PreRun(ctx context.Context) error {
        s.traceSVC.metrics = metrics
        s.bydbQLSVC.metrics = metrics
        s.propertyServer.metrics = metrics
+       if s.barrierSVC != nil {
+               s.barrierSVC.metrics = metrics
+               s.barrierSVC.l = s.log.Named("barrier")
+       }
        s.streamRegistryServer.metrics = metrics
        s.indexRuleBindingRegistryServer.metrics = metrics
        s.indexRuleRegistryServer.metrics = metrics
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 34beea76b..01aac40e4 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -114,6 +114,7 @@ func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
                cacheRev := resolveSchemaRevision(reg, schema.KindStream, 
metadata.GetGroup(), metadata.GetName(), streamCache.ModRevision)
                if clientRev < cacheRev {
                        s.l.Error().Stringer("written", writeEntity).Msg("the 
stream schema is expired")
+                       s.metrics.emitStatusExpired(rpcLabelStreamWrite, 
metadata.GetGroup())
                        s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream)
                        return modelv1.Status_STATUS_EXPIRED_SCHEMA
                }
@@ -126,6 +127,7 @@ func (s *streamService) validateWriteRequest(writeEntity 
*streamv1.WriteRequest,
                                return resolveSchemaRevision(reg, 
schema.KindStream, metadata.GetGroup(), metadata.GetName(), fallback)
                        }, clientRev, s.maxWaitDuration)
                        if !reached {
+                               
s.metrics.emitStatusNotAppliedTimeout(rpcLabelStreamWrite, metadata.GetGroup())
                                s.sendReply(metadata, 
modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetMessageId(), stream)
                                return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED
                        }
@@ -415,6 +417,7 @@ func (s *streamService) Query(ctx context.Context, req 
*streamv1.QueryRequest) (
                        loc, ok := s.entityRepo.getLocator(identity{name: name, 
group: group})
                        return resolveQueryGateRevision(streamReg, 
schema.KindStream, group, name, loc.ModRevision, ok)
                }, s.maxWaitDuration)
+       recordQueryGateStatuses(s.metrics, rpcLabelStreamQuery, gatedStatuses)
        if shortCircuit {
                return &streamv1.QueryResponse{GroupStatuses: gatedStatuses}, 
nil
        }
diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go
index 75b0b6b9a..4d1fcabd1 100644
--- a/banyand/liaison/grpc/trace.go
+++ b/banyand/liaison/grpc/trace.go
@@ -156,6 +156,7 @@ func (s *traceService) validateWriteRequest(writeEntity 
*tracev1.WriteRequest,
                cacheRev := resolveSchemaRevision(reg, schema.KindTrace, 
metadata.GetGroup(), metadata.GetName(), 
traceEntity.GetMetadata().GetModRevision())
                if clientRev < cacheRev {
                        s.l.Error().Stringer("written", writeEntity).Msg("the 
trace schema is expired")
+                       s.metrics.emitStatusExpired(rpcLabelTraceWrite, 
metadata.GetGroup())
                        s.sendReply(metadata, 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetVersion(), stream)
                        return modelv1.Status_STATUS_EXPIRED_SCHEMA
                }
@@ -168,6 +169,7 @@ func (s *traceService) validateWriteRequest(writeEntity 
*tracev1.WriteRequest,
                                return resolveSchemaRevision(reg, 
schema.KindTrace, metadata.GetGroup(), metadata.GetName(), fallback)
                        }, clientRev, s.maxWaitDuration)
                        if !reached {
+                               
s.metrics.emitStatusNotAppliedTimeout(rpcLabelTraceWrite, metadata.GetGroup())
                                s.sendReply(metadata, 
modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetVersion(), stream)
                                return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED
                        }
@@ -534,6 +536,7 @@ func (s *traceService) Query(ctx context.Context, req 
*tracev1.QueryRequest) (re
                        loc, ok := s.entityRepo.getLocator(identity{name: name, 
group: group})
                        return resolveQueryGateRevision(traceReg, 
schema.KindTrace, group, name, loc.ModRevision, ok)
                }, s.maxWaitDuration)
+       recordQueryGateStatuses(s.metrics, rpcLabelTraceQuery, gatedStatuses)
        if shortCircuit {
                return &tracev1.QueryResponse{GroupStatuses: gatedStatuses}, nil
        }


Reply via email to