This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 90aacda9113c056a5561cb57e56fae33ad1342a7 Author: Hongtao Gao <[email protected]> AuthorDate: Sun May 10 09:31:22 2026 +0000 feat(banyand/measure): typed PullBatch for indexSortResult (closes US-003 multi-block deferral) Replace the indexSortResult.PullBatch row-path fallback (Pull → BuildMeasureBatchFromResult) with a direct typed-column emit path, completing the deferral noted in 36140ca9. New methods on *indexSortResult: - mergeSegResultsBatch: heap-merge variant of Pull's segResult loop; pops up to mergeBatchMaxRows rows, calls copyToBatch per row, pushes back non-exhausted segResults. Fast path for single-segResult case avoids heap overhead. - copyToBatch: batch-path mirror of copyTo (query.go); appends one row's metadata into MeasureBatch parallel slices and dispatches per schema column via fillSegTagCell / fillSegFieldCell. - fillSegTagCell: resolves one tag column cell — projectedEntityOffsets path for EntityValues (pre-decoded *modelv1.TagValue via appendTagValueAt), fieldToValueType path for raw field bytes (via fillTagCell from block_batch.go). - fillSegFieldCell: resolves one field column cell via fillFieldCell from block_batch.go. Reused from block_batch.go: appendTagValueAt, appendNullTagN, appendNullFieldN, fillTagCell, fillFieldCell. The existing Pull() and copyTo() methods are untouched (C1 invariant — row path remains byte-identical). Verification: go build ./..., go test ./banyand/measure/... -short -race (PASS), go test ./pkg/query/vectorized/... -race (PASS), RUN_BENCH_GATES=1 TestBenchGates_PerWorkload (PASS), integration vectorized parity suite 488 specs (PASS), make lint (PASS). --- banyand/measure/query_batch.go | 155 +++++++++++++++++++++++++++++++++++++---- 1 file changed, 142 insertions(+), 13 deletions(-) diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go index 448a64695..3fdcfc556 100644 --- a/banyand/measure/query_batch.go +++ b/banyand/measure/query_batch.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" @@ -238,25 +239,153 @@ func (qr *queryResult) loadCursorsForBatch() error { // PullBatch implements model.MeasureBatchResult for indexSortResult. // -// indexSortResult.Pull() yields single-row MeasureResults; the conversion -// shape is identical to queryResult.PullBatch's multi-block fallback — -// single-row batches that the vectorized adapter accumulates into larger -// pipeline batches. A direct typed-column variant (skipping the -// *modelv1.TagValue intermediate) is deferred to a follow-on alongside -// the queryResult batch-merge work; indexSortResult uses storage's -// segResult instead of blockCursor, so the typed emit path here is its -// own ~150 LOC parallel implementation. +// Typed-column variant: appends rows from the segResult heap directly into a +// *model.MeasureBatch via copyToBatch, bypassing the *modelv1.TagValue / +// *MeasureResult intermediate that the row-path Pull() constructs. This +// mirrors what queryResult.mergeBatch does for blockCursor (committed in +// 36140ca9) but on the segResult side. Pull() and copyTo() are untouched — +// the row path remains byte-identical for legacy consumers (C1 invariant). func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) { if iqr.batchSchema == nil { return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema not initialized; " + "the underlying query did not record a vectorized BatchSchema (likely a schema-build error at Query time)") } - r := iqr.Pull() - if r == nil { + return iqr.mergeSegResultsBatch(iqr.batchSchema, mergeBatchMaxRows) +} + +// mergeSegResultsBatch is the batch-aware counterpart of indexSortResult.Pull. +// It pops from the segResult heap up to batchSize rows, calling copyToBatch +// for each row, then pushes the segResult back if it has more rows. This +// mirrors the heap-merge loop in Pull() but writes into typed columns instead +// of building *MeasureResult values. +func (iqr *indexSortResult) mergeSegResultsBatch(schema *vectorized.BatchSchema, batchSize int) (*model.MeasureBatch, error) { + if len(iqr.segResults.results) == 0 { return nil, nil } - if r.Error != nil { - return nil, r.Error + b := newMeasureBatchForSchema(schema, batchSize) + if len(iqr.segResults.results) == 1 { + sr := iqr.segResults.results[0] + for b.RowCount() < batchSize && sr.i < len(sr.SeriesList) { + iqr.copyToBatch(b, schema, sr) + sr.i++ + } + if sr.i >= len(sr.SeriesList) { + iqr.segResults.results = iqr.segResults.results[:0] + } + if b.RowCount() == 0 { + return nil, nil + } + return b, nil + } + for b.RowCount() < batchSize && iqr.segResults.Len() > 0 { + top := heap.Pop(&iqr.segResults) + sr := top.(*segResult) + iqr.copyToBatch(b, schema, sr) + sr.i++ + if sr.i < len(sr.SeriesList) { + heap.Push(&iqr.segResults, sr) + } + } + if b.RowCount() == 0 { + return nil, nil + } + return b, nil +} + +// copyToBatch appends the single row at src.i into b's parallel slices and +// typed columns. It is the batch-path mirror of indexSortResult.copyTo +// (query.go) and is used by mergeSegResultsBatch when emitting rows. +// +// Tag resolution order matches copyTo: +// 1. If the tag name has an entry in tfl[i].projectedEntityOffsets, the value +// is a pre-decoded *modelv1.TagValue from EntityValues — project via +// appendTagValueAt (passthrough) or appendTagValueAsTyped (native). +// 2. Otherwise, look up the tag name in tfl[i].fieldToValueType to get the +// raw bytes from the field result and decode via fillTagCell. +// +// Schema iteration follows the same tagIdx/fieldIdx convention as +// blockCursor.copyToBatch: Tags[tagIdx] aligns with the tagIdx-th RoleTag +// entry in schema.Columns, Fields[fieldIdx] with the fieldIdx-th RoleField. +func (iqr *indexSortResult) copyToBatch(b *model.MeasureBatch, schema *vectorized.BatchSchema, src *segResult) { + rowIdx := src.i + b.Timestamps = append(b.Timestamps, src.Timestamps[rowIdx]) + b.Versions = append(b.Versions, src.Versions[rowIdx]) + b.SeriesIDs = append(b.SeriesIDs, src.SeriesList[rowIdx].ID) + // indexSortResult has no shardID; use zero value (matches copyTo behaviour). + b.ShardIDs = append(b.ShardIDs, 0) + + var fr storage.FieldResult + if src.Fields != nil { + fr = src.Fields[rowIdx] + } + + // Build a fast lookup: family+tag → tfl entry index, for schema walking. + // We walk schema.Columns in order to maintain tagIdx/fieldIdx alignment, + // resolving each column via the tfl projection tables. + tagIdx, fieldIdx := 0, 0 + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + col := b.Tags[tagIdx] + tagIdx++ + iqr.fillSegTagCell(col, def, src, rowIdx, fr) + case vectorized.RoleField: + col := b.Fields[fieldIdx] + fieldIdx++ + iqr.fillSegFieldCell(col, def, fr) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata handled via the parallel slices above. + } + } +} + +// fillSegTagCell resolves one tag column cell for the given schema column def +// from a segResult row. Resolution order matches copyTo: +// 1. projectedEntityOffsets: pre-decoded *modelv1.TagValue from EntityValues. +// 2. fieldToValueType: raw bytes from the field result map. +// 3. Neither found: null-fill. +func (iqr *indexSortResult) fillSegTagCell(col vectorized.Column, def vectorized.ColumnDef, + src *segResult, rowIdx int, fr storage.FieldResult, +) { + for tfIdx := range iqr.tfl { + if iqr.tfl[tfIdx].name != def.TagFamily { + continue + } + tfl := &iqr.tfl[tfIdx] + if offset, ok := tfl.projectedEntityOffsets[def.Name]; ok { + appendTagValueAt(col, def, src.SeriesList[rowIdx].EntityValues[offset]) + return + } + if fr == nil { + appendNullTagN(col, def, 1) + return + } + if tnt, ok := tfl.fieldToValueType[def.Name]; ok { + fillTagCell(col, def, tnt.typ, fr[tnt.fieldName]) + return + } + appendNullTagN(col, def, 1) + return + } + appendNullTagN(col, def, 1) +} + +// fillSegFieldCell resolves one field column cell. Fields are stored in the +// FieldResult map; the schema def carries the field name and column type. +// If the field is absent or fr is nil, null-fill. +func (iqr *indexSortResult) fillSegFieldCell(col vectorized.Column, def vectorized.ColumnDef, fr storage.FieldResult) { + if fr == nil { + appendNullFieldN(col, def, 1) + return + } + // Fields in the FieldResult are keyed by the marshalled index key (tnt.fieldName). + // Walk all tfl entries to find the field's valueType. + for tfIdx := range iqr.tfl { + if tnt, ok := iqr.tfl[tfIdx].fieldToValueType[def.Name]; ok { + fillFieldCell(col, def, tnt.typ, fr[tnt.fieldName]) + return + } } - return vmeasure.BuildMeasureBatchFromResult(r, iqr.batchSchema) + appendNullFieldN(col, def, 1) }
