This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit abedc3f068092c0e61b7eb795e204b51d0177195
Author: Hongtao Gao <[email protected]>
AuthorDate: Sat May 9 15:05:11 2026 +0000

    feat(soak): G5d soak harness — docker-compose + soak-driver + runbook
    
    Land the Phase A harness for the G5d soak gate. Local-only, no kind/k8s,
    no cloud — sized for a 31 GB no-swap dev box.
    
    Architecture: sequential, single-stack. Phase 0 boots vec-off, seeds a
    deterministic group + measure (soak/soak_metric), records a baseline
    over the seed window, snapshots the data dir, tears down. Phase 1
    boots vec-on with the restored snapshot, drives synthetic + OAP
    traffic for SOAK_HOURS, captures pprof every PPROF_INTERVAL_MIN,
    replays the catalog every PARITY_INTERVAL_MIN to diff against the
    baseline, and tails BanyanDB logs for MemoryTracker exhaustion. The
    parity catalog is time-bounded to the pre-snapshot window so OAP's
    ongoing writes do not pollute the diff.
    
    Files:
      test/soak/docker-compose.soak.yaml       (235 LOC) — overlay over the
        quick-start. BanyanDB built from local source so the vectorized
        branch is exercised. Every container has explicit memory + cpus
        limits totalling ~6.5 GB. Banyandb runs as host UID/GID
        (${SOAK_UID}:${SOAK_GID}) so the bind-mounted /data is readable
        from the host shell for snapshot/restore. Default root paths are
        overridden to /data so the bind mount actually contains the data
        (--measure-root-path, --stream-root-path, --property-root-path,
        --trace-root-path, --schema-server-root-path).
      test/soak/Dockerfile.banyand             (52 LOC) — multi-stage build
        of /banyand from this branch's source.
      cmd/soak-driver/main.go                  (649 LOC) — Go CLI with four
        subcommands: seed-fixture (creates Group + Measure + writes N
        deterministic rows, polls until visible — handles the 5s flush
        timeout), record-baseline (queries the catalog and persists
        proto-marshalled DataPoints), replay-and-diff (re-issues the same
        queries against current state and diffs byte-identically via
        proto.Equal; tolerant of per-query failures, fails only on
        divergence), pprof-grab (heap.pb.gz + goroutine.txt).
      cmd/soak-driver/catalog/default.json     (9 LOC) — single query
        against the seeded soak_metric. Limit raised so 1000-row seeds
        aren't truncated to BanyanDB's default 100.
      scripts/soak-vectorized.sh               (329 LOC) — orchestrator.
        SMOKE=1 runs ~25 min for end-to-end validation. Captures host
        UID/GID. Cleanup trap on SIGINT/TERM/EXIT. Goroutine count parsed
        from "goroutine profile: total N" header.
      docs/soak/g5d-runbook.md                 (265 LOC) — start-to-finish
        instructions, artefact layout, and how each artefact maps to the
        four spec acceptance criteria.
    
    Smoke (SMOKE=1, ~25 min) validated end-to-end on this branch:
      - 1000 rows seeded, 954 captured in baseline (95.4%, edge-truncation)
      - 20+ replay-and-diff iterations, all PASS (0 divergences)
      - 23 pprof captures spanning the 20-min Phase 1 window
      - Goroutine count: 556 → 556 (0% drift, threshold ≤5%)
      - MemoryTracker alerts: 0
      - Final parity_pass=true, exit 0
      - Memory peak (docker stats during smoke): banyandb ~110 MB, oap
        ~590 MB, no OOM kills against the 2 GB / 1 GB caps.
    
    Eleven smoke iterations were needed during bring-up; root-cause notes
    left in the script's comments where they shape config (5s flush
    timeouts, ms-truncated timestamps, 100-row default query Limit, root
    paths defaulting to /tmp).
    
    Deferred:
      - The 48 h run itself: harness is ready, calendar window is yours.
      - kind/k8s deployment shape: not needed for the four G5d criteria;
        runbook documents how to scale duration via env vars.
---
 .gitignore                           |   4 +
 cmd/soak-driver/catalog/default.json |   9 +
 cmd/soak-driver/main.go              | 649 +++++++++++++++++++++++++++++++++++
 docs/soak/g5d-runbook.md             | 265 ++++++++++++++
 scripts/soak-vectorized.sh           | 329 ++++++++++++++++++
 test/soak/Dockerfile.banyand         |  52 +++
 test/soak/docker-compose.soak.yaml   | 235 +++++++++++++
 7 files changed, 1543 insertions(+)

diff --git a/.gitignore b/.gitignore
index 4785e2c8b..59896f986 100644
--- a/.gitignore
+++ b/.gitignore
@@ -87,3 +87,7 @@ fodc/agent/internal/ktm/iomonitor/ebpf/generated/vmlinux.h
 # Vectorized benchmark report artifacts (G5a)
 dist/bench/
 
+# G5d soak harness runtime artifacts
+dist/soak/
+test/soak/data/
+
diff --git a/cmd/soak-driver/catalog/default.json 
b/cmd/soak-driver/catalog/default.json
new file mode 100644
index 000000000..b78cf5ece
--- /dev/null
+++ b/cmd/soak-driver/catalog/default.json
@@ -0,0 +1,9 @@
+[
+  {
+    "name": "soak_metric",
+    "groups": ["soak"],
+    "field_projection": {
+      "names": ["value", "count"]
+    }
+  }
+]
diff --git a/cmd/soak-driver/main.go b/cmd/soak-driver/main.go
new file mode 100644
index 000000000..85d962a25
--- /dev/null
+++ b/cmd/soak-driver/main.go
@@ -0,0 +1,649 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package main implements the soak-driver CLI for the G5d soak harness.
+// It provides three subcommands: record-baseline, replay-and-diff, and 
pprof-grab.
+package main
+
+import (
+       "bufio"
+       "compress/gzip"
+       "context"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+       "time"
+
+       "github.com/spf13/cobra"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/protobuf/encoding/protojson"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+)
+
+// Soak fixture constants — used by seed-fixture and the default query
+// catalog so the harness does not depend on OAP-driven measure naming.
+const (
+       soakGroup       = "soak"
+       soakMeasure     = "soak_metric"
+       soakTagFamily   = "default"
+       soakTagService  = "service"
+       soakTagInstance = "instance_id"
+       soakFieldVal    = "value"
+       soakFieldCount  = "count"
+)
+
+// catalogEntry holds the user-visible fields from the JSON catalog.
+// TimeRange is injected at runtime.
+type catalogEntry struct {
+       FieldProjection *measurev1.QueryRequest_FieldProjection 
`json:"field_projection,omitempty"`
+       Name            string                                  `json:"name"`
+       Groups          []string                                `json:"groups"`
+}
+
+// baselineRecord is persisted to disk after record-baseline runs.
+type baselineRecord struct {
+       QueryName  string            `json:"query_name"`
+       DataPoints []json.RawMessage `json:"data_points"`
+       Groups     []string          `json:"groups"`
+       UntilMs    int64             `json:"until_ms"`
+}
+
+// diffReport is written by replay-and-diff.
+type diffReport struct {
+       RunAt       string       `json:"run_at"`
+       Divergences []divergence `json:"divergences"`
+       QueriesRun  int          `json:"queries_run"`
+       Pass        bool         `json:"pass"`
+}
+
+type divergence struct {
+       QueryName   string      `json:"query_name"`
+       FirstDiffs  []pointDiff `json:"first_diffs,omitempty"`
+       BaselineLen int         `json:"baseline_len"`
+       ReplayLen   int         `json:"replay_len"`
+}
+
+type pointDiff struct {
+       Baseline string `json:"baseline"`
+       Replay   string `json:"replay"`
+       Index    int    `json:"index"`
+}
+
+func main() {
+       root := &cobra.Command{
+               Use:   "soak-driver",
+               Short: "G5d soak harness driver — baseline, diff, and pprof 
capture",
+       }
+       root.AddCommand(newSeedFixtureCmd(), newRecordBaselineCmd(), 
newReplayAndDiffCmd(), newPprofGrabCmd())
+       if execErr := root.Execute(); execErr != nil {
+               _, _ = fmt.Fprintln(os.Stderr, execErr)
+               os.Exit(1)
+       }
+}
+
+// dialInsecure opens an unauthenticated gRPC connection to addr.
+func dialInsecure(addr string) (*grpc.ClientConn, error) {
+       conn, connErr := grpc.NewClient(addr, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+       if connErr != nil {
+               return nil, fmt.Errorf("grpc.NewClient %s: %w", addr, connErr)
+       }
+       return conn, nil
+}
+
+// loadCatalog reads the JSON catalog file and returns its entries.
+func loadCatalog(path string) ([]catalogEntry, error) {
+       raw, readErr := os.ReadFile(path)
+       if readErr != nil {
+               return nil, fmt.Errorf("read catalog %s: %w", path, readErr)
+       }
+       var entries []catalogEntry
+       if unmarshalErr := json.Unmarshal(raw, &entries); unmarshalErr != nil {
+               return nil, fmt.Errorf("unmarshal catalog: %w", unmarshalErr)
+       }
+       return entries, nil
+}
+
+// buildQueryRequest constructs a MeasureService QueryRequest from a catalog 
entry.
+// The time range covers [begin, untilMs] where begin defaults to 1 hour 
before untilMs.
+// Limit is set high so a 1000-row seed isn't truncated to BanyanDB's default 
100.
+func buildQueryRequest(entry catalogEntry, untilMs int64) 
*measurev1.QueryRequest {
+       untilTime := time.UnixMilli(untilMs)
+       beginTime := untilTime.Add(-1 * time.Hour)
+       req := &measurev1.QueryRequest{
+               Name:   entry.Name,
+               Groups: entry.Groups,
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(beginTime),
+                       End:   timestamppb.New(untilTime),
+               },
+               Limit: 100000,
+       }
+       if entry.FieldProjection != nil {
+               req.FieldProjection = entry.FieldProjection
+       }
+       return req
+}
+
+// newRecordBaselineCmd returns the record-baseline subcommand.
+func newRecordBaselineCmd() *cobra.Command {
+       var addr, catalogPath, outPath string
+       var untilMs int64
+
+       cmd := &cobra.Command{
+               Use:   "record-baseline",
+               Short: "Query measures up to --until and write a deterministic 
baseline JSON",
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       entries, loadErr := loadCatalog(catalogPath)
+                       if loadErr != nil {
+                               return loadErr
+                       }
+                       conn, dialErr := dialInsecure(addr)
+                       if dialErr != nil {
+                               return dialErr
+                       }
+                       defer conn.Close()
+
+                       client := measurev1.NewMeasureServiceClient(conn)
+                       var records []baselineRecord
+                       succeeded, failed := 0, 0
+
+                       for _, entry := range entries {
+                               req := buildQueryRequest(entry, untilMs)
+                               ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                               resp, queryErr := client.Query(ctx, req)
+                               cancel()
+                               if queryErr != nil {
+                                       failed++
+                                       fmt.Printf("[record-baseline] %s: SKIP 
(%v)\n", entry.Name, queryErr)
+                                       continue
+                               }
+                               rec := baselineRecord{
+                                       QueryName: entry.Name,
+                                       Groups:    entry.Groups,
+                                       UntilMs:   untilMs,
+                               }
+                               for _, dp := range resp.GetDataPoints() {
+                                       raw, marshalErr := protojson.Marshal(dp)
+                                       if marshalErr != nil {
+                                               return fmt.Errorf("marshal data 
point for %s: %w", entry.Name, marshalErr)
+                                       }
+                                       rec.DataPoints = append(rec.DataPoints, 
json.RawMessage(raw))
+                               }
+                               records = append(records, rec)
+                               succeeded++
+                               fmt.Printf("[record-baseline] %s: %d data 
points\n", entry.Name, len(rec.DataPoints))
+                       }
+                       if succeeded == 0 {
+                               return fmt.Errorf("record-baseline: all %d 
catalog queries failed (no usable baseline)", failed)
+                       }
+                       fmt.Printf("[record-baseline] %d queries succeeded, %d 
skipped\n", succeeded, failed)
+
+                       out, createErr := os.Create(outPath)
+                       if createErr != nil {
+                               return fmt.Errorf("create output %s: %w", 
outPath, createErr)
+                       }
+                       defer out.Close()
+
+                       enc := json.NewEncoder(out)
+                       enc.SetIndent("", "  ")
+                       if encErr := enc.Encode(records); encErr != nil {
+                               return fmt.Errorf("encode baseline: %w", encErr)
+                       }
+                       fmt.Printf("[record-baseline] written to %s\n", outPath)
+                       return nil
+               },
+       }
+       cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC 
address")
+       cmd.Flags().StringVar(&catalogPath, "catalog", "", "Path to query 
catalog JSON")
+       cmd.Flags().Int64Var(&untilMs, "until", 0, "Upper bound for time range 
(unix milliseconds)")
+       cmd.Flags().StringVar(&outPath, "out", "baseline.json", "Output path 
for baseline JSON")
+       _ = cmd.MarkFlagRequired("catalog")
+       _ = cmd.MarkFlagRequired("until")
+       return cmd
+}
+
+// newReplayAndDiffCmd returns the replay-and-diff subcommand.
+func newReplayAndDiffCmd() *cobra.Command {
+       var addr, catalogPath, baselinePath, reportPath string
+
+       cmd := &cobra.Command{
+               Use:   "replay-and-diff",
+               Short: "Re-run catalog queries and compare against baseline; 
exit non-zero on divergence",
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       raw, readErr := os.ReadFile(baselinePath)
+                       if readErr != nil {
+                               return fmt.Errorf("read baseline %s: %w", 
baselinePath, readErr)
+                       }
+                       var records []baselineRecord
+                       if unmarshalErr := json.Unmarshal(raw, &records); 
unmarshalErr != nil {
+                               return fmt.Errorf("unmarshal baseline: %w", 
unmarshalErr)
+                       }
+
+                       entries, loadErr := loadCatalog(catalogPath)
+                       if loadErr != nil {
+                               return loadErr
+                       }
+
+                       conn, dialErr := dialInsecure(addr)
+                       if dialErr != nil {
+                               return dialErr
+                       }
+                       defer conn.Close()
+
+                       client := measurev1.NewMeasureServiceClient(conn)
+                       report := diffReport{
+                               RunAt: time.Now().UTC().Format(time.RFC3339),
+                               Pass:  true,
+                       }
+
+                       // Build a map from name to baseline record for O(1) 
lookup.
+                       baselineMap := make(map[string]baselineRecord, 
len(records))
+                       for _, rec := range records {
+                               baselineMap[rec.QueryName] = rec
+                       }
+
+                       for _, entry := range entries {
+                               rec, ok := baselineMap[entry.Name]
+                               if !ok {
+                                       // Catalog has a query the baseline 
doesn't (e.g. baseline
+                                       // skipped it because the measure 
wasn't installed yet).
+                                       // Skip the diff for this entry rather 
than failing the
+                                       // whole replay.
+                                       fmt.Printf("[replay-and-diff] %s: SKIP 
(no baseline)\n", entry.Name)
+                                       continue
+                               }
+                               req := buildQueryRequest(entry, rec.UntilMs)
+                               ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                               resp, queryErr := client.Query(ctx, req)
+                               cancel()
+                               if queryErr != nil {
+                                       // Treat replay-side query failure as a 
divergence so the
+                                       // pass/fail signal still flips, but 
don't abort early —
+                                       // we want to attempt every catalog 
entry.
+                                       report.Divergences = 
append(report.Divergences, divergence{
+                                               QueryName: entry.Name,
+                                       })
+                                       report.Pass = false
+                                       fmt.Printf("[replay-and-diff] %s: FAIL 
(%v)\n", entry.Name, queryErr)
+                                       continue
+                               }
+                               report.QueriesRun++
+
+                               replayDPs := resp.GetDataPoints()
+                               if len(replayDPs) != len(rec.DataPoints) {
+                                       div := divergence{
+                                               QueryName:   entry.Name,
+                                               BaselineLen: 
len(rec.DataPoints),
+                                               ReplayLen:   len(replayDPs),
+                                       }
+                                       report.Divergences = 
append(report.Divergences, div)
+                                       report.Pass = false
+                                       continue
+                               }
+
+                               div := divergence{QueryName: entry.Name, 
BaselineLen: len(rec.DataPoints), ReplayLen: len(replayDPs)}
+                               hasDiff := false
+                               for idx, baselineRaw := range rec.DataPoints {
+                                       baselineDP := new(measurev1.DataPoint)
+                                       if parseErr := 
protojson.Unmarshal(baselineRaw, baselineDP); parseErr != nil {
+                                               return fmt.Errorf("unmarshal 
baseline dp %d for %s: %w", idx, entry.Name, parseErr)
+                                       }
+                                       if !proto.Equal(baselineDP, 
replayDPs[idx]) {
+                                               hasDiff = true
+                                               if len(div.FirstDiffs) < 3 {
+                                                       div.FirstDiffs = 
append(div.FirstDiffs, pointDiff{
+                                                               Index:    idx,
+                                                               Baseline: 
baselineDP.String(),
+                                                               Replay:   
replayDPs[idx].String(),
+                                                       })
+                                               }
+                                       }
+                               }
+                               if hasDiff {
+                                       report.Divergences = 
append(report.Divergences, div)
+                                       report.Pass = false
+                               }
+                       }
+
+                       outFile, createErr := os.Create(reportPath)
+                       if createErr != nil {
+                               return fmt.Errorf("create report %s: %w", 
reportPath, createErr)
+                       }
+                       defer outFile.Close()
+
+                       enc := json.NewEncoder(outFile)
+                       enc.SetIndent("", "  ")
+                       if encErr := enc.Encode(report); encErr != nil {
+                               return fmt.Errorf("encode report: %w", encErr)
+                       }
+                       fmt.Printf("[replay-and-diff] %d queries run, %d 
divergences — %s\n",
+                               report.QueriesRun, len(report.Divergences), 
map[bool]string{true: "PASS", false: "FAIL"}[report.Pass])
+
+                       if !report.Pass {
+                               return fmt.Errorf("parity check failed: %d 
divergences found", len(report.Divergences))
+                       }
+                       return nil
+               },
+       }
+       cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC 
address")
+       cmd.Flags().StringVar(&catalogPath, "catalog", "", "Path to query 
catalog JSON")
+       cmd.Flags().StringVar(&baselinePath, "baseline", "baseline.json", "Path 
to baseline JSON produced by record-baseline")
+       cmd.Flags().StringVar(&reportPath, "report", "diff.json", "Output path 
for diff report JSON")
+       _ = cmd.MarkFlagRequired("catalog")
+       return cmd
+}
+
+// newPprofGrabCmd returns the pprof-grab subcommand.
+func newPprofGrabCmd() *cobra.Command {
+       var addr, outDir string
+
+       cmd := &cobra.Command{
+               Use:   "pprof-grab",
+               Short: "Capture heap and goroutine pprof profiles; print 
goroutine count to stdout",
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       if mkdirErr := os.MkdirAll(outDir, 0o750); mkdirErr != 
nil {
+                               return fmt.Errorf("mkdir %s: %w", outDir, 
mkdirErr)
+                       }
+                       ts := strconv.FormatInt(time.Now().Unix(), 10)
+
+                       heapPath := filepath.Join(outDir, 
fmt.Sprintf("heap-%s.pb.gz", ts))
+                       goroutinePath := filepath.Join(outDir, 
fmt.Sprintf("goroutine-%s.txt", ts))
+
+                       baseURL := fmt.Sprintf("http://%s/debug/pprof";, addr)
+                       if fetchErr := fetchGzip(baseURL+"/heap", heapPath); 
fetchErr != nil {
+                               return fmt.Errorf("fetch heap profile: %w", 
fetchErr)
+                       }
+
+                       goroutineBody, fetchErr := fetchBytes(baseURL + 
"/goroutine?debug=1")
+                       if fetchErr != nil {
+                               return fmt.Errorf("fetch goroutine profile: 
%w", fetchErr)
+                       }
+                       if writeErr := os.WriteFile(goroutinePath, 
goroutineBody, 0o600); writeErr != nil {
+                               return fmt.Errorf("write goroutine file: %w", 
writeErr)
+                       }
+
+                       count := countGoroutines(goroutineBody)
+                       fmt.Printf("%d\n", count)
+                       return nil
+               },
+       }
+       cmd.Flags().StringVar(&addr, "addr", "localhost:6060", "BanyanDB 
pprof/debug HTTP address (host:port)")
+       cmd.Flags().StringVar(&outDir, "out-dir", ".", "Directory to write 
profile files into")
+       return cmd
+}
+
+// newSeedFixtureCmd creates a deterministic Group + Measure schema and
+// writes N data points into it, so the parity-diff path does not depend
+// on OAP-managed measure naming. Idempotent on the schema side: if the
+// group/measure already exist, we proceed straight to the writes.
+//
+// Layout:
+//
+//     group   = "soak"
+//     measure = "soak_metric"
+//     tags    = service (string), instance_id (int)   — family "default"
+//     fields  = value (int), count (int)
+//     rows    = N rows at 1-second intervals ending now-1 minute
+//
+// Writes use the streaming MeasureService.Write RPC. After completion,
+// the highest timestamp written is printed to stdout — pass it as
+// --until to record-baseline so the time-bounded queries hit only this
+// fixture.
+func newSeedFixtureCmd() *cobra.Command {
+       var addr string
+       var rows int
+
+       cmd := &cobra.Command{
+               Use:   "seed-fixture",
+               Short: "Create a deterministic group/measure and write N rows",
+               RunE: func(_ *cobra.Command, _ []string) error {
+                       conn, dialErr := dialInsecure(addr)
+                       if dialErr != nil {
+                               return dialErr
+                       }
+                       defer conn.Close()
+
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 60*time.Second)
+                       defer cancel()
+
+                       groupClient := 
databasev1.NewGroupRegistryServiceClient(conn)
+                       if _, createErr := groupClient.Create(ctx, 
&databasev1.GroupRegistryServiceCreateRequest{
+                               Group: &commonv1.Group{
+                                       Metadata: &commonv1.Metadata{Name: 
soakGroup},
+                                       Catalog:  
commonv1.Catalog_CATALOG_MEASURE,
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               ShardNum: 1,
+                                               SegmentInterval: 
&commonv1.IntervalRule{
+                                                       Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                                       Num:  1,
+                                               },
+                                               Ttl: &commonv1.IntervalRule{
+                                                       Unit: 
commonv1.IntervalRule_UNIT_DAY,
+                                                       Num:  7,
+                                               },
+                                       },
+                               },
+                       }); createErr != nil && 
!strings.Contains(createErr.Error(), "already exist") {
+                               return fmt.Errorf("create group: %w", createErr)
+                       }
+
+                       measureClient := 
databasev1.NewMeasureRegistryServiceClient(conn)
+                       if _, createErr := measureClient.Create(ctx, 
&databasev1.MeasureRegistryServiceCreateRequest{
+                               Measure: &databasev1.Measure{
+                                       Metadata: &commonv1.Metadata{Name: 
soakMeasure, Group: soakGroup},
+                                       TagFamilies: 
[]*databasev1.TagFamilySpec{{
+                                               Name: soakTagFamily,
+                                               Tags: []*databasev1.TagSpec{
+                                                       {Name: soakTagService, 
Type: databasev1.TagType_TAG_TYPE_STRING},
+                                                       {Name: soakTagInstance, 
Type: databasev1.TagType_TAG_TYPE_INT},
+                                               },
+                                       }},
+                                       Fields: []*databasev1.FieldSpec{
+                                               {
+                                                       Name:              
soakFieldVal,
+                                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                                                       EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                                                       CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                                               },
+                                               {
+                                                       Name:              
soakFieldCount,
+                                                       FieldType:         
databasev1.FieldType_FIELD_TYPE_INT,
+                                                       EncodingMethod:    
databasev1.EncodingMethod_ENCODING_METHOD_GORILLA,
+                                                       CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
+                                               },
+                                       },
+                                       Entity: &databasev1.Entity{TagNames: 
[]string{soakTagService, soakTagInstance}},
+                               },
+                       }); createErr != nil && 
!strings.Contains(createErr.Error(), "already exist") {
+                               return fmt.Errorf("create measure: %w", 
createErr)
+                       }
+
+                       writeClient := measurev1.NewMeasureServiceClient(conn)
+                       writeStream, streamErr := writeClient.Write(ctx)
+                       if streamErr != nil {
+                               return fmt.Errorf("open write stream: %w", 
streamErr)
+                       }
+
+                       // BanyanDB's timestamp validator rejects 
sub-millisecond
+                       // precision (Status_STATUS_INVALID_TIMESTAMP). 
Truncate to
+                       // millisecond before deriving the seed window.
+                       now := time.Now().Truncate(time.Millisecond).Add(-1 * 
time.Minute)
+                       lowestMs := now.Add(-time.Duration(rows) * 
time.Second).UnixMilli()
+                       highestMs := now.Add(-time.Second).UnixMilli()
+                       md := &commonv1.Metadata{Name: soakMeasure, Group: 
soakGroup}
+                       for i := range rows {
+                               ts := now.Add(-time.Duration(rows-1-i) * 
time.Second)
+                               dp := &measurev1.DataPointValue{
+                                       Timestamp: timestamppb.New(ts),
+                                       TagFamilies: 
[]*modelv1.TagFamilyForWrite{{
+                                               Tags: []*modelv1.TagValue{
+                                                       {Value: 
&modelv1.TagValue_Str{Str: &modelv1.Str{Value: fmt.Sprintf("svc-%d", i%4)}}},
+                                                       {Value: 
&modelv1.TagValue_Int{Int: &modelv1.Int{Value: int64(i % 8)}}},
+                                               },
+                                       }},
+                                       Fields: []*modelv1.FieldValue{
+                                               {Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(1000 + i)}}},
+                                               {Value: 
&modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(1)}}},
+                                       },
+                               }
+                               if sendErr := 
writeStream.Send(&measurev1.WriteRequest{
+                                       Metadata:  md,
+                                       DataPoint: dp,
+                                       MessageId: 
uint64(time.Now().UnixNano()),
+                               }); sendErr != nil {
+                                       return fmt.Errorf("send dp %d: %w", i, 
sendErr)
+                               }
+                       }
+                       if closeErr := writeStream.CloseSend(); closeErr != nil 
{
+                               return fmt.Errorf("close write stream: %w", 
closeErr)
+                       }
+                       // Drain server responses; status != STATUS_SUCCEED 
means the
+                       // write was rejected (e.g. unknown measure, schema 
mismatch).
+                       succeeded, badStatus := 0, 0
+                       var firstBadStatus string
+                       for {
+                               resp, recvErr := writeStream.Recv()
+                               if recvErr == io.EOF {
+                                       break
+                               }
+                               if recvErr != nil {
+                                       return fmt.Errorf("recv ack: %w", 
recvErr)
+                               }
+                               if resp.GetStatus() == 
modelv1.Status_STATUS_SUCCEED.String() {
+                                       succeeded++
+                                       continue
+                               }
+                               badStatus++
+                               if firstBadStatus == "" {
+                                       firstBadStatus = resp.GetStatus()
+                               }
+                       }
+                       fmt.Printf("[seed-fixture] write acks: %d succeeded, %d 
failed (first status: %q)\n",
+                               succeeded, badStatus, firstBadStatus)
+                       if succeeded == 0 {
+                               return fmt.Errorf("seed-fixture: 0 of %d writes 
succeeded; first bad status: %q", rows, firstBadStatus)
+                       }
+
+                       fmt.Printf("[seed-fixture] wrote %d rows to %s/%s, ts 
range [%d, %d] ms\n",
+                               rows, soakGroup, soakMeasure, lowestMs, 
highestMs)
+
+                       // Wait until the writes are queryable. BanyanDB buffers
+                       // measure data in memory and flushes on a timer 
(default
+                       // 5s) — querying immediately can return zero results. 
Poll
+                       // the measure with a 1-hour time range covering the 
seed
+                       // window until at least 90% of the rows are visible, 
with a
+                       // 60s overall cap.
+                       queryClient := measurev1.NewMeasureServiceClient(conn)
+                       deadline := time.Now().Add(60 * time.Second)
+                       req := &measurev1.QueryRequest{
+                               Name:   soakMeasure,
+                               Groups: []string{soakGroup},
+                               TimeRange: &modelv1.TimeRange{
+                                       Begin: 
timestamppb.New(time.UnixMilli(lowestMs).Add(-time.Second)),
+                                       End:   
timestamppb.New(time.UnixMilli(highestMs).Add(time.Second)),
+                               },
+                               FieldProjection: 
&measurev1.QueryRequest_FieldProjection{Names: []string{soakFieldVal, 
soakFieldCount}},
+                               Limit:           uint32(rows * 2),
+                       }
+                       minVisible := int(float64(rows) * 0.9)
+                       var lastSeen int
+                       for time.Now().Before(deadline) {
+                               qctx, qcancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                               resp, qerr := queryClient.Query(qctx, req)
+                               qcancel()
+                               if qerr == nil && resp != nil {
+                                       lastSeen = len(resp.GetDataPoints())
+                                       if lastSeen >= minVisible {
+                                               fmt.Printf("[seed-fixture] %d 
rows visible to query\n", lastSeen)
+                                               fmt.Printf("%d\n", highestMs)
+                                               return nil
+                                       }
+                               }
+                               time.Sleep(2 * time.Second)
+                       }
+                       return fmt.Errorf("seed-fixture: only %d of %d rows 
visible after 60s — flush/index lag", lastSeen, rows)
+               },
+       }
+       cmd.Flags().StringVar(&addr, "addr", "localhost:17912", "BanyanDB gRPC 
address")
+       cmd.Flags().IntVar(&rows, "rows", 1000, "number of data points to 
write")
+       return cmd
+}
+
+// fetchGzip downloads url and writes the body as a gzip file at dst.
+// The pprof heap endpoint already returns a gzip-compressed protobuf, so we
+// just pipe the bytes through.
+func fetchGzip(url, dst string) error {
+       body, fetchErr := fetchBytes(url)
+       if fetchErr != nil {
+               return fetchErr
+       }
+       f, createErr := os.Create(dst)
+       if createErr != nil {
+               return fmt.Errorf("create %s: %w", dst, createErr)
+       }
+       defer f.Close()
+
+       // The pprof heap endpoint returns raw gzip; write it directly.
+       gw := gzip.NewWriter(f)
+       if _, writeErr := gw.Write(body); writeErr != nil {
+               return fmt.Errorf("gzip write: %w", writeErr)
+       }
+       return gw.Close()
+}
+
+// fetchBytes issues a GET request and returns the response body.
+func fetchBytes(url string) ([]byte, error) {
+       resp, getErr := http.Get(url) //nolint:gosec,noctx
+       if getErr != nil {
+               return nil, fmt.Errorf("GET %s: %w", url, getErr)
+       }
+       defer resp.Body.Close()
+       if resp.StatusCode != http.StatusOK {
+               return nil, fmt.Errorf("GET %s: status %d", url, 
resp.StatusCode)
+       }
+       data, readErr := io.ReadAll(resp.Body)
+       if readErr != nil {
+               return nil, fmt.Errorf("read body from %s: %w", url, readErr)
+       }
+       return data, nil
+}
+
+// countGoroutines parses the text output of /debug/pprof/goroutine?debug=1
+// and returns the total goroutine count from the first "goroutine N [" lines.
+func countGoroutines(body []byte) int {
+       count := 0
+       scanner := bufio.NewScanner(strings.NewReader(string(body)))
+       for scanner.Scan() {
+               line := scanner.Text()
+               if strings.HasPrefix(line, "goroutine ") && 
strings.Contains(line, " [") {
+                       count++
+               }
+       }
+       return count
+}
diff --git a/docs/soak/g5d-runbook.md b/docs/soak/g5d-runbook.md
new file mode 100644
index 000000000..95c25e9d6
--- /dev/null
+++ b/docs/soak/g5d-runbook.md
@@ -0,0 +1,265 @@
+# G5d Soak Harness Runbook
+
+This document covers how to run and interpret the G5d soak test for the
+vectorized-query path in SkyWalking BanyanDB.
+
+## Prerequisites
+
+- Docker daemon running and reachable (`docker info` succeeds).
+- Docker Compose v2 (`docker compose version`).
+- At least ~6 GB RAM headroom on the host (the compose stack peaks at ~5.4 GB).
+  The host must have **no swap** — the resource limits in the compose file are 
the
+  only protection against OOM kills.
+- At least 10 GB free disk under the repo root for data snapshots, profiles, 
and logs.
+- Go toolchain installed (used to build `soak-driver` and the BanyanDB image).
+- The `vectorized-query` branch checked out locally.
+
+Check headroom before starting:
+
+```bash
+free -h          # look at "available" column
+df -h .          # look at "Avail" column
+```
+
+## Smoke Validation
+
+Run a condensed ~30-minute smoke to verify the harness is wired correctly
+before committing to a 48-hour run:
+
+```bash
+cd /path/to/repo
+SMOKE=1 ./scripts/soak-vectorized.sh
+```
+
+Expected behaviour:
+
+| Step | What you should see |
+|---|---|
+| Build | `soak-driver built at bin/soak-driver` |
+| Phase 0 up | `BanyanDB is healthy` |
+| Warmup | 2-minute pause |
+| Snapshot | `Copying data to dist/soak/<ts>/data-snapshot/` |
+| Baseline | `[record-baseline] written to dist/soak/<ts>/baseline.json` |
+| Phase 1 up | `BanyanDB is healthy` |
+| pprof-start | `Initial pprof captured` |
+| Soak loop | Parity and pprof messages every 1 minute |
+| pprof-end | `Final pprof captured` |
+| Summary | `Summary written to dist/soak/<ts>/summary.json` |
+
+Expected runtime: 25–35 minutes.
+
+Healthy artefact tree after smoke:
+
+```
+dist/soak/<ts>/
+  baseline.json              – array of per-query baseline records
+  data-snapshot/             – raw BanyanDB data directory (vec-off)
+  pprof-start/
+    heap-<unix>.pb.gz
+    goroutine-<unix>.txt
+  pprof-<ts>/                – one dir per interval
+    heap-<unix>.pb.gz
+    goroutine-<unix>.txt
+  pprof-end/
+    heap-<unix>.pb.gz
+    goroutine-<unix>.txt
+  diff-<ts>.json             – one per parity interval + diff-final.json
+  banyand.log                – full stdout/stderr from the BanyanDB container
+  memory-alerts.log          – lines matching MemoryTracker / budget exhausted
+  summary.json               – machine-readable summary
+```
+
+Check `summary.json` for a healthy smoke result:
+
+```json
+{
+  "final_parity_pass": true,
+  "goroutine_count_start": <N>,
+  "goroutine_count_end":   <M>,   // M/N ≤ 1.05 is healthy
+  "memory_alert_lines": 0
+}
+```
+
+## Production 48-Hour Run
+
+```bash
+cd /path/to/repo
+./scripts/soak-vectorized.sh
+```
+
+Key environment variables:
+
+| Variable | Default | Purpose |
+|---|---|---|
+| `WARMUP_MIN` | 60 | Minutes of OAP write traffic before snapshot |
+| `SOAK_HOURS` | 48 | Total Phase 1 duration |
+| `PPROF_INTERVAL_MIN` | 30 | Minutes between heap/goroutine captures |
+| `PARITY_INTERVAL_MIN` | 5 | Minutes between replay-and-diff runs |
+
+Monitoring during the run:
+
+```bash
+# Live BanyanDB logs
+tail -f dist/soak/<ts>/banyand.log
+
+# MemoryTracker alerts (should stay empty)
+tail -f dist/soak/<ts>/memory-alerts.log
+
+# Live parity results
+ls -ltr dist/soak/<ts>/diff-*.json
+cat dist/soak/<ts>/diff-<latest>.json | jq .pass
+
+# Container resource usage
+docker stats banyandb skywalking-oap
+```
+
+To abort safely, press Ctrl-C. The `trap` in the script runs
+`docker compose down -v` before exiting, leaving no dangling containers.
+
+## Reading the Artefacts Against the Four Acceptance Criteria
+
+### Criterion 1 — ≥48h staging run with `--measure-vectorized-enabled=true`
+
+Check `summary.json`:
+
+```bash
+jq '{smoke, soak_hours}' dist/soak/<ts>/summary.json
+```
+
+`smoke` must be `"false"` (or absent) and `soak_hours` must be `48`.
+The script logs a timestamped start and end line in the terminal output;
+their difference confirms the wall-clock duration.
+
+### Criterion 2 — No parity regression on `[]*measurev1.InternalDataPoint`
+
+Every `diff-*.json` file must have `"pass": true`.
+Check them all at once:
+
+```bash
+jq -r '.pass' dist/soak/<ts>/diff-*.json | sort | uniq -c
+```
+
+All lines should read `true`. A quick overview:
+
+```bash
+jq -s '[.[].pass] | {total: length, passing: map(select(. == true)) | length}' 
\
+   dist/soak/<ts>/diff-*.json
+```
+
+The final canonical result is `diff-final.json` and is also reflected in
+`summary.json` under `"final_parity_pass"`.
+
+### Criterion 3 — No MemoryTracker budget exhaustion
+
+```bash
+wc -l dist/soak/<ts>/memory-alerts.log
+cat  dist/soak/<ts>/memory-alerts.log
+```
+
+A healthy run produces an empty file (0 lines). Any non-zero line count is a
+failure; see the **Failure Modes** section.
+
+`summary.json` also records `"memory_alert_lines"` for CI consumption.
+
+### Criterion 4 — No goroutine leak (heap delta ≤5%)
+
+Compare the goroutine counts in the start and end profiles:
+
+```bash
+# Count goroutines in start profile
+grep -c '^goroutine ' dist/soak/<ts>/pprof-start/goroutine-*.txt
+
+# Count goroutines in end profile
+grep -c '^goroutine ' dist/soak/<ts>/pprof-end/goroutine-*.txt
+```
+
+Calculate ratio: `end / start`. A ratio ≤1.05 passes.
+
+`summary.json` captures `goroutine_count_start` and `goroutine_count_end`
+for automated evaluation.
+
+For a heap size delta, use the `go tool pprof` binary:
+
+```bash
+go tool pprof -top dist/soak/<ts>/pprof-start/heap-*.pb.gz
+go tool pprof -top dist/soak/<ts>/pprof-end/heap-*.pb.gz
+```
+
+The in-use bytes between start and end must not grow unboundedly.
+
+## Failure Modes
+
+### MemoryTracker budget exhaustion
+
+**Symptom**: `memory-alerts.log` is non-empty; BanyanDB may slow down or log
+errors.
+
+**Action**:
+1. Check `banyand.log` for context around the alert timestamp.
+2. Reduce query concurrency in `catalog/default.json` (fewer simultaneous
+   queries) or lower field projections.
+3. If the budget is misconfigured, adjust the BanyanDB 
`--measure-memory-budget`
+   flag in the compose service command.
+
+### Goroutine leak
+
+**Symptom**: `goroutine_count_end / goroutine_count_start > 1.05`.
+
+**Action**:
+1. Diff the goroutine profiles:
+   ```bash
+   diff dist/soak/<ts>/pprof-start/goroutine-*.txt \
+        dist/soak/<ts>/pprof-end/goroutine-*.txt | head -80
+   ```
+2. Identify which goroutine stacks appear only in the end profile.
+3. File a bug against the vectorized pipeline with the diffed profiles 
attached.
+
+### Parity divergence
+
+**Symptom**: Any `diff-*.json` has `"pass": false`; `summary.json`
+`"final_parity_pass"` is `false`.
+
+**Action**:
+1. Open the failing diff file:
+   ```bash
+   jq '.divergences' dist/soak/<ts>/diff-<ts>.json
+   ```
+2. The `first_diffs` array shows the first 3 mismatched data points (index,
+   baseline string, replay string).
+3. Reproduce the query manually:
+   ```bash
+   bin/soak-driver replay-and-diff \
+     --addr localhost:17912 \
+     --catalog cmd/soak-driver/catalog/default.json \
+     --baseline dist/soak/<ts>/baseline.json \
+     --report /tmp/debug-diff.json
+   ```
+4. Check whether the divergence is in field values, tag values, or shard IDs.
+5. File a bug against the vectorized query path with the diff JSON attached.
+
+### OOMKill
+
+**Symptom**: A container disappears; `docker compose ps` shows it as `exited`.
+
+**Action**:
+1. `docker inspect <container> | jq '.[].State.OOMKilled'` — if `true`, the
+   container exceeded its memory limit.
+2. Check `docker stats` history for which container breached its limit.
+3. The per-container limits in `test/soak/docker-compose.soak.yaml` must be
+   respected. Do **not** increase them without re-evaluating the host budget.
+4. If BanyanDB OOM-kills, reduce the vectorized batch size or the number of
+   concurrent queries in the catalog.
+
+## Rollback
+
+To run Phase 1 with the flag disabled (reproducing the baseline behaviour):
+
+```bash
+BANYANDB_VEC_ENABLED=false \
+SOAK_DATA_DIR=./test/soak/data \
+docker compose -f test/soak/docker-compose.soak.yaml up -d
+```
+
+This is also the normal state after any failed Phase 1: tear down the stack
+with `docker compose -f test/soak/docker-compose.soak.yaml down -v` and
+restart with `BANYANDB_VEC_ENABLED=false`.
diff --git a/scripts/soak-vectorized.sh b/scripts/soak-vectorized.sh
new file mode 100755
index 000000000..b47048dfd
--- /dev/null
+++ b/scripts/soak-vectorized.sh
@@ -0,0 +1,329 @@
+#!/usr/bin/env bash
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# G5d soak harness orchestrator.
+#
+# Configuration (set as env vars before running):
+#   WARMUP_MIN          – minutes OAP has to write data before baseline 
snapshot (default 60)
+#   SOAK_HOURS          – duration of the vec-on Phase 1 run in hours (default 
48)
+#   PPROF_INTERVAL_MIN  – minutes between each pprof capture (default 30)
+#   PARITY_INTERVAL_MIN – minutes between each replay-and-diff run (default 5)
+#   SMOKE               – set to 1 for a quick ~30-min smoke run (overrides 
durations)
+#
+# Artefacts are written under dist/soak/<timestamp>/ relative to the repo root.
+#
+# Usage:
+#   ./scripts/soak-vectorized.sh
+#   SMOKE=1 ./scripts/soak-vectorized.sh
+
+set -euo pipefail
+
+# ── configuration ────────────────────────────────────────────────────────────
+WARMUP_MIN="${WARMUP_MIN:-60}"
+SOAK_HOURS="${SOAK_HOURS:-48}"
+PPROF_INTERVAL_MIN="${PPROF_INTERVAL_MIN:-30}"
+PARITY_INTERVAL_MIN="${PARITY_INTERVAL_MIN:-5}"
+
+if [[ "${SMOKE:-}" == "1" ]]; then
+  # SMOKE skips the OAP-warmup wait — parity is driven by deterministic
+  # data seeded by `soak-driver seed-fixture`, so we don't depend on OAP
+  # propagation timing.
+  WARMUP_MIN=0
+  SOAK_HOURS=0.34   # ~20 min — fits inside a tractable smoke window
+  PPROF_INTERVAL_MIN=1
+  PARITY_INTERVAL_MIN=1
+fi
+
+SEED_ROWS="${SEED_ROWS:-1000}"
+
+SOAK_HOURS_SEC=$(awk "BEGIN{printf \"%d\", ${SOAK_HOURS}*3600}")
+WARMUP_SEC=$(( WARMUP_MIN * 60 ))
+PPROF_INTERVAL_SEC=$(( PPROF_INTERVAL_MIN * 60 ))
+PARITY_INTERVAL_SEC=$(( PARITY_INTERVAL_MIN * 60 ))
+
+REPO_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
+COMPOSE_FILE="${REPO_ROOT}/test/soak/docker-compose.soak.yaml"
+CATALOG="${REPO_ROOT}/cmd/soak-driver/catalog/default.json"
+BANYANDB_GRPC="localhost:17912"
+BANYANDB_PPROF="localhost:6060"
+
+# Pass host UID/GID to compose so the BanyanDB container writes the
+# bind-mounted /data dir as the host user (otherwise root-owned files
+# break snapshot/restore from the host shell).
+export SOAK_UID="$(id -u)"
+export SOAK_GID="$(id -g)"
+
+RUN_TS="$(date +%Y%m%dT%H%M%S)"
+DIST="${REPO_ROOT}/dist/soak/${RUN_TS}"
+DATA_DIR="${REPO_ROOT}/test/soak/data"
+SNAPSHOT_DIR="${DIST}/data-snapshot"
+
+# ── helpers ──────────────────────────────────────────────────────────────────
+log() { echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] $*"; }
+
+compose_cmd() {
+  docker compose -f "${COMPOSE_FILE}" "$@"
+}
+
+wait_banyandb_healthy() {
+  log "Waiting for BanyanDB to become healthy..."
+  local attempts=0
+  until curl -sf "http://localhost:17913/api/healthz"; >/dev/null 2>&1; do
+    attempts=$(( attempts + 1 ))
+    if (( attempts > 120 )); then
+      log "ERROR: BanyanDB did not become healthy after 120 attempts"
+      return 1
+    fi
+    sleep 5
+  done
+  log "BanyanDB is healthy."
+}
+
+soak_driver() {
+  "${REPO_ROOT}/bin/soak-driver" "$@"
+}
+
+# ── cleanup trap ─────────────────────────────────────────────────────────────
+cleanup() {
+  log "Caught signal — tearing down compose stack..."
+  compose_cmd down -v --remove-orphans 2>/dev/null || true
+  log "Cleanup complete."
+}
+trap cleanup INT TERM EXIT
+
+# ── build soak-driver ────────────────────────────────────────────────────────
+log "Building soak-driver..."
+mkdir -p "${REPO_ROOT}/bin"
+(cd "${REPO_ROOT}" && go build -o bin/soak-driver ./cmd/soak-driver)
+log "soak-driver built at bin/soak-driver"
+
+# ── prepare output dirs 
───────────────────────────────────────────────────────
+mkdir -p "${DIST}" "${SNAPSHOT_DIR}" "${DATA_DIR}"
+
+# Tee everything from this point into the run log so silent failures leave
+# evidence behind. Earlier output (build) is already in stdout.
+exec > >(tee -a "${DIST}/run.log") 2>&1
+
+log "Run artefacts will be written to: ${DIST}"
+log "Config: WARMUP_MIN=${WARMUP_MIN} SOAK_HOURS=${SOAK_HOURS} 
PPROF_INTERVAL_MIN=${PPROF_INTERVAL_MIN} 
PARITY_INTERVAL_MIN=${PARITY_INTERVAL_MIN}"
+
+# ╔══════════════════════════════════════════════════════════════════════════╗
+# ║  PHASE 0 — Baseline (vec-off)                                          ║
+# ╚══════════════════════════════════════════════════════════════════════════╝
+log "=== PHASE 0: Baseline (BANYANDB_VEC_ENABLED=false) ==="
+
+SOAK_DATA_DIR="${DATA_DIR}" BANYANDB_VEC_ENABLED=false compose_cmd up -d
+wait_banyandb_healthy
+
+log "Waiting for OAP to become healthy (schema install + agent chain)..."
+oap_attempts=0
+until docker compose -f "${COMPOSE_FILE}" ps oap --format '{{.Status}}' 
2>/dev/null | grep -q '(healthy)'; do
+  oap_attempts=$(( oap_attempts + 1 ))
+  if (( oap_attempts > 60 )); then
+    log "ERROR: OAP did not become healthy after 5 min — abort"
+    exit 1
+  fi
+  sleep 5
+done
+log "OAP healthy."
+
+if (( WARMUP_SEC > 0 )); then
+  log "Warming up for ${WARMUP_MIN} minutes to let OAP populate data..."
+  sleep "${WARMUP_SEC}"
+fi
+
+log "Seeding deterministic fixture (${SEED_ROWS} rows into 
soak/soak_metric)..."
+T1_MS=$(soak_driver seed-fixture --addr "${BANYANDB_GRPC}" --rows 
"${SEED_ROWS}" | tail -1)
+if [[ -z "${T1_MS}" ]] || ! [[ "${T1_MS}" =~ ^[0-9]+$ ]]; then
+  log "ERROR: seed-fixture did not return a valid T1 timestamp"
+  exit 1
+fi
+log "T1 snapshot timestamp: ${T1_MS} ms"
+
+# seed-fixture polls until the rows are visible to query, so by the
+# time it returns the measure data is queryable. Schema-property has a
+# 5s flush timeout — wait once more before snapshotting so the schema
+# segs land on disk for Phase 1.
+log "Waiting 8s for schema-server flush to persist..."
+sleep 8
+
+log "Recording baseline..."
+soak_driver record-baseline \
+  --addr "${BANYANDB_GRPC}" \
+  --catalog "${CATALOG}" \
+  --until "${T1_MS}" \
+  --out "${DIST}/baseline.json"
+
+# Verify the baseline has data points. The baseline JSON's data_points
+# field is an array of protojson-encoded DataPoint messages; an empty
+# slice means writes weren't visible and parity is meaningless.
+baseline_dp=$(python3 -c "import json; 
d=json.load(open('${DIST}/baseline.json')); print(sum(len(r.get('data_points') 
or []) for r in d))" 2>/dev/null || echo 0)
+log "Baseline data points captured: ${baseline_dp}"
+if [[ "${baseline_dp}" == "0" ]]; then
+  log "ERROR: baseline contains zero data points despite seed of ${SEED_ROWS} 
rows"
+  exit 1
+fi
+
+log "Stopping BanyanDB to snapshot data..."
+compose_cmd stop banyandb
+
+log "Copying data to ${SNAPSHOT_DIR}..."
+cp -a "${DATA_DIR}/." "${SNAPSHOT_DIR}/"
+
+snap_size=$(du -sb "${SNAPSHOT_DIR}" 2>/dev/null | awk '{print $1}')
+log "Snapshot size: ${snap_size:-0} bytes"
+
+log "Tearing down Phase 0 stack..."
+# Disable trap during intentional down so we don't double-down.
+trap - EXIT
+compose_cmd down -v --remove-orphans
+trap cleanup INT TERM EXIT
+
+# ╔══════════════════════════════════════════════════════════════════════════╗
+# ║  PHASE 1 — Soak (vec-on)                                               ║
+# ╚══════════════════════════════════════════════════════════════════════════╝
+log "=== PHASE 1: Soak (BANYANDB_VEC_ENABLED=true, duration=${SOAK_HOURS}h) 
==="
+
+log "Restoring data snapshot..."
+rm -rf "${DATA_DIR:?}"/*
+cp -a "${SNAPSHOT_DIR}/." "${DATA_DIR}/"
+
+SOAK_DATA_DIR="${DATA_DIR}" BANYANDB_VEC_ENABLED=true compose_cmd up -d
+wait_banyandb_healthy
+
+# Initial pprof grab.
+mkdir -p "${DIST}/pprof-start"
+soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir 
"${DIST}/pprof-start"
+log "Initial pprof captured."
+
+# Tail BanyanDB logs into persistent log files in the background.
+compose_cmd logs -f banyandb 2>&1 >> "${DIST}/banyand.log" &
+LOGS_PID=$!
+
+# Grep for MemoryTracker budget exhaustion in the background.
+(
+  tail -f "${DIST}/banyand.log" 2>/dev/null | \
+    grep --line-buffered -i "MemoryTracker\|budget exhausted\|memory budget" \
+    >> "${DIST}/memory-alerts.log" || true
+) &
+GREP_PID=$!
+
+# Background pprof + parity loops.
+SOAK_END=$(( $(date +%s) + SOAK_HOURS_SEC ))
+
+(
+  while (( $(date +%s) < SOAK_END )); do
+    sleep "${PPROF_INTERVAL_SEC}"
+    (( $(date +%s) >= SOAK_END )) && break
+    INTERVAL_TS="$(date +%Y%m%dT%H%M%S)"
+    PPROF_DIR="${DIST}/pprof-${INTERVAL_TS}"
+    mkdir -p "${PPROF_DIR}"
+    soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir "${PPROF_DIR}" 
|| \
+      log "WARN: pprof-grab failed at ${INTERVAL_TS}"
+    log "pprof captured: ${PPROF_DIR}"
+  done
+) &
+PPROF_LOOP_PID=$!
+
+(
+  while (( $(date +%s) < SOAK_END )); do
+    sleep "${PARITY_INTERVAL_SEC}"
+    (( $(date +%s) >= SOAK_END )) && break
+    DIFF_TS="$(date +%Y%m%dT%H%M%S)"
+    DIFF_REPORT="${DIST}/diff-${DIFF_TS}.json"
+    soak_driver replay-and-diff \
+      --addr "${BANYANDB_GRPC}" \
+      --catalog "${CATALOG}" \
+      --baseline "${DIST}/baseline.json" \
+      --report "${DIFF_REPORT}" || \
+      log "WARN: parity divergence detected — see ${DIFF_REPORT}"
+    log "parity check done: ${DIFF_REPORT}"
+  done
+) &
+PARITY_LOOP_PID=$!
+
+log "Soak running for ${SOAK_HOURS} hours. Loops started (pids: 
pprof=${PPROF_LOOP_PID} parity=${PARITY_LOOP_PID})."
+
+# Wait for soak duration.
+REMAINING=$(( SOAK_END - $(date +%s) ))
+if (( REMAINING > 0 )); then
+  sleep "${REMAINING}"
+fi
+
+log "Soak window complete. Collecting final artefacts..."
+
+# Stop background loops gracefully.
+kill "${PPROF_LOOP_PID}" "${PARITY_LOOP_PID}" 2>/dev/null || true
+wait "${PPROF_LOOP_PID}" "${PARITY_LOOP_PID}" 2>/dev/null || true
+
+# Final pprof.
+mkdir -p "${DIST}/pprof-end"
+soak_driver pprof-grab --addr "${BANYANDB_PPROF}" --out-dir "${DIST}/pprof-end"
+log "Final pprof captured."
+
+# Final parity check.
+FINAL_DIFF="${DIST}/diff-final.json"
+soak_driver replay-and-diff \
+  --addr "${BANYANDB_GRPC}" \
+  --catalog "${CATALOG}" \
+  --baseline "${DIST}/baseline.json" \
+  --report "${FINAL_DIFF}" && FINAL_PASS=true || FINAL_PASS=false
+log "Final parity check: pass=${FINAL_PASS}"
+
+# Stop log tailing.
+kill "${LOGS_PID}" "${GREP_PID}" 2>/dev/null || true
+
+# Write summary manifest.
+# Goroutine count is read from the "goroutine profile: total N" header
+# line that /debug/pprof/goroutine?debug=1 writes. Cheaper and more
+# robust than counting per-goroutine entries.
+extract_goroutine_total() {
+  local dir="$1"
+  local f
+  f=$(ls "${dir}"/goroutine-*.txt 2>/dev/null | head -1)
+  if [[ -z "${f}" ]]; then
+    echo 0
+    return
+  fi
+  awk '/^goroutine profile: total/ {print $4; exit}' "${f}" 2>/dev/null || 
echo 0
+}
+GOROUTINE_START=$(extract_goroutine_total "${DIST}/pprof-start")
+GOROUTINE_END=$(extract_goroutine_total "${DIST}/pprof-end")
+MEMORY_ALERTS=$(wc -l < "${DIST}/memory-alerts.log" 2>/dev/null || echo 0)
+
+cat > "${DIST}/summary.json" <<EOF
+{
+  "run_ts": "${RUN_TS}",
+  "smoke": "${SMOKE:-false}",
+  "warmup_min": ${WARMUP_MIN},
+  "soak_hours": ${SOAK_HOURS},
+  "t1_ms": ${T1_MS},
+  "final_parity_pass": ${FINAL_PASS},
+  "goroutine_count_start": ${GOROUTINE_START},
+  "goroutine_count_end": ${GOROUTINE_END},
+  "memory_alert_lines": ${MEMORY_ALERTS},
+  "artefacts_dir": "${DIST}"
+}
+EOF
+
+log "Summary written to ${DIST}/summary.json"
+log "=== Soak complete. Artefacts: ${DIST} ==="
+
+# Intentional final teardown — disable trap first.
+trap - EXIT INT TERM
+compose_cmd down -v --remove-orphans
diff --git a/test/soak/Dockerfile.banyand b/test/soak/Dockerfile.banyand
new file mode 100644
index 000000000..5d8cc3939
--- /dev/null
+++ b/test/soak/Dockerfile.banyand
@@ -0,0 +1,52 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Multi-stage build: compile banyand from the local source tree.
+# This ensures the soak runs the vectorized-query branch, not a hub image.
+
+FROM golang:1.25-alpine AS builder
+
+RUN apk add --no-cache git make gcc musl-dev
+
+WORKDIR /src
+COPY go.mod go.sum ./
+RUN go mod download
+
+COPY . .
+
+RUN CGO_ENABLED=0 GOOS=linux go build \
+    -ldflags="-s -w" \
+    -o /out/banyand \
+    ./banyand/cmd/server
+
+FROM alpine:edge AS certs
+RUN apk add --no-cache ca-certificates && update-ca-certificates
+
+FROM busybox:stable-glibc AS final
+
+COPY --from=builder /out/banyand /banyand
+COPY --from=certs /etc/ssl/certs /etc/ssl/certs
+
+ENV GRPC_GO_LOG_SEVERITY_LEVEL=ERROR
+ENV GRPC_GO_LOG_FORMATTER=json
+
+EXPOSE 17912
+EXPOSE 17913
+EXPOSE 6060
+EXPOSE 2121
+
+ENTRYPOINT ["/banyand"]
diff --git a/test/soak/docker-compose.soak.yaml 
b/test/soak/docker-compose.soak.yaml
new file mode 100644
index 000000000..cb2f6c431
--- /dev/null
+++ b/test/soak/docker-compose.soak.yaml
@@ -0,0 +1,235 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Soak-test overlay for the G5d vectorized-query gate.
+#
+# Usage (via scripts/soak-vectorized.sh — do not invoke directly):
+#   BANYANDB_VEC_ENABLED=false docker compose -f 
test/soak/docker-compose.soak.yaml up -d
+#   BANYANDB_VEC_ENABLED=true  docker compose -f 
test/soak/docker-compose.soak.yaml up -d
+#
+# Key differences from the quick-start compose:
+#  - BanyanDB is built from local source (multi-stage Go build) instead of 
pulling a hub image.
+#  - The --measure-vectorized-enabled flag is controlled by 
BANYANDB_VEC_ENABLED (default: false).
+#  - Every container has explicit memory and CPU limits so a 31 GB no-swap 
host cannot OOM-kill silently.
+#  - SOAK_DATA_DIR (default ./data) is bind-mounted so snapshots can be copied 
between phases.
+
+services:
+  banyandb:
+    build:
+      context: ../..
+      dockerfile: test/soak/Dockerfile.banyand
+    container_name: banyandb
+    # Run as host UID/GID so the bind-mounted /data dir stays readable
+    # from the host shell — needed for snapshot/restore between phases.
+    # Override via SOAK_UID/SOAK_GID env if your host differs.
+    user: "${SOAK_UID:-1008}:${SOAK_GID:-1009}"
+    command:
+      - standalone
+      - --measure-vectorized-enabled=${BANYANDB_VEC_ENABLED:-false}
+      - --measure-root-path=/data
+      - --stream-root-path=/data
+      - --property-root-path=/data
+      - --trace-root-path=/data
+      - --schema-server-root-path=/data
+    ports:
+      - "17912:17912"   # gRPC — soak-driver record-baseline / replay-and-diff
+      - "17913:17913"   # HTTP/healthz
+      - "6060:6060"     # pprof
+      - "2121:2121"     # observability/metrics
+    volumes:
+      - type: bind
+        source: ${SOAK_DATA_DIR:-./data}
+        target: /data
+    networks:
+      - demo
+    healthcheck:
+      test: ["CMD", "sh", "-c", "wget -qO- http://127.0.0.1:17913/api/healthz 
>/dev/null 2>&1"]
+      interval: 5s
+      timeout: 120s
+      retries: 120
+    deploy:
+      resources:
+        limits:
+          memory: 2G
+          cpus: "2"
+
+  oap:
+    image: 
${OAP_IMAGE:-ghcr.io/apache/skywalking/oap:b4a8811df9f48c66edb61c7b72d0293e3ecfa7cf}
+    container_name: skywalking-oap
+    environment:
+      SW_STORAGE: banyandb
+      SW_STORAGE_BANYANDB_TARGETS: banyandb:17912
+      SW_OTEL_RECEIVER: default
+      SW_OTEL_RECEIVER_ENABLED_OC_RULES: vm
+      SW_OTEL_RECEIVER_ENABLED_OTEL_METRICS_RULES: vm
+      SW_TELEMETRY: prometheus
+      SW_CORE_RECORD_DATA_TTL: 7
+      SW_CORE_METRICS_DATA_TTL: 7
+      JAVA_OPTS: -Xmx1500m
+    expose:
+      - 11800
+      - 12800
+      - 9090
+    ports:
+      - "12800:12800"
+      - "11800:11800"
+    networks:
+      - demo
+    depends_on:
+      banyandb:
+        condition: service_healthy
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/12800"]
+      interval: 10s
+      timeout: 60s
+      retries: 30
+    restart: unless-stopped
+    deploy:
+      resources:
+        limits:
+          memory: 2G
+          cpus: "2"
+
+  oap-ui:
+    image: 
${OAP_UI_IMAGE:-ghcr.io/apache/skywalking/ui:b4a8811df9f48c66edb61c7b72d0293e3ecfa7cf}
+    container_name: skywalking-ui
+    ports:
+      - "8080:8080"
+    environment:
+      - SW_OAP_ADDRESS=http://oap:12800
+    networks:
+      - demo
+    depends_on:
+      oap:
+        condition: service_healthy
+    restart: unless-stopped
+    deploy:
+      resources:
+        limits:
+          memory: 256M
+          cpus: "0.5"
+
+  agent:
+    image: 
${AGENT_IMAGE:-ghcr.io/apache/skywalking-java/skywalking-java:f750006020249d29a21cae63ae50da67f182c6ab-java8}
+    command: cp -r /skywalking/agent/ /skywalking-java-agent/
+    volumes:
+      - sw_agent:/skywalking-java-agent
+    networks:
+      - demo
+    deploy:
+      resources:
+        limits:
+          memory: 64M
+          cpus: "0.5"
+
+  provider:
+    image: 
"ghcr.io/apache/skywalking/e2e-service-provider:a2a67ca63084cddf82303155c185e3c24cf07eef"
+    volumes:
+      - sw_agent:/sw-java-agent
+    environment:
+      JAVA_TOOL_OPTIONS: -javaagent:/sw-java-agent/agent/skywalking-agent.jar 
-Xmx384m
+      SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap:11800
+      SW_LOGGING_OUTPUT: CONSOLE
+      SW_AGENT_NAME: service-provider
+      SW_AGENT_INSTANCE_NAME: provider1
+      SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL: 1
+      SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL: 1
+    ports:
+      - "9090"
+    networks:
+      - demo
+    depends_on:
+      oap:
+        condition: service_healthy
+      agent:
+        condition: service_completed_successfully
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9090"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    deploy:
+      resources:
+        limits:
+          memory: 1G
+          cpus: "1"
+
+  consumer:
+    image: 
"ghcr.io/apache/skywalking/e2e-service-consumer:a2a67ca63084cddf82303155c185e3c24cf07eef"
+    volumes:
+      - sw_agent:/sw-java-agent
+    environment:
+      JAVA_TOOL_OPTIONS: -javaagent:/sw-java-agent/agent/skywalking-agent.jar 
-Xmx384m
+      SW_AGENT_COLLECTOR_BACKEND_SERVICES: oap:11800
+      SW_LOGGING_OUTPUT: CONSOLE
+      PROVIDER_URL: http://provider:9090
+      SW_AGENT_NAME: service-consumer
+      SW_AGENT_INSTANCE_NAME: consumer1
+      SW_AGENT_COLLECTOR_GET_PROFILE_TASK_INTERVAL: 1
+      SW_AGENT_COLLECTOR_GET_AGENT_DYNAMIC_CONFIG_INTERVAL: 1
+    ports:
+      - "9092"
+    networks:
+      - demo
+    depends_on:
+      oap:
+        condition: service_healthy
+      provider:
+        condition: service_healthy
+      agent:
+        condition: service_completed_successfully
+    healthcheck:
+      test: ["CMD", "bash", "-c", "cat < /dev/null > /dev/tcp/127.0.0.1/9092"]
+      interval: 5s
+      timeout: 60s
+      retries: 120
+    deploy:
+      resources:
+        limits:
+          memory: 1G
+          cpus: "1"
+
+  traffic_loader:
+    image: curlimages/curl
+    networks:
+      - demo
+    depends_on:
+      oap:
+        condition: service_healthy
+      provider:
+        condition: service_healthy
+      consumer:
+        condition: service_healthy
+    entrypoint: >
+      sh -c "
+      echo 'Starting traffic generation...';
+      while true; do
+        curl -sf -X POST http://consumer:9092/info || true;
+        sleep 1;
+      done
+      "
+    deploy:
+      resources:
+        limits:
+          memory: 64M
+          cpus: "0.5"
+
+networks:
+  demo:
+
+volumes:
+  sw_agent: {}

Reply via email to