This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 90aacda9113c056a5561cb57e56fae33ad1342a7
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun May 10 09:31:22 2026 +0000

    feat(banyand/measure): typed PullBatch for indexSortResult (closes US-003 
multi-block deferral)
    
    Replace the indexSortResult.PullBatch row-path fallback (Pull → 
BuildMeasureBatchFromResult)
    with a direct typed-column emit path, completing the deferral noted in 
36140ca9.
    
    New methods on *indexSortResult:
    - mergeSegResultsBatch: heap-merge variant of Pull's segResult loop; pops 
up to
      mergeBatchMaxRows rows, calls copyToBatch per row, pushes back 
non-exhausted
      segResults. Fast path for single-segResult case avoids heap overhead.
    - copyToBatch: batch-path mirror of copyTo (query.go); appends one row's 
metadata
      into MeasureBatch parallel slices and dispatches per schema column via
      fillSegTagCell / fillSegFieldCell.
    - fillSegTagCell: resolves one tag column cell — projectedEntityOffsets 
path for
      EntityValues (pre-decoded *modelv1.TagValue via appendTagValueAt), 
fieldToValueType
      path for raw field bytes (via fillTagCell from block_batch.go).
    - fillSegFieldCell: resolves one field column cell via fillFieldCell from 
block_batch.go.
    
    Reused from block_batch.go: appendTagValueAt, appendNullTagN, 
appendNullFieldN,
    fillTagCell, fillFieldCell. The existing Pull() and copyTo() methods are 
untouched
    (C1 invariant — row path remains byte-identical).
    
    Verification: go build ./..., go test ./banyand/measure/... -short -race 
(PASS),
    go test ./pkg/query/vectorized/... -race (PASS), RUN_BENCH_GATES=1 
TestBenchGates_PerWorkload
    (PASS), integration vectorized parity suite 488 specs (PASS), make lint 
(PASS).
---
 banyand/measure/query_batch.go | 155 +++++++++++++++++++++++++++++++++++++----
 1 file changed, 142 insertions(+), 13 deletions(-)

diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go
index 448a64695..3fdcfc556 100644
--- a/banyand/measure/query_batch.go
+++ b/banyand/measure/query_batch.go
@@ -27,6 +27,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
        vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
@@ -238,25 +239,153 @@ func (qr *queryResult) loadCursorsForBatch() error {
 
 // PullBatch implements model.MeasureBatchResult for indexSortResult.
 //
-// indexSortResult.Pull() yields single-row MeasureResults; the conversion
-// shape is identical to queryResult.PullBatch's multi-block fallback —
-// single-row batches that the vectorized adapter accumulates into larger
-// pipeline batches. A direct typed-column variant (skipping the
-// *modelv1.TagValue intermediate) is deferred to a follow-on alongside
-// the queryResult batch-merge work; indexSortResult uses storage's
-// segResult instead of blockCursor, so the typed emit path here is its
-// own ~150 LOC parallel implementation.
+// Typed-column variant: appends rows from the segResult heap directly into a
+// *model.MeasureBatch via copyToBatch, bypassing the *modelv1.TagValue /
+// *MeasureResult intermediate that the row-path Pull() constructs. This
+// mirrors what queryResult.mergeBatch does for blockCursor (committed in
+// 36140ca9) but on the segResult side. Pull() and copyTo() are untouched —
+// the row path remains byte-identical for legacy consumers (C1 invariant).
 func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
        if iqr.batchSchema == nil {
                return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema 
not initialized; " +
                        "the underlying query did not record a vectorized 
BatchSchema (likely a schema-build error at Query time)")
        }
-       r := iqr.Pull()
-       if r == nil {
+       return iqr.mergeSegResultsBatch(iqr.batchSchema, mergeBatchMaxRows)
+}
+
+// mergeSegResultsBatch is the batch-aware counterpart of indexSortResult.Pull.
+// It pops from the segResult heap up to batchSize rows, calling copyToBatch
+// for each row, then pushes the segResult back if it has more rows. This
+// mirrors the heap-merge loop in Pull() but writes into typed columns instead
+// of building *MeasureResult values.
+func (iqr *indexSortResult) mergeSegResultsBatch(schema 
*vectorized.BatchSchema, batchSize int) (*model.MeasureBatch, error) {
+       if len(iqr.segResults.results) == 0 {
                return nil, nil
        }
-       if r.Error != nil {
-               return nil, r.Error
+       b := newMeasureBatchForSchema(schema, batchSize)
+       if len(iqr.segResults.results) == 1 {
+               sr := iqr.segResults.results[0]
+               for b.RowCount() < batchSize && sr.i < len(sr.SeriesList) {
+                       iqr.copyToBatch(b, schema, sr)
+                       sr.i++
+               }
+               if sr.i >= len(sr.SeriesList) {
+                       iqr.segResults.results = iqr.segResults.results[:0]
+               }
+               if b.RowCount() == 0 {
+                       return nil, nil
+               }
+               return b, nil
+       }
+       for b.RowCount() < batchSize && iqr.segResults.Len() > 0 {
+               top := heap.Pop(&iqr.segResults)
+               sr := top.(*segResult)
+               iqr.copyToBatch(b, schema, sr)
+               sr.i++
+               if sr.i < len(sr.SeriesList) {
+                       heap.Push(&iqr.segResults, sr)
+               }
+       }
+       if b.RowCount() == 0 {
+               return nil, nil
+       }
+       return b, nil
+}
+
+// copyToBatch appends the single row at src.i into b's parallel slices and
+// typed columns. It is the batch-path mirror of indexSortResult.copyTo
+// (query.go) and is used by mergeSegResultsBatch when emitting rows.
+//
+// Tag resolution order matches copyTo:
+//  1. If the tag name has an entry in tfl[i].projectedEntityOffsets, the value
+//     is a pre-decoded *modelv1.TagValue from EntityValues — project via
+//     appendTagValueAt (passthrough) or appendTagValueAsTyped (native).
+//  2. Otherwise, look up the tag name in tfl[i].fieldToValueType to get the
+//     raw bytes from the field result and decode via fillTagCell.
+//
+// Schema iteration follows the same tagIdx/fieldIdx convention as
+// blockCursor.copyToBatch: Tags[tagIdx] aligns with the tagIdx-th RoleTag
+// entry in schema.Columns, Fields[fieldIdx] with the fieldIdx-th RoleField.
+func (iqr *indexSortResult) copyToBatch(b *model.MeasureBatch, schema 
*vectorized.BatchSchema, src *segResult) {
+       rowIdx := src.i
+       b.Timestamps = append(b.Timestamps, src.Timestamps[rowIdx])
+       b.Versions = append(b.Versions, src.Versions[rowIdx])
+       b.SeriesIDs = append(b.SeriesIDs, src.SeriesList[rowIdx].ID)
+       // indexSortResult has no shardID; use zero value (matches copyTo 
behaviour).
+       b.ShardIDs = append(b.ShardIDs, 0)
+
+       var fr storage.FieldResult
+       if src.Fields != nil {
+               fr = src.Fields[rowIdx]
+       }
+
+       // Build a fast lookup: family+tag → tfl entry index, for schema 
walking.
+       // We walk schema.Columns in order to maintain tagIdx/fieldIdx 
alignment,
+       // resolving each column via the tfl projection tables.
+       tagIdx, fieldIdx := 0, 0
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       col := b.Tags[tagIdx]
+                       tagIdx++
+                       iqr.fillSegTagCell(col, def, src, rowIdx, fr)
+               case vectorized.RoleField:
+                       col := b.Fields[fieldIdx]
+                       fieldIdx++
+                       iqr.fillSegFieldCell(col, def, fr)
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata handled via the parallel slices above.
+               }
+       }
+}
+
+// fillSegTagCell resolves one tag column cell for the given schema column def
+// from a segResult row. Resolution order matches copyTo:
+//  1. projectedEntityOffsets: pre-decoded *modelv1.TagValue from EntityValues.
+//  2. fieldToValueType: raw bytes from the field result map.
+//  3. Neither found: null-fill.
+func (iqr *indexSortResult) fillSegTagCell(col vectorized.Column, def 
vectorized.ColumnDef,
+       src *segResult, rowIdx int, fr storage.FieldResult,
+) {
+       for tfIdx := range iqr.tfl {
+               if iqr.tfl[tfIdx].name != def.TagFamily {
+                       continue
+               }
+               tfl := &iqr.tfl[tfIdx]
+               if offset, ok := tfl.projectedEntityOffsets[def.Name]; ok {
+                       appendTagValueAt(col, def, 
src.SeriesList[rowIdx].EntityValues[offset])
+                       return
+               }
+               if fr == nil {
+                       appendNullTagN(col, def, 1)
+                       return
+               }
+               if tnt, ok := tfl.fieldToValueType[def.Name]; ok {
+                       fillTagCell(col, def, tnt.typ, fr[tnt.fieldName])
+                       return
+               }
+               appendNullTagN(col, def, 1)
+               return
+       }
+       appendNullTagN(col, def, 1)
+}
+
+// fillSegFieldCell resolves one field column cell. Fields are stored in the
+// FieldResult map; the schema def carries the field name and column type.
+// If the field is absent or fr is nil, null-fill.
+func (iqr *indexSortResult) fillSegFieldCell(col vectorized.Column, def 
vectorized.ColumnDef, fr storage.FieldResult) {
+       if fr == nil {
+               appendNullFieldN(col, def, 1)
+               return
+       }
+       // Fields in the FieldResult are keyed by the marshalled index key 
(tnt.fieldName).
+       // Walk all tfl entries to find the field's valueType.
+       for tfIdx := range iqr.tfl {
+               if tnt, ok := iqr.tfl[tfIdx].fieldToValueType[def.Name]; ok {
+                       fillFieldCell(col, def, tnt.typ, fr[tnt.fieldName])
+                       return
+               }
        }
-       return vmeasure.BuildMeasureBatchFromResult(r, iqr.batchSchema)
+       appendNullFieldN(col, def, 1)
 }

Reply via email to