This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b5f5513196fba381d9a562d3acb4388a8f86fed6 Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 15 15:27:18 2026 +0000 perf(query/vectorized/measure): defer BatchTop row copy to heap admission BatchTop.materialize deep-copied every candidate input row (all schema columns into fresh 1-row typed columns) before the bounded-heap check, so high-input / low-N top-N paid a full per-row materialize for rows immediately discarded. Split into a cheap extractKey (sort key only, every row) and materializeCols (deep copy, only on heap admission). Rejected rows now allocate nothing. W3 Top-N bench (100k rows): vec 58.0M -> 8.75M ns/op, 1.40M -> 117.6K allocs/op -- from 1.32x slower than the row path to ~4.85x faster. Output is byte-identical (retained rows still fully copied while the batch is valid; cmpTopVal/shouldReplace read only key fields): integration parity gate green (321s, all fixtures proto.Equal), TestBatchTop_* + measure/plan unit suites green. --- pkg/query/vectorized/measure/top.go | 55 +++++++++++++++++++++++++------------ 1 file changed, 38 insertions(+), 17 deletions(-) diff --git a/pkg/query/vectorized/measure/top.go b/pkg/query/vectorized/measure/top.go index f58fc99b3..bd9a91a05 100644 --- a/pkg/query/vectorized/measure/top.go +++ b/pkg/query/vectorized/measure/top.go @@ -149,16 +149,30 @@ func (t *BatchTop) Consume(_ context.Context, b *vectorized.RecordBatch) error { } active := activeIndices(b) for _, rowIdx := range active { - candidate := t.materialize(b, int(rowIdx)) - candidate.seq = t.inputCount + ri := int(rowIdx) + seq := t.inputCount t.inputCount++ + // Heap not yet full: the row is retained, so pay the full + // per-column deep copy now (the batch is recycled after Consume + // returns, so a retained row cannot defer its copy). if t.heapState.Len() < t.n { - heap.Push(t.heapState, candidate) + row := &topRow{seq: seq} + t.extractKey(b, ri, row) + row.cols = t.materializeCols(b, ri) + heap.Push(t.heapState, row) continue } + // Heap full: extract only the cheap sort key and compare against + // the root. The expensive per-column materialize is deferred + // until the row actually displaces the root — for high-input / + // low-N top-N the vast majority of rows lose here and never pay + // the copy. cmpTopVal/shouldReplace read only the key fields. + cand := topRow{seq: seq} + t.extractKey(b, ri, &cand) root := t.heapState.rows[0] - if t.shouldReplace(candidate, root) { - t.heapState.rows[0] = candidate + if t.shouldReplace(&cand, root) { + cand.cols = t.materializeCols(b, ri) + t.heapState.rows[0] = &cand heap.Fix(t.heapState, 0) } } @@ -224,8 +238,10 @@ func (t *BatchTop) Close() error { return nil } -// materialize copies row rowIdx of b into a new topRow, reading the sort key -// from the configured field column. +// extractKey reads only the sort key for row rowIdx from the configured +// field column into row — no per-column deep copy. This is the cheap +// half of the old materialize: it runs for every input row, while the +// expensive materializeCols runs only for rows admitted to the heap. // // The key column may be a native typed column (ColumnTypeInt64 / // ColumnTypeFloat64 — promoted when an Agg reduces over the field) or a @@ -234,26 +250,20 @@ func (t *BatchTop) Close() error { // fields as passthrough). Both are handled so the float/int decision is // per-column-shape, matching the row path's schema-field-type dispatch // (pkg/query/logical/measure.topOp.Execute). -func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int) *topRow { - cols := make([]vectorized.Column, len(t.schema.Columns)) - for i, def := range t.schema.Columns { - cols[i] = vectorized.NewColumnForType(def.Type, 1) - copyOneValue(cols[i], b.Columns[i], rowIdx) - } - row := &topRow{cols: cols} +func (t *BatchTop) extractKey(b *vectorized.RecordBatch, rowIdx int, row *topRow) { keyCol := b.Columns[t.fieldCol] switch c := keyCol.(type) { case *vectorized.TypedColumn[float64]: row.isFloat = true if c.IsNull(rowIdx) { row.isNull = true - return row + return } row.floatVal = c.Data()[rowIdx] case *vectorized.TypedColumn[int64]: if c.IsNull(rowIdx) { row.isNull = true - return row + return } row.intVal = c.Data()[rowIdx] case *vectorized.TypedColumn[*modelv1.FieldValue]: @@ -275,5 +285,16 @@ func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int) *topRow { // rather than panicking. row.isNull = true } - return row +} + +// materializeCols deep-copies row rowIdx of b into schema-shaped 1-row +// columns. Only called for rows admitted to the bounded heap (heap not +// full, or the row displaces the root) — rejected rows never pay this. +func (t *BatchTop) materializeCols(b *vectorized.RecordBatch, rowIdx int) []vectorized.Column { + cols := make([]vectorized.Column, len(t.schema.Columns)) + for i, def := range t.schema.Columns { + cols[i] = vectorized.NewColumnForType(def.Type, 1) + copyOneValue(cols[i], b.Columns[i], rowIdx) + } + return cols }
