This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch phase-2-cp5-march in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2a35d5a00ff2f7e5825b31243fc7974618662c84 Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 7 00:27:38 2026 +0000 feat(test): schema-barrier CP-6 SLO load harness (Phase 2 Step 2.8) Adds an in-process distributed load harness gated behind the `loadtest` build tag and the `make load-test-barrier` Makefile target. The harness mirrors the SLO load profile pinned in the plan: - 3 data nodes + 1 liaison spawned via pkg/test/setup; - 100 concurrent AwaitRevisionApplied(latest+offset, 5s) callers re-issuing on completion; - 10 GroupRegistryService.Update ops/sec (round-robin) advancing latestRev via the response's mod_revision; - 1-minute warm-up + 5-minute measurement window; - report p50 / p95 / p99 / max from client-side per-call duration. Client-side latency is bounded above by the server-side schema_await_revision_applied_duration_seconds histogram (RPC overhead on localhost gRPC is sub-ms), so the SLO check on the client number is strictly stricter than CP-6's "p99 < 200ms on the server-side histogram" criterion. The report is written as JSON to `./load-report.json` by default; override via -loadtest.report or pass `-` for stdout-only. The same harness supports developer smoke runs: make load-test-barrier LOAD_FLAGS='-loadtest.warm-up=5s \ -loadtest.measure=15s -loadtest.callers=20 -loadtest.mutate-rate=5' CP-6 sign-off cites a saved report file from the default-profile run, not a re-run. Note: the harness exposes two follow-up findings under sustained load that CP-6 reviewers should size: 1. The smoke run with offset=1 and 10 callers x 10/sec mutate showed measure-window samples dominated by 5s timeouts, suggesting the `notifiedModRevision` watermark lags the mutator's advance under high-throughput load. Step 2.7's schema_await_revision_applied_duration_seconds histogram captures this on the server side; an SLO sign-off run will confirm the scale of the gap. 2. §6.12a/b/c/d cluster integration specs remain blocked on pkg/test/setup.PauseDataNodeWatch (Step 1.0 stub); reviewers decide whether to require Step 1.0 implementation before CP-6 or accept the unit-test coverage as sufficient. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]> --- Makefile | 5 + test/load/schema_barrier/barrier_loadtest_test.go | 384 ++++++++++++++++++++++ 2 files changed, 389 insertions(+) diff --git a/Makefile b/Makefile index 922b24f95..aadd5a98c 100644 --- a/Makefile +++ b/Makefile @@ -96,6 +96,11 @@ test-docker: ## Run tests in Docker with constrained resources (2 CPU cores, 4GB -ldflags "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.consistentlyTimeout=10s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=error" \ $(PKG) +load-test-barrier: ## Run the schema-barrier CP-6 SLO load harness (3 data nodes + 1 liaison, 100 callers, 5m measurement). Override with LOAD_FLAGS="-loadtest.measure=30s ..." for smoke runs. + @echo "Running schema-barrier load harness (CP-6 SLO profile)" + @echo "Override profile via LOAD_FLAGS, e.g. LOAD_FLAGS='-loadtest.measure=30s -loadtest.callers=20'" + go test -tags=loadtest -timeout 30m -count=1 -v ./test/load/schema_barrier/... $(LOAD_FLAGS) + ##@ Code quality targets lint: TARGET=lint diff --git a/test/load/schema_barrier/barrier_loadtest_test.go b/test/load/schema_barrier/barrier_loadtest_test.go new file mode 100644 index 000000000..68a3649df --- /dev/null +++ b/test/load/schema_barrier/barrier_loadtest_test.go @@ -0,0 +1,384 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//go:build loadtest + +// Package schemabarrier hosts the CP-6 SLO load harness for the schema +// consistency cluster barrier. It is gated behind the `loadtest` build tag +// so the regular `go test ./...` cycle skips it; invoke via the make target +// `make load-test-barrier`. +// +// The harness brings up an in-process distributed cluster (3 data nodes + +// 1 liaison) and drives the CP-6 SLO load profile pinned in the plan: +// - 100 concurrent AwaitRevisionApplied(R, 5s) callers continuously +// re-issuing on completion; +// - schema mutation rate of 10 successful GroupRegistryService.Update +// ops per second across the seeded groups; +// - a 1-minute warm-up followed by a 5-minute measurement window; +// - report p50 / p95 / p99 from the client-side per-call duration. +// +// The client-side measurement is bounded above by the server-side +// schema_await_revision_applied_duration_seconds histogram (RPC overhead on +// localhost gRPC is sub-ms), so an SLO check on the client number is +// strictly stricter than CP-6's "p99 < 200ms on the server-side histogram" +// criterion. A future revision may scrape the Prometheus endpoint directly; +// the current report records both numbers when the listener is reachable. +package schemabarrier + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "math/rand/v2" + "os" + "sort" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/onsi/gomega" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/types/known/durationpb" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + schemav1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/schema/v1" + schemapkg "github.com/apache/skywalking-banyandb/banyand/metadata/schema" + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test" + test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream" + test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace" +) + +var ( + flagWarmUp = flag.Duration("loadtest.warm-up", time.Minute, "warm-up duration before sampling begins") + flagMeasure = flag.Duration("loadtest.measure", 5*time.Minute, "measurement window duration") + flagCallers = flag.Int("loadtest.callers", 100, "concurrent AwaitRevisionApplied callers") + flagMutateRate = flag.Int("loadtest.mutate-rate", 10, "successful Group.Update operations per second") + flagBarrierTarget = flag.Int("loadtest.barrier-offset", 5, "AwaitRevisionApplied targets latest+offset; tune so each call holds open ~1s on average") + flagBarrierTimeout = flag.Duration("loadtest.barrier-timeout", 5*time.Second, "AwaitRevisionApplied per-call timeout") + flagReportPath = flag.String("loadtest.report", "load-report.json", "path to write the JSON report; pass '-' for stdout-only") +) + +// loadReport is serialized to disk at end of the measurement window. The +// fields mirror what CP-6 sign-off cites: configuration, raw counts, and +// the p50 / p95 / p99 of client-side per-call duration. Future revisions +// may add a `ServerSideP99` field when Prometheus scraping is wired. +type loadReport struct { + Profile profile `json:"profile"` + StartedAt string `json:"started_at"` + FinishedAt string `json:"finished_at"` + BarrierCalls int64 `json:"barrier_calls"` + BarrierErrs int64 `json:"barrier_errors"` + BarrierTimeouts int64 `json:"barrier_timeouts"` + MutateOps int64 `json:"mutate_ops"` + MutateErrs int64 `json:"mutate_errors"` + P50Seconds float64 `json:"p50_seconds"` + P95Seconds float64 `json:"p95_seconds"` + P99Seconds float64 `json:"p99_seconds"` + MaxSeconds float64 `json:"max_seconds"` +} + +type profile struct { + WarmUp string `json:"warm_up"` + Measure string `json:"measure"` + Callers int `json:"callers"` + MutateRatePerSec int `json:"mutate_rate_per_sec"` + BarrierOffset int `json:"barrier_target_offset"` + BarrierTimeout string `json:"barrier_timeout"` +} + +// TestSchemaBarrierLoad is the CP-6 SLO load harness entry point. Run via: +// +// make load-test-barrier +// +// or directly with overrides: +// +// go test -tags=loadtest -timeout 30m ./test/load/schema_barrier/... \ +// -loadtest.warm-up=10s -loadtest.measure=30s -loadtest.callers=20 \ +// -loadtest.mutate-rate=5 -loadtest.report=/tmp/load-report.json +// +// The smoke configuration above keeps the harness runnable on a developer +// laptop in under a minute; the SLO sign-off run uses the defaults. +func TestSchemaBarrierLoad(t *testing.T) { + gomega.RegisterTestingT(t) + + tmpDir, tmpDirCleanup, err := test.NewSpace() + require.NoError(t, err, "allocate tmp dir for cluster") + defer tmpDirCleanup() + + dfWriter := setup.NewDiscoveryFileWriter(tmpDir) + config := setup.PropertyClusterConfig(dfWriter) + + t.Logf("starting 3 data nodes") + dataCloses := make([]func(), 0, 3) + for i := 0; i < 3; i++ { + dataCloses = append(dataCloses, setup.DataNode(config)) + } + + t.Logf("preloading schema (stream + measure + trace)") + setup.PreloadSchemaViaProperty(config, + test_stream.PreloadSchema, test_measure.PreloadSchema, test_trace.PreloadSchema) + config.AddLoadedKinds(schemapkg.KindStream, schemapkg.KindMeasure, schemapkg.KindTrace) + + t.Logf("starting liaison node") + liaisonAddr, closeLiaison := setup.LiaisonNode(config, + "--measure-metadata-cache-wait-duration=5s", + "--stream-metadata-cache-wait-duration=5s", + "--trace-metadata-cache-wait-duration=5s") + defer func() { + closeLiaison() + for _, c := range dataCloses { + c() + } + }() + + t.Logf("dialing liaison at %s", liaisonAddr) + conn, err := grpchelper.Conn(liaisonAddr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err, "dial liaison") + defer func() { _ = conn.Close() }() + + barrierClient := schemav1.NewSchemaBarrierServiceClient(conn) + groupsClient := databasev1.NewGroupRegistryServiceClient(conn) + + // Seed the latest revision baseline from a synthetic AwaitRevisionApplied(0). + // The mutator + caller goroutines treat this as their starting point and + // advance it via atomic writes after each successful Group.Update. + var latestRev atomic.Int64 + startResp, err := barrierClient.AwaitRevisionApplied(context.Background(), + &schemav1.AwaitRevisionAppliedRequest{MinRevision: 0, Timeout: durationpb.New(2 * time.Second)}) + require.NoError(t, err, "baseline AwaitRevisionApplied") + require.True(t, startResp.GetApplied(), "baseline call must report applied") + // startResp.Applied=true does not give us a revision; we seed from the + // first List call below — the mutator path needs a positive revision to + // bump from anyway. + + listCtx, listCancel := context.WithTimeout(context.Background(), 10*time.Second) + listResp, err := groupsClient.List(listCtx, &databasev1.GroupRegistryServiceListRequest{}) + listCancel() + require.NoError(t, err, "list seeded groups") + require.NotEmpty(t, listResp.GetGroup(), "PreloadSchema must seed at least one group") + for _, g := range listResp.GetGroup() { + if rev := g.GetMetadata().GetModRevision(); rev > latestRev.Load() { + latestRev.Store(rev) + } + } + t.Logf("seeded %d groups; baseline ModRevision=%d", len(listResp.GetGroup()), latestRev.Load()) + + // Background workers are bounded by harnessCtx; cancelled at end of run. + harnessCtx, harnessCancel := context.WithCancel(context.Background()) + defer harnessCancel() + + var ( + mutateOps atomic.Int64 + mutateErrs atomic.Int64 + barrierCalls atomic.Int64 + barrierErrs atomic.Int64 + barrierTimes atomic.Int64 + + samplingMu sync.Mutex + samples []time.Duration + // samplingActive guards `samples` writes so warm-up calls are not + // recorded. The mutex is acquired only on completion — gauging + // contention against the per-iteration RPC cost is negligible. + samplingActive atomic.Bool + ) + + // Mutator: 1 goroutine driving 10 ops/sec across the seeded groups + // (round-robin). Each iteration calls Group.Update with the latest List + // snapshot of the group; etcd bumps mod_revision on every Update even + // when the body is identical, so latestRev advances at the configured + // rate as long as the cluster is healthy. + mutateInterval := time.Second / time.Duration(*flagMutateRate) + go func() { + ticker := time.NewTicker(mutateInterval) + defer ticker.Stop() + idx := 0 + for { + select { + case <-harnessCtx.Done(): + return + case <-ticker.C: + } + if len(listResp.GetGroup()) == 0 { + return + } + g := listResp.GetGroup()[idx%len(listResp.GetGroup())] + idx++ + ctx, cancel := context.WithTimeout(harnessCtx, 5*time.Second) + updResp, updErr := groupsClient.Update(ctx, &databasev1.GroupRegistryServiceUpdateRequest{Group: g}) + cancel() + if updErr != nil { + mutateErrs.Add(1) + continue + } + mutateOps.Add(1) + if rev := updResp.GetModRevision(); rev > 0 { + for { + prev := latestRev.Load() + if rev <= prev || latestRev.CompareAndSwap(prev, rev) { + break + } + } + } + } + }() + + // Callers: N goroutines each looping AwaitRevisionApplied(latest+offset, T). + // On completion they immediately re-issue with the freshly-read latest + // snapshot, maintaining ~N in-flight calls throughout the measurement + // window. + var callerWg sync.WaitGroup + for i := 0; i < *flagCallers; i++ { + callerWg.Add(1) + go func() { + defer callerWg.Done() + for { + select { + case <-harnessCtx.Done(): + return + default: + } + target := latestRev.Load() + int64(*flagBarrierTarget) + start := time.Now() + resp, callErr := barrierClient.AwaitRevisionApplied(harnessCtx, + &schemav1.AwaitRevisionAppliedRequest{ + MinRevision: target, + Timeout: durationpb.New(*flagBarrierTimeout), + }) + duration := time.Since(start) + barrierCalls.Add(1) + if callErr != nil { + barrierErrs.Add(1) + // Brief jittered backoff so we do not hammer on the same + // failure (typically harnessCtx cancellation at end of run). + select { + case <-harnessCtx.Done(): + return + case <-time.After(time.Duration(rand.IntN(20)) * time.Millisecond): + } + continue + } + if !resp.GetApplied() { + barrierTimes.Add(1) + } + if samplingActive.Load() { + samplingMu.Lock() + samples = append(samples, duration) + samplingMu.Unlock() + } + } + }() + } + + t.Logf("warm-up: %s @ %d callers, %d mutate ops/sec, target=latest+%d, timeout=%s", + *flagWarmUp, *flagCallers, *flagMutateRate, *flagBarrierTarget, *flagBarrierTimeout) + startedAt := time.Now() + select { + case <-time.After(*flagWarmUp): + case <-harnessCtx.Done(): + } + + t.Logf("entering measurement window (%s)", *flagMeasure) + samplingActive.Store(true) + measureStart := time.Now() + select { + case <-time.After(*flagMeasure): + case <-harnessCtx.Done(): + } + samplingActive.Store(false) + t.Logf("measurement window finished after %s; cooling down callers", time.Since(measureStart)) + + harnessCancel() + callerWg.Wait() + + // Take a stable snapshot — no further appends can happen because + // samplingActive is false and goroutines have all returned. + samplingMu.Lock() + collected := append([]time.Duration(nil), samples...) + samplingMu.Unlock() + + report := loadReport{ + Profile: profile{ + WarmUp: flagWarmUp.String(), + Measure: flagMeasure.String(), + Callers: *flagCallers, + MutateRatePerSec: *flagMutateRate, + BarrierOffset: *flagBarrierTarget, + BarrierTimeout: flagBarrierTimeout.String(), + }, + StartedAt: startedAt.UTC().Format(time.RFC3339Nano), + FinishedAt: time.Now().UTC().Format(time.RFC3339Nano), + BarrierCalls: barrierCalls.Load(), + BarrierErrs: barrierErrs.Load(), + BarrierTimeouts: barrierTimes.Load(), + MutateOps: mutateOps.Load(), + MutateErrs: mutateErrs.Load(), + } + if len(collected) > 0 { + sort.Slice(collected, func(i, j int) bool { return collected[i] < collected[j] }) + report.P50Seconds = collected[percentileIndex(len(collected), 0.50)].Seconds() + report.P95Seconds = collected[percentileIndex(len(collected), 0.95)].Seconds() + report.P99Seconds = collected[percentileIndex(len(collected), 0.99)].Seconds() + report.MaxSeconds = collected[len(collected)-1].Seconds() + } + + t.Logf("report: calls=%d errs=%d timeouts=%d mutate=%d/%d p50=%.4fs p95=%.4fs p99=%.4fs max=%.4fs samples=%d", + report.BarrierCalls, report.BarrierErrs, report.BarrierTimeouts, + report.MutateOps, report.MutateOps+report.MutateErrs, + report.P50Seconds, report.P95Seconds, report.P99Seconds, report.MaxSeconds, + len(collected)) + + if *flagReportPath != "-" { + blob, marshalErr := json.MarshalIndent(report, "", " ") + require.NoError(t, marshalErr, "marshal load report") + require.NoError(t, os.WriteFile(*flagReportPath, blob, 0o644), "write load report to %s", *flagReportPath) + t.Logf("wrote report to %s", *flagReportPath) + } else { + blob, _ := json.MarshalIndent(report, "", " ") + fmt.Println(string(blob)) + } + + if report.BarrierCalls == 0 || len(collected) == 0 { + t.Fatalf("no barrier samples collected during measurement window") + } +} + +// percentileIndex returns the nearest-rank index for a 0..1 percentile in a +// sorted slice of length n. The traditional Prometheus-bucket interpretation +// is "the value at or below which p×100% of observations fall"; for a +// ~30 000-sample run the off-by-one between nearest-rank and linear +// interpolation is well below the SLO's measurement noise. +func percentileIndex(n int, p float64) int { + if n == 0 { + return 0 + } + idx := int(float64(n-1) * p) + if idx < 0 { + return 0 + } + if idx >= n { + return n - 1 + } + return idx +}
