This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit c6c9ae8216e4765f4738f89fd5e2d0ac1db799ae Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 23:58:59 2026 +0000 feat(barrier): metrics + access logging for the schema-consistency cluster (Phase 2 Step 2.7, §A17) Wires the A17 observability surface the plan calls out as a CP-6 prerequisite. Three duration histograms — one per Await* RPC — record every call labelled by `result` ("applied" / "timeout" / "invalid_argument" / "error"); a single counter aggregates per-laggard observations across all three barriers, broken out by `barrier`, `role` ("liaison" / "data" / "self" for unprefixed standalone laggards), and `node`. The split decodes the <role>-<Metadata.Name> identifier the cluster barrier already emits (see member.laggardName in barrier_cluster.go). Two status counters cover the gate's user-visible verdict on the same node: schema_status_schema_not_applied_total{rpc,group,reason} schema_status_expired_schema_total{rpc,group} The reason label is fixed at "wait_timeout" in v0.11.0 — the only emitting path is awaitRevisionReached's deadline expiration — but the label is retained on the metric so dashboards built today do not break if optional fast-sync paths land in a later release. The rpc label covers the six gate entry points (measure / stream / trace × write / query). barrierService grows two fields populated in server.go PreRun: a metrics handle and a named logger child. Each AwaitX entry point uses named return values + defer to record duration, recordBarrierLaggards, and emit a structured access-log line carrying min_revision (or keys count for schema_applied/deleted), laggards count, duration, and result. validateWriteRequest and the per-group query gate caller in measure.go / stream.go / trace.go bump the status counters at the same return points where the response status is set. 19 unit tests in banyand/liaison/grpc/barrier_metrics_test.go pin the contract via a recordingFactory that captures every Counter.Inc and Histogram.Observe call: result-label routing under each outcome, role/name split for laggards, per-RPC counter labels for the gate's three-way split, and nil-safe behavior for fixtures without a metrics handle. Distributed schema integration smoke (./test/integration/distributed/ schema/...) still green; full liaison/grpc unit suite green. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- banyand/liaison/grpc/barrier.go | 26 +- banyand/liaison/grpc/barrier_metrics.go | 250 +++++++++++++++++ banyand/liaison/grpc/barrier_metrics_test.go | 405 +++++++++++++++++++++++++++ banyand/liaison/grpc/measure.go | 3 + banyand/liaison/grpc/metrics.go | 62 ++-- banyand/liaison/grpc/server.go | 4 + banyand/liaison/grpc/stream.go | 3 + banyand/liaison/grpc/trace.go | 3 + 8 files changed, 733 insertions(+), 23 deletions(-) diff --git a/banyand/liaison/grpc/barrier.go b/banyand/liaison/grpc/barrier.go index c4a606736..7ac0cb6dc 100644 --- a/banyand/liaison/grpc/barrier.go +++ b/banyand/liaison/grpc/barrier.go @@ -28,6 +28,7 @@ import ( schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/logger" ) const ( @@ -60,6 +61,13 @@ type barrierService struct { peerLiaisons func() queue.Client dataNodes func() queue.Client selfName func() string + // metrics and l are wired in during PreRun (server.go) so the barrier + // can record schema_await_*_duration_seconds histograms, + // schema_barrier_laggard_nodes_total counters, and structured access-log + // lines on every call. Both are nil for legacy fixtures that exercise + // the standalone path with newBarrierService directly. + metrics *metrics + l *logger.Logger } func newBarrierService(cacheProvider func() barrierCacheReader) *barrierService { @@ -101,7 +109,11 @@ func (b *barrierService) cache() barrierCacheReader { // those dependencies (Phase 1 unit-test path), it falls back to a single // in-process cache poll, returning a self-only laggard on timeout so callers // can diagnose how far behind the standalone cache is. -func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req *schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, error) { +func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req *schemav1.AwaitRevisionAppliedRequest) (resp *schemav1.AwaitRevisionAppliedResponse, err error) { + start := time.Now() + defer func() { + b.recordAwaitRevisionAppliedResult(start, req, resp, err) + }() if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { return b.awaitRevisionAppliedCluster(ctx, req) } @@ -142,7 +154,11 @@ func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req *schemav1 // dependencies are wired (production), the call probes the frozen tier1 + // tier2 + self watched set in parallel via GetKeyRevisions; without them // (Phase 1 unit-test path), it falls back to a single in-process cache poll. -func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req *schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, error) { +func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req *schemav1.AwaitSchemaAppliedRequest) (resp *schemav1.AwaitSchemaAppliedResponse, err error) { + start := time.Now() + defer func() { + b.recordAwaitSchemaAppliedResult(start, req, resp, err) + }() if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { return b.awaitSchemaAppliedCluster(ctx, req) } @@ -181,7 +197,11 @@ func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req *schemav1.A // wired (production), the call probes the frozen tier1 + tier2 + self // watched set in parallel via GetAbsentKeys; without them (Phase 1 // unit-test path), it falls back to a single in-process cache poll. -func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req *schemav1.AwaitSchemaDeletedRequest) (*schemav1.AwaitSchemaDeletedResponse, error) { +func (b *barrierService) AwaitSchemaDeleted(ctx context.Context, req *schemav1.AwaitSchemaDeletedRequest) (resp *schemav1.AwaitSchemaDeletedResponse, err error) { + start := time.Now() + defer func() { + b.recordAwaitSchemaDeletedResult(start, req, resp, err) + }() if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil { return b.awaitSchemaDeletedCluster(ctx, req) } diff --git a/banyand/liaison/grpc/barrier_metrics.go b/banyand/liaison/grpc/barrier_metrics.go new file mode 100644 index 000000000..a29904cfe --- /dev/null +++ b/banyand/liaison/grpc/barrier_metrics.go @@ -0,0 +1,250 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "strings" + "time" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +const ( + // barrierLabelRevisionApplied / SchemaApplied / SchemaDeleted are the + // three values used for the `barrier` label on + // schema_barrier_laggard_nodes_total. The histogram metrics carry the + // barrier identity in their metric name instead, so this label is only + // emitted on the laggard counter. + barrierLabelRevisionApplied = "revision_applied" + barrierLabelSchemaApplied = "schema_applied" + barrierLabelSchemaDeleted = "schema_deleted" + + // resultLabel* are the values for the `result` label on the three + // schema_await_*_duration_seconds histograms. Recorded once per call + // at return time. + resultLabelApplied = "applied" + resultLabelTimeout = "timeout" + resultLabelInvalidArgument = "invalid_argument" + resultLabelError = "error" + + // roleLabelSelf is used when a laggard's Node field is unprefixed — + // the standalone barrier path emits a single self-laggard with no name + // on timeout, where there is no role to label against. + roleLabelSelf = "self" + + // statusReasonWaitTimeout is the only `reason` value emitted for + // schema_status_schema_not_applied_total in v0.11.0. The label is + // retained for forward-compat with optional fast-sync paths. + statusReasonWaitTimeout = "wait_timeout" + + // rpcLabel* are the values for the `rpc` label on the two status + // counters (schema_status_schema_not_applied_total / + // schema_status_expired_schema_total). Bound at call sites in + // measure.go / stream.go / trace.go for the write and query gates. + rpcLabelMeasureWrite = "measure_write" + rpcLabelStreamWrite = "stream_write" + rpcLabelTraceWrite = "trace_write" + rpcLabelMeasureQuery = "measure_query" + rpcLabelStreamQuery = "stream_query" + rpcLabelTraceQuery = "trace_query" +) + +// splitRoleNode extracts the `<role>` and `<name>` halves of a laggard's +// Node identifier per the cluster barrier's `<role>-<Metadata.Name>` +// convention (see member.laggardName in barrier_cluster.go). Unprefixed +// values — emitted by the standalone barrier path on timeout — map to +// role="self" and name="" so the laggard counter still increments without +// dropping the observation. +func splitRoleNode(node string) (role, name string) { + if i := strings.IndexByte(node, '-'); i >= 0 { + return node[:i], node[i+1:] + } + return roleLabelSelf, node +} + +// barrierResultLabel maps a barrier RPC's (response.Applied, error) outcome +// to the `result` label value used on the duration histogram. An +// InvalidArgument status reflects the 10 000-key cap rejection; any other +// error is reported as "error" so dashboards can flag transport-layer +// failures separately from cache-applied vs cache-timeout. +func barrierResultLabel(applied bool, err error) string { + if err != nil { + if s, ok := status.FromError(err); ok && s.Code() == codes.InvalidArgument { + return resultLabelInvalidArgument + } + return resultLabelError + } + if applied { + return resultLabelApplied + } + return resultLabelTimeout +} + +// recordBarrierLaggards bumps the schema_barrier_laggard_nodes_total counter +// once per laggard in the response. The role/name split lets dashboards +// answer "which node fell behind on which barrier" without unbounded +// cardinality on a single label. +// +// Metric is silently skipped when counter is nil (test fixtures that +// construct barrierService without metrics) or when laggards is empty. +func recordBarrierLaggards(counter meter.Counter, barrier string, laggards []*schemav1.NodeLaggard) { + if counter == nil { + return + } + for _, lag := range laggards { + role, name := splitRoleNode(lag.GetNode()) + counter.Inc(1, barrier, role, name) + } +} + +// recordAwaitRevisionAppliedResult observes the duration histogram, bumps +// the laggard counter, and emits a structured access-log line for one +// AwaitRevisionApplied call. Defensive about nil metrics / nil logger so +// fixtures that construct barrierService directly (without server.PreRun) +// stay zero-cost. +func (b *barrierService) recordAwaitRevisionAppliedResult( + start time.Time, + req *schemav1.AwaitRevisionAppliedRequest, + resp *schemav1.AwaitRevisionAppliedResponse, + err error, +) { + duration := time.Since(start) + result := barrierResultLabel(resp.GetApplied(), err) + if b.metrics != nil { + if h := b.metrics.schemaAwaitRevisionAppliedDuration; h != nil { + h.Observe(duration.Seconds(), result) + } + recordBarrierLaggards(b.metrics.schemaBarrierLaggards, barrierLabelRevisionApplied, resp.GetLaggards()) + } + if b.l != nil { + b.l.Info(). + Str("barrier", barrierLabelRevisionApplied). + Str("result", result). + Int64("min_revision", req.GetMinRevision()). + Int("laggards", len(resp.GetLaggards())). + Dur("duration", duration). + Msg("schema barrier completed") + } +} + +// recordAwaitSchemaAppliedResult is the AwaitSchemaApplied counterpart of +// recordAwaitRevisionAppliedResult. The access-log carries the request's +// key count instead of min_revision because per-key revisions are too +// chatty to log at INFO level. +func (b *barrierService) recordAwaitSchemaAppliedResult( + start time.Time, + req *schemav1.AwaitSchemaAppliedRequest, + resp *schemav1.AwaitSchemaAppliedResponse, + err error, +) { + duration := time.Since(start) + result := barrierResultLabel(resp.GetApplied(), err) + if b.metrics != nil { + if h := b.metrics.schemaAwaitSchemaAppliedDuration; h != nil { + h.Observe(duration.Seconds(), result) + } + recordBarrierLaggards(b.metrics.schemaBarrierLaggards, barrierLabelSchemaApplied, resp.GetLaggards()) + } + if b.l != nil { + b.l.Info(). + Str("barrier", barrierLabelSchemaApplied). + Str("result", result). + Int("keys", len(req.GetKeys())). + Int("laggards", len(resp.GetLaggards())). + Dur("duration", duration). + Msg("schema barrier completed") + } +} + +// emitStatusExpired bumps schema_status_expired_schema_total once for an +// (rpc, group) pair. Skipped when m or the counter is nil so legacy test +// fixtures stay zero-cost. +func (m *metrics) emitStatusExpired(rpc, group string) { + if m == nil || m.schemaStatusExpired == nil { + return + } + m.schemaStatusExpired.Inc(1, rpc, group) +} + +// emitStatusNotAppliedTimeout bumps schema_status_schema_not_applied_total +// with reason="wait_timeout" — the only reason emitted in v0.11.0. The +// reason label is retained on the metric so dashboards built against +// v0.11.0 do not break if fast-sync paths land in a later release. +func (m *metrics) emitStatusNotAppliedTimeout(rpc, group string) { + if m == nil || m.schemaStatusNotApplied == nil { + return + } + m.schemaStatusNotApplied.Inc(1, rpc, group, statusReasonWaitTimeout) +} + +// recordQueryGateStatuses iterates the per-group gate verdicts produced by +// checkQueryGate and bumps the matching status counters. STATUS_SUCCEED is +// a no-op; STATUS_EXPIRED_SCHEMA bumps schema_status_expired_schema_total; +// STATUS_SCHEMA_NOT_APPLIED bumps schema_status_schema_not_applied_total +// with reason="wait_timeout". Other statuses (STATUS_NOT_FOUND) carry no +// counter — they are surfaced through the gate's normal response shape. +func recordQueryGateStatuses(m *metrics, rpc string, statuses map[string]modelv1.Status) { + if m == nil { + return + } + for group, st := range statuses { + switch st { + case modelv1.Status_STATUS_EXPIRED_SCHEMA: + m.emitStatusExpired(rpc, group) + case modelv1.Status_STATUS_SCHEMA_NOT_APPLIED: + m.emitStatusNotAppliedTimeout(rpc, group) + default: + // STATUS_SUCCEED (no-op) and STATUS_NOT_FOUND / other non-gate + // statuses are surfaced through the response shape and carry + // no counter in v0.11.0. + } + } +} + +// recordAwaitSchemaDeletedResult mirrors recordAwaitSchemaAppliedResult for +// the deletion barrier; the only differences are the metric routing and +// the access-log barrier label. +func (b *barrierService) recordAwaitSchemaDeletedResult( + start time.Time, + req *schemav1.AwaitSchemaDeletedRequest, + resp *schemav1.AwaitSchemaDeletedResponse, + err error, +) { + duration := time.Since(start) + result := barrierResultLabel(resp.GetApplied(), err) + if b.metrics != nil { + if h := b.metrics.schemaAwaitSchemaDeletedDuration; h != nil { + h.Observe(duration.Seconds(), result) + } + recordBarrierLaggards(b.metrics.schemaBarrierLaggards, barrierLabelSchemaDeleted, resp.GetLaggards()) + } + if b.l != nil { + b.l.Info(). + Str("barrier", barrierLabelSchemaDeleted). + Str("result", result). + Int("keys", len(req.GetKeys())). + Int("laggards", len(resp.GetLaggards())). + Dur("duration", duration). + Msg("schema barrier completed") + } +} diff --git a/banyand/liaison/grpc/barrier_metrics_test.go b/banyand/liaison/grpc/barrier_metrics_test.go new file mode 100644 index 000000000..e0433d416 --- /dev/null +++ b/banyand/liaison/grpc/barrier_metrics_test.go @@ -0,0 +1,405 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package grpc + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + "github.com/apache/skywalking-banyandb/banyand/observability" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/meter" +) + +// recordingFactory is a minimal observability.Factory that captures every +// Counter.Inc / Histogram.Observe / Gauge.Set call for assertion. Tests use +// it to pin the schema-barrier metric labels emitted by the production +// instrumentation without depending on Prometheus or the bypass registry's +// no-op semantics. +type recordingFactory struct { + counters map[string]*recordingCounter + histograms map[string]*recordingHistogram + gauges map[string]*recordingGauge + mu sync.Mutex +} + +func newRecordingFactory() *recordingFactory { + return &recordingFactory{ + counters: map[string]*recordingCounter{}, + histograms: map[string]*recordingHistogram{}, + gauges: map[string]*recordingGauge{}, + } +} + +func (f *recordingFactory) NewCounter(name string, _ ...string) meter.Counter { + f.mu.Lock() + defer f.mu.Unlock() + c := &recordingCounter{} + f.counters[name] = c + return c +} + +func (f *recordingFactory) NewHistogram(name string, _ meter.Buckets, _ ...string) meter.Histogram { + f.mu.Lock() + defer f.mu.Unlock() + h := &recordingHistogram{} + f.histograms[name] = h + return h +} + +func (f *recordingFactory) NewGauge(name string, _ ...string) meter.Gauge { + f.mu.Lock() + defer f.mu.Unlock() + g := &recordingGauge{} + f.gauges[name] = g + return g +} + +func (f *recordingFactory) Close() {} + +func (f *recordingFactory) counter(name string) *recordingCounter { + f.mu.Lock() + defer f.mu.Unlock() + return f.counters[name] +} + +func (f *recordingFactory) histogram(name string) *recordingHistogram { + f.mu.Lock() + defer f.mu.Unlock() + return f.histograms[name] +} + +type recordedCall struct { + labels []string + value float64 +} + +type recordingCounter struct { + calls []recordedCall + mu sync.Mutex +} + +func (r *recordingCounter) Inc(delta float64, labelValues ...string) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, recordedCall{value: delta, labels: append([]string{}, labelValues...)}) +} + +func (r *recordingCounter) Delete(_ ...string) bool { return true } + +func (r *recordingCounter) snapshot() []recordedCall { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]recordedCall, len(r.calls)) + copy(out, r.calls) + return out +} + +type recordingHistogram struct { + calls []recordedCall + mu sync.Mutex +} + +func (r *recordingHistogram) Observe(value float64, labelValues ...string) { + r.mu.Lock() + defer r.mu.Unlock() + r.calls = append(r.calls, recordedCall{value: value, labels: append([]string{}, labelValues...)}) +} + +func (r *recordingHistogram) Delete(_ ...string) bool { return true } + +func (r *recordingHistogram) snapshot() []recordedCall { + r.mu.Lock() + defer r.mu.Unlock() + out := make([]recordedCall, len(r.calls)) + copy(out, r.calls) + return out +} + +type recordingGauge struct{} + +func (r *recordingGauge) Set(_ float64, _ ...string) {} +func (r *recordingGauge) Add(_ float64, _ ...string) {} +func (r *recordingGauge) Delete(_ ...string) bool { return true } + +var _ observability.Factory = (*recordingFactory)(nil) + +func newRecordingMetrics(t *testing.T) (*recordingFactory, *metrics) { + t.Helper() + f := newRecordingFactory() + return f, newMetrics(f) +} + +// fakeBarrierCache satisfies barrierCacheReader with controllable revision +// + per-key state for the standalone path tests below. +type fakeBarrierCache struct { + keyRevs map[string]int64 + maxRev int64 +} + +func (f fakeBarrierCache) GetMaxModRevision() int64 { + return f.maxRev +} + +func (f fakeBarrierCache) GetKeyModRevision(propID string) (int64, bool) { + if f.keyRevs == nil { + return 0, false + } + rev, ok := f.keyRevs[propID] + return rev, ok +} + +// --- Helper-level tests --------------------------------------------------. + +func TestBarrierResultLabel_AppliedTrue_ReturnsApplied(t *testing.T) { + assert.Equal(t, resultLabelApplied, barrierResultLabel(true, nil)) +} + +func TestBarrierResultLabel_AppliedFalseNoErr_ReturnsTimeout(t *testing.T) { + assert.Equal(t, resultLabelTimeout, barrierResultLabel(false, nil)) +} + +func TestBarrierResultLabel_InvalidArgument_ReturnsInvalidArgument(t *testing.T) { + err := status.Errorf(codes.InvalidArgument, "too many keys") + assert.Equal(t, resultLabelInvalidArgument, barrierResultLabel(false, err)) +} + +func TestBarrierResultLabel_OtherError_ReturnsError(t *testing.T) { + err := status.Errorf(codes.Internal, "boom") + assert.Equal(t, resultLabelError, barrierResultLabel(false, err)) +} + +func TestSplitRoleNode_LiaisonPrefix(t *testing.T) { + role, name := splitRoleNode("liaison-foo") + assert.Equal(t, "liaison", role) + assert.Equal(t, "foo", name) +} + +func TestSplitRoleNode_DataPrefix(t *testing.T) { + role, name := splitRoleNode("data-bar") + assert.Equal(t, "data", role) + assert.Equal(t, "bar", name) +} + +func TestSplitRoleNode_NoPrefix_FallsBackToSelf(t *testing.T) { + role, name := splitRoleNode("nodash") + assert.Equal(t, roleLabelSelf, role) + assert.Equal(t, "nodash", name) +} + +func TestSplitRoleNode_Empty_FallsBackToSelf(t *testing.T) { + role, name := splitRoleNode("") + assert.Equal(t, roleLabelSelf, role) + assert.Equal(t, "", name) +} + +func TestSplitRoleNode_DataWithEmbeddedDashes_PreservesTail(t *testing.T) { + role, name := splitRoleNode("data-host-with-dashes") + assert.Equal(t, "data", role) + assert.Equal(t, "host-with-dashes", name) +} + +func TestRecordBarrierLaggards_NilCounter_NoPanic(_ *testing.T) { + // Defensive: empty laggards on nil counter must not panic. + recordBarrierLaggards(nil, barrierLabelRevisionApplied, []*schemav1.NodeLaggard{{Node: "data-bar"}}) +} + +func TestRecordBarrierLaggards_BumpsPerLaggardWithRoleSplit(t *testing.T) { + f, m := newRecordingMetrics(t) + recordBarrierLaggards(m.schemaBarrierLaggards, barrierLabelSchemaApplied, []*schemav1.NodeLaggard{ + {Node: "liaison-foo"}, + {Node: "data-bar"}, + {Node: "noprefix"}, + }) + calls := f.counter("schema_barrier_laggard_nodes_total").snapshot() + require.Len(t, calls, 3) + assert.Equal(t, []string{barrierLabelSchemaApplied, "liaison", "foo"}, calls[0].labels) + assert.Equal(t, []string{barrierLabelSchemaApplied, "data", "bar"}, calls[1].labels) + assert.Equal(t, []string{barrierLabelSchemaApplied, roleLabelSelf, "noprefix"}, calls[2].labels) +} + +// --- Barrier integration tests ------------------------------------------. + +func newRecordingBarrier(t *testing.T, cache barrierCacheReader) (*barrierService, *recordingFactory) { + t.Helper() + f, m := newRecordingMetrics(t) + b := newBarrierService(func() barrierCacheReader { return cache }) + b.metrics = m + b.l = logger.GetLogger("test-barrier") + return b, f +} + +func TestAwaitRevisionApplied_AppliedRecordsAppliedHistogram(t *testing.T) { + b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 100}) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + resp, err := b.AwaitRevisionApplied(ctx, &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(50 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied()) + + hist := f.histogram("schema_await_revision_applied_duration_seconds") + require.NotNil(t, hist) + calls := hist.snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{resultLabelApplied}, calls[0].labels) + + lag := f.counter("schema_barrier_laggard_nodes_total").snapshot() + assert.Empty(t, lag, "applied=true response carries no laggards") +} + +func TestAwaitRevisionApplied_TimeoutRecordsTimeoutAndLaggard(t *testing.T) { + b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 50}) + + ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) + defer cancel() + resp, err := b.AwaitRevisionApplied(ctx, &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: 100, + Timeout: durationpb.New(30 * time.Millisecond), + }) + require.NoError(t, err) + assert.False(t, resp.GetApplied()) + + calls := f.histogram("schema_await_revision_applied_duration_seconds").snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{resultLabelTimeout}, calls[0].labels) + + lag := f.counter("schema_barrier_laggard_nodes_total").snapshot() + require.Len(t, lag, 1, "standalone timeout emits a single self laggard") + assert.Equal(t, []string{barrierLabelRevisionApplied, roleLabelSelf, ""}, lag[0].labels) +} + +func TestAwaitSchemaApplied_TooManyKeys_RecordsInvalidArgument(t *testing.T) { + b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 0}) + + keys := make([]*schemav1.SchemaKey, barrierMaxKeys+1) + for i := range keys { + keys[i] = &schemav1.SchemaKey{Kind: "measure", Group: "g", Name: "m"} + } + resp, err := b.AwaitSchemaApplied(context.Background(), &schemav1.AwaitSchemaAppliedRequest{Keys: keys}) + require.Error(t, err, "expected InvalidArgument when keys exceed cap") + require.Nil(t, resp) + + calls := f.histogram("schema_await_schema_applied_duration_seconds").snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{resultLabelInvalidArgument}, calls[0].labels) +} + +func TestAwaitSchemaDeleted_AllAbsent_RecordsAppliedHistogram(t *testing.T) { + b, f := newRecordingBarrier(t, fakeBarrierCache{maxRev: 0}) + + resp, err := b.AwaitSchemaDeleted(context.Background(), &schemav1.AwaitSchemaDeletedRequest{ + Keys: []*schemav1.SchemaKey{{Kind: "measure", Group: "g", Name: "m"}}, + Timeout: durationpb.New(20 * time.Millisecond), + }) + require.NoError(t, err) + assert.True(t, resp.GetApplied()) + + calls := f.histogram("schema_await_schema_deleted_duration_seconds").snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{resultLabelApplied}, calls[0].labels) +} + +// --- Status counter integration -----------------------------------------. + +func newRecordingMeasureService(t *testing.T, er *entityRepo, maxWait time.Duration) (*measureService, *recordingFactory) { + t.Helper() + f, m := newRecordingMetrics(t) + return &measureService{ + discoveryService: &discoveryService{entityRepo: er}, + maxWaitDuration: maxWait, + l: logger.GetLogger("test"), + metrics: m, + }, f +} + +func TestWriteGate_Measure_StaleClient_BumpsExpiredCounter(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) + svc, f := newRecordingMeasureService(t, er, 30*time.Millisecond) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 50} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + require.Equal(t, modelv1.Status_STATUS_EXPIRED_SCHEMA, st) + + calls := f.counter("schema_status_expired_schema_total").snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{rpcLabelMeasureWrite, "g"}, calls[0].labels) + + notApplied := f.counter("schema_status_schema_not_applied_total").snapshot() + assert.Empty(t, notApplied, "stale-client path must not bump the not-applied counter") +} + +func TestWriteGate_Measure_AheadClient_TimesOut_BumpsNotAppliedCounter(t *testing.T) { + id := identity{group: "g", name: "m"} + er := seededLocatorRepo(id) + svc, f := newRecordingMeasureService(t, er, 30*time.Millisecond) + mock := &mockBidiServer[measurev1.WriteRequest, measurev1.WriteResponse]{} + + meta := &commonv1.Metadata{Group: "g", Name: "m", ModRevision: 9999} + st := svc.validateWriteRequest(validMeasureWriteRequest(), meta, mock) + require.Equal(t, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, st) + + calls := f.counter("schema_status_schema_not_applied_total").snapshot() + require.Len(t, calls, 1) + assert.Equal(t, []string{rpcLabelMeasureWrite, "g", statusReasonWaitTimeout}, calls[0].labels) + + expired := f.counter("schema_status_expired_schema_total").snapshot() + assert.Empty(t, expired, "ahead-client path must not bump the expired counter") +} + +func TestRecordQueryGateStatuses_BumpsPerGroupCounters(t *testing.T) { + f, m := newRecordingMetrics(t) + statuses := map[string]modelv1.Status{ + "g_succ": modelv1.Status_STATUS_SUCCEED, + "g_expired": modelv1.Status_STATUS_EXPIRED_SCHEMA, + "g_lag": modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, + "g_other": modelv1.Status_STATUS_NOT_FOUND, + } + recordQueryGateStatuses(m, rpcLabelMeasureQuery, statuses) + + expired := f.counter("schema_status_expired_schema_total").snapshot() + require.Len(t, expired, 1) + assert.Equal(t, []string{rpcLabelMeasureQuery, "g_expired"}, expired[0].labels) + + notApplied := f.counter("schema_status_schema_not_applied_total").snapshot() + require.Len(t, notApplied, 1) + assert.Equal(t, []string{rpcLabelMeasureQuery, "g_lag", statusReasonWaitTimeout}, notApplied[0].labels) +} + +func TestRecordQueryGateStatuses_NilMetrics_NoPanic(_ *testing.T) { + recordQueryGateStatuses(nil, rpcLabelMeasureQuery, map[string]modelv1.Status{ + "g": modelv1.Status_STATUS_EXPIRED_SCHEMA, + }) +} diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go index fd3aaaba9..b5e6d87ba 100644 --- a/banyand/liaison/grpc/measure.go +++ b/banyand/liaison/grpc/measure.go @@ -209,6 +209,7 @@ func (ms *measureService) validateWriteRequest(writeRequest *measurev1.WriteRequ cacheRev := resolveSchemaRevision(reg, schema.KindMeasure, metadata.GetGroup(), metadata.GetName(), measureCache.ModRevision) if clientRev < cacheRev { ms.l.Error().Stringer("written", writeRequest).Msg("the measure schema is expired") + ms.metrics.emitStatusExpired(rpcLabelMeasureWrite, metadata.GetGroup()) ms.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure) return modelv1.Status_STATUS_EXPIRED_SCHEMA } @@ -221,6 +222,7 @@ func (ms *measureService) validateWriteRequest(writeRequest *measurev1.WriteRequ return resolveSchemaRevision(reg, schema.KindMeasure, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, ms.maxWaitDuration) if !reached { + ms.metrics.emitStatusNotAppliedTimeout(rpcLabelMeasureWrite, metadata.GetGroup()) ms.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeRequest.GetMessageId(), measure) return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED } @@ -450,6 +452,7 @@ func (ms *measureService) Query(ctx context.Context, req *measurev1.QueryRequest loc, ok := ms.entityRepo.getLocator(identity{name: name, group: group}) return resolveQueryGateRevision(measureReg, schema.KindMeasure, group, name, loc.ModRevision, ok) }, ms.maxWaitDuration) + recordQueryGateStatuses(ms.metrics, rpcLabelMeasureQuery, gatedStatuses) if shortCircuit { return &measurev1.QueryResponse{GroupStatuses: gatedStatuses}, nil } diff --git a/banyand/liaison/grpc/metrics.go b/banyand/liaison/grpc/metrics.go index ed7d546c2..e97e35ce7 100644 --- a/banyand/liaison/grpc/metrics.go +++ b/banyand/liaison/grpc/metrics.go @@ -47,30 +47,52 @@ type metrics struct { memoryLoadSheddingRejections meter.Counter grpcBufferSize meter.Gauge // Shared gauge for both conn and stream buffer sizes memoryState meter.Gauge + + // Step 2.7 — schema-barrier observability. The three Await* RPCs each + // emit one histogram observation per call labeled by `result` + // ("applied", "timeout", "invalid_argument"). Laggards observed during + // any barrier are counted by `barrier`, `role`, `node` so dashboards + // can break out which node fell behind on which call. The two status + // counters cover the gate's STATUS_SCHEMA_NOT_APPLIED / EXPIRED_SCHEMA + // outcomes by `rpc` and `group`; in v0.11.0 the only `reason` value + // emitted is "wait_timeout" — the label is retained for forward-compat + // with optional fast-sync paths that may land in a later release. + schemaAwaitRevisionAppliedDuration meter.Histogram + schemaAwaitSchemaAppliedDuration meter.Histogram + schemaAwaitSchemaDeletedDuration meter.Histogram + schemaBarrierLaggards meter.Counter + schemaStatusNotApplied meter.Counter + schemaStatusExpired meter.Counter } func newMetrics(factory observability.Factory) *metrics { return &metrics{ - totalStarted: factory.NewCounter("total_started", "group", "service", "method"), - totalFinished: factory.NewCounter("total_finished", "group", "service", "method"), - totalErr: factory.NewCounter("total_err", "group", "service", "method"), - totalPanic: factory.NewCounter("total_panic"), - totalLatency: factory.NewCounter("total_latency", "group", "service", "method"), - totalStreamStarted: factory.NewCounter("total_stream_started", "service", "method"), - totalStreamFinished: factory.NewCounter("total_stream_finished", "service", "method"), - totalStreamErr: factory.NewCounter("total_stream_err", "service", "method"), - totalStreamLatency: factory.NewCounter("total_stream_latency", "service", "method"), - totalStreamMsgReceived: factory.NewCounter("total_stream_msg_received", "group", "service", "method"), - totalStreamMsgReceivedErr: factory.NewCounter("total_stream_msg_received_err", "group", "service", "method"), - totalStreamMsgSent: factory.NewCounter("total_stream_msg_sent", "group", "service", "method"), - totalStreamMsgSentErr: factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"), - totalRegistryStarted: factory.NewCounter("total_registry_started", "group", "service", "method"), - totalRegistryFinished: factory.NewCounter("total_registry_finished", "group", "service", "method"), - totalRegistryErr: factory.NewCounter("total_registry_err", "group", "service", "method"), - totalRegistryLatency: factory.NewCounter("total_registry_latency", "group", "service", "method"), - memoryLoadSheddingRejections: factory.NewCounter("memory_load_shedding_rejections_total", "service"), - grpcBufferSize: factory.NewGauge("grpc_buffer_size_bytes", "type"), - memoryState: factory.NewGauge("memory_state"), + totalStarted: factory.NewCounter("total_started", "group", "service", "method"), + totalFinished: factory.NewCounter("total_finished", "group", "service", "method"), + totalErr: factory.NewCounter("total_err", "group", "service", "method"), + totalPanic: factory.NewCounter("total_panic"), + totalLatency: factory.NewCounter("total_latency", "group", "service", "method"), + totalStreamStarted: factory.NewCounter("total_stream_started", "service", "method"), + totalStreamFinished: factory.NewCounter("total_stream_finished", "service", "method"), + totalStreamErr: factory.NewCounter("total_stream_err", "service", "method"), + totalStreamLatency: factory.NewCounter("total_stream_latency", "service", "method"), + totalStreamMsgReceived: factory.NewCounter("total_stream_msg_received", "group", "service", "method"), + totalStreamMsgReceivedErr: factory.NewCounter("total_stream_msg_received_err", "group", "service", "method"), + totalStreamMsgSent: factory.NewCounter("total_stream_msg_sent", "group", "service", "method"), + totalStreamMsgSentErr: factory.NewCounter("total_stream_msg_sent_err", "group", "service", "method"), + totalRegistryStarted: factory.NewCounter("total_registry_started", "group", "service", "method"), + totalRegistryFinished: factory.NewCounter("total_registry_finished", "group", "service", "method"), + totalRegistryErr: factory.NewCounter("total_registry_err", "group", "service", "method"), + totalRegistryLatency: factory.NewCounter("total_registry_latency", "group", "service", "method"), + memoryLoadSheddingRejections: factory.NewCounter("memory_load_shedding_rejections_total", "service"), + grpcBufferSize: factory.NewGauge("grpc_buffer_size_bytes", "type"), + memoryState: factory.NewGauge("memory_state"), + schemaAwaitRevisionAppliedDuration: factory.NewHistogram("schema_await_revision_applied_duration_seconds", meter.DefBuckets, "result"), + schemaAwaitSchemaAppliedDuration: factory.NewHistogram("schema_await_schema_applied_duration_seconds", meter.DefBuckets, "result"), + schemaAwaitSchemaDeletedDuration: factory.NewHistogram("schema_await_schema_deleted_duration_seconds", meter.DefBuckets, "result"), + schemaBarrierLaggards: factory.NewCounter("schema_barrier_laggard_nodes_total", "barrier", "role", "node"), + schemaStatusNotApplied: factory.NewCounter("schema_status_schema_not_applied_total", "rpc", "group", "reason"), + schemaStatusExpired: factory.NewCounter("schema_status_expired_schema_total", "rpc", "group"), } } diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go index e787368d7..0e3cdd722 100644 --- a/banyand/liaison/grpc/server.go +++ b/banyand/liaison/grpc/server.go @@ -364,6 +364,10 @@ func (s *server) PreRun(ctx context.Context) error { s.traceSVC.metrics = metrics s.bydbQLSVC.metrics = metrics s.propertyServer.metrics = metrics + if s.barrierSVC != nil { + s.barrierSVC.metrics = metrics + s.barrierSVC.l = s.log.Named("barrier") + } s.streamRegistryServer.metrics = metrics s.indexRuleBindingRegistryServer.metrics = metrics s.indexRuleRegistryServer.metrics = metrics diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go index 34beea76b..01aac40e4 100644 --- a/banyand/liaison/grpc/stream.go +++ b/banyand/liaison/grpc/stream.go @@ -114,6 +114,7 @@ func (s *streamService) validateWriteRequest(writeEntity *streamv1.WriteRequest, cacheRev := resolveSchemaRevision(reg, schema.KindStream, metadata.GetGroup(), metadata.GetName(), streamCache.ModRevision) if clientRev < cacheRev { s.l.Error().Stringer("written", writeEntity).Msg("the stream schema is expired") + s.metrics.emitStatusExpired(rpcLabelStreamWrite, metadata.GetGroup()) s.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream) return modelv1.Status_STATUS_EXPIRED_SCHEMA } @@ -126,6 +127,7 @@ func (s *streamService) validateWriteRequest(writeEntity *streamv1.WriteRequest, return resolveSchemaRevision(reg, schema.KindStream, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, s.maxWaitDuration) if !reached { + s.metrics.emitStatusNotAppliedTimeout(rpcLabelStreamWrite, metadata.GetGroup()) s.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetMessageId(), stream) return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED } @@ -415,6 +417,7 @@ func (s *streamService) Query(ctx context.Context, req *streamv1.QueryRequest) ( loc, ok := s.entityRepo.getLocator(identity{name: name, group: group}) return resolveQueryGateRevision(streamReg, schema.KindStream, group, name, loc.ModRevision, ok) }, s.maxWaitDuration) + recordQueryGateStatuses(s.metrics, rpcLabelStreamQuery, gatedStatuses) if shortCircuit { return &streamv1.QueryResponse{GroupStatuses: gatedStatuses}, nil } diff --git a/banyand/liaison/grpc/trace.go b/banyand/liaison/grpc/trace.go index 75b0b6b9a..4d1fcabd1 100644 --- a/banyand/liaison/grpc/trace.go +++ b/banyand/liaison/grpc/trace.go @@ -156,6 +156,7 @@ func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest, cacheRev := resolveSchemaRevision(reg, schema.KindTrace, metadata.GetGroup(), metadata.GetName(), traceEntity.GetMetadata().GetModRevision()) if clientRev < cacheRev { s.l.Error().Stringer("written", writeEntity).Msg("the trace schema is expired") + s.metrics.emitStatusExpired(rpcLabelTraceWrite, metadata.GetGroup()) s.sendReply(metadata, modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetVersion(), stream) return modelv1.Status_STATUS_EXPIRED_SCHEMA } @@ -168,6 +169,7 @@ func (s *traceService) validateWriteRequest(writeEntity *tracev1.WriteRequest, return resolveSchemaRevision(reg, schema.KindTrace, metadata.GetGroup(), metadata.GetName(), fallback) }, clientRev, s.maxWaitDuration) if !reached { + s.metrics.emitStatusNotAppliedTimeout(rpcLabelTraceWrite, metadata.GetGroup()) s.sendReply(metadata, modelv1.Status_STATUS_SCHEMA_NOT_APPLIED, writeEntity.GetVersion(), stream) return modelv1.Status_STATUS_SCHEMA_NOT_APPLIED } @@ -534,6 +536,7 @@ func (s *traceService) Query(ctx context.Context, req *tracev1.QueryRequest) (re loc, ok := s.entityRepo.getLocator(identity{name: name, group: group}) return resolveQueryGateRevision(traceReg, schema.KindTrace, group, name, loc.ModRevision, ok) }, s.maxWaitDuration) + recordQueryGateStatuses(s.metrics, rpcLabelTraceQuery, gatedStatuses) if shortCircuit { return &tracev1.QueryResponse{GroupStatuses: gatedStatuses}, nil }
