This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 0b163e6d9420bc4d9ba0860b898ee84164a117ad Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 15 15:17:00 2026 +0000 test(query/vectorized/measure): G9e.1 real plan-path bench harness Replace the deprecated NewMIterator leaf-substitution bench (scan/decode only; G7e operator wiring was reverted) with paired row/vec benchmarks that drive the production operator pipeline (BatchScan -> BatchTop / BuildOperators -> BatchLimit -> NewIteratorFromPipeline) against a faithful row-path baseline replicated via pkg/query/aggregation. Workloads W1-W8: scan (W1/W2), Top-N (W3), scalar reduce (W4), raw GroupBy (W5), GroupBy+Agg (W6), hidden-tags egress strip (W7), COUNT-on-float (W8). Gates opt-in via RUN_BENCH_GATES=1 (not in test-ci). Aggregation shapes show vec 4-8x faster than row; W3 Top-N currently exceeds the ns gate (vec ~1.3x) because BatchTop.materialize deep-copies every candidate row before the bounded-heap check -- a genuine operator regression tracked for the next fix, not absorbed by loosening the gate. --- pkg/query/vectorized/measure/bench_gates_test.go | 85 ++-- pkg/query/vectorized/measure/bench_test.go | 598 ++++++++++++++++++++--- 2 files changed, 565 insertions(+), 118 deletions(-) diff --git a/pkg/query/vectorized/measure/bench_gates_test.go b/pkg/query/vectorized/measure/bench_gates_test.go index 11e24a0ff..0e0bed5f1 100644 --- a/pkg/query/vectorized/measure/bench_gates_test.go +++ b/pkg/query/vectorized/measure/bench_gates_test.go @@ -22,19 +22,34 @@ import ( "testing" ) -// G5a acceptance gates per spec §"Performance Evaluation Plan". Ratios are -// vectorized / row; failing the gate is a regression that blocks the -// default-flip rollout. +// G9e acceptance gates. Ratios are vectorized / row; failing the gate is a +// regression that blocks the default-flip rollout. The vec half here is the +// production compute pipeline (source → [BatchAggregation | BatchTop] → +// BatchLimit drained through the public adapter), not the deprecated +// NewMIterator leaf-substitution path. // -// The alloc gate is set to 1.005 (0.5% tolerance) rather than the spec's -// literal 1.00. Reason: each runVectorizedPath call constructs a fresh -// BatchSchema, BatchPool, BatchScan, Pipeline, and MIterator wrapper — -// roughly 20 fixture allocations per query. The row path's resultMIterator -// is a struct literal with effectively zero fixture cost. Spread over -// W1's 10K rows that's a 0.05% per-iteration delta; over W3/W4's 100K rows, -// 0.014%. The spec author's "architectural benefit must materialize" -// intent is satisfied at 1.005 — a real per-row alloc regression would -// blow far past 0.5%, while fixture noise stays under it. +// Tolerance rationale: +// +// - Scan shapes (W1, W2, W7): ns ≤ 1.05, allocs ≤ 1.005, bytes ≤ 1.20. +// The alloc gate is 1.005 (0.5%) rather than a literal 1.00 because each +// runVectorizedPath call constructs a fresh BatchSchema, BatchPool, +// BatchScan, Pipeline, and adapter — roughly 20 fixture allocations per +// query. The row path's serializer is a struct literal with effectively +// zero fixture cost. Spread over W1's 10K rows that is a 0.05% +// per-iteration delta; over 100K-row workloads, 0.014%. The "per-cell +// wrapper bias" rationale from G5a still applies: buildResults allocates +// a fresh TagValue/FieldValue per cell so both paths pay the same +// storage-decode cost; a real per-row alloc regression blows far past +// 0.5% while fixture noise stays under it. bytes ≤ 1.20 absorbs the +// RecordBatch column backing the row path never allocates. +// +// - Agg / Top / GroupBy shapes (W3, W4, W5, W6, W8): vec should win or +// tie because aggregation/top runs on typed columns and the output row +// count is bounded by group/heap cardinality, amortizing wrapper +// reconstruction at egress. ns ≤ 1.05, allocs ≤ 1.05, bytes ≤ 1.20 +// initially — these can tighten once a baseline is recorded (the vec +// advantage should let ns/allocs drop well below 1.0 for high-input, +// low-output shapes). type benchGate struct { id string maxNsRatio float64 // ns/op ≤ row × maxNsRatio @@ -42,34 +57,30 @@ type benchGate struct { maxBytesRatio float64 // B/op ≤ row × maxBytesRatio } -// W3's spec gate is `vec ≤ row × 1.00` — tighter than the others — because -// W3 is "GroupBy + SUM/COUNT" and columnar should win outright once -// aggregation runs on the columns. With G4's wiring, operators are not yet -// wired into NewMIterator's pipeline, so W3 here measures the same scan + -// serialize cost as W2 — the strict gate is shape-mismatched. Relaxed to -// 1.05 to match the other scan-shape gates; tighten back to 1.00 once -// BatchAggregation/BatchGroupBy execute end-to-end (post-G6b). var benchGates = map[string]benchGate{ - "W1": {id: "W1", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W2": {id: "W2", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W3": {id: "W3", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W4": {id: "W4", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W5": {id: "W5", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W2-MB": {id: "W2-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W4-MB": {id: "W4-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W5-MB": {id: "W5-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + // Scan-shaped (regression continuity + hidden-tags egress strip). + "W1": {id: "W1", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W2": {id: "W2", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W7": {id: "W7", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + // Compute-shaped (Top / scalar reduce / raw GroupBy / GroupBy+Agg / + // COUNT-on-float). vec should win or tie; tighten once baselined. + "W3": {id: "W3", maxNsRatio: 1.05, maxAllocRatio: 1.05, maxBytesRatio: 1.20}, + "W4": {id: "W4", maxNsRatio: 1.05, maxAllocRatio: 1.05, maxBytesRatio: 1.20}, + "W5": {id: "W5", maxNsRatio: 1.05, maxAllocRatio: 1.05, maxBytesRatio: 1.20}, + "W6": {id: "W6", maxNsRatio: 1.05, maxAllocRatio: 1.05, maxBytesRatio: 1.20}, + "W8": {id: "W8", maxNsRatio: 1.05, maxAllocRatio: 1.05, maxBytesRatio: 1.20}, } -// TestBenchGates_PerWorkload runs both serialization paths inside testing.B -// harnesses and asserts the spec's vec/row ratios. A regression fails this -// test, not just the markdown report — gates are enforced as code. +// TestBenchGates_PerWorkload runs both paths inside testing.B harnesses and +// asserts the vec/row ratios. A regression fails this test, not just the +// markdown report — gates are enforced as code. // -// Skipped unless RUN_BENCH_GATES=1 is set (or short mode is off and the host -// is not under load): this test takes ~10–20s of wall time per workload and -// is gated on a CI-tunable knob to keep `go test ./...` fast. +// Skipped unless RUN_BENCH_GATES=1 is set: this test takes ~10–20s of wall +// time per workload and is gated on a CI-tunable knob to keep +// `go test ./...` fast. func TestBenchGates_PerWorkload(t *testing.T) { if os.Getenv("RUN_BENCH_GATES") != "1" { - t.Skip("set RUN_BENCH_GATES=1 to run G5a bench gates") + t.Skip("set RUN_BENCH_GATES=1 to run G9e bench gates") } if testing.Short() { t.Skip("skipping bench gates in -short mode") @@ -120,15 +131,15 @@ func timeWorkload(spec workloadSpec, vectorized bool) testing.BenchmarkResult { results := buildResults(spec) schema := buildSchema(spec) opts := buildOpts(spec) - cfg := VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 64} + cfg := cfgFor() return testing.Benchmark(func(b *testing.B) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { if vectorized { - runVectorizedPath(results, schema, opts, cfg) + runVectorizedPath(spec, results, schema, opts, cfg) } else { - runRowPath(results, opts) + runRowPath(spec, results, opts) } } }) diff --git a/pkg/query/vectorized/measure/bench_test.go b/pkg/query/vectorized/measure/bench_test.go index b655d547e..a2886eee6 100644 --- a/pkg/query/vectorized/measure/bench_test.go +++ b/pkg/query/vectorized/measure/bench_test.go @@ -18,44 +18,99 @@ package measure import ( + "container/heap" "context" "testing" "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/aggregation" + "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" ) -// Microbenchmarks (G5a) — paired row-path vs vectorized-path benchmarks for -// W1..W5 per the spec's Performance Evaluation Plan. Both paths consume the -// same fake MeasureQueryResult; only the serialization implementation -// differs. ns/op, B/op, allocs/op are reported via testing.B. +// Microbenchmarks (G9e) — paired row-path vs vectorized-path benchmarks for +// the G9 query shapes. The vec half is the *production* compute path: a +// vectorized.Pipeline composed of the same measure-level operators +// plan.Execute wires (source → [BatchAggregation | BatchTop] → BatchLimit → +// NewIteratorFromPipeline), drained through the public adapter. The row half +// is a faithful replica of pkg/query/logical/measure's reduce/top — it cannot +// be imported (cycle: logical/measure → this package), so it is replicated +// here exactly as diff_test.go documents for rowSerialize. +// +// The earlier harness benchmarked NewMIterator (the deprecated +// leaf-substitution path) which is scan/decode only and never runs +// Top/Agg/GroupBy — that did not measure the production vec compute the +// parity gate validates. This file replaces it. // // Workload scales are bounded for unit-bench tractability — the integration // macro suite at test/integration/standalone/benchmark/ exercises full-scale // shapes against the real Measure module. Acceptance gates are ratios -// (vec/row), so the relative comparison holds at any scale; absolute -// throughput is not the gate. +// (vec/row), so the relative comparison holds at any scale. // // Run via: // // go test ./pkg/query/vectorized/measure -bench=. -benchmem -count=5 -benchtime=2s -// workloadSpec parameterizes a benchmark workload. +// Repeated literals (goconst min-occurrences 4). +const ( + benchSvc = "svc" + benchValue = "value" + benchDefault = "default" + benchEnvID = "env_id" + benchVInt = "v_int" + benchVFloat = "v_float" + benchCriteria = "criteria" + defaultBatch = 1024 + defaultMemMiB = 64 + defaultTopN = 50 + defaultLimitN = 100 + groupKeyCardin = 16 +) + +// queryShape selects which operator pipeline a workload exercises. Each +// shape names one production query class the vec subsystem handles via +// plan.Dispatch (scan/top/scalarReduce/rawGroupBy/groupByAgg/hiddenTags/ +// countFloat); the row half reduces the same input with the row path's +// algorithm so the gate ratio reflects the real compute trade-off. +type queryShape int + +// queryShape values. +const ( + shapeScan queryShape = iota + shapeTop + shapeScalarReduce + shapeRawGroupBy + shapeGroupByAgg + shapeHiddenTags + shapeCountFloat +) + +// workloadSpec parameterizes a benchmark workload and the query shape applied +// on top of its scan output. // // chunksPerSeries > 1 splits each series's rowsPer rows into that many // *model.MeasureResult instances, simulating the multi-block storage path -// where queryResult.merge produces multiple Pull() results per series. The -// vec adapter must accumulate rows across PullBatch calls; the row adapter -// just sees more Pull invocations. Default 1 = single-block per series. +// where queryResult.merge produces multiple Pull() results per series. +// Default 1 = single-block per series. type workloadSpec struct { + groupBy *model.MeasureGroupBy + agg *model.MeasureAgg id string + topField string + criteriaTag string tagFamilies []tagSpec fields []fieldSpec series int rowsPer int chunksPerSeries int + topN int + limitN int + shape queryShape + topAsc bool } type tagSpec struct { @@ -69,76 +124,101 @@ type fieldSpec struct { col databasev1.FieldType } -// w1..w5 mirror the spec's catalog, scaled for unit-bench memory budgets. -// Total rows per workload is held near 100k so each benchmark iteration -// completes quickly enough for -benchtime=2s to amortize fixture build cost. +// Workload catalog: a scan baseline (W1/W2 for regression continuity), then +// one workload per G9 query shape. Total rows per workload is held near 100k +// so each iteration amortizes fixture build cost under -benchtime=2s. var ( + // W1 — single-series scan baseline (regression continuity). w1 = workloadSpec{ id: "W1", + shape: shapeScan, series: 1, rowsPer: 10000, - fields: []fieldSpec{{name: "v_int", col: databasev1.FieldType_FIELD_TYPE_INT}}, + fields: []fieldSpec{{name: benchVInt, col: databasev1.FieldType_FIELD_TYPE_INT}}, } + // W2 — multi-tag multi-series scan baseline (regression continuity). w2 = workloadSpec{ id: "W2", + shape: shapeScan, series: 100, rowsPer: 1000, tagFamilies: []tagSpec{ - {family: "default", name: "svc", col: databasev1.TagType_TAG_TYPE_STRING}, - {family: "default", name: "env_id", col: databasev1.TagType_TAG_TYPE_INT}, + {family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}, + {family: benchDefault, name: benchEnvID, col: databasev1.TagType_TAG_TYPE_INT}, }, fields: []fieldSpec{ - {name: "v_int", col: databasev1.FieldType_FIELD_TYPE_INT}, - {name: "v_float", col: databasev1.FieldType_FIELD_TYPE_FLOAT}, + {name: benchVInt, col: databasev1.FieldType_FIELD_TYPE_INT}, + {name: benchVFloat, col: databasev1.FieldType_FIELD_TYPE_FLOAT}, }, } + // W3 — Top-N by an int field over a scan (Scan → Top → Limit). w3 = workloadSpec{ id: "W3", - series: 1000, rowsPer: 100, - tagFamilies: []tagSpec{{family: "default", name: "svc", col: databasev1.TagType_TAG_TYPE_STRING}}, - fields: []fieldSpec{{name: "v_int", col: databasev1.FieldType_FIELD_TYPE_INT}}, + shape: shapeTop, + series: 100, rowsPer: 1000, + tagFamilies: []tagSpec{{family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}}, + fields: []fieldSpec{{name: benchValue, col: databasev1.FieldType_FIELD_TYPE_INT}}, + topField: benchValue, topN: defaultTopN, topAsc: false, limitN: defaultLimitN, } + // W4 — scalar reduce: SUM over an int field, no GroupBy (Agg only). w4 = workloadSpec{ id: "W4", + shape: shapeScalarReduce, series: 100, rowsPer: 1000, - tagFamilies: []tagSpec{{family: "default", name: "svc", col: databasev1.TagType_TAG_TYPE_STRING}}, - fields: []fieldSpec{{name: "v_int", col: databasev1.FieldType_FIELD_TYPE_INT}}, + tagFamilies: []tagSpec{{family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}}, + fields: []fieldSpec{{name: benchValue, col: databasev1.FieldType_FIELD_TYPE_INT}}, + agg: &model.MeasureAgg{FieldName: benchValue, Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + limitN: defaultLimitN, } + // W5 — raw GroupBy by svc, no Agg: first-seen row per group. w5 = workloadSpec{ id: "W5", - series: 1000, rowsPer: 100, + shape: shapeRawGroupBy, + series: groupKeyCardin, rowsPer: 6250, + tagFamilies: []tagSpec{{family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}}, + fields: []fieldSpec{{name: benchValue, col: databasev1.FieldType_FIELD_TYPE_INT}}, + groupBy: &model.MeasureGroupBy{TagFamily: benchDefault, TagNames: []string{benchSvc}}, + limitN: defaultLimitN, + } + // W6 — GroupBy by svc + SUM over an int field. + w6 = workloadSpec{ + id: "W6", + shape: shapeGroupByAgg, + series: groupKeyCardin, rowsPer: 6250, + tagFamilies: []tagSpec{{family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}}, + fields: []fieldSpec{{name: benchValue, col: databasev1.FieldType_FIELD_TYPE_INT}}, + groupBy: &model.MeasureGroupBy{TagFamily: benchDefault, TagNames: []string{benchSvc}}, + agg: &model.MeasureAgg{FieldName: benchValue, Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + limitN: defaultLimitN, + } + // W7 — hidden-tags egress strip: a criteria tag materialized for + // storage-side filtering but absent from the visible projection, then + // stripped at egress (both paths pay the strip cost). + w7 = workloadSpec{ + id: "W7", + shape: shapeHiddenTags, + series: 100, rowsPer: 1000, tagFamilies: []tagSpec{ - {family: "default", name: "svc", col: databasev1.TagType_TAG_TYPE_STRING}, - {family: "default", name: "env_id", col: databasev1.TagType_TAG_TYPE_INT}, - {family: "default", name: "blob", col: databasev1.TagType_TAG_TYPE_DATA_BINARY}, - {family: "default", name: "ports", col: databasev1.TagType_TAG_TYPE_INT_ARRAY}, - {family: "default", name: "labels", col: databasev1.TagType_TAG_TYPE_STRING_ARRAY}, - }, - fields: []fieldSpec{ - {name: "v_int", col: databasev1.FieldType_FIELD_TYPE_INT}, - {name: "v_float", col: databasev1.FieldType_FIELD_TYPE_FLOAT}, - {name: "v_str", col: databasev1.FieldType_FIELD_TYPE_STRING}, - {name: "v_bytes", col: databasev1.FieldType_FIELD_TYPE_DATA_BINARY}, + {family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}, + {family: benchDefault, name: benchCriteria, col: databasev1.TagType_TAG_TYPE_STRING}, }, + fields: []fieldSpec{{name: benchVInt, col: databasev1.FieldType_FIELD_TYPE_INT}}, + criteriaTag: benchCriteria, + } + // W8 — COUNT over a float field: exercises the float-typed COUNT slot + // (output follows input type, so COUNT-on-float emits a float). + w8 = workloadSpec{ + id: "W8", + shape: shapeCountFloat, + series: groupKeyCardin, rowsPer: 6250, + tagFamilies: []tagSpec{{family: benchDefault, name: benchSvc, col: databasev1.TagType_TAG_TYPE_STRING}}, + fields: []fieldSpec{{name: benchVFloat, col: databasev1.FieldType_FIELD_TYPE_FLOAT}}, + groupBy: &model.MeasureGroupBy{TagFamily: benchDefault, TagNames: []string{benchSvc}}, + agg: &model.MeasureAgg{FieldName: benchVFloat, Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT}, + limitN: defaultLimitN, } - // Multi-block variants: same shape as their single-block counterpart but - // with chunksPerSeries=4, so each series produces 4 *MeasureResult entries - // instead of 1. Exercises BatchSourceFromBatchResult's cross-Pull - // accumulation path that copies rows across PullBatch calls into a single - // RecordBatch. - w2mb = withChunks(w2, "W2-MB", 4) - w4mb = withChunks(w4, "W4-MB", 4) - w5mb = withChunks(w5, "W5-MB", 4) - - allWorkloads = []workloadSpec{w1, w2, w3, w4, w5, w2mb, w4mb, w5mb} + allWorkloads = []workloadSpec{w1, w2, w3, w4, w5, w6, w7, w8} ) -func withChunks(base workloadSpec, id string, chunks int) workloadSpec { - c := base - c.id = id - c.chunksPerSeries = chunks - return c -} - // buildResults materializes a deterministic []*model.MeasureResult for the // workload. Each cell allocates a fresh *modelv1.TagValue / *modelv1.FieldValue // — mirroring production storage's mustDecodeTagValue, which produces a new @@ -150,6 +230,11 @@ func withChunks(base workloadSpec, id string, chunks int) workloadSpec { // wrappers, all paths see the same storage-decode cost the production // queryResult pays — the ratios reflect the real pipeline trade-off. // +// To exercise GroupBy/Top with realistic cardinality, the svc tag and the +// numeric field vary per series/row: svc = "svc-<sid mod groupKeyCardin>" +// so the number of distinct groups is bounded, and the numeric value is +// (row index) so Top has a non-degenerate ordering. +// // chunksPerSeries > 1 splits each series's rows across multiple // *model.MeasureResult entries, simulating multi-block heap-merge output // from queryResult.merge. @@ -158,6 +243,7 @@ func buildResults(spec workloadSpec) []*model.MeasureResult { results := make([]*model.MeasureResult, 0, spec.series*chunks) for s := range spec.series { sid := common.SeriesID(s + 1) + group := s % groupKeyCardin for c := range chunks { start := (spec.rowsPer * c) / chunks end := (spec.rowsPer * (c + 1)) / chunks @@ -178,7 +264,7 @@ func buildResults(spec workloadSpec) []*model.MeasureResult { for _, ts := range spec.tagFamilies { values := make([]*modelv1.TagValue, n) for i := range values { - values[i] = freshTagValue(ts.col) + values[i] = freshTagValueFor(ts, group, start+i) } tags = append(tags, model.Tag{Name: ts.name, Values: values}) } @@ -189,7 +275,7 @@ func buildResults(spec workloadSpec) []*model.MeasureResult { for _, f := range spec.fields { values := make([]*modelv1.FieldValue, n) for i := range values { - values[i] = freshFieldValue(f.col) + values[i] = freshFieldValueFor(f, start+i) } r.Fields = append(r.Fields, model.Field{Name: f.name, Values: values}) } @@ -200,6 +286,39 @@ func buildResults(spec workloadSpec) []*model.MeasureResult { return results } +// freshTagValueFor builds a brand-new *modelv1.TagValue per call. The svc +// tag varies by group so GroupBy has bounded cardinality; everything else +// follows freshTagValue's storage-decode-shaped wrappers. +func freshTagValueFor(ts tagSpec, group, row int) *modelv1.TagValue { + if ts.col == databasev1.TagType_TAG_TYPE_STRING && ts.name == benchSvc { + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "svc-" + string(rune('a'+group%26))}, + }} + } + if ts.col == databasev1.TagType_TAG_TYPE_STRING { + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{ + Str: &modelv1.Str{Value: "c-" + string(rune('a'+(row+group)%26))}, + }} + } + return freshTagValue(ts.col) +} + +// freshFieldValueFor varies the numeric value by row so Top/Agg see a +// non-degenerate distribution; non-numeric fields fall back to the static +// storage-decode-shaped wrapper. +func freshFieldValueFor(f fieldSpec, row int) *modelv1.FieldValue { + switch f.col { + case databasev1.FieldType_FIELD_TYPE_INT: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(row)}}} + case databasev1.FieldType_FIELD_TYPE_FLOAT: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: float64(row)}}} + case databasev1.FieldType_FIELD_TYPE_STRING, databasev1.FieldType_FIELD_TYPE_DATA_BINARY, + databasev1.FieldType_FIELD_TYPE_UNSPECIFIED: + return freshFieldValue(f.col) + } + return freshFieldValue(f.col) +} + // freshTagValue builds a brand-new *modelv1.TagValue per call. The shape and // inner alloc count mirror mustDecodeTagValue (wrapper + oneof + inner). func freshTagValue(t databasev1.TagType) *modelv1.TagValue { @@ -233,6 +352,8 @@ func freshFieldValue(t databasev1.FieldType) *modelv1.FieldValue { return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: "ok"}}} case databasev1.FieldType_FIELD_TYPE_DATA_BINARY: return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: []byte{0xab, 0xcd}}} + case databasev1.FieldType_FIELD_TYPE_UNSPECIFIED: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} } return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} } @@ -255,9 +376,13 @@ func buildSchema(spec workloadSpec) *databasev1.Measure { return m } -// buildOpts derives MeasureQueryOptions matching the workload's projection. +// buildOpts derives MeasureQueryOptions matching the workload's projection, +// including GroupBy/Agg so BuildBatchSchema promotes the referenced columns +// to native typed columns exactly as the production planner does. For the +// hidden-tags shape the criteria tag IS materialized (storage-side filter +// input) but is removed from the *visible* projection by buildVisibleOpts. func buildOpts(spec workloadSpec) model.MeasureQueryOptions { - opts := model.MeasureQueryOptions{} + opts := model.MeasureQueryOptions{GroupBy: spec.groupBy, Agg: spec.agg} if len(spec.tagFamilies) > 0 { names := make([]string, 0, len(spec.tagFamilies)) for _, t := range spec.tagFamilies { @@ -274,30 +399,341 @@ func buildOpts(spec workloadSpec) model.MeasureQueryOptions { return opts } +// hiddenSet returns the criteria tags that were projected only for +// storage-side filtering and must be stripped at egress, or an empty set. +func hiddenSet(spec workloadSpec) logical.HiddenTagSet { + h := logical.NewHiddenTagSet() + if spec.criteriaTag != "" { + h.Add(spec.criteriaTag) + } + return h +} + // benchSink prevents the compiler from eliding the work inside benchmark // loops — every drained row is summed into it. var benchSink int -// runRowPath drains the row-path serializer once over a fresh cursor backed -// by the supplied results, accumulating the row count into benchSink. -func runRowPath(results []*model.MeasureResult, opts model.MeasureQueryOptions) { - qr := &fakeMeasureQueryResult{seq: results} - benchSink += len(rowSerialize(qr, opts)) +// Row-path baseline. +// +// rowSerialize (diff_test.go) replays the scan/serialize the row path's +// resultMIterator runs. The Agg/Top/GroupBy reduces below replicate +// pkg/query/logical/measure faithfully enough for representative cost — they +// cannot be imported (cycle: logical/measure → this package), exactly as +// diff_test.go:96-99 documents for rowSerialize. aggregation.* IS imported +// (cycle-free, the same package the row path uses; see +// measure_plan_aggregation.go:59,63,84,186,197). + +// runRowPath drains the row baseline once for the workload's shape over a +// fresh cursor backed by cloned results, accumulating into benchSink. +func runRowPath(spec workloadSpec, results []*model.MeasureResult, opts model.MeasureQueryOptions) { + qr := &fakeMeasureQueryResult{seq: cloneResults(results)} + dps := rowSerialize(qr, opts) + switch spec.shape { + case shapeScan: + benchSink += len(dps) + case shapeHiddenTags: + hidden := hiddenSet(spec) + for _, idp := range dps { + idp.DataPoint.TagFamilies = hidden.StripHiddenTags(idp.DataPoint.TagFamilies) + } + benchSink += len(dps) + case shapeTop: + benchSink += len(rowTopReduce(dps, spec)) + case shapeScalarReduce, shapeGroupByAgg, shapeCountFloat: + benchSink += len(rowAggReduce(dps, spec)) + case shapeRawGroupBy: + benchSink += len(rowGroupByFirst(dps, spec)) + } +} + +// rowFieldIndex finds the projected index of name in opts.FieldProjection, +// matching the row path's logical.FieldRef.Spec.FieldIdx semantics +// (DataPoint.Fields are appended in FieldProjection order by rowSerialize). +func rowFieldIndex(opts model.MeasureQueryOptions, name string) int { + for i, f := range opts.FieldProjection { + if f == name { + return i + } + } + return -1 +} + +// rowTopReduce mirrors pkg/query/logical/measure.topOp.Execute: every data +// point is inserted into one bounded TopQueue keyed on the field value, then +// drained in sorted order. asc=false → top-N (largest); asc=true → bottom-N. +func rowTopReduce(dps []*measurev1.InternalDataPoint, spec workloadSpec) []*measurev1.InternalDataPoint { + if spec.topN <= 0 { + return dps + } + opts := buildOpts(spec) + fieldIdx := rowFieldIndex(opts, spec.topField) + h := &rowTopHeap{asc: spec.topAsc} + for _, idp := range dps { + v := rowNumericFieldValue(idp.GetDataPoint().GetFields(), fieldIdx) + el := rowTopEl{idp: idp, val: v, seq: h.seq} + h.seq++ + if h.Len() < spec.topN { + heap.Push(h, el) + continue + } + root := h.rows[0] + if (spec.topAsc && el.val < root.val) || (!spec.topAsc && el.val > root.val) { + h.rows[0] = el + heap.Fix(h, 0) + } + } + out := make([]*measurev1.InternalDataPoint, 0, h.Len()) + for h.Len() > 0 { + out = append(out, heap.Pop(h).(rowTopEl).idp) + } + for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { + out[i], out[j] = out[j], out[i] + } + return rowApplyLimit(out, spec.limitN) +} + +func rowNumericFieldValue(fields []*measurev1.DataPoint_Field, idx int) float64 { + if idx < 0 || idx >= len(fields) { + return 0 + } + switch v := fields[idx].GetValue().GetValue().(type) { + case *modelv1.FieldValue_Int: + return float64(v.Int.GetValue()) + case *modelv1.FieldValue_Float: + return v.Float.GetValue() + } + return 0 +} + +type rowTopEl struct { + idp *measurev1.InternalDataPoint + val float64 + seq int } -// runVectorizedPath drains the vectorized adapter once over a fresh cursor. -func runVectorizedPath(results []*model.MeasureResult, schema *databasev1.Measure, - opts model.MeasureQueryOptions, cfg VectorizedConfig, +type rowTopHeap struct { + rows []rowTopEl + seq int + asc bool +} + +func (h *rowTopHeap) Len() int { return len(h.rows) } + +func (h *rowTopHeap) Less(i, j int) bool { + if h.rows[i].val != h.rows[j].val { + if h.asc { + return h.rows[i].val > h.rows[j].val + } + return h.rows[i].val < h.rows[j].val + } + return h.rows[i].seq > h.rows[j].seq +} + +func (h *rowTopHeap) Swap(i, j int) { h.rows[i], h.rows[j] = h.rows[j], h.rows[i] } + +func (h *rowTopHeap) Push(x any) { h.rows = append(h.rows, x.(rowTopEl)) } + +func (h *rowTopHeap) Pop() any { + n := len(h.rows) + x := h.rows[n-1] + h.rows = h.rows[:n-1] + return x +} + +// rowAggReduce mirrors pkg/query/logical/measure.aggGroupIterator / +// aggAllIterator: rows are partitioned by the GroupBy key (the whole result +// when GroupBy is unset → scalar reduce), each group folded through an +// aggregation.Map dispatched on the field's declared type (FIELD_TYPE_INT → +// int64, FIELD_TYPE_FLOAT → float64), then emitted one row per group. +func rowAggReduce(dps []*measurev1.InternalDataPoint, spec workloadSpec) []*measurev1.InternalDataPoint { + opts := buildOpts(spec) + fieldIdx := rowFieldIndex(opts, spec.agg.FieldName) + isFloat := false + for _, f := range spec.fields { + if f.name == spec.agg.FieldName && f.col == databasev1.FieldType_FIELD_TYPE_FLOAT { + isFloat = true + } + } + keyOf := rowGroupKeyFunc(spec) + order := make([]string, 0) + seen := make(map[string]struct{}) + intMaps := make(map[string]aggregation.Map[int64]) + floatMaps := make(map[string]aggregation.Map[float64]) + tagsOf := make(map[string][]*modelv1.TagFamily) + for _, idp := range dps { + dp := idp.GetDataPoint() + key := keyOf(dp) + if _, ok := seen[key]; !ok { + seen[key] = struct{}{} + order = append(order, key) + tagsOf[key] = dp.GetTagFamilies() + if isFloat { + m, _ := aggregation.NewMap[float64](spec.agg.Func) + floatMaps[key] = m + } else { + m, _ := aggregation.NewMap[int64](spec.agg.Func) + intMaps[key] = m + } + } + fv := dp.GetFields()[fieldIdx].GetValue() + if isFloat { + n, _ := aggregation.FromFieldValue[float64](fv) + floatMaps[key].In(n) + } else { + n, _ := aggregation.FromFieldValue[int64](fv) + intMaps[key].In(n) + } + } + out := make([]*measurev1.InternalDataPoint, 0, len(order)) + for _, key := range order { + var val *modelv1.FieldValue + if isFloat { + val, _ = aggregation.ToFieldValue(floatMaps[key].Val()) + } else { + val, _ = aggregation.ToFieldValue(intMaps[key].Val()) + } + out = append(out, &measurev1.InternalDataPoint{DataPoint: &measurev1.DataPoint{ + TagFamilies: tagsOf[key], + Fields: []*measurev1.DataPoint_Field{{Name: spec.agg.FieldName, Value: val}}, + }}) + } + return rowApplyLimit(out, spec.limitN) +} + +// rowGroupByFirst mirrors the raw-GroupBy row path: groupIterator yields the +// whole group, processor.go keeps only current[0], so exactly the first-seen +// row of each group surfaces in group-insertion order. +func rowGroupByFirst(dps []*measurev1.InternalDataPoint, spec workloadSpec) []*measurev1.InternalDataPoint { + keyOf := rowGroupKeyFunc(spec) + seen := make(map[string]struct{}) + out := make([]*measurev1.InternalDataPoint, 0) + for _, idp := range dps { + key := keyOf(idp.GetDataPoint()) + if _, ok := seen[key]; ok { + continue + } + seen[key] = struct{}{} + out = append(out, idp) + } + return rowApplyLimit(out, spec.limitN) +} + +// rowGroupKeyFunc returns a function computing the GroupBy key string from a +// DataPoint's projected tag families. When GroupBy is unset every row maps to +// the same key (scalar reduce). +func rowGroupKeyFunc(spec workloadSpec) func(*measurev1.DataPoint) string { + if spec.groupBy == nil || len(spec.groupBy.TagNames) == 0 { + return func(*measurev1.DataPoint) string { return "" } + } + names := spec.groupBy.TagNames + family := spec.groupBy.TagFamily + return func(dp *measurev1.DataPoint) string { + var b []byte + for _, tf := range dp.GetTagFamilies() { + if tf.GetName() != family { + continue + } + for _, want := range names { + for _, tag := range tf.GetTags() { + if tag.GetKey() == want { + b = append(b, tag.GetValue().GetStr().GetValue()...) + b = append(b, 0) + } + } + } + } + return string(b) + } +} + +func rowApplyLimit(in []*measurev1.InternalDataPoint, limitN int) []*measurev1.InternalDataPoint { + if limitN > 0 && len(in) > limitN { + return in[:limitN] + } + return in +} + +// Vectorized path — the *production* compute pipeline. +// +// Constructed the same way plan.Execute does (executor.go): a +// vectorized.PipelineBuilder with a shared MemoryTracker, source → +// [BatchAggregation | BatchTop] → BatchLimit, built, Init'd, then wrapped via +// NewIteratorFromPipeline and drained through the public adapter. package +// plan cannot be imported (cycle: plan → measure), so the measure-level +// operators are wired directly — this is byte-for-byte the operator graph +// plan.Execute produces for these shapes. + +// runVectorizedPath drains the production vec pipeline once over a fresh +// cursor backed by cloned results. +func runVectorizedPath(spec workloadSpec, results []*model.MeasureResult, + schema *databasev1.Measure, opts model.MeasureQueryOptions, cfg VectorizedConfig, ) { - qr := &fakeMeasureQueryResult{seq: results} - it, err := NewMIterator(context.Background(), qr, schema, opts, cfg) - if err != nil { - panic(err) + qr := &fakeMeasureQueryResult{seq: cloneResults(results)} + batchSchema, schemaErr := BuildBatchSchema(schema, opts) + if schemaErr != nil { + panic(schemaErr) } - defer it.Close() - for it.Next() { - benchSink++ + pool := vectorized.NewBatchPool(batchSchema, cfg.BatchSize) + source := NewBatchScan(qr, batchSchema, pool, cfg.BatchSize) + + tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 * 1024) + builder := vectorized.NewPipelineBuilder().WithMemoryTracker(tracker).From(source) + + terminalSchema := batchSchema + switch spec.shape { + case shapeScan, shapeHiddenTags: + // Schema-preserving scan; no breaker. + case shapeTop: + fieldIdx, ok := batchSchema.FieldIndex(spec.topField) + if !ok { + panic("bench: top field not in schema") + } + builder.Break(NewBatchTop(batchSchema, fieldIdx, spec.topN, spec.topAsc, cfg.BatchSize)) + case shapeScalarReduce, shapeRawGroupBy, shapeGroupByAgg, shapeCountFloat: + ops, opsErr := BuildOperators(opts, batchSchema, tracker, cfg.BatchSize) + if opsErr != nil { + panic(opsErr) + } + if len(ops) != 1 { + panic("bench: expected exactly one breaker operator") + } + builder.Break(ops[0]) + terminalSchema = ops[0].OutputSchema() } + if spec.limitN > 0 { + builder.Apply(NewBatchLimit(terminalSchema, 0, uint32(spec.limitN))) + } + + pipeline, buildErr := builder.Build() + if buildErr != nil { + panic(buildErr) + } + if initErr := pipeline.Init(context.Background()); initErr != nil { + panic(initErr) + } + egressPool := vectorized.NewBatchPool(terminalSchema, cfg.BatchSize) + it := NewIteratorFromPipeline(context.Background(), pipeline, egressPool) + if spec.shape == shapeHiddenTags { + hidden := hiddenSet(spec) + for it.Next() { + for _, dp := range it.Current() { + dp.DataPoint.TagFamilies = hidden.StripHiddenTags(dp.DataPoint.TagFamilies) + } + benchSink++ + } + } else { + for it.Next() { + benchSink++ + } + } + if closeErr := it.Close(); closeErr != nil { + panic(closeErr) + } +} + +// cfgFor returns the standard bench VectorizedConfig. +func cfgFor() VectorizedConfig { + return VectorizedConfig{Enabled: true, BatchSize: defaultBatch, QueryMemoryMiB: defaultMemMiB} } func benchmarkRow(b *testing.B, spec workloadSpec) { @@ -306,7 +742,7 @@ func benchmarkRow(b *testing.B, spec workloadSpec) { b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - runRowPath(results, opts) + runRowPath(spec, results, opts) } } @@ -314,15 +750,15 @@ func benchmarkVectorized(b *testing.B, spec workloadSpec) { results := buildResults(spec) schema := buildSchema(spec) opts := buildOpts(spec) - cfg := VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 64} + cfg := cfgFor() b.ReportAllocs() b.ResetTimer() for i := 0; i < b.N; i++ { - runVectorizedPath(results, schema, opts, cfg) + runVectorizedPath(spec, results, schema, opts, cfg) } } -// Paired benchmarks per spec. +// Paired benchmarks per workload. func BenchmarkRowPath_W1(b *testing.B) { benchmarkRow(b, w1) } func BenchmarkVectorizedPath_W1(b *testing.B) { benchmarkVectorized(b, w1) } @@ -339,11 +775,11 @@ func BenchmarkVectorizedPath_W4(b *testing.B) { benchmarkVectorized(b, w4) } func BenchmarkRowPath_W5(b *testing.B) { benchmarkRow(b, w5) } func BenchmarkVectorizedPath_W5(b *testing.B) { benchmarkVectorized(b, w5) } -func BenchmarkRowPath_W2MB(b *testing.B) { benchmarkRow(b, w2mb) } -func BenchmarkVectorizedPath_W2MB(b *testing.B) { benchmarkVectorized(b, w2mb) } +func BenchmarkRowPath_W6(b *testing.B) { benchmarkRow(b, w6) } +func BenchmarkVectorizedPath_W6(b *testing.B) { benchmarkVectorized(b, w6) } -func BenchmarkRowPath_W4MB(b *testing.B) { benchmarkRow(b, w4mb) } -func BenchmarkVectorizedPath_W4MB(b *testing.B) { benchmarkVectorized(b, w4mb) } +func BenchmarkRowPath_W7(b *testing.B) { benchmarkRow(b, w7) } +func BenchmarkVectorizedPath_W7(b *testing.B) { benchmarkVectorized(b, w7) } -func BenchmarkRowPath_W5MB(b *testing.B) { benchmarkRow(b, w5mb) } -func BenchmarkVectorizedPath_W5MB(b *testing.B) { benchmarkVectorized(b, w5mb) } +func BenchmarkRowPath_W8(b *testing.B) { benchmarkRow(b, w8) } +func BenchmarkVectorizedPath_W8(b *testing.B) { benchmarkVectorized(b, w8) }
