This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b5f5513196fba381d9a562d3acb4388a8f86fed6
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 15 15:27:18 2026 +0000

    perf(query/vectorized/measure): defer BatchTop row copy to heap admission
    
    BatchTop.materialize deep-copied every candidate input row (all
    schema columns into fresh 1-row typed columns) before the bounded-heap
    check, so high-input / low-N top-N paid a full per-row materialize for
    rows immediately discarded. Split into a cheap extractKey (sort key
    only, every row) and materializeCols (deep copy, only on heap
    admission). Rejected rows now allocate nothing.
    
    W3 Top-N bench (100k rows): vec 58.0M -> 8.75M ns/op, 1.40M -> 117.6K
    allocs/op -- from 1.32x slower than the row path to ~4.85x faster.
    Output is byte-identical (retained rows still fully copied while the
    batch is valid; cmpTopVal/shouldReplace read only key fields):
    integration parity gate green (321s, all fixtures proto.Equal),
    TestBatchTop_* + measure/plan unit suites green.
---
 pkg/query/vectorized/measure/top.go | 55 +++++++++++++++++++++++++------------
 1 file changed, 38 insertions(+), 17 deletions(-)

diff --git a/pkg/query/vectorized/measure/top.go 
b/pkg/query/vectorized/measure/top.go
index f58fc99b3..bd9a91a05 100644
--- a/pkg/query/vectorized/measure/top.go
+++ b/pkg/query/vectorized/measure/top.go
@@ -149,16 +149,30 @@ func (t *BatchTop) Consume(_ context.Context, b 
*vectorized.RecordBatch) error {
        }
        active := activeIndices(b)
        for _, rowIdx := range active {
-               candidate := t.materialize(b, int(rowIdx))
-               candidate.seq = t.inputCount
+               ri := int(rowIdx)
+               seq := t.inputCount
                t.inputCount++
+               // Heap not yet full: the row is retained, so pay the full
+               // per-column deep copy now (the batch is recycled after Consume
+               // returns, so a retained row cannot defer its copy).
                if t.heapState.Len() < t.n {
-                       heap.Push(t.heapState, candidate)
+                       row := &topRow{seq: seq}
+                       t.extractKey(b, ri, row)
+                       row.cols = t.materializeCols(b, ri)
+                       heap.Push(t.heapState, row)
                        continue
                }
+               // Heap full: extract only the cheap sort key and compare 
against
+               // the root. The expensive per-column materialize is deferred
+               // until the row actually displaces the root — for high-input /
+               // low-N top-N the vast majority of rows lose here and never pay
+               // the copy. cmpTopVal/shouldReplace read only the key fields.
+               cand := topRow{seq: seq}
+               t.extractKey(b, ri, &cand)
                root := t.heapState.rows[0]
-               if t.shouldReplace(candidate, root) {
-                       t.heapState.rows[0] = candidate
+               if t.shouldReplace(&cand, root) {
+                       cand.cols = t.materializeCols(b, ri)
+                       t.heapState.rows[0] = &cand
                        heap.Fix(t.heapState, 0)
                }
        }
@@ -224,8 +238,10 @@ func (t *BatchTop) Close() error {
        return nil
 }
 
-// materialize copies row rowIdx of b into a new topRow, reading the sort key
-// from the configured field column.
+// extractKey reads only the sort key for row rowIdx from the configured
+// field column into row — no per-column deep copy. This is the cheap
+// half of the old materialize: it runs for every input row, while the
+// expensive materializeCols runs only for rows admitted to the heap.
 //
 // The key column may be a native typed column (ColumnTypeInt64 /
 // ColumnTypeFloat64 — promoted when an Agg reduces over the field) or a
@@ -234,26 +250,20 @@ func (t *BatchTop) Close() error {
 // fields as passthrough). Both are handled so the float/int decision is
 // per-column-shape, matching the row path's schema-field-type dispatch
 // (pkg/query/logical/measure.topOp.Execute).
-func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int) *topRow {
-       cols := make([]vectorized.Column, len(t.schema.Columns))
-       for i, def := range t.schema.Columns {
-               cols[i] = vectorized.NewColumnForType(def.Type, 1)
-               copyOneValue(cols[i], b.Columns[i], rowIdx)
-       }
-       row := &topRow{cols: cols}
+func (t *BatchTop) extractKey(b *vectorized.RecordBatch, rowIdx int, row 
*topRow) {
        keyCol := b.Columns[t.fieldCol]
        switch c := keyCol.(type) {
        case *vectorized.TypedColumn[float64]:
                row.isFloat = true
                if c.IsNull(rowIdx) {
                        row.isNull = true
-                       return row
+                       return
                }
                row.floatVal = c.Data()[rowIdx]
        case *vectorized.TypedColumn[int64]:
                if c.IsNull(rowIdx) {
                        row.isNull = true
-                       return row
+                       return
                }
                row.intVal = c.Data()[rowIdx]
        case *vectorized.TypedColumn[*modelv1.FieldValue]:
@@ -275,5 +285,16 @@ func (t *BatchTop) materialize(b *vectorized.RecordBatch, 
rowIdx int) *topRow {
                // rather than panicking.
                row.isNull = true
        }
-       return row
+}
+
+// materializeCols deep-copies row rowIdx of b into schema-shaped 1-row
+// columns. Only called for rows admitted to the bounded heap (heap not
+// full, or the row displaces the root) — rejected rows never pay this.
+func (t *BatchTop) materializeCols(b *vectorized.RecordBatch, rowIdx int) 
[]vectorized.Column {
+       cols := make([]vectorized.Column, len(t.schema.Columns))
+       for i, def := range t.schema.Columns {
+               cols[i] = vectorized.NewColumnForType(def.Type, 1)
+               copyOneValue(cols[i], b.Columns[i], rowIdx)
+       }
+       return cols
 }

Reply via email to