This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit abedc3f068092c0e61b7eb795e204b51d0177195 Author: Hongtao Gao <[email protected]> AuthorDate: Sat May 9 15:05:11 2026 +0000 feat(soak): G5d soak harness — docker-compose + soak-driver + runbook Land the Phase A harness for the G5d soak gate. Local-only, no kind/k8s, no cloud — sized for a 31 GB no-swap dev box. Architecture: sequential, single-stack. Phase 0 boots vec-off, seeds a deterministic group + measure (soak/soak_metric), records a baseline over the seed window, snapshots the data dir, tears down. Phase 1 boots vec-on with the restored snapshot, drives synthetic + OAP traffic for SOAK_HOURS, captures pprof every PPROF_INTERVAL_MIN, replays the catalog every PARITY_INTERVAL_MIN to diff against the baseline, and tails BanyanDB logs for MemoryTracker exhaustion. The parity catalog is time-bounded to the pre-snapshot window so OAP's ongoing writes do not pollute the diff. Files: test/soak/docker-compose.soak.yaml (235 LOC) — overlay over the quick-start. BanyanDB built from local source so the vectorized branch is exercised. Every container has explicit memory + cpus limits totalling ~6.5 GB. Banyandb runs as host UID/GID (${SOAK_UID}:${SOAK_GID}) so the bind-mounted /data is readable from the host shell for snapshot/restore. Default root paths are overridden to /data so the bind mount actually contains the data (--measure-root-path, --stream-root-path, --property-root-path, --trace-root-path, --schema-server-root-path). test/soak/Dockerfile.banyand (52 LOC) — multi-stage build of /banyand from this branch's source. cmd/soak-driver/main.go (649 LOC) — Go CLI with four subcommands: seed-fixture (creates Group + Measure + writes N deterministic rows, polls until visible — handles the 5s flush timeout), record-baseline (queries the catalog and persists proto-marshalled DataPoints), replay-and-diff (re-issues the same queries against current state and diffs byte-identically via proto.Equal; tolerant of per-query failures, fails only on divergence), pprof-grab (heap.pb.gz + goroutine.txt). cmd/soak-driver/catalog/default.json (9 LOC) — single query against the seeded soak_metric. Limit raised so 1000-row seeds aren't truncated to BanyanDB's default 100. scripts/soak-vectorized.sh (329 LOC) — orchestrator. SMOKE=1 runs ~25 min for end-to-end validation. Captures host UID/GID. Cleanup trap on SIGINT/TERM/EXIT. Goroutine count parsed from "goroutine profile: total N" header. docs/soak/g5d-runbook.md (265 LOC) — start-to-finish instructions, artefact layout, and how each artefact maps to the four spec acceptance criteria. Smoke (SMOKE=1, ~25 min) validated end-to-end on this branch: - 1000 rows seeded, 954 captured in baseline (95.4%, edge-truncation) - 20+ replay-and-diff iterations, all PASS (0 divergences) - 23 pprof captures spanning the 20-min Phase 1 window - Goroutine count: 556 → 556 (0% drift, threshold ≤5%) - MemoryTracker alerts: 0 - Final parity_pass=true, exit 0 - Memory peak (docker stats during smoke): banyandb ~110 MB, oap ~590 MB, no OOM kills against the 2 GB / 1 GB caps. Eleven smoke iterations were needed during bring-up; root-cause notes left in the script's comments where they shape config (5s flush timeouts, ms-truncated timestamps, 100-row default query Limit, root paths defaulting to /tmp). Deferred: - The 48 h run itself: harness is ready, calendar window is yours. - kind/k8s deployment shape: not needed for the four G5d criteria; runbook documents how to scale duration via env vars. --- .gitignore | 4 + cmd/soak-driver/catalog/default.json | 9 + cmd/soak-driver/main.go | 649 +++++++++++++++++++++++++++++++++++ docs/soak/g5d-runbook.md | 265 ++++++++++++++ scripts/soak-vectorized.sh | 329 ++++++++++++++++++ test/soak/Dockerfile.banyand | 52 +++ test/soak/docker-compose.soak.yaml | 235 +++++++++++++ 7 files changed, 1543 insertions(+) diff --git a/.gitignore b/.gitignore index 4785e2c8b..59896f986 100644 --- a/.gitignore +++ b/.gitignore @@ -87,3 +87,7 @@ fodc/agent/internal/ktm/iomonitor/ebpf/generated/vmlinux.h # Vectorized benchmark report artifacts (G5a) dist/bench/ +# G5d soak harness runtime artifacts +dist/soak/ +test/soak/data/ + diff --git a/cmd/soak-driver/catalog/default.json b/cmd/soak-driver/catalog/default.json new file mode 100644 index 000000000..b78cf5ece --- /dev/null +++ b/cmd/soak-driver/catalog/default.json @@ -0,0 +1,9 @@ +[ + { + "name": "soak_metric", + "groups": ["soak"], + "field_projection": { + "names": ["value", "count"] + } + } +] diff --git a/cmd/soak-driver/main.go b/cmd/soak-driver/main.go new file mode 100644 index 000000000..85d962a25 --- /dev/null +++ b/cmd/soak-driver/main.go @@ -0,0 +1,649 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package main implements the soak-driver CLI for the G5d soak harness. +// It provides three subcommands: record-baseline, replay-and-diff, and pprof-grab. +package main + +import ( + "bufio" + "compress/gzip" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/spf13/cobra" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" +) + +// Soak fixture constants — used by seed-fixture and the default query +// catalog so the harness does not depend on OAP-driven measure naming. +const ( + soakGroup = "soak" + soakMeasure = "soak_metric" + soakTagFamily = "default" + soakTagService = "service" + soakTagInstance = "instance_id" + soakFieldVal = "value" + soakFieldCount = "count" +) + +// catalogEntry holds the user-visible fields from the JSON catalog. +// TimeRange is injected at runtime. +type catalogEntry struct { + FieldProjection *measurev1.QueryRequest_FieldProjection `json:"field_projection,omitempty"` + Name string `json:"name"` + Groups []string `json:"groups"` +} + +// baselineRecord is persisted to disk after record-baseline runs. +type baselineRecord struct { + QueryName string `json:"query_name"` + DataPoints []json.RawMessage `json:"data_points"` + Groups []string `json:"groups"` + UntilMs int64 `json:"until_ms"` +} + +// diffReport is written by replay-and-diff. +type diffReport struct { + RunAt string `json:"run_at"` + Divergences []divergence `json:"divergences"` + QueriesRun int `json:"queries_run"` + Pass bool `json:"pass"` +} + +type divergence struct { + QueryName string `json:"query_name"` + FirstDiffs []pointDiff `json:"first_diffs,omitempty"` + BaselineLen int `json:"baseline_len"` + ReplayLen int `json:"replay_len"` +} + +type pointDiff struct { + Baseline string `json:"baseline"` + Replay string `json:"replay"` + Index int `json:"index"` +} + +func main() { + root := &cobra.Command{ + Use: "soak-driver", + Short: "G5d soak harness driver — baseline, diff, and pprof capture", + } + root.AddCommand(newSeedFixtureCmd(), newRecordBaselineCmd(), newReplayAndDiffCmd(), newPprofGrabCmd()) + if execErr := root.Execute(); execErr != nil { + _, _ = fmt.Fprintln(os.Stderr, execErr) + os.Exit(1) + } +} + +// dialInsecure opens an unauthenticated gRPC connection to addr. +func dialInsecure(addr string) (*grpc.ClientConn, error) { + conn, connErr := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if connErr != nil { + return nil, fmt.Errorf("grpc.NewClient %s: %w", addr, connErr) + } + return conn, nil +} + +// loadCatalog reads the JSON catalog file and returns its entries. +func loadCatalog(path string) ([]catalogEntry, error) { + raw, readErr := os.ReadFile(path) + if readErr != nil { + return nil, fmt.Errorf("read catalog %s: %w", path, readErr) + } + var entries []catalogEntry + if unmarshalErr := json.Unmarshal(raw, &entries); unmarshalErr != nil { + return nil, fmt.Errorf("unmarshal catalog: %w", unmarshalErr) + } + return entries, nil +} + +// buildQueryRequest constructs a MeasureService QueryRequest from a catalog entry. +// The time range covers [begin, untilMs] where begin defaults to 1 hour before untilMs. +// Limit is set high so a 1000-row seed isn't truncated to BanyanDB's default 100. +func buildQueryRequest(entry catalogEntry, untilMs int64) *measurev1.QueryRequest { + untilTime := time.UnixMilli(untilMs) + beginTime := untilTime.Add(-1 * time.Hour) + req := &measurev1.QueryRequest{ + Name: entry.Name, + Groups: entry.Groups, + TimeRange: &modelv1.TimeRange{ + Begin: timestamppb.New(beginTime), + End: timestamppb.New(untilTime), + }, + Limit: 100000, + } + if entry.FieldProjection != nil { + req.FieldProjection = entry.FieldProjection + } + return req +} + +// newRecordBaselineCmd returns the record-baseline subcommand. +func newRecordBaselineCmd() *cobra.Command { + var addr, catalogPath, outPath string + var untilMs int64 + + cmd := &cobra.Command{ + Use: "record-baseline", + Short: "Query measures up to --until and write a deterministic baseline JSON", + RunE: func(_ *cobra.Command, _ []string) error { + entries, loadErr := loadCatalog(catalogPath) + if loadErr != nil { + return loadErr + } + conn, dialErr := dialInsecure(addr) + if dialErr != nil { + return dialErr + } + defer conn.Close() + + client := measurev1.NewMeasureServiceClient(conn) + var records []baselineRecord + succeeded, failed := 0, 0 + + for _, entry := range entries { + req := buildQueryRequest(entry, untilMs) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + resp, queryErr := client.Query(ctx, req) + cancel() + if queryErr != nil { + failed++ + fmt.Printf("[record-baseline] %s: SKIP (%v)\n", entry.Name, queryErr) + continue + } + rec := baselineRecord{ + QueryName: entry.Name, + Groups: entry.Groups, + UntilMs: untilMs, + } + for _, dp := range resp.GetDataPoints() { + raw, marshalErr := protojson.Marshal(dp) + if marshalErr != nil { + return fmt.Errorf("marshal data point for %s: %w", entry.Name, marshalErr) + } + rec.DataPoints = append(rec.DataPoints, json.RawMessage(raw)) + } + records = append(records, rec) + succeeded++ + fmt.Printf("[record-baseline] %s: %d data points\n", entry.Name, len(rec.DataPoints)) + } + if succeeded == 0 { + return fmt.Errorf("record-baseline: all %d catalog queries failed (no usable baseline)", failed) + } + fmt.Printf("[record-baseline] %d queries succeeded, %d skipped\n", succeeded, failed) + + out, createErr := os.Create(outPath) + if createErr != nil { + return fmt.Errorf("create output %s: %w", outPath, createErr) + } + defer out.Close() + + enc := json.NewEncoder(out) + enc.SetIndent("", " ") + if encErr := enc.Encode(records); encErr != nil { + return fmt.Errorf("encode baseline: %w", encErr) + } + fmt.Printf("[record-baseline] written to %s\n", outPath) + return nil + }, + } + cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC address") + cmd.Flags().StringVar(&catalogPath, "catalog", "", "Path to query catalog JSON") + cmd.Flags().Int64Var(&untilMs, "until", 0, "Upper bound for time range (unix milliseconds)") + cmd.Flags().StringVar(&outPath, "out", "baseline.json", "Output path for baseline JSON") + _ = cmd.MarkFlagRequired("catalog") + _ = cmd.MarkFlagRequired("until") + return cmd +} + +// newReplayAndDiffCmd returns the replay-and-diff subcommand. +func newReplayAndDiffCmd() *cobra.Command { + var addr, catalogPath, baselinePath, reportPath string + + cmd := &cobra.Command{ + Use: "replay-and-diff", + Short: "Re-run catalog queries and compare against baseline; exit non-zero on divergence", + RunE: func(_ *cobra.Command, _ []string) error { + raw, readErr := os.ReadFile(baselinePath) + if readErr != nil { + return fmt.Errorf("read baseline %s: %w", baselinePath, readErr) + } + var records []baselineRecord + if unmarshalErr := json.Unmarshal(raw, &records); unmarshalErr != nil { + return fmt.Errorf("unmarshal baseline: %w", unmarshalErr) + } + + entries, loadErr := loadCatalog(catalogPath) + if loadErr != nil { + return loadErr + } + + conn, dialErr := dialInsecure(addr) + if dialErr != nil { + return dialErr + } + defer conn.Close() + + client := measurev1.NewMeasureServiceClient(conn) + report := diffReport{ + RunAt: time.Now().UTC().Format(time.RFC3339), + Pass: true, + } + + // Build a map from name to baseline record for O(1) lookup. + baselineMap := make(map[string]baselineRecord, len(records)) + for _, rec := range records { + baselineMap[rec.QueryName] = rec + } + + for _, entry := range entries { + rec, ok := baselineMap[entry.Name] + if !ok { + // Catalog has a query the baseline doesn't (e.g. baseline + // skipped it because the measure wasn't installed yet). + // Skip the diff for this entry rather than failing the + // whole replay. + fmt.Printf("[replay-and-diff] %s: SKIP (no baseline)\n", entry.Name) + continue + } + req := buildQueryRequest(entry, rec.UntilMs) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + resp, queryErr := client.Query(ctx, req) + cancel() + if queryErr != nil { + // Treat replay-side query failure as a divergence so the + // pass/fail signal still flips, but don't abort early — + // we want to attempt every catalog entry. + report.Divergences = append(report.Divergences, divergence{ + QueryName: entry.Name, + }) + report.Pass = false + fmt.Printf("[replay-and-diff] %s: FAIL (%v)\n", entry.Name, queryErr) + continue + } + report.QueriesRun++ + + replayDPs := resp.GetDataPoints() + if len(replayDPs) != len(rec.DataPoints) { + div := divergence{ + QueryName: entry.Name, + BaselineLen: len(rec.DataPoints), + ReplayLen: len(replayDPs), + } + report.Divergences = append(report.Divergences, div) + report.Pass = false + continue + } + + div := divergence{QueryName: entry.Name, BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)} + hasDiff := false + for idx, baselineRaw := range rec.DataPoints { + baselineDP := new(measurev1.DataPoint) + if parseErr := protojson.Unmarshal(baselineRaw, baselineDP); parseErr != nil { + return fmt.Errorf("unmarshal baseline dp %d for %s: %w", idx, entry.Name, parseErr) + } + if !proto.Equal(baselineDP, replayDPs[idx]) { + hasDiff = true + if len(div.FirstDiffs) < 3 { + div.FirstDiffs = append(div.FirstDiffs, pointDiff{ + Index: idx, + Baseline: baselineDP.String(), + Replay: replayDPs[idx].String(), + }) + } + } + } + if hasDiff { + report.Divergences = append(report.Divergences, div) + report.Pass = false + } + } + + outFile, createErr := os.Create(reportPath) + if createErr != nil { + return fmt.Errorf("create report %s: %w", reportPath, createErr) + } + defer outFile.Close() + + enc := json.NewEncoder(outFile) + enc.SetIndent("", " ") + if encErr := enc.Encode(report); encErr != nil { + return fmt.Errorf("encode report: %w", encErr) + } + fmt.Printf("[replay-and-diff] %d queries run, %d divergences — %s\n", + report.QueriesRun, len(report.Divergences), map[bool]string{true: "PASS", false: "FAIL"}[report.Pass]) + + if !report.Pass { + return fmt.Errorf("parity check failed: %d divergences found", len(report.Divergences)) + } + return nil + }, + } + cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC address") + cmd.Flags().StringVar(&catalogPath, "catalog", "", "Path to query catalog JSON") + cmd.Flags().StringVar(&baselinePath, "baseline", "baseline.json", "Path to baseline JSON produced by record-baseline") + cmd.Flags().StringVar(&reportPath, "report", "diff.json", "Output path for diff report JSON") + _ = cmd.MarkFlagRequired("catalog") + return cmd +} + +// newPprofGrabCmd returns the pprof-grab subcommand. +func newPprofGrabCmd() *cobra.Command { + var addr, outDir string + + cmd := &cobra.Command{ + Use: "pprof-grab", + Short: "Capture heap and goroutine pprof profiles; print goroutine count to stdout", + RunE: func(_ *cobra.Command, _ []string) error { + if mkdirErr := os.MkdirAll(outDir, 0o750); mkdirErr != nil { + return fmt.Errorf("mkdir %s: %w", outDir, mkdirErr) + } + ts := strconv.FormatInt(time.Now().Unix(), 10) + + heapPath := filepath.Join(outDir, fmt.Sprintf("heap-%s.pb.gz", ts)) + goroutinePath := filepath.Join(outDir, fmt.Sprintf("goroutine-%s.txt", ts)) + + baseURL := fmt.Sprintf("http://%s/debug/pprof", addr) + if fetchErr := fetchGzip(baseURL+"/heap", heapPath); fetchErr != nil { + return fmt.Errorf("fetch heap profile: %w", fetchErr) + } + + goroutineBody, fetchErr := fetchBytes(baseURL + "/goroutine?debug=1") + if fetchErr != nil { + return fmt.Errorf("fetch goroutine profile: %w", fetchErr) + } + if writeErr := os.WriteFile(goroutinePath, goroutineBody, 0o600); writeErr != nil { + return fmt.Errorf("write goroutine file: %w", writeErr) + } + + count := countGoroutines(goroutineBody) + fmt.Printf("%d\n", count) + return nil + }, + } + cmd.Flags().StringVar(&addr, "addr", "localhost:6060", "BanyanDB pprof/debug HTTP address (host:port)") + cmd.Flags().StringVar(&outDir, "out-dir", ".", "Directory to write profile files into") + return cmd +} + +// newSeedFixtureCmd creates a deterministic Group + Measure schema and +// writes N data points into it, so the parity-diff path does not depend +// on OAP-managed measure naming. Idempotent on the schema side: if the +// group/measure already exist, we proceed straight to the writes. +// +// Layout: +// +// group = "soak" +// measure = "soak_metric" +// tags = service (string), instance_id (int) — family "default" +// fields = value (int), count (int) +// rows = N rows at 1-second intervals ending now-1 minute +// +// Writes use the streaming MeasureService.Write RPC. After completion, +// the highest timestamp written is printed to stdout — pass it as +// --until to record-baseline so the time-bounded queries hit only this +// fixture. +func newSeedFixtureCmd() *cobra.Command { + var addr string + var rows int + + cmd := &cobra.Command{ + Use: "seed-fixture", + Short: "Create a deterministic group/measure and write N rows", + RunE: func(_ *cobra.Command, _ []string) error { + conn, dialErr := dialInsecure(addr) + if dialErr != nil { + return dialErr + } + defer conn.Close() + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + groupClient := databasev1.NewGroupRegistryServiceClient(conn) + if _, createErr := groupClient.Create(ctx, &databasev1.GroupRegistryServiceCreateRequest{ + Group: &commonv1.Group{ + Metadata: &commonv1.Metadata{Name: soakGroup}, + Catalog: commonv1.Catalog_CATALOG_MEASURE, + ResourceOpts: &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, + Num: 1, + }, + Ttl: &commonv1.IntervalRule{ + Unit: commonv1.IntervalRule_UNIT_DAY, + Num: 7, + }, + }, + }, + }); createErr != nil && !strings.Contains(createErr.Error(), "already exist") { + return fmt.Errorf("create group: %w", createErr) + } + + measureClient := databasev1.NewMeasureRegistryServiceClient(conn) + if _, createErr := measureClient.Create(ctx, &databasev1.MeasureRegistryServiceCreateRequest{ + Measure: &databasev1.Measure{ + Metadata: &commonv1.Metadata{Name: soakMeasure, Group: soakGroup}, + TagFamilies: []*databasev1.TagFamilySpec{{ + Name: soakTagFamily, + Tags: []*databasev1.TagSpec{ + {Name: soakTagService, Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: soakTagInstance, Type: databasev1.TagType_TAG_TYPE_INT}, + }, + }}, + Fields: []*databasev1.FieldSpec{ + { + Name: soakFieldVal, + FieldType: databasev1.FieldType_FIELD_TYPE_INT, + EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, + CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, + }, + { + Name: soakFieldCount, + FieldType: databasev1.FieldType_FIELD_TYPE_INT, + EncodingMethod: databasev1.EncodingMethod_ENCODING_METHOD_GORILLA, + CompressionMethod: databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD, + }, + }, + Entity: &databasev1.Entity{TagNames: []string{soakTagService, soakTagInstance}}, + }, + }); createErr != nil && !strings.Contains(createErr.Error(), "already exist") { + return fmt.Errorf("create measure: %w", createErr) + } + + writeClient := measurev1.NewMeasureServiceClient(conn) + writeStream, streamErr := writeClient.Write(ctx) + if streamErr != nil { + return fmt.Errorf("open write stream: %w", streamErr) + } + + // BanyanDB's timestamp validator rejects sub-millisecond + // precision (Status_STATUS_INVALID_TIMESTAMP). Truncate to + // millisecond before deriving the seed window. + now := time.Now().Truncate(time.Millisecond).Add(-1 * time.Minute) + lowestMs := now.Add(-time.Duration(rows) * time.Second).UnixMilli() + highestMs := now.Add(-time.Second).UnixMilli() + md := &commonv1.Metadata{Name: soakMeasure, Group: soakGroup} + for i := range rows { + ts := now.Add(-time.Duration(rows-1-i) * time.Second) + dp := &measurev1.DataPointValue{ + Timestamp: timestamppb.New(ts), + TagFamilies: []*modelv1.TagFamilyForWrite{{ + Tags: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc-%d", i%4)}}}, + {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: int64(i % 8)}}}, + }, + }}, + Fields: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(1000 + i)}}}, + {Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(1)}}}, + }, + } + if sendErr := writeStream.Send(&measurev1.WriteRequest{ + Metadata: md, + DataPoint: dp, + MessageId: uint64(time.Now().UnixNano()), + }); sendErr != nil { + return fmt.Errorf("send dp %d: %w", i, sendErr) + } + } + if closeErr := writeStream.CloseSend(); closeErr != nil { + return fmt.Errorf("close write stream: %w", closeErr) + } + // Drain server responses; status != STATUS_SUCCEED means the + // write was rejected (e.g. unknown measure, schema mismatch). + succeeded, badStatus := 0, 0 + var firstBadStatus string + for { + resp, recvErr := writeStream.Recv() + if recvErr == io.EOF { + break + } + if recvErr != nil { + return fmt.Errorf("recv ack: %w", recvErr) + } + if resp.GetStatus() == modelv1.Status_STATUS_SUCCEED.String() { + succeeded++ + continue + } + badStatus++ + if firstBadStatus == "" { + firstBadStatus = resp.GetStatus() + } + } + fmt.Printf("[seed-fixture] write acks: %d succeeded, %d failed (first status: %q)\n", + succeeded, badStatus, firstBadStatus) + if succeeded == 0 { + return fmt.Errorf("seed-fixture: 0 of %d writes succeeded; first bad status: %q", rows, firstBadStatus) + } + + fmt.Printf("[seed-fixture] wrote %d rows to %s/%s, ts range [%d, %d] ms\n", + rows, soakGroup, soakMeasure, lowestMs, highestMs) + + // Wait until the writes are queryable. BanyanDB buffers + // measure data in memory and flushes on a timer (default + // 5s) — querying immediately can return zero results. Poll + // the measure with a 1-hour time range covering the seed + // window until at least 90% of the rows are visible, with a + // 60s overall cap. + queryClient := measurev1.NewMeasureServiceClient(conn) + deadline := time.Now().Add(60 * time.Second) + req := &measurev1.QueryRequest{ + Name: soakMeasure, + Groups: []string{soakGroup}, + TimeRange: &modelv1.TimeRange{ + Begin: timestamppb.New(time.UnixMilli(lowestMs).Add(-time.Second)), + End: timestamppb.New(time.UnixMilli(highestMs).Add(time.Second)), + }, + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{soakFieldVal, soakFieldCount}}, + Limit: uint32(rows * 2), + } + minVisible := int(float64(rows) * 0.9) + var lastSeen int + for time.Now().Before(deadline) { + qctx, qcancel := context.WithTimeout(context.Background(), 5*time.Second) + resp, qerr := queryClient.Query(qctx, req) + qcancel() + if qerr == nil && resp != nil { + lastSeen = len(resp.GetDataPoints()) + if lastSeen >= minVisible { + fmt.Printf("[seed-fixture] %d rows visible to query\n", lastSeen) + fmt.Printf("%d\n", highestMs) + return nil + } + } + time.Sleep(2 * time.Second) + } + return fmt.Errorf("seed-fixture: only %d of %d rows visible after 60s — flush/index lag", lastSeen, rows) + }, + } + cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC address") + cmd.Flags().IntVar(&rows, "rows", 1000, "number of data points to write") + return cmd +} + +// fetchGzip downloads url and writes the body as a gzip file at dst. +// The pprof heap endpoint already returns a gzip-compressed protobuf, so we +// just pipe the bytes through. +func fetchGzip(url, dst string) error { + body, fetchErr := fetchBytes(url) + if fetchErr != nil { + return fetchErr + } + f, createErr := os.Create(dst) + if createErr != nil { + return fmt.Errorf("create %s: %w", dst, createErr) + } + defer f.Close() + + // The pprof heap endpoint returns raw gzip; write it directly. + gw := gzip.NewWriter(f) + if _, writeErr := gw.Write(body); writeErr != nil { + return fmt.Errorf("gzip write: %w", writeErr) + } + return gw.Close() +} + +// fetchBytes issues a GET request and returns the response body. +func fetchBytes(url string) ([]byte, error) { + resp, getErr := http.Get(url) //nolint:gosec,noctx + if getErr != nil { + return nil, fmt.Errorf("GET %s: %w", url, getErr) + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("GET %s: status %d", url, resp.StatusCode) + } + data, readErr := io.ReadAll(resp.Body) + if readErr != nil { + return nil, fmt.Errorf("read body from %s: %w", url, readErr) + } + return data, nil +} + +// countGoroutines parses the text output of /debug/pprof/goroutine?debug=1 +// and returns the total goroutine count from the first "goroutine N [" lines. +func countGoroutines(body []byte) int { + count := 0 + scanner := bufio.NewScanner(strings.NewReader(string(body))) + for scanner.Scan() { + line := scanner.Text() + if strings.HasPrefix(line, "goroutine ") && strings.Contains(line, " [") { + count++ + } + } + return count +} diff --git a/docs/soak/g5d-runbook.md b/docs/soak/g5d-runbook.md new file mode 100644 index 000000000..95c25e9d6 --- /dev/null +++ b/docs/soak/g5d-runbook.md @@ -0,0 +1,265 @@ +# G5d Soak Harness Runbook + +This document covers how to run and interpret the G5d soak test for the +vectorized-query path in SkyWalking BanyanDB. + +## Prerequisites + +- Docker daemon running and reachable (`docker info` succeeds). +- Docker Compose v2 (`docker compose version`). +- At least ~6 GB RAM headroom on the host (the compose stack peaks at ~5.4 GB). + The host must have **no swap** — the resource limits in the compose file are the + only protection against OOM kills. +- At least 10 GB free disk under the repo root for data snapshots, profiles, and logs. +- Go toolchain installed (used to build `soak-driver` and the BanyanDB image). +- The `vectorized-query` branch checked out locally. + +Check headroom before starting: + +```bash +free -h # look at "available" column +df -h . # look at "Avail" column +``` + +## Smoke Validation + +Run a condensed ~30-minute smoke to verify the harness is wired correctly +before committing to a 48-hour run: + +```bash +cd /path/to/repo +SMOKE=1 ./scripts/soak-vectorized.sh +``` + +Expected behaviour: + +| Step | What you should see | +|---|---| +| Build | `soak-driver built at bin/soak-driver` | +| Phase 0 up | `BanyanDB is healthy` | +| Warmup | 2-minute pause | +| Snapshot | `Copying data to dist/soak/<ts>/data-snapshot/` | +| Baseline | `[record-baseline] written to dist/soak/<ts>/baseline.json` | +| Phase 1 up | `BanyanDB is healthy` | +| pprof-start | `Initial pprof captured` | +| Soak loop | Parity and pprof messages every 1 minute | +| pprof-end | `Final pprof captured` | +| Summary | `Summary written to dist/soak/<ts>/summary.json` | + +Expected runtime: 25–35 minutes. + +Healthy artefact tree after smoke: + +``` +dist/soak/<ts>/ + baseline.json – array of per-query baseline records + data-snapshot/ – raw BanyanDB data directory (vec-off) + pprof-start/ + heap-<unix>.pb.gz + goroutine-<unix>.txt + pprof-<ts>/ – one dir per interval + heap-<unix>.pb.gz + goroutine-<unix>.txt + pprof-end/ + heap-<unix>.pb.gz + goroutine-<unix>.txt + diff-<ts>.json – one per parity interval + diff-final.json + banyand.log – full stdout/stderr from the BanyanDB container + memory-alerts.log – lines matching MemoryTracker / budget exhausted + summary.json – machine-readable summary +``` + +Check `summary.json` for a healthy smoke result: + +```json +{ + "final_parity_pass": true, + "goroutine_count_start": <N>, + "goroutine_count_end": <M>, // M/N ≤ 1.05 is healthy + "memory_alert_lines": 0 +} +``` + +## Production 48-Hour Run + +```bash +cd /path/to/repo +./scripts/soak-vectorized.sh +``` + +Key environment variables: + +| Variable | Default | Purpose | +|---|---|---| +| `WARMUP_MIN` | 60 | Minutes of OAP write traffic before snapshot | +| `SOAK_HOURS` | 48 | Total Phase 1 duration | +| `PPROF_INTERVAL_MIN` | 30 | Minutes between heap/goroutine captures | +| `PARITY_INTERVAL_MIN` | 5 | Minutes between replay-and-diff runs | + +Monitoring during the run: + +```bash +# Live BanyanDB logs +tail -f dist/soak/<ts>/banyand.log + +# MemoryTracker alerts (should stay empty) +tail -f dist/soak/<ts>/memory-alerts.log + +# Live parity results +ls -ltr dist/soak/<ts>/diff-*.json +cat dist/soak/<ts>/diff-<latest>.json | jq .pass + +# Container resource usage +docker stats banyandb skywalking-oap +``` + +To abort safely, press Ctrl-C. The `trap` in the script runs +`docker compose down -v` before exiting, leaving no dangling containers. + +## Reading the Artefacts Against the Four Acceptance Criteria + +### Criterion 1 — ≥48h staging run with `--measure-vectorized-enabled=true` + +Check `summary.json`: + +```bash +jq '{smoke, soak_hours}' dist/soak/<ts>/summary.json +``` + +`smoke` must be `"false"` (or absent) and `soak_hours` must be `48`. +The script logs a timestamped start and end line in the terminal output; +their difference confirms the wall-clock duration. + +### Criterion 2 — No parity regression on `[]*measurev1.InternalDataPoint` + +Every `diff-*.json` file must have `"pass": true`. +Check them all at once: + +```bash +jq -r '.pass' dist/soak/<ts>/diff-*.json | sort | uniq -c +``` + +All lines should read `true`. A quick overview: + +```bash +jq -s '[.[].pass] | {total: length, passing: map(select(. == true)) | length}' \ + dist/soak/<ts>/diff-*.json +``` + +The final canonical result is `diff-final.json` and is also reflected in +`summary.json` under `"final_parity_pass"`. + +### Criterion 3 — No MemoryTracker budget exhaustion + +```bash +wc -l dist/soak/<ts>/memory-alerts.log +cat dist/soak/<ts>/memory-alerts.log +``` + +A healthy run produces an empty file (0 lines). Any non-zero line count is a +failure; see the **Failure Modes** section. + +`summary.json` also records `"memory_alert_lines"` for CI consumption. + +### Criterion 4 — No goroutine leak (heap delta ≤5%) + +Compare the goroutine counts in the start and end profiles: + +```bash +# Count goroutines in start profile +grep -c '^goroutine ' dist/soak/<ts>/pprof-start/goroutine-*.txt + +# Count goroutines in end profile +grep -c '^goroutine ' dist/soak/<ts>/pprof-end/goroutine-*.txt +``` + +Calculate ratio: `end / start`. A ratio ≤1.05 passes. + +`summary.json` captures `goroutine_count_start` and `goroutine_count_end` +for automated evaluation. + +For a heap size delta, use the `go tool pprof` binary: + +```bash +go tool pprof -top dist/soak/<ts>/pprof-start/heap-*.pb.gz +go tool pprof -top dist/soak/<ts>/pprof-end/heap-*.pb.gz +``` + +The in-use bytes between start and end must not grow unboundedly. + +## Failure Modes + +### MemoryTracker budget exhaustion + +**Symptom**: `memory-alerts.log` is non-empty; BanyanDB may slow down or log +errors. + +**Action**: +1. Check `banyand.log` for context around the alert timestamp. +2. Reduce query concurrency in `catalog/default.json` (fewer simultaneous + queries) or lower field projections. +3. If the budget is misconfigured, adjust the BanyanDB `--measure-memory-budget` + flag in the compose service command. + +### Goroutine leak + +**Symptom**: `goroutine_count_end / goroutine_count_start > 1.05`. + +**Action**: +1. Diff the goroutine profiles: + ```bash + diff dist/soak/<ts>/pprof-start/goroutine-*.txt \ + dist/soak/<ts>/pprof-end/goroutine-*.txt | head -80 + ``` +2. Identify which goroutine stacks appear only in the end profile. +3. File a bug against the vectorized pipeline with the diffed profiles attached. + +### Parity divergence + +**Symptom**: Any `diff-*.json` has `"pass": false`; `summary.json` +`"final_parity_pass"` is `false`. + +**Action**: +1. Open the failing diff file: + ```bash + jq '.divergences' dist/soak/<ts>/diff-<ts>.json + ``` +2. The `first_diffs` array shows the first 3 mismatched data points (index, + baseline string, replay string). +3. Reproduce the query manually: + ```bash + bin/soak-driver replay-and-diff \ + --addr localhost:17912 \ + --catalog cmd/soak-driver/catalog/default.json \ + --baseline dist/soak/<ts>/baseline.json \ + --report /tmp/debug-diff.json + ``` +4. Check whether the divergence is in field values, tag values, or shard IDs. +5. File a bug against the vectorized query path with the diff JSON attached. + +### OOMKill + +**Symptom**: A container disappears; `docker compose ps` shows it as `exited`. + +**Action**: +1. `docker inspect <container> | jq '.[].State.OOMKilled'` — if `true`, the + container exceeded its memory limit. +2. Check `docker stats` history for which container breached its limit. +3. The per-container limits in `test/soak/docker-compose.soak.yaml` must be + respected. Do **not** increase them without re-evaluating the host budget. +4. If BanyanDB OOM-kills, reduce the vectorized batch size or the number of + concurrent queries in the catalog. + +## Rollback + +To run Phase 1 with the flag disabled (reproducing the baseline behaviour): + +```bash +BANYANDB_VEC_ENABLED=false \ +SOAK_DATA_DIR=./test/soak/data \ +docker compose -f test/soak/docker-compose.soak.yaml up -d +``` + +This is also the normal state after any failed Phase 1: tear down the stack +with `docker compose -f test/soak/docker-compose.soak.yaml down -v` and +restart with `BANYANDB_VEC_ENABLED=false`. diff --git a/scripts/soak-vectorized.sh b/scripts/soak-vectorized.sh new file mode 100755 index 000000000..b47048dfd --- /dev/null +++ b/scripts/soak-vectorized.sh @@ -0,0 +1,329 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# G5d soak harness orchestrator. +# +# Configuration (set as env vars before running): +# WARMUP_MIN – minutes OAP has to write data before baseline snapshot (default 60) +# SOAK_HOURS – duration of the vec-on Phase 1 run in hours (default 48) +# PPROF_INTERVAL_MIN – minutes between each pprof capture (default 30) +# PARITY_INTERVAL_MIN – minutes between each replay-and-diff run (default 5) +# SMOKE – set to 1 for a quick ~30-min smoke run (overrides durations) +# +# Artefacts are written under dist/soak/<timestamp>/ relative to the repo root. +# +# Usage: +# ./scripts/soak-vectorized.sh +# SMOKE=1 ./scripts/soak-vectorized.sh + +set -euo pipefail + +# ── configuration ──────────────────────────────────────────────────────────── +WARMUP_MIN="${WARMUP_MIN:-60}" +SOAK_HOURS="${SOAK_HOURS:-48}" +PPROF_INTERVAL_MIN="${PPROF_INTERVAL_MIN:-30}" +PARITY_INTERVAL_MIN="${PARITY_INTERVAL_MIN:-5}" + +if [[ "${SMOKE:-}" == "1" ]]; then + # SMOKE skips the OAP-warmup wait — parity is driven by deterministic + # data seeded by `soak-driver seed-fixture`, so we don't depend on OAP + # propagation timing. + WARMUP_MIN=0 + SOAK_HOURS=0.34 # ~20 min — fits inside a tractable smoke window + PPROF_INTERVAL_MIN=1 + PARITY_INTERVAL_MIN=1 +fi + +SEED_ROWS="${SEED_ROWS:-1000}" + +SOAK_HOURS_SEC=$(awk "BEGIN{printf \"%d\", ${SOAK_HOURS}*3600}") +WARMUP_SEC=$(( WARMUP_MIN * 60 )) +PPROF_INTERVAL_SEC=$(( PPROF_INTERVAL_MIN * 60 )) +PARITY_INTERVAL_SEC=$(( PARITY_INTERVAL_MIN * 60 )) + +REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +COMPOSE_FILE="${REPO_ROOT}/test/soak/docker-compose.soak.yaml" +CATALOG="${REPO_ROOT}/cmd/soak-driver/catalog/default.json" +BANYANDB_GRPC="localhost:17912" +BANYANDB_PPROF="localhost:6060" + +# Pass host UID/GID to compose so the BanyanDB container writes the +# bind-mounted /data dir as the host user (otherwise root-owned files +# break snapshot/restore from the host shell). +export SOAK_UID="$(id -u)" +export SOAK_GID="$(id -g)" + +RUN_TS="$(date +%Y%m%dT%H%M%S)" +DIST="${REPO_ROOT}/dist/soak/${RUN_TS}" +DATA_DIR="${REPO_ROOT}/test/soak/data" +SNAPSHOT_DIR="${DIST}/data-snapshot" + +# ── helpers ────────────────────────────────────────────────────────────────── +log() { echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] $*"; } + +compose_cmd() { + docker compose -f "${COMPOSE_FILE}" "$@" +} + +wait_banyandb_healthy() { + log "Waiting for BanyanDB to become healthy..." + local attempts=0 + until curl -sf "http://localhost:17913/api/healthz" >/dev/null 2>&1; do + attempts=$(( attempts + 1 )) + if (( attempts > 120 )); then + log "ERROR: BanyanDB did not become healthy after 120 attempts" + return 1 + fi + sleep 5 + done + log "BanyanDB is healthy." +} + +soak_driver() { + "${REPO_ROOT}/bin/soak-driver" "$@" +} + +# ── cleanup trap ───────────────────────────────────────────────────────────── +cleanup() { + log "Caught signal — tearing down compose stack..." + compose_cmd down -v --remove-orphans 2>/dev/null || true + log "Cleanup complete." +} +trap cleanup INT TERM EXIT + +# ── build soak-driver ──────────────────────────────────────────────────────── +log "Building soak-driver..." +mkdir -p "${REPO_ROOT}/bin" +(cd "${REPO_ROOT}" && go build -o bin/soak-driver ./cmd/soak-driver) +log "soak-driver built at bin/soak-driver" + +# ── prepare output dirs ─────────────────────────────────────────────────────── +mkdir -p "${DIST}" "${SNAPSHOT_DIR}" "${DATA_DIR}" + +# Tee everything from this point into the run log so silent failures leave +# evidence behind. Earlier output (build) is already in stdout. +exec > >(tee -a "${DIST}/run.log") 2>&1 + +log "Run artefacts will be written to: ${DIST}" +log "Config: WARMUP_MIN=${WARMUP_MIN} SOAK_HOURS=${SOAK_HOURS} PPROF_INTERVAL_MIN=${PPROF_INTERVAL_MIN} PARITY_INTERVAL_MIN=${PARITY_INTERVAL_MIN}" + +# ╔══════════════════════════════════════════════════════════════════════════╗ +# ║ PHASE 0 — Baseline (vec-off) ║ +# ╚══════════════════════════════════════════════════════════════════════════╝ +log "=== PHASE 0: Baseline (BANYANDB_VEC_ENABLED=false) ===" + +SOAK_DATA_DIR="${DATA_DIR}" BANYANDB_VEC_ENABLED=false compose_cmd up -d +wait_banyandb_healthy + +log "Waiting for OAP to become healthy (schema install + agent chain)..." +oap_attempts=0 +until docker compose -f "${COMPOSE_FILE}" ps oap --format '{{.Status}}' 2>/dev/null | grep -q '(healthy)'; do + oap_attempts=$(( oap_attempts + 1 )) + if (( oap_attempts > 60 )); then + log "ERROR: OAP did not become healthy after 5 min — abort" + exit 1 + fi + sleep 5 +done +log "OAP healthy." + +if (( WARMUP_SEC > 0 )); then + log "Warming up for ${WARMUP_MIN} minutes to let OAP populate data..." + sleep "${WARMUP_SEC}" +fi + +log "Seeding deterministic fixture (${SEED_ROWS} rows into soak/soak_metric)..." +T1_MS=$(soak_driver seed-fixture --addr "${BANYANDB_GRPC}" --rows "${SEED_ROWS}" | tail -1) +if [[ -z "${T1_MS}" ]] || ! [[ "${T1_MS}" =~ ^[0-9]+$ ]]; then + log "ERROR: seed-fixture did not return a valid T1 timestamp" + exit 1 +fi +log "T1 snapshot timestamp: ${T1_MS} ms" + +# seed-fixture polls until the rows are visible to query, so by the +# time it returns the measure data is queryable. Schema-property has a +# 5s flush timeout — wait once more before snapshotting so the schema +# segs land on disk for Phase 1. +log "Waiting 8s for schema-server flush to persist..." +sleep 8 + +log "Recording baseline..." +soak_driver record-baseline \ + --addr "${BANYANDB_GRPC}" \ + --catalog "${CATALOG}" \ + --until "${T1_MS}" \ + --out "${DIST}/baseline.json" + +# Verify the baseline has data points. The baseline JSON's data_points +# field is an array of protojson-encoded DataPoint messages; an empty +# slice means writes weren't visible and parity is meaningless. +baseline_dp=$(python3 -c "import json; d=json.load(open('${DIST}/baseline.json')); print(sum(len(r.get('data_points') or []) for r in d))" 2>/dev/null || echo 0) +log "Baseline data points captured: ${baseline_dp}" +if [[ "${baseline_dp}" == "0" ]]; then + log "ERROR: baseline contains zero data points despite seed of ${SEED_ROWS} rows" + exit 1 +fi + +log "Stopping BanyanDB to snapshot data..." +compose_cmd stop banyandb + +log "Copying data to ${SNAPSHOT_DIR}..." +cp -a "${DATA_DIR}/." "${SNAPSHOT_DIR}/" + +snap_size=$(du -sb "${SNAPSHOT_DIR}" 2>/dev/null | awk '{print $1}') +log "Snapshot size: ${snap_size:-0} bytes" + +log "Tearing down Phase 0 stack..." +# Disable trap during intentional down so we don't double-down. +trap - EXIT +compose_cmd down -v --remove-orphans +trap cleanup INT TERM EXIT + +# ╔══════════════════════════════════════════════════════════════════════════╗ +# ║ PHASE 1 — Soak (vec-on) ║ +# ╚══════════════════════════════════════════════════════════════════════════╝ +log "=== PHASE 1: Soak (BANYANDB_VEC_ENABLED=true, duration=${SOAK_HOURS}h) ===" + +log "Restoring data snapshot..." +rm -rf "${DATA_DIR:?}"/* +cp -a "${SNAPSHOT_DIR}/." "${DATA_DIR}/" + +SOAK_DATA_DIR="${DATA_DIR}" BANYANDB_VEC_ENABLED=true compose_cmd up -d +wait_banyandb_healthy + +# Initial pprof grab. +mkdir -p "${DIST}/pprof-start" +soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir "${DIST}/pprof-start" +log "Initial pprof captured." + +# Tail BanyanDB logs into persistent log files in the background. +compose_cmd logs -f banyandb 2>&1 >> "${DIST}/banyand.log" & +LOGS_PID=$! + +# Grep for MemoryTracker budget exhaustion in the background. +( + tail -f "${DIST}/banyand.log" 2>/dev/null | \ + grep --line-buffered -i "MemoryTracker\|budget exhausted\|memory budget" \ + >> "${DIST}/memory-alerts.log" || true +) & +GREP_PID=$! + +# Background pprof + parity loops. +SOAK_END=$(( $(date +%s) + SOAK_HOURS_SEC )) + +( + while (( $(date +%s) < SOAK_END )); do + sleep "${PPROF_INTERVAL_SEC}" + (( $(date +%s) >= SOAK_END )) && break + INTERVAL_TS="$(date +%Y%m%dT%H%M%S)" + PPROF_DIR="${DIST}/pprof-${INTERVAL_TS}" + mkdir -p "${PPROF_DIR}" + soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir "${PPROF_DIR}" || \ + log "WARN: pprof-grab failed at ${INTERVAL_TS}" + log "pprof captured: ${PPROF_DIR}" + done +) & +PPROF_LOOP_PID=$! + +( + while (( $(date +%s) < SOAK_END )); do + sleep "${PARITY_INTERVAL_SEC}" + (( $(date +%s) >= SOAK_END )) && break + DIFF_TS="$(date +%Y%m%dT%H%M%S)" + DIFF_REPORT="${DIST}/diff-${DIFF_TS}.json" + soak_driver replay-and-diff \ + --addr "${BANYANDB_GRPC}" \ + --catalog "${CATALOG}" \ + --baseline "${DIST}/baseline.json" \ + --report "${DIFF_REPORT}" || \ + log "WARN: parity divergence detected — see ${DIFF_REPORT}" + log "parity check done: ${DIFF_REPORT}" + done +) & +PARITY_LOOP_PID=$! + +log "Soak running for ${SOAK_HOURS} hours. Loops started (pids: pprof=${PPROF_LOOP_PID} parity=${PARITY_LOOP_PID})." + +# Wait for soak duration. +REMAINING=$(( SOAK_END - $(date +%s) )) +if (( REMAINING > 0 )); then + sleep "${REMAINING}" +fi + +log "Soak window complete. Collecting final artefacts..." + +# Stop background loops gracefully. +kill "${PPROF_LOOP_PID}" "${PARITY_LOOP_PID}" 2>/dev/null || true +wait "${PPROF_LOOP_PID}" "${PARITY_LOOP_PID}" 2>/dev/null || true + +# Final pprof. +mkdir -p "${DIST}/pprof-end" +soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir "${DIST}/pprof-end" +log "Final pprof captured." + +# Final parity check. +FINAL_DIFF="${DIST}/diff-final.json" +soak_driver replay-and-diff \ + --addr "${BANYANDB_GRPC}" \ + --catalog "${CATALOG}" \ + --baseline "${DIST}/baseline.json" \ + --report "${FINAL_DIFF}" && FINAL_PASS=true || FINAL_PASS=false +log "Final parity check: pass=${FINAL_PASS}" + +# Stop log tailing. +kill "${LOGS_PID}" "${GREP_PID}" 2>/dev/null || true + +# Write summary manifest. +# Goroutine count is read from the "goroutine profile: total N" header +# line that /debug/pprof/goroutine?debug=1 writes. Cheaper and more +# robust than counting per-goroutine entries. +extract_goroutine_total() { + local dir="$1" + local f + f=$(ls "${dir}"/goroutine-*.txt 2>/dev/null | head -1) + if [[ -z "${f}" ]]; then + echo 0 + return + fi + awk '/^goroutine profile: total/ {print $4; exit}' "${f}" 2>/dev/null || echo 0 +} +GOROUTINE_START=$(extract_goroutine_total "${DIST}/pprof-start") +GOROUTINE_END=$(extract_goroutine_total "${DIST}/pprof-end") +MEMORY_ALERTS=$(wc -l < "${DIST}/memory-alerts.log" 2>/dev/null || echo 0) + +cat > "${DIST}/summary.json" <<EOF +{ + "run_ts": "${RUN_TS}", + "smoke": "${SMOKE:-false}", + "warmup_min": ${WARMUP_MIN}, + "soak_hours": ${SOAK_HOURS}, + "t1_ms": ${T1_MS}, + "final_parity_pass": ${FINAL_PASS}, + "goroutine_count_start": ${GOROUTINE_START}, + "goroutine_count_end": ${GOROUTINE_END}, + "memory_alert_lines": ${MEMORY_ALERTS}, + "artefacts_dir": "${DIST}" +} +EOF + +log "Summary written to ${DIST}/summary.json" +log "=== Soak complete. Artefacts: ${DIST} ===" + +# Intentional final teardown — disable trap first. +trap - EXIT INT TERM +compose_cmd down -v --remove-orphans diff --git a/test/soak/Dockerfile.banyand b/test/soak/Dockerfile.banyand new file mode 100644 index 000000000..5d8cc3939 --- /dev/null +++ b/test/soak/Dockerfile.banyand @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Multi-stage build: compile banyand from the local source tree. +# This ensures the soak runs the vectorized-query branch, not a hub image. + +FROM golang:1.25-alpine AS builder + +RUN apk add --no-cache git make gcc musl-dev + +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download + +COPY . . + +RUN CGO_ENABLED=0 GOOS=linux go build \ + -ldflags="-s -w" \ + -o /out/banyand \ + ./banyand/cmd/server + +FROM alpine:edge AS certs +RUN apk add --no-cache ca-certificates && update-ca-certificates + +FROM busybox:stable-glibc AS final + +COPY --from=builder /out/banyand /banyand +COPY --from=certs /etc/ssl/certs /etc/ssl/certs + +ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR +ENV GRPC_GO_LOG_FORMATTER=json + +EXPOSE 17912 +EXPOSE 17913 +EXPOSE 6060 +EXPOSE 2121 + +ENTRYPOINT ["/banyand"] diff --git a/test/soak/docker-compose.soak.yaml b/test/soak/docker-compose.soak.yaml new file mode 100644 index 000000000..cb2f6c431 --- /dev/null +++ b/test/soak/docker-compose.soak.yaml @@ -0,0 +1,235 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Soak-test overlay for the G5d vectorized-query gate. +# +# Usage (via scripts/soak-vectorized.sh — do not invoke directly): +# BANYANDB_VEC_ENABLED=false docker compose -f test/soak/docker-compose.soak.yaml up -d +# BANYANDB_VEC_ENABLED=true docker compose -f test/soak/docker-compose.soak.yaml up -d +# +# Key differences from the quick-start compose: +# - BanyanDB is built from local source (multi-stage Go build) instead of pulling a hub image. +# - The --measure-vectorized-enabled flag is controlled by BANYANDB_VEC_ENABLED (default: false). +# - Every container has explicit memory and CPU limits so a 31 GB no-swap host cannot OOM-kill silently. +# - SOAK_DATA_DIR (default ./data) is bind-mounted so snapshots can be copied between phases. + +services: + banyandb: + build: + context: ../.. + dockerfile: test/soak/Dockerfile.banyand + container_name: banyandb + # Run as host UID/GID so the bind-mounted /data dir stays readable + # from the host shell — needed for snapshot/restore between phases. + # Override via SOAK_UID/SOAK_GID env if your host differs. + user: "${SOAK_UID:-1008}:${SOAK_GID:-1009}" + command: + - standalone + - --measure-vectorized-enabled=${BANYANDB_VEC_ENABLED:-false} + - --measure-root-path=/data + - --stream-root-path=/data + - --property-root-path=/data + - --trace-root-path=/data + - --schema-server-root-path=/data + ports: + - "17912:17912" # gRPC — soak-driver record-baseline / replay-and-diff + - "17913:17913" # HTTP/healthz + - "6060:6060" # pprof + - "2121:2121" # observability/metrics + volumes: + - type: bind + source: ${SOAK_DATA_DIR:-./data} + target: /data + networks: + - demo + healthcheck: + test: ["CMD", "sh", "-c", "wget -qO- http://127.0.0.1:17913/api/healthz >/dev/null 2>&1"] + interval: 5s + timeout: 120s + retries: 120 + deploy: + resources: + limits: + memory: 2G + cpus: "2" + + oap: + image: ${OAP_IMAGE:-ghcr.io/apache/skywalking/oap:b4a8811df9f48c66edb61c7b72d0293e3ecfa7cf} + container_name: skywalking-oap + environment: + SW_STORAGE: banyandb + SW_STORAGE_BANYANDB_TARGETS: banyandb:17912 + SW_OTEL_RECEIVER: default + SW_OTEL_RECEIVER_ENABLED_OC_RULES: vm + SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: vm + SW_TELEMETRY: prometheus + SW_CORE_RECORD_DATA_TTL: 7 + SW_CORE_METRICS_DATA_TTL: 7 + JAVA_OPTS: -Xmx1500m + expose: + - 11800 + - 12800 + - 9090 + ports: + - "12800:12800" + - "11800:11800" + networks: + - demo + depends_on: + banyandb: + condition: service_healthy + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/12800"] + interval: 10s + timeout: 60s + retries: 30 + restart: unless-stopped + deploy: + resources: + limits: + memory: 2G + cpus: "2" + + oap-ui: + image: ${OAP_UI_IMAGE:-ghcr.io/apache/skywalking/ui:b4a8811df9f48c66edb61c7b72d0293e3ecfa7cf} + container_name: skywalking-ui + ports: + - "8080:8080" + environment: + - SW_OAP_ADDRESS=http://oap:12800 + networks: + - demo + depends_on: + oap: + condition: service_healthy + restart: unless-stopped + deploy: + resources: + limits: + memory: 256M + cpus: "0.5" + + agent: + image: ${AGENT_IMAGE:-ghcr.io/apache/skywalking-java/skywalking-java:f750006020249d29a21cae63ae50da67f182c6ab-java8} + command: cp -r /skywalking/agent/ /skywalking-java-agent/ + volumes: + - sw_agent:/skywalking-java-agent + networks: + - demo + deploy: + resources: + limits: + memory: 64M + cpus: "0.5" + + provider: + image: "ghcr.io/apache/skywalking/e2e-service-provider:a2a67ca63084cddf82303155c185e3c24cf07eef" + volumes: + - sw_agent:/sw-java-agent + environment: + JAVA_TOOL_OPTIONS: -javaagent:/sw-java-agent/agent/skywalking-agent.jar -Xmx384m + SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap:11800 + SW_LOGGING_OUTPUT: CONSOLE + SW_AGENT_NAME: service-provider + SW_AGENT_INSTANCE_NAME: provider1 + SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL: 1 + SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL: 1 + ports: + - "9090" + networks: + - demo + depends_on: + oap: + condition: service_healthy + agent: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"] + interval: 5s + timeout: 60s + retries: 120 + deploy: + resources: + limits: + memory: 1G + cpus: "1" + + consumer: + image: "ghcr.io/apache/skywalking/e2e-service-consumer:a2a67ca63084cddf82303155c185e3c24cf07eef" + volumes: + - sw_agent:/sw-java-agent + environment: + JAVA_TOOL_OPTIONS: -javaagent:/sw-java-agent/agent/skywalking-agent.jar -Xmx384m + SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap:11800 + SW_LOGGING_OUTPUT: CONSOLE + PROVIDER_URL: http://provider:9090 + SW_AGENT_NAME: service-consumer + SW_AGENT_INSTANCE_NAME: consumer1 + SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL: 1 + SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL: 1 + ports: + - "9092" + networks: + - demo + depends_on: + oap: + condition: service_healthy + provider: + condition: service_healthy + agent: + condition: service_completed_successfully + healthcheck: + test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9092"] + interval: 5s + timeout: 60s + retries: 120 + deploy: + resources: + limits: + memory: 1G + cpus: "1" + + traffic_loader: + image: curlimages/curl + networks: + - demo + depends_on: + oap: + condition: service_healthy + provider: + condition: service_healthy + consumer: + condition: service_healthy + entrypoint: > + sh -c " + echo 'Starting traffic generation...'; + while true; do + curl -sf -X POST http://consumer:9092/info || true; + sleep 1; + done + " + deploy: + resources: + limits: + memory: 64M + cpus: "0.5" + +networks: + demo: + +volumes: + sw_agent: {}
