Copilot commented on code in PR #1121:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1121#discussion_r3206394270


##########
banyand/liaison/grpc/barrier.go:
##########
@@ -138,8 +150,18 @@ func (b *barrierService) AwaitRevisionApplied(ctx 
context.Context, req *schemav1
 }
 
 // AwaitSchemaApplied blocks until all requested keys are present at or above 
their
-// per-key min_revisions, or the timeout elapses.
-func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (*schemav1.AwaitSchemaAppliedResponse, 
error) {
+// per-key min_revisions, or the timeout elapses. When the cluster fan-out
+// dependencies are wired (production), the call probes the frozen tier1 +
+// tier2 + self watched set in parallel via GetKeyRevisions; without them
+// (Phase 1 unit-test path), it falls back to a single in-process cache poll.
+func (b *barrierService) AwaitSchemaApplied(ctx context.Context, req 
*schemav1.AwaitSchemaAppliedRequest) (resp 
*schemav1.AwaitSchemaAppliedResponse, err error) {
+       start := time.Now()
+       defer func() {
+               b.recordAwaitSchemaAppliedResult(start, req, resp, err)
+       }()
+       if b.peerLiaisons != nil && b.dataNodes != nil && b.selfName != nil {
+               return b.awaitSchemaAppliedCluster(ctx, req)
+       }
        if len(req.GetKeys()) > barrierMaxKeys {
                return nil, status.Errorf(codes.InvalidArgument, "too many 
keys: max=%d", barrierMaxKeys)
        }

Review Comment:
   `AwaitSchemaApplied` can return an error with `resp == nil` (e.g., too many 
keys, or cluster fan-out returning an error), but the deferred recorder still 
runs and will dereference `resp`. This will turn a clean gRPC error into a 
server panic; please ensure the recorder path handles nil responses.



##########
test/load/schema_barrier/barrier_loadtest_test.go:
##########
@@ -0,0 +1,384 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build loadtest
+
+// Package schemabarrier hosts the CP-6 SLO load harness for the schema
+// consistency cluster barrier. It is gated behind the `loadtest` build tag
+// so the regular `go test ./...` cycle skips it; invoke via the make target
+// `make load-test-barrier`.
+//
+// The harness brings up an in-process distributed cluster (3 data nodes +
+// 1 liaison) and drives the CP-6 SLO load profile pinned in the plan:
+//   - 100 concurrent AwaitRevisionApplied(R, 5s) callers continuously
+//     re-issuing on completion;
+//   - schema mutation rate of 10 successful GroupRegistryService.Update
+//     ops per second across the seeded groups;
+//   - a 1-minute warm-up followed by a 5-minute measurement window;
+//   - report p50 / p95 / p99 from the client-side per-call duration.
+//
+// The client-side measurement is bounded above by the server-side
+// schema_await_revision_applied_duration_seconds histogram (RPC overhead on
+// localhost gRPC is sub-ms), so an SLO check on the client number is
+// strictly stricter than CP-6's "p99 < 200ms on the server-side histogram"
+// criterion. A future revision may scrape the Prometheus endpoint directly;
+// the current report records both numbers when the listener is reachable.
+package schemabarrier
+
+import (
+       "context"
+       "encoding/json"
+       "flag"
+       "fmt"
+       "math/rand/v2"
+       "os"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/onsi/gomega"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       schemapkg 
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+var (
+       flagWarmUp        = flag.Duration("loadtest.warm-up", time.Minute, 
"warm-up duration before sampling begins")
+       flagMeasure       = flag.Duration("loadtest.measure", 5*time.Minute, 
"measurement window duration")
+       flagCallers       = flag.Int("loadtest.callers", 100, "concurrent 
AwaitRevisionApplied callers")
+       flagMutateRate    = flag.Int("loadtest.mutate-rate", 10, "successful 
Group.Update operations per second")
+       flagBarrierTarget = flag.Int("loadtest.barrier-offset", 5, 
"AwaitRevisionApplied targets latest+offset; tune so each call holds open ~1s 
on average")
+       flagBarrierTimeout = flag.Duration("loadtest.barrier-timeout", 
5*time.Second, "AwaitRevisionApplied per-call timeout")
+       flagReportPath    = flag.String("loadtest.report", "load-report.json", 
"path to write the JSON report; pass '-' for stdout-only")
+)
+
+// loadReport is serialized to disk at end of the measurement window. The
+// fields mirror what CP-6 sign-off cites: configuration, raw counts, and
+// the p50 / p95 / p99 of client-side per-call duration. Future revisions
+// may add a `ServerSideP99` field when Prometheus scraping is wired.
+type loadReport struct {
+       Profile      profile  `json:"profile"`
+       StartedAt    string   `json:"started_at"`
+       FinishedAt   string   `json:"finished_at"`
+       BarrierCalls int64    `json:"barrier_calls"`
+       BarrierErrs  int64    `json:"barrier_errors"`
+       BarrierTimeouts int64 `json:"barrier_timeouts"`
+       MutateOps    int64    `json:"mutate_ops"`
+       MutateErrs   int64    `json:"mutate_errors"`
+       P50Seconds   float64  `json:"p50_seconds"`
+       P95Seconds   float64  `json:"p95_seconds"`
+       P99Seconds   float64  `json:"p99_seconds"`
+       MaxSeconds   float64  `json:"max_seconds"`
+}
+
+type profile struct {
+       WarmUp           string `json:"warm_up"`
+       Measure          string `json:"measure"`
+       Callers          int    `json:"callers"`
+       MutateRatePerSec int    `json:"mutate_rate_per_sec"`
+       BarrierOffset    int    `json:"barrier_target_offset"`
+       BarrierTimeout   string `json:"barrier_timeout"`
+}
+
+// TestSchemaBarrierLoad is the CP-6 SLO load harness entry point. Run via:
+//
+//     make load-test-barrier
+//
+// or directly with overrides:
+//
+//     go test -tags=loadtest -timeout 30m ./test/load/schema_barrier/... \
+//         -loadtest.warm-up=10s -loadtest.measure=30s -loadtest.callers=20 \
+//         -loadtest.mutate-rate=5 -loadtest.report=/tmp/load-report.json
+//
+// The smoke configuration above keeps the harness runnable on a developer
+// laptop in under a minute; the SLO sign-off run uses the defaults.
+func TestSchemaBarrierLoad(t *testing.T) {
+       gomega.RegisterTestingT(t)
+
+       tmpDir, tmpDirCleanup, err := test.NewSpace()
+       require.NoError(t, err, "allocate tmp dir for cluster")
+       defer tmpDirCleanup()
+
+       dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
+       config := setup.PropertyClusterConfig(dfWriter)
+
+       t.Logf("starting 3 data nodes")
+       dataCloses := make([]func(), 0, 3)
+       for i := 0; i < 3; i++ {
+               dataCloses = append(dataCloses, setup.DataNode(config))
+       }
+
+       t.Logf("preloading schema (stream + measure + trace)")
+       setup.PreloadSchemaViaProperty(config,
+               test_stream.PreloadSchema, test_measure.PreloadSchema, 
test_trace.PreloadSchema)
+       config.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure, 
schemapkg.KindTrace)
+
+       t.Logf("starting liaison node")
+       liaisonAddr, closeLiaison := setup.LiaisonNode(config,
+               "--measure-metadata-cache-wait-duration=5s",
+               "--stream-metadata-cache-wait-duration=5s",
+               "--trace-metadata-cache-wait-duration=5s")
+       defer func() {
+               closeLiaison()
+               for _, c := range dataCloses {
+                       c()
+               }
+       }()
+
+       t.Logf("dialing liaison at %s", liaisonAddr)
+       conn, err := grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       require.NoError(t, err, "dial liaison")
+       defer func() { _ = conn.Close() }()
+
+       barrierClient := schemav1.NewSchemaBarrierServiceClient(conn)
+       groupsClient := databasev1.NewGroupRegistryServiceClient(conn)
+
+       // Seed the latest revision baseline from a synthetic 
AwaitRevisionApplied(0).
+       // The mutator + caller goroutines treat this as their starting point 
and
+       // advance it via atomic writes after each successful Group.Update.
+       var latestRev atomic.Int64
+       startResp, err := 
barrierClient.AwaitRevisionApplied(context.Background(),
+               &schemav1.AwaitRevisionAppliedRequest{MinRevision: 0, Timeout: 
durationpb.New(2 * time.Second)})
+       require.NoError(t, err, "baseline AwaitRevisionApplied")
+       require.True(t, startResp.GetApplied(), "baseline call must report 
applied")
+       // startResp.Applied=true does not give us a revision; we seed from the
+       // first List call below — the mutator path needs a positive revision to
+       // bump from anyway.
+
+       listCtx, listCancel := context.WithTimeout(context.Background(), 
10*time.Second)
+       listResp, err := groupsClient.List(listCtx, 
&databasev1.GroupRegistryServiceListRequest{})
+       listCancel()
+       require.NoError(t, err, "list seeded groups")
+       require.NotEmpty(t, listResp.GetGroup(), "PreloadSchema must seed at 
least one group")
+       for _, g := range listResp.GetGroup() {
+               if rev := g.GetMetadata().GetModRevision(); rev > 
latestRev.Load() {
+                       latestRev.Store(rev)
+               }
+       }
+       t.Logf("seeded %d groups; baseline ModRevision=%d", 
len(listResp.GetGroup()), latestRev.Load())
+
+       // Background workers are bounded by harnessCtx; cancelled at end of 
run.
+       harnessCtx, harnessCancel := context.WithCancel(context.Background())
+       defer harnessCancel()
+
+       var (
+               mutateOps    atomic.Int64
+               mutateErrs   atomic.Int64
+               barrierCalls atomic.Int64
+               barrierErrs  atomic.Int64
+               barrierTimes atomic.Int64
+
+               samplingMu sync.Mutex
+               samples    []time.Duration
+               // samplingActive guards `samples` writes so warm-up calls are 
not
+               // recorded. The mutex is acquired only on completion — gauging
+               // contention against the per-iteration RPC cost is negligible.
+               samplingActive atomic.Bool
+       )
+
+       // Mutator: 1 goroutine driving 10 ops/sec across the seeded groups
+       // (round-robin). Each iteration calls Group.Update with the latest List
+       // snapshot of the group; etcd bumps mod_revision on every Update even
+       // when the body is identical, so latestRev advances at the configured
+       // rate as long as the cluster is healthy.
+       mutateInterval := time.Second / time.Duration(*flagMutateRate)
+       go func() {
+               ticker := time.NewTicker(mutateInterval)
+               defer ticker.Stop()
+               idx := 0
+               for {
+                       select {
+                       case <-harnessCtx.Done():
+                               return
+                       case <-ticker.C:
+                       }
+                       if len(listResp.GetGroup()) == 0 {
+                               return
+                       }
+                       g := listResp.GetGroup()[idx%len(listResp.GetGroup())]
+                       idx++
+                       ctx, cancel := context.WithTimeout(harnessCtx, 
5*time.Second)
+                       updResp, updErr := groupsClient.Update(ctx, 
&databasev1.GroupRegistryServiceUpdateRequest{Group: g})
+                       cancel()
+                       if updErr != nil {
+                               mutateErrs.Add(1)
+                               continue
+                       }
+                       mutateOps.Add(1)
+                       if rev := updResp.GetModRevision(); rev > 0 {
+                               for {
+                                       prev := latestRev.Load()
+                                       if rev <= prev || 
latestRev.CompareAndSwap(prev, rev) {
+                                               break
+                                       }
+                               }
+                       }
+               }
+       }()
+
+       // Callers: N goroutines each looping 
AwaitRevisionApplied(latest+offset, T).
+       // On completion they immediately re-issue with the freshly-read latest
+       // snapshot, maintaining ~N in-flight calls throughout the measurement
+       // window.
+       var callerWg sync.WaitGroup
+       for i := 0; i < *flagCallers; i++ {
+               callerWg.Add(1)
+               go func() {
+                       defer callerWg.Done()
+                       for {
+                               select {
+                               case <-harnessCtx.Done():
+                                       return
+                               default:
+                               }
+                               target := latestRev.Load() + 
int64(*flagBarrierTarget)
+                               start := time.Now()
+                               resp, callErr := 
barrierClient.AwaitRevisionApplied(harnessCtx,
+                                       &schemav1.AwaitRevisionAppliedRequest{
+                                               MinRevision: target,
+                                               Timeout:     
durationpb.New(*flagBarrierTimeout),
+                                       })
+                               duration := time.Since(start)
+                               barrierCalls.Add(1)
+                               if callErr != nil {
+                                       barrierErrs.Add(1)
+                                       // Brief jittered backoff so we do not 
hammer on the same
+                                       // failure (typically harnessCtx 
cancellation at end of run).
+                                       select {
+                                       case <-harnessCtx.Done():
+                                               return
+                                       case 
<-time.After(time.Duration(rand.IntN(20)) * time.Millisecond):
+                                       }
+                                       continue
+                               }
+                               if !resp.GetApplied() {
+                                       barrierTimes.Add(1)
+                               }
+                               if samplingActive.Load() {
+                                       samplingMu.Lock()
+                                       samples = append(samples, duration)
+                                       samplingMu.Unlock()
+                               }
+                       }
+               }()
+       }
+
+       t.Logf("warm-up: %s @ %d callers, %d mutate ops/sec, target=latest+%d, 
timeout=%s",
+               *flagWarmUp, *flagCallers, *flagMutateRate, *flagBarrierTarget, 
*flagBarrierTimeout)
+       startedAt := time.Now()
+       select {
+       case <-time.After(*flagWarmUp):
+       case <-harnessCtx.Done():
+       }
+
+       t.Logf("entering measurement window (%s)", *flagMeasure)
+       samplingActive.Store(true)
+       measureStart := time.Now()
+       select {
+       case <-time.After(*flagMeasure):
+       case <-harnessCtx.Done():
+       }
+       samplingActive.Store(false)
+       t.Logf("measurement window finished after %s; cooling down callers", 
time.Since(measureStart))
+
+       harnessCancel()
+       callerWg.Wait()
+

Review Comment:
   The mutator goroutine is not joined. After `harnessCancel()` you only wait 
for `callerWg`, so the mutator can still be running (or blocked in an Update 
RPC) while the test proceeds to write the report / return, which risks 
goroutine leaks and flakiness. Consider tracking it with a WaitGroup and 
waiting for it to exit after cancellation.



##########
banyand/liaison/grpc/barrier.go:
##########
@@ -101,7 +109,11 @@ func (b *barrierService) cache() barrierCacheReader {
 // those dependencies (Phase 1 unit-test path), it falls back to a single
 // in-process cache poll, returning a self-only laggard on timeout so callers
 // can diagnose how far behind the standalone cache is.
-func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (*schemav1.AwaitRevisionAppliedResponse, 
error) {
+func (b *barrierService) AwaitRevisionApplied(ctx context.Context, req 
*schemav1.AwaitRevisionAppliedRequest) (resp 
*schemav1.AwaitRevisionAppliedResponse, err error) {
+       start := time.Now()
+       defer func() {

Review Comment:
   The deferred metrics/logging recorder can panic on error paths because 
`resp` may be nil (e.g., InvalidArgument / Unavailable returned directly). 
`recordAwait*Result` calls `resp.GetApplied()`/`resp.GetLaggards()` 
unconditionally. Consider guarding in the defer (instantiate an empty response 
when resp==nil) or make the recorders tolerate a nil response.
   



##########
banyand/liaison/grpc/barrier_metrics.go:
##########
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "strings"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+const (
+       // barrierLabelRevisionApplied / SchemaApplied / SchemaDeleted are the
+       // three values used for the `barrier` label on
+       // schema_barrier_laggard_nodes_total. The histogram metrics carry the
+       // barrier identity in their metric name instead, so this label is only
+       // emitted on the laggard counter.
+       barrierLabelRevisionApplied = "revision_applied"
+       barrierLabelSchemaApplied   = "schema_applied"
+       barrierLabelSchemaDeleted   = "schema_deleted"
+
+       // resultLabel* are the values for the `result` label on the three
+       // schema_await_*_duration_seconds histograms. Recorded once per call
+       // at return time.
+       resultLabelApplied         = "applied"
+       resultLabelTimeout         = "timeout"
+       resultLabelInvalidArgument = "invalid_argument"
+       resultLabelError           = "error"
+
+       // roleLabelSelf is used when a laggard's Node field is unprefixed —
+       // the standalone barrier path emits a single self-laggard with no name
+       // on timeout, where there is no role to label against.
+       roleLabelSelf = "self"
+
+       // statusReasonWaitTimeout is the only `reason` value emitted for
+       // schema_status_schema_not_applied_total in v0.11.0. The label is
+       // retained for forward-compat with optional fast-sync paths.
+       statusReasonWaitTimeout = "wait_timeout"
+
+       // rpcLabel* are the values for the `rpc` label on the two status
+       // counters (schema_status_schema_not_applied_total /
+       // schema_status_expired_schema_total). Bound at call sites in
+       // measure.go / stream.go / trace.go for the write and query gates.
+       rpcLabelMeasureWrite = "measure_write"
+       rpcLabelStreamWrite  = "stream_write"
+       rpcLabelTraceWrite   = "trace_write"
+       rpcLabelMeasureQuery = "measure_query"
+       rpcLabelStreamQuery  = "stream_query"
+       rpcLabelTraceQuery   = "trace_query"
+)
+
+// splitRoleNode extracts the `<role>` and `<name>` halves of a laggard's
+// Node identifier per the cluster barrier's `<role>-<Metadata.Name>`
+// convention (see member.laggardName in barrier_cluster.go). Unprefixed
+// values — emitted by the standalone barrier path on timeout — map to
+// role="self" and name="" so the laggard counter still increments without
+// dropping the observation.
+func splitRoleNode(node string) (role, name string) {
+       if i := strings.IndexByte(node, '-'); i >= 0 {
+               return node[:i], node[i+1:]
+       }
+       return roleLabelSelf, node
+}
+
+// barrierResultLabel maps a barrier RPC's (response.Applied, error) outcome
+// to the `result` label value used on the duration histogram. An
+// InvalidArgument status reflects the 10 000-key cap rejection; any other
+// error is reported as "error" so dashboards can flag transport-layer
+// failures separately from cache-applied vs cache-timeout.
+func barrierResultLabel(applied bool, err error) string {
+       if err != nil {
+               if s, ok := status.FromError(err); ok && s.Code() == 
codes.InvalidArgument {
+                       return resultLabelInvalidArgument
+               }
+               return resultLabelError
+       }
+       if applied {
+               return resultLabelApplied
+       }
+       return resultLabelTimeout
+}
+
+// recordBarrierLaggards bumps the schema_barrier_laggard_nodes_total counter
+// once per laggard in the response. The role/name split lets dashboards
+// answer "which node fell behind on which barrier" without unbounded
+// cardinality on a single label.
+//
+// Metric is silently skipped when counter is nil (test fixtures that
+// construct barrierService without metrics) or when laggards is empty.
+func recordBarrierLaggards(counter meter.Counter, barrier string, laggards 
[]*schemav1.NodeLaggard) {
+       if counter == nil {
+               return
+       }
+       for _, lag := range laggards {
+               role, name := splitRoleNode(lag.GetNode())
+               counter.Inc(1, barrier, role, name)
+       }
+}
+
+// recordAwaitRevisionAppliedResult observes the duration histogram, bumps
+// the laggard counter, and emits a structured access-log line for one
+// AwaitRevisionApplied call. Defensive about nil metrics / nil logger so
+// fixtures that construct barrierService directly (without server.PreRun)
+// stay zero-cost.
+func (b *barrierService) recordAwaitRevisionAppliedResult(
+       start time.Time,
+       req *schemav1.AwaitRevisionAppliedRequest,
+       resp *schemav1.AwaitRevisionAppliedResponse,
+       err error,
+) {
+       duration := time.Since(start)
+       result := barrierResultLabel(resp.GetApplied(), err)
+       if b.metrics != nil {
+               if h := b.metrics.schemaAwaitRevisionAppliedDuration; h != nil {
+                       h.Observe(duration.Seconds(), result)
+               }
+               recordBarrierLaggards(b.metrics.schemaBarrierLaggards, 
barrierLabelRevisionApplied, resp.GetLaggards())
+       }
+       if b.l != nil {
+               b.l.Info().
+                       Str("barrier", barrierLabelRevisionApplied).
+                       Str("result", result).
+                       Int64("min_revision", req.GetMinRevision()).
+                       Int("laggards", len(resp.GetLaggards())).

Review Comment:
   The `recordAwait*Result` helpers assume `resp` is non-nil 
(`resp.GetApplied()`, `resp.GetLaggards()`), but the barrier RPCs can return 
`(nil, err)` on validation or availability errors. That would panic inside the 
deferred recorder. Please make these recorders nil-safe (e.g., treat nil resp 
as an empty response and derive `result` from `err` alone).
   



##########
test/load/schema_barrier/barrier_loadtest_test.go:
##########
@@ -0,0 +1,384 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build loadtest
+
+// Package schemabarrier hosts the CP-6 SLO load harness for the schema
+// consistency cluster barrier. It is gated behind the `loadtest` build tag
+// so the regular `go test ./...` cycle skips it; invoke via the make target
+// `make load-test-barrier`.
+//
+// The harness brings up an in-process distributed cluster (3 data nodes +
+// 1 liaison) and drives the CP-6 SLO load profile pinned in the plan:
+//   - 100 concurrent AwaitRevisionApplied(R, 5s) callers continuously
+//     re-issuing on completion;
+//   - schema mutation rate of 10 successful GroupRegistryService.Update
+//     ops per second across the seeded groups;
+//   - a 1-minute warm-up followed by a 5-minute measurement window;
+//   - report p50 / p95 / p99 from the client-side per-call duration.
+//
+// The client-side measurement is bounded above by the server-side
+// schema_await_revision_applied_duration_seconds histogram (RPC overhead on
+// localhost gRPC is sub-ms), so an SLO check on the client number is
+// strictly stricter than CP-6's "p99 < 200ms on the server-side histogram"
+// criterion. A future revision may scrape the Prometheus endpoint directly;
+// the current report records both numbers when the listener is reachable.
+package schemabarrier
+
+import (
+       "context"
+       "encoding/json"
+       "flag"
+       "fmt"
+       "math/rand/v2"
+       "os"
+       "sort"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/onsi/gomega"
+       "github.com/stretchr/testify/require"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       schemapkg 
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+var (
+       flagWarmUp        = flag.Duration("loadtest.warm-up", time.Minute, 
"warm-up duration before sampling begins")
+       flagMeasure       = flag.Duration("loadtest.measure", 5*time.Minute, 
"measurement window duration")
+       flagCallers       = flag.Int("loadtest.callers", 100, "concurrent 
AwaitRevisionApplied callers")
+       flagMutateRate    = flag.Int("loadtest.mutate-rate", 10, "successful 
Group.Update operations per second")
+       flagBarrierTarget = flag.Int("loadtest.barrier-offset", 5, 
"AwaitRevisionApplied targets latest+offset; tune so each call holds open ~1s 
on average")
+       flagBarrierTimeout = flag.Duration("loadtest.barrier-timeout", 
5*time.Second, "AwaitRevisionApplied per-call timeout")
+       flagReportPath    = flag.String("loadtest.report", "load-report.json", 
"path to write the JSON report; pass '-' for stdout-only")
+)
+
+// loadReport is serialized to disk at end of the measurement window. The
+// fields mirror what CP-6 sign-off cites: configuration, raw counts, and
+// the p50 / p95 / p99 of client-side per-call duration. Future revisions
+// may add a `ServerSideP99` field when Prometheus scraping is wired.
+type loadReport struct {
+       Profile      profile  `json:"profile"`
+       StartedAt    string   `json:"started_at"`
+       FinishedAt   string   `json:"finished_at"`
+       BarrierCalls int64    `json:"barrier_calls"`
+       BarrierErrs  int64    `json:"barrier_errors"`
+       BarrierTimeouts int64 `json:"barrier_timeouts"`
+       MutateOps    int64    `json:"mutate_ops"`
+       MutateErrs   int64    `json:"mutate_errors"`
+       P50Seconds   float64  `json:"p50_seconds"`
+       P95Seconds   float64  `json:"p95_seconds"`
+       P99Seconds   float64  `json:"p99_seconds"`
+       MaxSeconds   float64  `json:"max_seconds"`
+}
+
+type profile struct {
+       WarmUp           string `json:"warm_up"`
+       Measure          string `json:"measure"`
+       Callers          int    `json:"callers"`
+       MutateRatePerSec int    `json:"mutate_rate_per_sec"`
+       BarrierOffset    int    `json:"barrier_target_offset"`
+       BarrierTimeout   string `json:"barrier_timeout"`
+}
+
+// TestSchemaBarrierLoad is the CP-6 SLO load harness entry point. Run via:
+//
+//     make load-test-barrier
+//
+// or directly with overrides:
+//
+//     go test -tags=loadtest -timeout 30m ./test/load/schema_barrier/... \
+//         -loadtest.warm-up=10s -loadtest.measure=30s -loadtest.callers=20 \
+//         -loadtest.mutate-rate=5 -loadtest.report=/tmp/load-report.json
+//
+// The smoke configuration above keeps the harness runnable on a developer
+// laptop in under a minute; the SLO sign-off run uses the defaults.
+func TestSchemaBarrierLoad(t *testing.T) {
+       gomega.RegisterTestingT(t)
+
+       tmpDir, tmpDirCleanup, err := test.NewSpace()
+       require.NoError(t, err, "allocate tmp dir for cluster")
+       defer tmpDirCleanup()
+
+       dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
+       config := setup.PropertyClusterConfig(dfWriter)
+
+       t.Logf("starting 3 data nodes")
+       dataCloses := make([]func(), 0, 3)
+       for i := 0; i < 3; i++ {
+               dataCloses = append(dataCloses, setup.DataNode(config))
+       }
+
+       t.Logf("preloading schema (stream + measure + trace)")
+       setup.PreloadSchemaViaProperty(config,
+               test_stream.PreloadSchema, test_measure.PreloadSchema, 
test_trace.PreloadSchema)
+       config.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure, 
schemapkg.KindTrace)
+
+       t.Logf("starting liaison node")
+       liaisonAddr, closeLiaison := setup.LiaisonNode(config,
+               "--measure-metadata-cache-wait-duration=5s",
+               "--stream-metadata-cache-wait-duration=5s",
+               "--trace-metadata-cache-wait-duration=5s")
+       defer func() {
+               closeLiaison()
+               for _, c := range dataCloses {
+                       c()
+               }
+       }()
+
+       t.Logf("dialing liaison at %s", liaisonAddr)
+       conn, err := grpchelper.Conn(liaisonAddr, 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       require.NoError(t, err, "dial liaison")
+       defer func() { _ = conn.Close() }()
+
+       barrierClient := schemav1.NewSchemaBarrierServiceClient(conn)
+       groupsClient := databasev1.NewGroupRegistryServiceClient(conn)
+
+       // Seed the latest revision baseline from a synthetic 
AwaitRevisionApplied(0).
+       // The mutator + caller goroutines treat this as their starting point 
and
+       // advance it via atomic writes after each successful Group.Update.
+       var latestRev atomic.Int64
+       startResp, err := 
barrierClient.AwaitRevisionApplied(context.Background(),
+               &schemav1.AwaitRevisionAppliedRequest{MinRevision: 0, Timeout: 
durationpb.New(2 * time.Second)})
+       require.NoError(t, err, "baseline AwaitRevisionApplied")
+       require.True(t, startResp.GetApplied(), "baseline call must report 
applied")
+       // startResp.Applied=true does not give us a revision; we seed from the
+       // first List call below — the mutator path needs a positive revision to
+       // bump from anyway.
+
+       listCtx, listCancel := context.WithTimeout(context.Background(), 
10*time.Second)
+       listResp, err := groupsClient.List(listCtx, 
&databasev1.GroupRegistryServiceListRequest{})
+       listCancel()
+       require.NoError(t, err, "list seeded groups")
+       require.NotEmpty(t, listResp.GetGroup(), "PreloadSchema must seed at 
least one group")
+       for _, g := range listResp.GetGroup() {
+               if rev := g.GetMetadata().GetModRevision(); rev > 
latestRev.Load() {
+                       latestRev.Store(rev)
+               }
+       }
+       t.Logf("seeded %d groups; baseline ModRevision=%d", 
len(listResp.GetGroup()), latestRev.Load())
+
+       // Background workers are bounded by harnessCtx; cancelled at end of 
run.
+       harnessCtx, harnessCancel := context.WithCancel(context.Background())
+       defer harnessCancel()
+
+       var (
+               mutateOps    atomic.Int64
+               mutateErrs   atomic.Int64
+               barrierCalls atomic.Int64
+               barrierErrs  atomic.Int64
+               barrierTimes atomic.Int64
+
+               samplingMu sync.Mutex
+               samples    []time.Duration
+               // samplingActive guards `samples` writes so warm-up calls are 
not
+               // recorded. The mutex is acquired only on completion — gauging
+               // contention against the per-iteration RPC cost is negligible.
+               samplingActive atomic.Bool
+       )
+
+       // Mutator: 1 goroutine driving 10 ops/sec across the seeded groups
+       // (round-robin). Each iteration calls Group.Update with the latest List
+       // snapshot of the group; etcd bumps mod_revision on every Update even
+       // when the body is identical, so latestRev advances at the configured
+       // rate as long as the cluster is healthy.
+       mutateInterval := time.Second / time.Duration(*flagMutateRate)
+       go func() {
+               ticker := time.NewTicker(mutateInterval)

Review Comment:
   `mutateInterval := time.Second / time.Duration(*flagMutateRate)` will panic 
if `-loadtest.mutate-rate=0`. Since these are user-facing flags, please 
validate inputs early (mutate-rate > 0, callers >= 0, timeouts > 0, etc.) and 
fail the test with a clear error instead of dividing by zero.



##########
test/cases/schema/barrier_cluster.go:
##########
@@ -0,0 +1,340 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+// §6.12 — Cluster-only specs that exercise the schema-watch pause primitive
+// end-to-end through the public AwaitX RPCs. They pause the receiving
+// liaison's own SchemaRegistry; the cluster barrier's selfName probe reads
+// through that SR, so pausing it surfaces a laggard via the public AwaitX
+// API. Data-node fan-out via NodeSchemaStatusService is covered by the
+// unit tests in banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2).
+// These integration specs cover the orthogonal contract: the pause
+// primitive's effect is observable through the public AwaitX RPC and
+// resume drains the queued events so the barrier converges.
+//
+// Specs skip themselves under standalone mode and when the receiving
+// liaison address is empty (the standalone harness has none).
+
+func barrierClusterMeasureGroup(name string) *commonv1.Group {
+       return &commonv1.Group{
+               Metadata: &commonv1.Metadata{Name: name},
+               Catalog:  commonv1.Catalog_CATALOG_MEASURE,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum:        2,
+                       SegmentInterval: &commonv1.IntervalRule{Unit: 
commonv1.IntervalRule_UNIT_DAY, Num: 1},
+                       Ttl:             &commonv1.IntervalRule{Unit: 
commonv1.IntervalRule_UNIT_DAY, Num: 7},
+               },
+       }
+}
+
+func barrierClusterMeasureSpec(group, name string) *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: &commonv1.Metadata{Name: name, Group: group},
+               Entity:   &databasev1.Entity{TagNames: []string{"host"}},
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {
+                               Name: "default",
+                               Tags: []*databasev1.TagSpec{
+                                       {Name: "host", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               },
+                       },
+               },
+       }
+}
+
+var _ = g.Describe("Cluster barrier under partial-cluster conditions (§6.12)", 
func() {
+       var (
+               ctx     context.Context
+               clients *Clients
+               paused  string
+       )
+
+       g.BeforeEach(func() {
+               if SharedContext.Mode != helpers.ModeDistributed {
+                       g.Skip("§6.12 cluster barrier specs are 
distributed-only")
+               }
+               if SharedContext.LiaisonAddr == "" {
+                       g.Skip("§6.12 specs need a registered liaison address 
(set by the distributed BeforeSuite)")
+               }
+               ctx = context.Background()
+               clients = NewClients(SharedContext.Connection)
+               paused = ""
+       })
+
+       g.AfterEach(func() {
+               if paused == "" {
+                       return
+               }
+               // Best-effort resume so a failing assertion does not leave the
+               // liaison's SR permanently paused for downstream specs.
+               _ = setup.ResumeDataNodeWatch(paused)
+       })
+
+               // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a
+               // laggard via its selfName probe; resume drains the queue and 
the
+               // barrier converges. The GetMaxRevision min-aggregation 
regression
+               // that caused post-resume laggards:3 timeouts has been repaired
+               // (removed LatestModRevision from NodeRepoRegistry; 
GetMaxRevision
+               // now reads cache-only).
+       g.It("§6.12a AwaitRevisionApplied reports the paused liaison as a 
laggard", func() {

Review Comment:
   This file appears to not be gofmt'd: the §6.12a comment block is indented 
with extra tabs, which will be reformatted by gofmt and may fail 
formatting/lint checks. Please run gofmt on this file before merging.



##########
banyand/liaison/grpc/barrier_metrics.go:
##########
@@ -0,0 +1,250 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "strings"
+       "time"
+
+       "google.golang.org/grpc/codes"
+       "google.golang.org/grpc/status"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/meter"
+)
+
+const (
+       // barrierLabelRevisionApplied / SchemaApplied / SchemaDeleted are the
+       // three values used for the `barrier` label on
+       // schema_barrier_laggard_nodes_total. The histogram metrics carry the
+       // barrier identity in their metric name instead, so this label is only
+       // emitted on the laggard counter.
+       barrierLabelRevisionApplied = "revision_applied"
+       barrierLabelSchemaApplied   = "schema_applied"
+       barrierLabelSchemaDeleted   = "schema_deleted"
+
+       // resultLabel* are the values for the `result` label on the three
+       // schema_await_*_duration_seconds histograms. Recorded once per call
+       // at return time.
+       resultLabelApplied         = "applied"
+       resultLabelTimeout         = "timeout"
+       resultLabelInvalidArgument = "invalid_argument"
+       resultLabelError           = "error"
+
+       // roleLabelSelf is used when a laggard's Node field is unprefixed —
+       // the standalone barrier path emits a single self-laggard with no name
+       // on timeout, where there is no role to label against.
+       roleLabelSelf = "self"
+
+       // statusReasonWaitTimeout is the only `reason` value emitted for
+       // schema_status_schema_not_applied_total in v0.11.0. The label is
+       // retained for forward-compat with optional fast-sync paths.
+       statusReasonWaitTimeout = "wait_timeout"
+
+       // rpcLabel* are the values for the `rpc` label on the two status
+       // counters (schema_status_schema_not_applied_total /
+       // schema_status_expired_schema_total). Bound at call sites in
+       // measure.go / stream.go / trace.go for the write and query gates.
+       rpcLabelMeasureWrite = "measure_write"
+       rpcLabelStreamWrite  = "stream_write"
+       rpcLabelTraceWrite   = "trace_write"
+       rpcLabelMeasureQuery = "measure_query"
+       rpcLabelStreamQuery  = "stream_query"
+       rpcLabelTraceQuery   = "trace_query"
+)
+
+// splitRoleNode extracts the `<role>` and `<name>` halves of a laggard's
+// Node identifier per the cluster barrier's `<role>-<Metadata.Name>`
+// convention (see member.laggardName in barrier_cluster.go). Unprefixed
+// values — emitted by the standalone barrier path on timeout — map to
+// role="self" and name="" so the laggard counter still increments without
+// dropping the observation.
+func splitRoleNode(node string) (role, name string) {
+       if i := strings.IndexByte(node, '-'); i >= 0 {
+               return node[:i], node[i+1:]
+       }
+       return roleLabelSelf, node
+}

Review Comment:
   The `splitRoleNode` docstring says unprefixed node IDs map to role="self" 
and name=""; however the implementation returns role="self" and name=node (and 
tests assert the non-empty name). Please adjust the comment to match the actual 
behavior to avoid confusion when interpreting metric labels.



##########
CHANGES.md:
##########
@@ -31,10 +31,26 @@ Release Notes.
   - Add tombstone retention/GC (default 7 days, configurable via 
`--schema-server-tombstone-retention`) with a per-cache count cap to bound 
memory under bulk deletes.
   - Reject `Create` with `updated_at <= tombstone.delete_time` to prevent 
replayed creates from overwriting newer deletes.
   - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose 
monotonic `LatestModRevision` watermark.
-- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. 
Internal-only; no client-facing surface impact yet.
-  - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the upcoming 
barrier fan-out (#1108).
+- Schema consistency (Phase 2 in progress): cluster-wide barrier. 
Internal-only; no client-facing surface impact yet.
+  - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the barrier 
fan-out (#1108).
   - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the 
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of 
opening a parallel mesh (#1109).
   - `AwaitRevisionApplied` now fans out across the receiving liaison's frozen 
tier1 (peer-liaison) + tier2 (data-node) Active set, probing each member in 
parallel via `GetMaxRevision` with shared per-call deadline. Cross-version 
peers returning `codes.Unimplemented` are treated as ready so partial-upgrade 
clusters do not deadlock; transient RPC errors count as per-iteration laggards. 
Empty Active set fails fast with `codes.Unavailable`.
+  - Frozen-snapshot mid-call semantics: members that transition `Active → 
Evictable` during a call are dropped from subsequent probes and surfaced once 
as a `NodeLaggard{reason="evicted_during_poll"}`; members that disappear from 
the route table altogether are dropped silently; late joiners are excluded from 
the watched set until the next call. Adds `reason` field (5) to `NodeLaggard` 
proto.
+  - `AwaitSchemaApplied` and `AwaitSchemaDeleted` follow the same fan-out 
shape using `GetKeyRevisions` / `GetAbsentKeys` respectively, with per-node 
calls chunked at 1000 keys and a shared call-wide deadline (no equal-slice 
division across chunks). Per-node laggards carry the per-member `missing_keys` 
/ `still_present_keys` they observed.
+  - First-attempt re-enable of the four Phase-1-deferred distributed specs 
(§6.8, §6.11, §4.6.4, §4.6.2) confirmed all four still flake under the cluster 
barrier alone (first run had §4.6.2 passing; second run reproduced the `group 
not found` race). All four guards stay in place pending Step 2.5's cluster 
query gate; comments updated to cite the gate as the next prerequisite.
+  - Add `pkg/schema/registry.NodeRepoRegistry`, the per-node aggregator that 
routes barrier and node-status RPC lookups to the same per-service 
`pkg/schema.schemaRepo` instances the data-node executor consults via 
`LoadGroup` / `LoadResource`. Each banyand service (measure / stream / trace) 
registers its `schemaRepo` here during PreRun under a kind bitmask covering 
`KindGroup` + the catalog's primary kind + `KindIndexRule` + 
`KindIndexRuleBinding`. `metadata.Service` exposes a `NodeRepoRegistry()` 
accessor and `clientService` constructs a single registry per process.
+  - Repoint `NodeSchemaStatusService` at the `NodeRepoRegistry` for 
executor-tracked kinds: `GetMaxRevision` returns 
`min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision)` and 
`GetKeyRevisions` / `GetAbsentKeys` route per-key lookups through the 
per-service `schemaRepo` aggregator. TopN / Property keys still consult the 
property `schemaCache`. This closes the `SendMetadataEvent` eventCh-retry leak 
where the schemaCache watermark advanced before `schemaRepo.groupMap` applied 
the event, so the cluster barrier can no longer certify a key the executor is 
about to miss.

Review Comment:
   CHANGES.md states `GetMaxRevision` returns 
`min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision)`, but 
later bullets say `LatestModRevision` was removed and `GetMaxRevision` reads 
cache-only. Please update this bullet to reflect the current implementation 
(and avoid referencing the removed API).
   



##########
pkg/test/setup/setup.go:
##########
@@ -702,6 +725,23 @@ func startLiaisonNode(config *ClusterConfig, path string, 
flags ...string) (stri
        }
        waitForActiveDataNodes(grpcAddr, config)
 
+       // Bind the liaison's SchemaRegistry to its gRPC address so cluster-only
+       // specs can call PauseDataNodeWatch / ResumeDataNodeWatch on the
+       // receiving liaison itself — pausing the liaison's own SR makes its
+       // barrier selfName probe lag, which is the §6.12 contract since the
+       // in-process distributed harness does not expose
+       // NodeSchemaStatusService on data-node ports.

Review Comment:
   The comment says the in-process distributed harness does not expose 
`NodeSchemaStatusService` on data-node ports, but data nodes started via 
`startDataNode` run the `data` command and (per this PR) wire 
`SetNodeSchemaStatusRepo`, which should register the service. Please 
update/remove this comment to avoid misleading future readers.



##########
CHANGES.md:
##########
@@ -31,10 +31,26 @@ Release Notes.
   - Add tombstone retention/GC (default 7 days, configurable via 
`--schema-server-tombstone-retention`) with a per-cache count cap to bound 
memory under bulk deletes.
   - Reject `Create` with `updated_at <= tombstone.delete_time` to prevent 
replayed creates from overwriting newer deletes.
   - Guard `pkg/schema/cache` against out-of-order `EventDelete` events; expose 
monotonic `LatestModRevision` watermark.
-- Schema consistency (Phase 2 in progress): cluster-wide barrier groundwork. 
Internal-only; no client-facing surface impact yet.
-  - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the upcoming 
barrier fan-out (#1108).
+- Schema consistency (Phase 2 in progress): cluster-wide barrier. 
Internal-only; no client-facing surface impact yet.
+  - Add `NodeSchemaStatusService` (`GetMaxRevision`, `GetKeyRevisions`, 
`GetAbsentKeys`) registered on every cluster member that holds a schema cache, 
so peer liaisons and data nodes can be probed identically by the barrier 
fan-out (#1108).
   - Extend `queue.Client` with `NewNodeSchemaStatusClient(node)` so the 
barrier fan-out can borrow the existing tier1/tier2 connection pools instead of 
opening a parallel mesh (#1109).
   - `AwaitRevisionApplied` now fans out across the receiving liaison's frozen 
tier1 (peer-liaison) + tier2 (data-node) Active set, probing each member in 
parallel via `GetMaxRevision` with shared per-call deadline. Cross-version 
peers returning `codes.Unimplemented` are treated as ready so partial-upgrade 
clusters do not deadlock; transient RPC errors count as per-iteration laggards. 
Empty Active set fails fast with `codes.Unavailable`.
+  - Frozen-snapshot mid-call semantics: members that transition `Active → 
Evictable` during a call are dropped from subsequent probes and surfaced once 
as a `NodeLaggard{reason="evicted_during_poll"}`; members that disappear from 
the route table altogether are dropped silently; late joiners are excluded from 
the watched set until the next call. Adds `reason` field (5) to `NodeLaggard` 
proto.
+  - `AwaitSchemaApplied` and `AwaitSchemaDeleted` follow the same fan-out 
shape using `GetKeyRevisions` / `GetAbsentKeys` respectively, with per-node 
calls chunked at 1000 keys and a shared call-wide deadline (no equal-slice 
division across chunks). Per-node laggards carry the per-member `missing_keys` 
/ `still_present_keys` they observed.
+  - First-attempt re-enable of the four Phase-1-deferred distributed specs 
(§6.8, §6.11, §4.6.4, §4.6.2) confirmed all four still flake under the cluster 
barrier alone (first run had §4.6.2 passing; second run reproduced the `group 
not found` race). All four guards stay in place pending Step 2.5's cluster 
query gate; comments updated to cite the gate as the next prerequisite.
+  - Add `pkg/schema/registry.NodeRepoRegistry`, the per-node aggregator that 
routes barrier and node-status RPC lookups to the same per-service 
`pkg/schema.schemaRepo` instances the data-node executor consults via 
`LoadGroup` / `LoadResource`. Each banyand service (measure / stream / trace) 
registers its `schemaRepo` here during PreRun under a kind bitmask covering 
`KindGroup` + the catalog's primary kind + `KindIndexRule` + 
`KindIndexRuleBinding`. `metadata.Service` exposes a `NodeRepoRegistry()` 
accessor and `clientService` constructs a single registry per process.
+  - Repoint `NodeSchemaStatusService` at the `NodeRepoRegistry` for 
executor-tracked kinds: `GetMaxRevision` returns 
`min(schemaCache.notifiedModRevision, NodeRepoRegistry.LatestModRevision)` and 
`GetKeyRevisions` / `GetAbsentKeys` route per-key lookups through the 
per-service `schemaRepo` aggregator. TopN / Property keys still consult the 
property `schemaCache`. This closes the `SendMetadataEvent` eventCh-retry leak 
where the schemaCache watermark advanced before `schemaRepo.groupMap` applied 
the event, so the cluster barrier can no longer certify a key the executor is 
about to miss.
+  - Write gate (`validateWriteRequest` for measure / stream / trace) and 
per-group query gate (`checkQueryGate`) read `cacheRev` through the 
`NodeRepoRegistry` rather than the liaison `entityRepo` locator. The locator 
still answers existence checks (`STATUS_NOT_FOUND` signal) and downstream 
navigation; only the revision scalar moves. Net contract: if 
`AwaitRevisionApplied(R)` on a node returns `applied=true`, the write gate, 
query gate, and downstream executor on that same node all see ≥ R for any key 
included in R.
+  - Re-enable §4.6.2 / §4.6.4 / §6.8 / §6.11 in distributed mode (final pass). 
Distributed schema integration suite reports `Ran 28 of 28 Specs, 0 Skipped`.

Review Comment:
   CHANGES.md bullet says the four distributed specs still flake and guards 
stay in place, but a later bullet says they were re-enabled and the suite ran 
with 0 skipped. These statements conflict; please reconcile/remove the outdated 
entry so the release notes are internally consistent.
   



##########
test/cases/schema/barrier_cluster.go:
##########
@@ -0,0 +1,340 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package schema
+
+import (
+       "context"
+       "fmt"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       gm "github.com/onsi/gomega"
+       "google.golang.org/protobuf/types/known/durationpb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       schemav1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+)
+
+// §6.12 — Cluster-only specs that exercise the schema-watch pause primitive
+// end-to-end through the public AwaitX RPCs. They pause the receiving
+// liaison's own SchemaRegistry; the cluster barrier's selfName probe reads
+// through that SR, so pausing it surfaces a laggard via the public AwaitX
+// API. Data-node fan-out via NodeSchemaStatusService is covered by the
+// unit tests in banyand/liaison/grpc/barrier_cluster_test.go (§FA-1..FD-2).
+// These integration specs cover the orthogonal contract: the pause
+// primitive's effect is observable through the public AwaitX RPC and
+// resume drains the queued events so the barrier converges.
+//
+// Specs skip themselves under standalone mode and when the receiving
+// liaison address is empty (the standalone harness has none).
+
+func barrierClusterMeasureGroup(name string) *commonv1.Group {
+       return &commonv1.Group{
+               Metadata: &commonv1.Metadata{Name: name},
+               Catalog:  commonv1.Catalog_CATALOG_MEASURE,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       ShardNum:        2,
+                       SegmentInterval: &commonv1.IntervalRule{Unit: 
commonv1.IntervalRule_UNIT_DAY, Num: 1},
+                       Ttl:             &commonv1.IntervalRule{Unit: 
commonv1.IntervalRule_UNIT_DAY, Num: 7},
+               },
+       }
+}
+
+func barrierClusterMeasureSpec(group, name string) *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: &commonv1.Metadata{Name: name, Group: group},
+               Entity:   &databasev1.Entity{TagNames: []string{"host"}},
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {
+                               Name: "default",
+                               Tags: []*databasev1.TagSpec{
+                                       {Name: "host", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               },
+                       },
+               },
+       }
+}
+
+var _ = g.Describe("Cluster barrier under partial-cluster conditions (§6.12)", 
func() {
+       var (
+               ctx     context.Context
+               clients *Clients
+               paused  string
+       )
+
+       g.BeforeEach(func() {
+               if SharedContext.Mode != helpers.ModeDistributed {
+                       g.Skip("§6.12 cluster barrier specs are 
distributed-only")
+               }
+               if SharedContext.LiaisonAddr == "" {
+                       g.Skip("§6.12 specs need a registered liaison address 
(set by the distributed BeforeSuite)")
+               }
+               ctx = context.Background()
+               clients = NewClients(SharedContext.Connection)
+               paused = ""
+       })
+
+       g.AfterEach(func() {
+               if paused == "" {
+                       return
+               }
+               // Best-effort resume so a failing assertion does not leave the
+               // liaison's SR permanently paused for downstream specs.
+               _ = setup.ResumeDataNodeWatch(paused)
+       })
+
+               // §6.12a — AwaitRevisionApplied surfaces a paused liaison as a
+               // laggard via its selfName probe; resume drains the queue and 
the
+               // barrier converges. The GetMaxRevision min-aggregation 
regression
+               // that caused post-resume laggards:3 timeouts has been repaired
+               // (removed LatestModRevision from NodeRepoRegistry; 
GetMaxRevision
+               // now reads cache-only).
+       g.It("§6.12a AwaitRevisionApplied reports the paused liaison as a 
laggard", func() {
+               groupName := fmt.Sprintf("bc-rev-%d", time.Now().UnixNano())
+               measureName := "bc_rev_measure"
+
+               g.By("Seeding the group + measure at a known mod_revision")
+               _, createGroupErr := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{Group: 
barrierClusterMeasureGroup(groupName)})
+               gm.Expect(createGroupErr).ShouldNot(gm.HaveOccurred())
+               createMeasureResp, createMeasureErr := 
clients.MeasureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: barrierClusterMeasureSpec(groupName, 
measureName),
+               })
+               gm.Expect(createMeasureErr).ShouldNot(gm.HaveOccurred())
+               baselineRev := createMeasureResp.GetModRevision()
+               gm.Expect(clients.AwaitRevision(ctx, baselineRev, 
10*time.Second)).Should(gm.Succeed())
+
+               g.By("Pausing the receiving liaison's schema watch")
+               paused = SharedContext.LiaisonAddr
+               gm.Expect(setup.PauseDataNodeWatch(paused)).Should(gm.Succeed())
+
+               g.By("Bumping the measure's mod_revision while the liaison is 
paused")
+               getResp, getErr := clients.MeasureRegClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{
+                       Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
+               })
+               gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
+               measure := getResp.GetMeasure()
+               measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                       &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+               updResp, updErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+               gm.Expect(updErr).ShouldNot(gm.HaveOccurred())
+               newRev := updResp.GetModRevision()
+               gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
+
+               g.By("Calling awaitRevisionApplied — paused liaison must 
surface as a laggard")
+               // Brief settle so the bumped revision's watch event has time to
+               // reach the paused liaison's SR (which queues it under pause).
+               // Without this, the barrier can race the watch broadcast.
+               time.Sleep(200 * time.Millisecond)
+               callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+               defer cancel()
+               resp, rpcErr := 
clients.BarrierClient.AwaitRevisionApplied(callCtx, 
&schemav1.AwaitRevisionAppliedRequest{
+                       MinRevision: newRev,
+                       Timeout:     durationpb.New(2 * time.Second),
+               })
+               gm.Expect(rpcErr).ShouldNot(gm.HaveOccurred())
+               gm.Expect(resp.GetApplied()).Should(gm.BeFalse(),
+                       "barrier must not report applied while the receiving 
liaison is paused")
+               gm.Expect(resp.GetLaggards()).ShouldNot(gm.BeEmpty(),
+                       "barrier must surface a laggard while the receiving 
liaison is paused")
+
+               g.By("Resuming and verifying the barrier converges")
+               
gm.Expect(setup.ResumeDataNodeWatch(paused)).Should(gm.Succeed())
+               paused = ""
+               gm.Expect(clients.AwaitRevision(ctx, newRev, 
10*time.Second)).Should(gm.Succeed())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       // §6.12b — AwaitSchemaApplied surfaces a paused liaison as a laggard
+       // when a measure's mod_revision has bumped but the liaison's SR has
+       // queued the watch event.
+       g.It("§6.12b AwaitSchemaApplied reports the paused liaison as a 
laggard", func() {
+               groupName := fmt.Sprintf("bc-applied-%d", time.Now().UnixNano())
+               measureName := "bc_measure"
+
+               g.By("Seeding the group + measure")
+               _, createGroupErr := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{Group: 
barrierClusterMeasureGroup(groupName)})
+               gm.Expect(createGroupErr).ShouldNot(gm.HaveOccurred())
+               createMeasureResp, createMeasureErr := 
clients.MeasureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: barrierClusterMeasureSpec(groupName, 
measureName),
+               })
+               gm.Expect(createMeasureErr).ShouldNot(gm.HaveOccurred())
+               baselineRev := createMeasureResp.GetModRevision()
+               gm.Expect(clients.AwaitRevision(ctx, baselineRev, 
10*time.Second)).Should(gm.Succeed())
+
+               g.By("Pausing the receiving liaison's schema watch")
+               paused = SharedContext.LiaisonAddr
+               gm.Expect(setup.PauseDataNodeWatch(paused)).Should(gm.Succeed())
+
+               g.By("Updating the measure to bump its mod_revision")
+               getResp, getErr := clients.MeasureRegClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{
+                       Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
+               })
+               gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
+               measure := getResp.GetMeasure()
+               measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                       &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+               updResp, updErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+               gm.Expect(updErr).ShouldNot(gm.HaveOccurred())
+               newRev := updResp.GetModRevision()
+               gm.Expect(newRev).Should(gm.BeNumerically(">", baselineRev))
+
+               g.By("Calling AwaitSchemaApplied — paused liaison must surface 
as a laggard")
+               callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+               defer cancel()
+               resp, rpcErr := 
clients.BarrierClient.AwaitSchemaApplied(callCtx, 
&schemav1.AwaitSchemaAppliedRequest{
+                       Keys: []*schemav1.SchemaKey{{
+                               Kind: "measure", Group: groupName, Name: 
measureName,
+                       }},
+                       MinRevisions: []int64{newRev},
+                       Timeout:      durationpb.New(2 * time.Second),
+               })
+               gm.Expect(rpcErr).ShouldNot(gm.HaveOccurred())
+               gm.Expect(resp.GetApplied()).Should(gm.BeFalse())
+               gm.Expect(resp.GetLaggards()).ShouldNot(gm.BeEmpty(),
+                       "barrier must surface a laggard while the receiving 
liaison is paused")
+
+               g.By("Resuming and verifying the barrier converges")
+               
gm.Expect(setup.ResumeDataNodeWatch(paused)).Should(gm.Succeed())
+               paused = ""
+               gm.Expect(clients.AwaitApplied(ctx, 
[]string{fmt.Sprintf("measure:%s/%s", groupName, measureName)}, 
10*time.Second)).Should(gm.Succeed())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+       // §6.12c — AwaitSchemaDeleted surfaces a paused liaison as a laggard
+       // when a measure was deleted but the liaison's SR has queued the
+       // delete event (so its cache still holds the entry).
+       g.It("§6.12c AwaitSchemaDeleted reports the paused liaison as a 
laggard", func() {
+               groupName := fmt.Sprintf("bc-deleted-%d", time.Now().UnixNano())
+               measureName := "bc_del_measure"
+
+               g.By("Seeding the group + measure")
+               _, createGroupErr := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{Group: 
barrierClusterMeasureGroup(groupName)})
+               gm.Expect(createGroupErr).ShouldNot(gm.HaveOccurred())
+               createMeasureResp, createMeasureErr := 
clients.MeasureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: barrierClusterMeasureSpec(groupName, 
measureName),
+               })
+               gm.Expect(createMeasureErr).ShouldNot(gm.HaveOccurred())
+               gm.Expect(clients.AwaitRevision(ctx, 
createMeasureResp.GetModRevision(), 10*time.Second)).Should(gm.Succeed())
+
+               g.By("Pausing the receiving liaison's schema watch")
+               paused = SharedContext.LiaisonAddr
+               gm.Expect(setup.PauseDataNodeWatch(paused)).Should(gm.Succeed())
+
+               g.By("Deleting the measure while the liaison is paused")
+               _, delErr := clients.MeasureRegClient.Delete(ctx, 
&databasev1.MeasureRegistryServiceDeleteRequest{
+                       Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
+               })
+               gm.Expect(delErr).ShouldNot(gm.HaveOccurred())
+
+               g.By("Calling AwaitSchemaDeleted — paused liaison must surface 
as a laggard")
+               callCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
+               defer cancel()
+               resp, rpcErr := 
clients.BarrierClient.AwaitSchemaDeleted(callCtx, 
&schemav1.AwaitSchemaDeletedRequest{
+                       Keys: []*schemav1.SchemaKey{{
+                               Kind: "measure", Group: groupName, Name: 
measureName,
+                       }},
+                       Timeout: durationpb.New(2 * time.Second),
+               })
+               gm.Expect(rpcErr).ShouldNot(gm.HaveOccurred())
+               gm.Expect(resp.GetApplied()).Should(gm.BeFalse())
+               gm.Expect(resp.GetLaggards()).ShouldNot(gm.BeEmpty(),
+                       "barrier must surface a laggard while the receiving 
liaison is paused")
+
+               g.By("Resuming and verifying the deletion barrier converges")
+               
gm.Expect(setup.ResumeDataNodeWatch(paused)).Should(gm.Succeed())
+               paused = ""
+               gm.Expect(clients.AwaitDeleted(ctx, 
[]string{fmt.Sprintf("measure:%s/%s", groupName, measureName)}, 
10*time.Second)).Should(gm.Succeed())
+
+               _, _ = clients.GroupClient.Delete(ctx, 
&databasev1.GroupRegistryServiceDeleteRequest{Group: groupName})
+       })
+
+               // §6.12d — Cross-barrier recovery: after a multi-step 
pause-and-mutate
+               // sequence, resume drains the queued events in arrival order 
so a
+               // follow-up AwaitRevisionApplied at the post-mutate revision 
returns
+               // applied=true with no laggards.
+       g.It("§6.12d cross-barrier recovery: resume drains queued events and 
clears the laggard", func() {
+               groupName := fmt.Sprintf("bc-recovery-%d", 
time.Now().UnixNano())
+               measureName := "bc_recovery_measure"
+
+               g.By("Seeding the group + measure")
+               _, createGroupErr := clients.GroupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{Group: 
barrierClusterMeasureGroup(groupName)})
+               gm.Expect(createGroupErr).ShouldNot(gm.HaveOccurred())
+               createMeasureResp, createMeasureErr := 
clients.MeasureRegClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                       Measure: barrierClusterMeasureSpec(groupName, 
measureName),
+               })
+               gm.Expect(createMeasureErr).ShouldNot(gm.HaveOccurred())
+               baselineRev := createMeasureResp.GetModRevision()
+               gm.Expect(clients.AwaitRevision(ctx, baselineRev, 
10*time.Second)).Should(gm.Succeed())
+
+               g.By("Pausing the receiving liaison and bumping the measure 
twice while paused")
+               paused = SharedContext.LiaisonAddr
+               gm.Expect(setup.PauseDataNodeWatch(paused)).Should(gm.Succeed())
+
+               getResp, getErr := clients.MeasureRegClient.Get(ctx, 
&databasev1.MeasureRegistryServiceGetRequest{
+                       Metadata: &commonv1.Metadata{Group: groupName, Name: 
measureName},
+               })
+               gm.Expect(getErr).ShouldNot(gm.HaveOccurred())
+                       measure := getResp.GetMeasure()
+                       measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                               &databasev1.TagSpec{Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+                       _, firstErr := clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+                       gm.Expect(firstErr).ShouldNot(gm.HaveOccurred())
+                       measure.TagFamilies[0].Tags = 
append(measure.TagFamilies[0].Tags,
+                               &databasev1.TagSpec{Name: "zone", Type: 
databasev1.TagType_TAG_TYPE_STRING})
+                       secondResp, secondErr := 
clients.MeasureRegClient.Update(ctx, 
&databasev1.MeasureRegistryServiceUpdateRequest{Measure: measure})
+                       gm.Expect(secondErr).ShouldNot(gm.HaveOccurred())

Review Comment:
   The §6.12d block has several lines that are incorrectly indented (e.g., 
`measure := ...` and subsequent updates are shifted right). This is another 
gofmt issue and also makes the control flow harder to read. Please gofmt (or 
otherwise fix indentation) so the spec code is consistently formatted.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to