This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3560a960a51561d3068950ff6d2947b8f28cb058 Author: Hongtao Gao <[email protected]> AuthorDate: Sat May 9 04:11:26 2026 +0000 test(query/vectorized/measure): realistic per-cell bench fixture + multi-block workloads The previous fixture shared one *modelv1.TagValue / *modelv1.FieldValue singleton across every row of every series, so neither the row path nor the passthrough column path paid the per-cell wrapper allocation that production storage's mustDecodeTagValue does. This biased the gate toward passthrough and made native column types look like a 1.5-2x regression that doesn't reproduce against production-true cost. Replace the singleton fixture with freshTagValue / freshFieldValue — each cell allocates its own wrapper + oneof + inner struct, mirroring mustDecodeTagValue. Add chunksPerSeries to workloadSpec and W2-MB / W4-MB / W5-MB variants that split each series's rows into 4 *MeasureResults, exercising BatchSourceFromBatchResult's cross-PullBatch accumulation. Wire the new IDs into bench_gates_test.go. Gate ratios at this fixture (vec/row): W1 0.95x ns / 1.00x allocs, W2 0.78x / 0.85x, W3 0.92x / 1.00x, W4 0.95x / 1.00x, W5 0.74x / 0.76x; multi-block W2-MB 0.88x / 0.85x, W4-MB 0.93x / 1.00x, W5-MB 0.75x / 0.76x. Vec beats row on every dimension once the row path no longer inherits a free pre-built singleton. --- pkg/query/vectorized/measure/bench_gates_test.go | 13 +- pkg/query/vectorized/measure/bench_test.go | 172 ++++++++++++++--------- 2 files changed, 114 insertions(+), 71 deletions(-) diff --git a/pkg/query/vectorized/measure/bench_gates_test.go b/pkg/query/vectorized/measure/bench_gates_test.go index 379c4cdd9..11e24a0ff 100644 --- a/pkg/query/vectorized/measure/bench_gates_test.go +++ b/pkg/query/vectorized/measure/bench_gates_test.go @@ -50,11 +50,14 @@ type benchGate struct { // 1.05 to match the other scan-shape gates; tighten back to 1.00 once // BatchAggregation/BatchGroupBy execute end-to-end (post-G6b). var benchGates = map[string]benchGate{ - "W1": {id: "W1", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W2": {id: "W2", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W3": {id: "W3", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W4": {id: "W4", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, - "W5": {id: "W5", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W1": {id: "W1", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W2": {id: "W2", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W3": {id: "W3", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W4": {id: "W4", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W5": {id: "W5", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W2-MB": {id: "W2-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W4-MB": {id: "W4-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, + "W5-MB": {id: "W5-MB", maxNsRatio: 1.05, maxAllocRatio: 1.005, maxBytesRatio: 1.20}, } // TestBenchGates_PerWorkload runs both serialization paths inside testing.B diff --git a/pkg/query/vectorized/measure/bench_test.go b/pkg/query/vectorized/measure/bench_test.go index 79563ecbe..b655d547e 100644 --- a/pkg/query/vectorized/measure/bench_test.go +++ b/pkg/query/vectorized/measure/bench_test.go @@ -43,12 +43,19 @@ import ( // go test ./pkg/query/vectorized/measure -bench=. -benchmem -count=5 -benchtime=2s // workloadSpec parameterizes a benchmark workload. +// +// chunksPerSeries > 1 splits each series's rowsPer rows into that many +// *model.MeasureResult instances, simulating the multi-block storage path +// where queryResult.merge produces multiple Pull() results per series. The +// vec adapter must accumulate rows across PullBatch calls; the row adapter +// just sees more Pull invocations. Default 1 = single-block per series. type workloadSpec struct { - id string - tagFamilies []tagSpec - fields []fieldSpec - series int - rowsPer int + id string + tagFamilies []tagSpec + fields []fieldSpec + series int + rowsPer int + chunksPerSeries int } type tagSpec struct { @@ -113,97 +120,121 @@ var ( }, } - allWorkloads = []workloadSpec{w1, w2, w3, w4, w5} + // Multi-block variants: same shape as their single-block counterpart but + // with chunksPerSeries=4, so each series produces 4 *MeasureResult entries + // instead of 1. Exercises BatchSourceFromBatchResult's cross-Pull + // accumulation path that copies rows across PullBatch calls into a single + // RecordBatch. + w2mb = withChunks(w2, "W2-MB", 4) + w4mb = withChunks(w4, "W4-MB", 4) + w5mb = withChunks(w5, "W5-MB", 4) + + allWorkloads = []workloadSpec{w1, w2, w3, w4, w5, w2mb, w4mb, w5mb} ) +func withChunks(base workloadSpec, id string, chunks int) workloadSpec { + c := base + c.id = id + c.chunksPerSeries = chunks + return c +} + // buildResults materializes a deterministic []*model.MeasureResult for the -// workload. Tag and field protobuf values are shared singletons across rows -// to keep allocation cost concentrated on serialization (the path under test) -// rather than fixture construction. +// workload. Each cell allocates a fresh *modelv1.TagValue / *modelv1.FieldValue +// — mirroring production storage's mustDecodeTagValue, which produces a new +// wrapper per cell from raw bytes (3 allocs/cell). An earlier version reused +// singleton wrappers across all rows; that biased the gates because the row +// path and passthrough column type both inherit a free pre-built pointer that +// production never has, while a native-typed column type pays the wrapper +// reconstruction cost at egress and looks artificially slower. With per-cell +// wrappers, all paths see the same storage-decode cost the production +// queryResult pays — the ratios reflect the real pipeline trade-off. +// +// chunksPerSeries > 1 splits each series's rows across multiple +// *model.MeasureResult entries, simulating multi-block heap-merge output +// from queryResult.merge. func buildResults(spec workloadSpec) []*model.MeasureResult { - tagSingletons := map[string]*modelv1.TagValue{ - "str": {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}}, - "int": {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 7}}}, - "binary": {Value: &modelv1.TagValue_BinaryData{BinaryData: []byte{0xfe, 0xed}}}, - "intarr": {Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{1, 2}}}}, - "strarr": {Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"x", "y"}}}}, - } - fieldSingletons := map[string]*modelv1.FieldValue{ - "int": {Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}}, - "float": {Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: 3.14}}}, - "str": {Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: "ok"}}}, - "binary": {Value: &modelv1.FieldValue_BinaryData{BinaryData: []byte{0xab, 0xcd}}}, - } - - results := make([]*model.MeasureResult, 0, spec.series) - for s := 0; s < spec.series; s++ { - r := &model.MeasureResult{SID: common.SeriesID(s + 1)} - r.Timestamps = make([]int64, spec.rowsPer) - r.Versions = make([]int64, spec.rowsPer) - r.ShardIDs = make([]common.ShardID, spec.rowsPer) - for i := 0; i < spec.rowsPer; i++ { - r.Timestamps[i] = int64(i) - r.Versions[i] = 1 - } - if len(spec.tagFamilies) > 0 { - tags := make([]model.Tag, 0, len(spec.tagFamilies)) - for _, ts := range spec.tagFamilies { - values := make([]*modelv1.TagValue, spec.rowsPer) - singleton := pickTagSingleton(ts.col, tagSingletons) - for i := range values { - values[i] = singleton + chunks := max(spec.chunksPerSeries, 1) + results := make([]*model.MeasureResult, 0, spec.series*chunks) + for s := range spec.series { + sid := common.SeriesID(s + 1) + for c := range chunks { + start := (spec.rowsPer * c) / chunks + end := (spec.rowsPer * (c + 1)) / chunks + n := end - start + if n <= 0 { + continue + } + r := &model.MeasureResult{SID: sid} + r.Timestamps = make([]int64, n) + r.Versions = make([]int64, n) + r.ShardIDs = make([]common.ShardID, n) + for i := range n { + r.Timestamps[i] = int64(start + i) + r.Versions[i] = 1 + } + if len(spec.tagFamilies) > 0 { + tags := make([]model.Tag, 0, len(spec.tagFamilies)) + for _, ts := range spec.tagFamilies { + values := make([]*modelv1.TagValue, n) + for i := range values { + values[i] = freshTagValue(ts.col) + } + tags = append(tags, model.Tag{Name: ts.name, Values: values}) } - tags = append(tags, model.Tag{Name: ts.name, Values: values}) + r.TagFamilies = []model.TagFamily{{Name: spec.tagFamilies[0].family, Tags: tags}} } - r.TagFamilies = []model.TagFamily{{Name: spec.tagFamilies[0].family, Tags: tags}} - } - if len(spec.fields) > 0 { - r.Fields = make([]model.Field, 0, len(spec.fields)) - for _, f := range spec.fields { - values := make([]*modelv1.FieldValue, spec.rowsPer) - singleton := pickFieldSingleton(f.col, fieldSingletons) - for i := range values { - values[i] = singleton + if len(spec.fields) > 0 { + r.Fields = make([]model.Field, 0, len(spec.fields)) + for _, f := range spec.fields { + values := make([]*modelv1.FieldValue, n) + for i := range values { + values[i] = freshFieldValue(f.col) + } + r.Fields = append(r.Fields, model.Field{Name: f.name, Values: values}) } - r.Fields = append(r.Fields, model.Field{Name: f.name, Values: values}) } + results = append(results, r) } - results = append(results, r) } return results } -func pickTagSingleton(t databasev1.TagType, m map[string]*modelv1.TagValue) *modelv1.TagValue { +// freshTagValue builds a brand-new *modelv1.TagValue per call. The shape and +// inner alloc count mirror mustDecodeTagValue (wrapper + oneof + inner). +func freshTagValue(t databasev1.TagType) *modelv1.TagValue { switch t { case databasev1.TagType_TAG_TYPE_INT: - return m["int"] + return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 7}}} case databasev1.TagType_TAG_TYPE_STRING: - return m["str"] + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} case databasev1.TagType_TAG_TYPE_DATA_BINARY: - return m["binary"] + return &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: []byte{0xfe, 0xed}}} case databasev1.TagType_TAG_TYPE_INT_ARRAY: - return m["intarr"] + return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{1, 2}}}} case databasev1.TagType_TAG_TYPE_STRING_ARRAY: - return m["strarr"] + return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"x", "y"}}}} case databasev1.TagType_TAG_TYPE_UNSPECIFIED, databasev1.TagType_TAG_TYPE_TIMESTAMP: // Bench fixtures never use these variants. - return m["str"] + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} } - return m["str"] + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} } -func pickFieldSingleton(t databasev1.FieldType, m map[string]*modelv1.FieldValue) *modelv1.FieldValue { +// freshFieldValue is the field-side counterpart to freshTagValue. Each call +// allocates a new wrapper (mirroring mustDecodeFieldValue). +func freshFieldValue(t databasev1.FieldType) *modelv1.FieldValue { switch t { case databasev1.FieldType_FIELD_TYPE_INT: - return m["int"] + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} case databasev1.FieldType_FIELD_TYPE_FLOAT: - return m["float"] + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: 3.14}}} case databasev1.FieldType_FIELD_TYPE_STRING: - return m["str"] + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: "ok"}}} case databasev1.FieldType_FIELD_TYPE_DATA_BINARY: - return m["binary"] + return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: []byte{0xab, 0xcd}}} } - return m["int"] + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} } // buildSchema reconstructs the *databasev1.Measure schema corresponding to @@ -307,3 +338,12 @@ func BenchmarkVectorizedPath_W4(b *testing.B) { benchmarkVectorized(b, w4) } func BenchmarkRowPath_W5(b *testing.B) { benchmarkRow(b, w5) } func BenchmarkVectorizedPath_W5(b *testing.B) { benchmarkVectorized(b, w5) } + +func BenchmarkRowPath_W2MB(b *testing.B) { benchmarkRow(b, w2mb) } +func BenchmarkVectorizedPath_W2MB(b *testing.B) { benchmarkVectorized(b, w2mb) } + +func BenchmarkRowPath_W4MB(b *testing.B) { benchmarkRow(b, w4mb) } +func BenchmarkVectorizedPath_W4MB(b *testing.B) { benchmarkVectorized(b, w4mb) } + +func BenchmarkRowPath_W5MB(b *testing.B) { benchmarkRow(b, w5mb) } +func BenchmarkVectorizedPath_W5MB(b *testing.B) { benchmarkVectorized(b, w5mb) }
