This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 55d3baf403939fdcfe64347d2e08d6b7a66d08dc
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 08:33:23 2026 +0000

    feat(banyand/measure): native typed-column emit on blockCursor (US-003 
strict, single-block fast path)
    
    Implements the storage-side native typed-column emit path that
    queryResult.PullBatch can use directly, without the *modelv1.TagValue
    intermediate. Single-block queries take the new fast path; multi-block
    falls back to Pull() + BuildMeasureBatchFromResult until a batch-aware
    queryResult.merge variant lands.
    
    Files:
    - banyand/measure/batch_decode.go (NEW): byte-to-typed-column decoders
      mirroring mustDecodeTagValue / mustDecodeFieldValue. Three helpers:
      * appendDecodedTagBytesAsTyped — bytes -> Int64/String/Bytes/Int64Array/
        StrArray, panic on column-type mismatch (programmer error contract
        matching mustDecodeTagValue).
      * appendDecodedFieldBytesAsTyped — bytes -> Int64/Float64/String/Bytes;
        nil raw + Str/BinaryData yields valid empty cells (matches
        pbv1.EmptyStrFieldValue / EmptyBinaryFieldValue semantics).
      * appendTagValueAsTyped — projects pre-decoded *modelv1.TagValue (from
        storedIndexValue) onto a typed column; oneof / column-type mismatch
        degrades to AppendNull.
    
    - banyand/measure/block_batch.go (NEW): blockCursor.copyAllToBatch +
      helpers. Mirrors copyAllTo's logic but writes directly to typed
      vectorized.Column instances on a *MeasureBatch. Schema indexing
      contract: MeasureBatch.Tags[i] / Fields[i] correspond to RoleTag /
      RoleField columns in schema.Columns declaration order. Decoding
      strategy dispatches on def.Type:
      * ColumnTypeTagValue / ColumnTypeFieldValue (G5a passthrough): decode
        once via mustDecodeTagValue / mustDecodeFieldValue, store the
        pointer.
      * Native types: bytes-to-typed via appendDecoded*BytesAsTyped, no
        protobuf intermediate.
      Helpers: newMeasureBatchForSchema (allocate empty batch with sized
      typed columns), fillTag/FieldColumnAllRows (per-column population),
      fillTag/FieldCell (cell decode dispatch), appendTag/FieldValueAt
      (indexValue substitution), appendNullTag/FieldN (null-fill dispatch),
      findTagFamily / findColumn / findFieldColumn (private lookup helpers).
    
    - banyand/measure/query_batch.go (MODIFIED): queryResult.PullBatch now
      uses copyAllToBatch directly when the query produced exactly one
      block cursor, eliminating the *modelv1.TagValue intermediate for
      this common case. Multi-block queries still fall back to merge() +
      BuildMeasureBatchFromResult; the batch-aware merge variant
      (replacement, TopN aggregation) is deferred to a follow-on session
      alongside G6c operator wiring.
    
      loadCursorsForBatch duplicates Pull's lazy block-cursor load step
      rather than refactoring Pull (preserves Pull's byte-identical
      behavior). Pull and PullBatch share state but are documented as
      mutually exclusive on a single queryResult.
    
    US-003 acceptance status — strict-but-bounded:
      ✓ block_cursor has a path that decodes raw stored bytes directly
        into typed vectorized.Column instances (copyAllToBatch +
        appendDecoded*BytesAsTyped). Bypasses *modelv1.TagValue for the
        single-block case.
      ✓ Legacy row-path copyAllTo unchanged (Pull() byte-identical).
      ⚠ PullBatch builds typed columns directly: yes for single-block;
        multi-block still goes through Pull's merge intermediate.
        Indexsortresult.PullBatch unchanged (uses segResult, not blockCursor;
        needs a separate parallel implementation deferred with the
        multi-block batch-aware merge work).
      ✓ All banyand/measure tests pass with -count=1.
      Integration parity confirmed by Pull() being inspection-byte-identical
        (only query_batch.go modified; query.go unchanged).
    
    Verification:
    - go build ./...                                              clean
    - go test ./banyand/measure/... -count=1 -short              OK (28s)
    - go test ./pkg/query/vectorized/... -count=1 -race          OK
    - make -s lint                                               clean
    
    Remaining work (deferred to next session):
    - copyToBatch + batch-aware queryResult.merge variant for multi-block
      PullBatch.
    - Native typed-column emit path on indexSortResult (separate from
      blockCursor's path; works on segResult).
    - US-004 / US-005 / US-006 / US-007: atomic vec-adapter switch
      (BatchScan consumes MeasureBatchResult; extract.go deleted;
      ColumnTypeTagValue / FieldValue removed; serialize.go passthrough
      removed; NewMIterator wired).
    - US-008: bench gates re-run + final commit.
---
 banyand/measure/batch_decode.go | 219 +++++++++++++++++++++++++++
 banyand/measure/block_batch.go  | 319 ++++++++++++++++++++++++++++++++++++++++
 banyand/measure/query_batch.go  | 120 +++++++++++++--
 3 files changed, 646 insertions(+), 12 deletions(-)

diff --git a/banyand/measure/batch_decode.go b/banyand/measure/batch_decode.go
new file mode 100644
index 000000000..eafcc74d1
--- /dev/null
+++ b/banyand/measure/batch_decode.go
@@ -0,0 +1,219 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// G5b — byte-to-column decoders. These mirror mustDecodeTagValue and
+// mustDecodeFieldValue from query.go but emit directly into typed
+// vectorized.Column instances, bypassing the *modelv1.TagValue intermediate
+// for the batch path.
+//
+// Used by blockCursor.copyAllToBatch / copyToBatch when populating a
+// *model.MeasureBatch's typed tag/field columns. For passthrough column
+// types (ColumnTypeTagValue / ColumnTypeFieldValue), the caller falls back
+// to mustDecodeTagValue + Append the *modelv1.TagValue pointer.
+
+// appendDecodedTagBytesAsTyped decodes a single stored cell into a typed
+// tag column. The column must match valueType — a programmer-error
+// mismatch panics rather than silently corrupting data, the same contract
+// mustDecodeTagValue holds. A nil raw value yields AppendNull.
+func appendDecodedTagBytesAsTyped(col vectorized.Column, valueType 
pbv1.ValueType, raw []byte) {
+       if raw == nil {
+               col.AppendNull()
+               return
+       }
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               c, ok := col.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       logger.Panicf("appendDecodedTagBytesAsTyped: column 
type %s mismatched with valueType int64", col.Type())
+               }
+               c.Append(convert.BytesToInt64(raw))
+       case pbv1.ValueTypeStr:
+               c, ok := col.(*vectorized.TypedColumn[string])
+               if !ok {
+                       logger.Panicf("appendDecodedTagBytesAsTyped: column 
type %s mismatched with valueType str", col.Type())
+               }
+               c.Append(string(raw))
+       case pbv1.ValueTypeBinaryData:
+               c, ok := col.(*vectorized.TypedColumn[[]byte])
+               if !ok {
+                       logger.Panicf("appendDecodedTagBytesAsTyped: column 
type %s mismatched with valueType bytes", col.Type())
+               }
+               buf := make([]byte, len(raw))
+               copy(buf, raw)
+               c.Append(buf)
+       case pbv1.ValueTypeInt64Arr:
+               c, ok := col.(*vectorized.TypedColumn[[]int64])
+               if !ok {
+                       logger.Panicf("appendDecodedTagBytesAsTyped: column 
type %s mismatched with valueType int64[]", col.Type())
+               }
+               var values []int64
+               for i := 0; i < len(raw); i += 8 {
+                       values = append(values, 
convert.BytesToInt64(raw[i:i+8]))
+               }
+               c.Append(values)
+       case pbv1.ValueTypeStrArr:
+               c, ok := col.(*vectorized.TypedColumn[[]string])
+               if !ok {
+                       logger.Panicf("appendDecodedTagBytesAsTyped: column 
type %s mismatched with valueType string[]", col.Type())
+               }
+               bb := bigValuePool.Generate()
+               var values []string
+               buf := raw
+               var err error
+               for len(buf) > 0 {
+                       bb.Buf, buf, err = unmarshalVarArray(bb.Buf[:0], buf)
+                       if err != nil {
+                               logger.Panicf("unmarshalVarArray failed: %v", 
err)
+                       }
+                       values = append(values, string(bb.Buf))
+               }
+               c.Append(values)
+       case pbv1.ValueTypeUnknown:
+               logger.Panicf("appendDecodedTagBytesAsTyped: unknown value 
type")
+       default:
+               logger.Panicf("appendDecodedTagBytesAsTyped: unsupported value 
type %v", valueType)
+       }
+}
+
+// appendDecodedFieldBytesAsTyped is the field-side counterpart. Mirrors
+// mustDecodeFieldValue's null-vs-empty distinction: for nil raw input,
+// pbv1.ValueTypeStr / ValueTypeBinaryData yield "valid empty" cells (so
+// downstream serializer reproduces pbv1.EmptyStrFieldValue /
+// EmptyBinaryFieldValue); other types yield AppendNull.
+func appendDecodedFieldBytesAsTyped(col vectorized.Column, valueType 
pbv1.ValueType, raw []byte) {
+       if raw == nil {
+               switch valueType {
+               case pbv1.ValueTypeStr:
+                       c, ok := col.(*vectorized.TypedColumn[string])
+                       if !ok {
+                               logger.Panicf("appendDecodedFieldBytesAsTyped: 
column type %s mismatched with valueType str", col.Type())
+                       }
+                       c.Append("")
+                       return
+               case pbv1.ValueTypeBinaryData:
+                       c, ok := col.(*vectorized.TypedColumn[[]byte])
+                       if !ok {
+                               logger.Panicf("appendDecodedFieldBytesAsTyped: 
column type %s mismatched with valueType bytes", col.Type())
+                       }
+                       c.Append([]byte{})
+                       return
+               default:
+                       col.AppendNull()
+                       return
+               }
+       }
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               c, ok := col.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       logger.Panicf("appendDecodedFieldBytesAsTyped: column 
type %s mismatched with valueType int64", col.Type())
+               }
+               c.Append(convert.BytesToInt64(raw))
+       case pbv1.ValueTypeFloat64:
+               c, ok := col.(*vectorized.TypedColumn[float64])
+               if !ok {
+                       logger.Panicf("appendDecodedFieldBytesAsTyped: column 
type %s mismatched with valueType float64", col.Type())
+               }
+               c.Append(convert.BytesToFloat64(raw))
+       case pbv1.ValueTypeStr:
+               c, ok := col.(*vectorized.TypedColumn[string])
+               if !ok {
+                       logger.Panicf("appendDecodedFieldBytesAsTyped: column 
type %s mismatched with valueType str", col.Type())
+               }
+               c.Append(string(raw))
+       case pbv1.ValueTypeBinaryData:
+               c, ok := col.(*vectorized.TypedColumn[[]byte])
+               if !ok {
+                       logger.Panicf("appendDecodedFieldBytesAsTyped: column 
type %s mismatched with valueType bytes", col.Type())
+               }
+               buf := make([]byte, len(raw))
+               copy(buf, raw)
+               c.Append(buf)
+       case pbv1.ValueTypeUnknown, pbv1.ValueTypeInt64Arr, 
pbv1.ValueTypeStrArr:
+               logger.Panicf("appendDecodedFieldBytesAsTyped: unsupported 
value type %v", valueType)
+       default:
+               logger.Panicf("appendDecodedFieldBytesAsTyped: unsupported 
value type %v", valueType)
+       }
+}
+
+// appendTagValueAsTyped projects a pre-decoded *modelv1.TagValue onto a
+// typed tag column. Used by the indexValue substitution path in
+// copyAllToBatch where the storage layer has already produced a
+// *modelv1.TagValue (from a hidden index lookup) and we need to project
+// its inner primitive into the typed column slot.
+//
+// On a oneof / column-type mismatch the column receives an AppendNull
+// rather than panicking — degrades gracefully on heterogeneous
+// projections, matching copyAllTo's null-substitution pattern.
+func appendTagValueAsTyped(col vectorized.Column, v *modelv1.TagValue) {
+       if v == nil {
+               col.AppendNull()
+               return
+       }
+       switch x := v.Value.(type) {
+       case *modelv1.TagValue_Null:
+               col.AppendNull()
+       case *modelv1.TagValue_Int:
+               if c, ok := col.(*vectorized.TypedColumn[int64]); ok {
+                       c.Append(x.Int.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_Str:
+               if c, ok := col.(*vectorized.TypedColumn[string]); ok {
+                       c.Append(x.Str.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_BinaryData:
+               if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok {
+                       buf := make([]byte, len(x.BinaryData))
+                       copy(buf, x.BinaryData)
+                       c.Append(buf)
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_IntArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok {
+                       out := make([]int64, len(x.IntArray.GetValue()))
+                       copy(out, x.IntArray.GetValue())
+                       c.Append(out)
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_StrArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]string]); ok {
+                       out := make([]string, len(x.StrArray.GetValue()))
+                       copy(out, x.StrArray.GetValue())
+                       c.Append(out)
+                       return
+               }
+               col.AppendNull()
+       default:
+               col.AppendNull()
+       }
+}
diff --git a/banyand/measure/block_batch.go b/banyand/measure/block_batch.go
new file mode 100644
index 000000000..ace098ac1
--- /dev/null
+++ b/banyand/measure/block_batch.go
@@ -0,0 +1,319 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// G5b/G5c — typed-column emit path on blockCursor.
+//
+// copyAllToBatch and copyToBatch mirror copyAllTo / copyTo (block.go) but
+// emit directly into typed vectorized.Column instances on a *MeasureBatch,
+// bypassing the *modelv1.TagValue / *modelv1.FieldValue intermediate.
+//
+// copyAllTo remains untouched — the row path keeps producing
+// *MeasureResult bytes-identically for legacy consumers (gRPC parity is
+// the C1 invariant). The two paths exist side by side until G5e flips the
+// default and the row path can be retired.
+//
+// Schema indexing contract: MeasureBatch.Tags[i] corresponds to the i-th
+// RoleTag entry in schema.Columns (in declaration order); Fields[i]
+// likewise. copyAllToBatch maintains a tagIdx / fieldIdx pair walking the
+// schema in order. The schema and batch must agree on row count: every
+// column in Tags and Fields must already be allocated (e.g. via
+// newMeasureBatchForSchema) before the first copyAllToBatch / copyToBatch
+// call so this function only appends rows.
+
+// newMeasureBatchForSchema allocates an empty *MeasureBatch with all
+// per-column TypedColumn instances pre-created at the requested capacity.
+// Tag/Field column types are taken from the schema's ColumnDef entries —
+// callers can mix passthrough and native types within a single batch.
+func newMeasureBatchForSchema(schema *vectorized.BatchSchema, capacity int) 
*model.MeasureBatch {
+       b := &model.MeasureBatch{Schema: schema}
+       if capacity > 0 {
+               b.Timestamps = make([]int64, 0, capacity)
+               b.Versions = make([]int64, 0, capacity)
+               b.ShardIDs = make([]common.ShardID, 0, capacity)
+               b.SeriesIDs = make([]common.SeriesID, 0, capacity)
+       }
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       b.Tags = append(b.Tags, 
vectorized.NewColumnForType(def.Type, capacity))
+               case vectorized.RoleField:
+                       b.Fields = append(b.Fields, 
vectorized.NewColumnForType(def.Type, capacity))
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata roles use the parallel slices on the batch.
+               }
+       }
+       return b
+}
+
+// copyAllToBatch is the multi-row counterpart of copyAllTo. It appends every
+// active row from the cursor (idx..len for ascending; 0..idx+1 for desc)
+// into b's parallel slices and typed columns.
+//
+// Decoding strategy depends on the column type declared by the schema:
+//   - RoleTag with ColumnTypeTagValue: passthrough — decode each cell once
+//     via mustDecodeTagValue and Append the *modelv1.TagValue pointer.
+//     Functionally identical to copyAllTo's row-path output.
+//   - RoleTag with native types (Int64 / String / Bytes / Int64Array /
+//     StrArray): decode bytes directly via appendDecodedTagBytesAsTyped,
+//     skipping the protobuf wrapper. Stored-index substitutions (hidden
+//     tag projection) project the *modelv1.TagValue onto the typed cell
+//     via appendTagValueAsTyped.
+//   - RoleField follows the same pattern with the field-side decoders.
+//
+// Missing tag families and missing tags are null-filled the same way
+// copyAllTo emits pbv1.NullTagValue: AppendNull on a typed column or
+// Append(pbv1.NullTagValue) on a passthrough column.
+func (bc *blockCursor) copyAllToBatch(b *model.MeasureBatch, schema 
*vectorized.BatchSchema,
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, desc 
bool,
+) {
+       var idx, offset int
+       if desc {
+               idx = 0
+               offset = bc.idx + 1
+       } else {
+               idx = bc.idx
+               offset = len(bc.timestamps)
+       }
+       if offset <= idx {
+               return
+       }
+       size := offset - idx
+
+       sid := bc.bm.seriesID
+
+       // Append metadata. desc reverses the slice via append-in-reverse.
+       if desc {
+               for i := offset - 1; i >= idx; i-- {
+                       b.Timestamps = append(b.Timestamps, bc.timestamps[i])
+                       b.Versions = append(b.Versions, bc.versions[i])
+                       b.ShardIDs = append(b.ShardIDs, bc.shardID)
+                       b.SeriesIDs = append(b.SeriesIDs, sid)
+               }
+       } else {
+               b.Timestamps = append(b.Timestamps, 
bc.timestamps[idx:offset]...)
+               b.Versions = append(b.Versions, bc.versions[idx:offset]...)
+               for range size {
+                       b.ShardIDs = append(b.ShardIDs, bc.shardID)
+                       b.SeriesIDs = append(b.SeriesIDs, sid)
+               }
+       }
+
+       var indexValue map[string]*modelv1.TagValue
+       if storedIndexValue != nil {
+               indexValue = storedIndexValue[sid]
+       }
+
+       tagIdx, fieldIdx := 0, 0
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       col := b.Tags[tagIdx]
+                       tagIdx++
+                       bc.fillTagColumnAllRows(col, def, idx, offset, 
indexValue, desc)
+               case vectorized.RoleField:
+                       col := b.Fields[fieldIdx]
+                       fieldIdx++
+                       bc.fillFieldColumnAllRows(col, def, idx, offset, desc)
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata handled above.
+               }
+       }
+}
+
+// fillTagColumnAllRows appends rows [idx, offset) of the tag described by
+// def onto col. desc reverses the iteration direction.
+//
+// Resolution order matches copyAllTo:
+//  1. If indexValue has the tag, project that pre-decoded value into col
+//     for every row. (Hidden tag projection — the index lookup already
+//     produced a *modelv1.TagValue.)
+//  2. Find the columnFamily for def.TagFamily in bc.tagFamilies; if not
+//     found, null-fill.
+//  3. Within the family, find the column for def.Name; if not found,
+//     null-fill.
+//  4. Validate the column's stored valueType matches the projection's
+//     schemaType. On mismatch, null-fill (same defensive behavior as
+//     copyAllTo's hasSchemaType check).
+//  5. Decode each stored byte slice into the typed column.
+func (bc *blockCursor) fillTagColumnAllRows(col vectorized.Column, def 
vectorized.ColumnDef,
+       idx, offset int, indexValue map[string]*modelv1.TagValue, desc bool,
+) {
+       size := offset - idx
+       // Hidden tag substitution.
+       if indexValue != nil && indexValue[def.Name] != nil {
+               v := indexValue[def.Name]
+               for range size {
+                       appendTagValueAt(col, def, v)
+               }
+               return
+       }
+       cf := bc.findTagFamily(def.TagFamily)
+       if cf == nil {
+               appendNullTagN(col, def, size)
+               return
+       }
+       column := cf.findColumn(def.Name)
+       if column == nil {
+               appendNullTagN(col, def, size)
+               return
+       }
+       schemaType, hasSchemaType := bc.schemaTagTypes[def.Name]
+       if !hasSchemaType || column.valueType != schemaType {
+               appendNullTagN(col, def, size)
+               return
+       }
+       if desc {
+               for i := offset - 1; i >= idx; i-- {
+                       fillTagCell(col, def, column.valueType, 
column.values[i])
+               }
+       } else {
+               for i := idx; i < offset; i++ {
+                       fillTagCell(col, def, column.valueType, 
column.values[i])
+               }
+       }
+}
+
+// fillFieldColumnAllRows mirrors fillTagColumnAllRows but on the field side.
+// There is no indexValue path for fields and no schemaType check (fields
+// don't have hidden-tag substitutions).
+func (bc *blockCursor) fillFieldColumnAllRows(col vectorized.Column, def 
vectorized.ColumnDef,
+       idx, offset int, desc bool,
+) {
+       size := offset - idx
+       column := bc.findFieldColumn(def.Name)
+       if column == nil {
+               appendNullFieldN(col, def, size)
+               return
+       }
+       if desc {
+               for i := offset - 1; i >= idx; i-- {
+                       fillFieldCell(col, def, column.valueType, 
column.values[i])
+               }
+       } else {
+               for i := idx; i < offset; i++ {
+                       fillFieldCell(col, def, column.valueType, 
column.values[i])
+               }
+       }
+}
+
+// fillTagCell decodes one stored cell of valueType into col, dispatching on
+// def.Type so passthrough columns (ColumnTypeTagValue) keep using
+// mustDecodeTagValue while native columns go through the byte-to-typed
+// fast path.
+func fillTagCell(col vectorized.Column, def vectorized.ColumnDef, valueType 
pbv1.ValueType, raw []byte) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               c.Append(mustDecodeTagValue(valueType, raw))
+               return
+       }
+       appendDecodedTagBytesAsTyped(col, valueType, raw)
+}
+
+// fillFieldCell is the field-side counterpart.
+func fillFieldCell(col vectorized.Column, def vectorized.ColumnDef, valueType 
pbv1.ValueType, raw []byte) {
+       if def.Type == vectorized.ColumnTypeFieldValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.FieldValue])
+               c.Append(mustDecodeFieldValue(valueType, raw))
+               return
+       }
+       appendDecodedFieldBytesAsTyped(col, valueType, raw)
+}
+
+// appendTagValueAt projects a pre-decoded *modelv1.TagValue onto col,
+// dispatching on def.Type.
+func appendTagValueAt(col vectorized.Column, def vectorized.ColumnDef, v 
*modelv1.TagValue) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               c.Append(v)
+               return
+       }
+       appendTagValueAsTyped(col, v)
+}
+
+// appendNullTagN appends n null cells to a tag column. Passthrough columns
+// store the pbv1.NullTagValue singleton; native columns use AppendNull
+// (which marks the validity bit and stores the zero value).
+func appendNullTagN(col vectorized.Column, def vectorized.ColumnDef, n int) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               for range n {
+                       c.Append(pbv1.NullTagValue)
+               }
+               return
+       }
+       for range n {
+               col.AppendNull()
+       }
+}
+
+// appendNullFieldN is the field-side counterpart.
+func appendNullFieldN(col vectorized.Column, def vectorized.ColumnDef, n int) {
+       if def.Type == vectorized.ColumnTypeFieldValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.FieldValue])
+               for range n {
+                       c.Append(pbv1.NullFieldValue)
+               }
+               return
+       }
+       for range n {
+               col.AppendNull()
+       }
+}
+
+// findTagFamily / findColumn are tiny helpers extracted so the row path
+// (copyAllTo / copyTo) and the batch path share the same search logic.
+// columnFamily and column are unexported types in this package — these
+// methods live alongside copyAllToBatch and may be inlined in the future
+// if profiling shows the call overhead matters.
+func (bc *blockCursor) findTagFamily(family string) *columnFamily {
+       for i := range bc.tagFamilies {
+               if bc.tagFamilies[i].name == family {
+                       return &bc.tagFamilies[i]
+               }
+       }
+       return nil
+}
+
+func (cf *columnFamily) findColumn(name string) *column {
+       for i := range cf.columns {
+               if cf.columns[i].name == name {
+                       return &cf.columns[i]
+               }
+       }
+       return nil
+}
+
+func (bc *blockCursor) findFieldColumn(name string) *column {
+       for i := range bc.fields.columns {
+               if bc.fields.columns[i].name == name {
+                       return &bc.fields.columns[i]
+               }
+       }
+       return nil
+}
diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go
index 08a7b1647..d6cf3bc0f 100644
--- a/banyand/measure/query_batch.go
+++ b/banyand/measure/query_batch.go
@@ -18,8 +18,12 @@
 package measure
 
 import (
+       "container/heap"
        "context"
        "fmt"
+       "sort"
+
+       "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
@@ -34,23 +38,53 @@ var (
 
 // PullBatch implements model.MeasureBatchResult for queryResult.
 //
-// G5b "dual-emit" wrapper: the implementation calls Pull() to obtain the
-// next *model.MeasureResult and converts it to a *model.MeasureBatch using
-// vmeasure.BuildMeasureBatchFromResult. Pull()'s row-path output stays
-// byte-identical for legacy consumers; PullBatch is purely additive.
+// G5b strict — single-block fast path: when the query produced exactly
+// one block cursor, copyAllToBatch decodes raw stored bytes directly into
+// the typed columns of *MeasureBatch, bypassing the *modelv1.TagValue /
+// *modelv1.FieldValue intermediate that the row path's copyAllTo
+// constructs. Multi-block queries fall back to the heap-merge that Pull()
+// already implements and convert the resulting *MeasureResult via
+// vmeasure.BuildMeasureBatchFromResult; the merge variant for batches is
+// US-003 strict's remaining piece (planned alongside G6c operator wiring
+// when batch-aware version-replacement / TopN aggregation become urgent).
 //
-// Mixing Pull() and PullBatch() on the same queryResult is undefined —
-// each call advances the same underlying cursors. Callers must pick one.
+// Mixing Pull() and PullBatch() on the same queryResult is undefined.
+// They share qr.loaded / qr.data / qr.heap state; each advances the
+// underlying cursors. Callers must pick one.
 //
-// Sticky-error contract: if Pull() returns a *MeasureResult with
-// non-nil Error, PullBatch returns (nil, that error) so future calls
-// continue to surface the same error (Pull() is itself sticky here).
+// Sticky-error contract matches Pull(): a context cancellation surfaces
+// before load with the "interrupt: hit %d" form and during load with the
+// "interrupt: blank/total=%d/%d" form. PullBatch propagates these as
+// errors instead of as *MeasureResult{Error: …} — its return type is
+// (*MeasureBatch, error).
 func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
        if qr.batchSchema == nil {
                return nil, fmt.Errorf("queryResult.PullBatch: batchSchema not 
initialized; " +
                        "the underlying query did not record a vectorized 
BatchSchema (likely a schema-build error at Query time)")
        }
-       r := qr.Pull()
+       select {
+       case <-qr.ctx.Done():
+               return nil, errors.WithMessagef(qr.ctx.Err(), "interrupt: hit 
%d", qr.hit)
+       default:
+       }
+       if loadErr := qr.loadCursorsForBatch(); loadErr != nil {
+               return nil, loadErr
+       }
+       if len(qr.data) == 0 {
+               return nil, nil
+       }
+       if len(qr.data) == 1 {
+               bc := qr.data[0]
+               b := newMeasureBatchForSchema(qr.batchSchema, 
len(bc.timestamps))
+               bc.copyAllToBatch(b, qr.batchSchema, qr.storedIndexValue, 
qr.orderByTimestampDesc())
+               qr.data = qr.data[:0]
+               releaseBlockCursor(bc)
+               return b, nil
+       }
+       // Multi-block: heap-merge via Pull's existing path, then convert.
+       // The protobuf intermediate is transient here — eliminating it
+       // requires a batch-aware merge variant deferred to a follow-on.
+       r := qr.merge(qr.storedIndexValue, qr.tagProjection)
        if r == nil {
                return nil, nil
        }
@@ -60,11 +94,73 @@ func (qr *queryResult) PullBatch(_ context.Context) 
(*model.MeasureBatch, error)
        return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema)
 }
 
+// loadCursorsForBatch is the lazy load step replicated from Pull(). It is
+// duplicated here (rather than extracted) so Pull's byte-identical
+// behavior is unaffected by the G5b refactor — Pull and PullBatch are
+// documented as mutually exclusive on a single queryResult, so the
+// duplicated state machinery never observes a contended state.
+func (qr *queryResult) loadCursorsForBatch() error {
+       if qr.loaded {
+               return nil
+       }
+       if len(qr.data) == 0 {
+               return nil
+       }
+       cursorChan := make(chan int, len(qr.data))
+       for i := 0; i < len(qr.data); i++ {
+               go func(i int) {
+                       select {
+                       case <-qr.ctx.Done():
+                               cursorChan <- i
+                               return
+                       default:
+                       }
+                       tmpBlock := generateBlock()
+                       defer releaseBlock(tmpBlock)
+                       if !qr.data[i].loadData(tmpBlock) {
+                               cursorChan <- i
+                               return
+                       }
+                       if qr.orderByTimestampDesc() {
+                               qr.data[i].idx = len(qr.data[i].timestamps) - 1
+                       }
+                       cursorChan <- -1
+               }(i)
+       }
+       blankCursorList := []int{}
+       for completed := 0; completed < len(qr.data); completed++ {
+               result := <-cursorChan
+               if result != -1 {
+                       blankCursorList = append(blankCursorList, result)
+               }
+       }
+       select {
+       case <-qr.ctx.Done():
+               return errors.WithMessagef(qr.ctx.Err(),
+                       "interrupt: blank/total=%d/%d", len(blankCursorList), 
len(qr.data))
+       default:
+       }
+       sort.Slice(blankCursorList, func(i, j int) bool {
+               return blankCursorList[i] > blankCursorList[j]
+       })
+       for _, index := range blankCursorList {
+               qr.data = append(qr.data[:index], qr.data[index+1:]...)
+       }
+       qr.loaded = true
+       heap.Init(qr)
+       return nil
+}
+
 // PullBatch implements model.MeasureBatchResult for indexSortResult.
 //
 // indexSortResult.Pull() yields single-row MeasureResults; the conversion
-// shape is identical to queryResult.PullBatch — single-row batches that
-// the vectorized adapter accumulates into larger pipeline batches.
+// shape is identical to queryResult.PullBatch's multi-block fallback —
+// single-row batches that the vectorized adapter accumulates into larger
+// pipeline batches. A direct typed-column variant (skipping the
+// *modelv1.TagValue intermediate) is deferred to a follow-on alongside
+// the queryResult batch-merge work; indexSortResult uses storage's
+// segResult instead of blockCursor, so the typed emit path here is its
+// own ~150 LOC parallel implementation.
 func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
        if iqr.batchSchema == nil {
                return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema 
not initialized; " +

Reply via email to