This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b2d3c08028131c66f1807f1a696633e6a2782122
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 22:00:12 2026 +0000

    feat(query/vectorized/measure): wire NewMIterator through 
MeasureBatchResult.PullBatch (US-007 partial)
    
    Adds the architectural wiring step that lets the vectorized adapter
    consume typed columns from storage's PullBatch directly, bypassing
    extract.go's per-cell decode pass. Storage's queryResult /
    indexSortResult both implement MeasureBatchResult; NewMIterator
    type-asserts the input and dispatches to the new source.
    
    Files:
    - pkg/query/vectorized/measure/batchsource.go (NEW): a PullOperator
      that drives MeasureBatchResult.PullBatch and accumulates rows into
      *vectorized.RecordBatch up to batchSize. Per-row metadata copied
      from MeasureBatch's parallel slices; tag/field columns delegated to
      appendColumnRange (typed Append per element with validity bit
      propagation).
    - pkg/query/vectorized/measure/batchsource_test.go (NEW): table-driven
      tests for EOF, single-batch-smaller-than-batchSize, accumulate-
      across-pulls, sticky error, idempotent Close, and pointer
      preservation for passthrough TagValue/FieldValue columns.
    - pkg/query/vectorized/measure/integration.go: NewMIterator now
      type-asserts qr to MeasureBatchResult; if satisfied, builds a
      BatchSourceFromBatchResult. Otherwise falls back to the original
      BatchScan path (which decodes via extract.go). Both paths share the
      schema and batch pool.
    - banyand/measure/query.go: queryResult.batchSchema is now built from
      result.tagProjection (the *original* projection captured before
      the entity/index tag strip) rather than from the post-strip
      mqo.TagProjection. Without this fix the storage-emitted MeasureBatch
      schema is narrower than the vec adapter's NewMIterator schema, and
      the vectorized parity integration suite fails on hidden-tag
      projection cases (the storedIndexValue substitution has no slot in
      the narrowed schema). The narrowed mqo.TagProjection is still used
      for the row path's queryResult.merge consumers via the existing
      qr.tagProjection field.
    
    Acceptance:
      ✓ go build ./...
      ✓ go test ./pkg/query/vectorized/... -count=1 -race
      ✓ go test ./banyand/measure/... -count=1 -short -race  (28.835s)
      ✓ scripts/bench-vectorized.sh COUNT=2 BENCHTIME=1s exits 0
        (W1..W5 gates green: ns/op vec ≤ row × 1.05; allocs vec ≤ row ×
        1.005; B/op vec ≤ row × 1.20)
      ✓ go test ./test/integration/standalone/query/
        -ginkgo.focus="vectorized parity"  (90s green)
      ✓ make -s lint clean
    
    Architectural state after this commit:
      - Storage layer dual-emits: Pull() → *MeasureResult (row);
        PullBatch() → *MeasureBatch (passthrough columns).
      - Vec adapter consumes PullBatch when available — no extract.go on
        the live path. extract.go remains for any caller producing a
        plain MeasureQueryResult (none in the v1 storage layer).
      - BuildBatchSchema still emits passthrough column types
        (ColumnTypeTagValue / ColumnTypeFieldValue). Switching to native
        types is US-005, gated behind atomic deletion of extract.go +
        passthrough fast paths in serialize.go (US-004 / US-006).
    
    Remaining for the next iteration:
      - US-005: BuildBatchSchema → native column types.
      - US-006: serialize.go drop passthrough fast paths.
      - US-004: delete extract.go + ColumnTypeTagValue/FieldValue
        constants + factories.
      - US-003 strict full: multi-block batch-aware queryResult.merge
        variant; indexSortResult native typed emit.
      - US-008: bench gates re-run after the cleanup; final commit.
---
 banyand/measure/query.go                         |  16 +-
 pkg/query/vectorized/measure/batchsource.go      | 297 +++++++++++++++++++++++
 pkg/query/vectorized/measure/batchsource_test.go | 247 +++++++++++++++++++
 pkg/query/vectorized/measure/integration.go      |  29 ++-
 4 files changed, 580 insertions(+), 9 deletions(-)

diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index fd5f9ee70..2819552a4 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -219,9 +219,19 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        }
 
        // Build the columnar BatchSchema once for PullBatch consumers (G5b).
-       // Falls back to nil on schema-build failure; PullBatch checks for nil
-       // and returns a clean error rather than degrading the row-path Pull().
-       result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, mqo)
+       // IMPORTANT: use result.tagProjection (the *original* projection
+       // captured before the entity/index tag strip) rather than mqo —
+       // mqo.TagProjection was rewritten to newTagProjection above and no
+       // longer carries entity-tag slots that the row path's copyAllTo
+       // fills via storedIndexValue. The batch path's column layout must
+       // match the vec adapter's expectation, which is the original
+       // projection. Falls back to nil on schema-build failure; PullBatch
+       // checks for nil and returns a clean error rather than degrading
+       // the row-path Pull().
+       result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, 
model.MeasureQueryOptions{
+               TagProjection:   result.tagProjection,
+               FieldProjection: mqo.FieldProjection,
+       })
 
        return &result, nil
 }
diff --git a/pkg/query/vectorized/measure/batchsource.go 
b/pkg/query/vectorized/measure/batchsource.go
new file mode 100644
index 000000000..2a81920b5
--- /dev/null
+++ b/pkg/query/vectorized/measure/batchsource.go
@@ -0,0 +1,297 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// BatchSourceFromBatchResult is a PullOperator that drives a
+// model.MeasureBatchResult through PullBatch and accumulates rows into
+// *vectorized.RecordBatch instances of the configured batchSize.
+//
+// G5c (US-007 partial) — provides the wiring path that lets NewMIterator
+// consume PullBatch output directly. Storage's queryResult /
+// indexSortResult both implement MeasureBatchResult; the columns inside
+// the *MeasureBatch are typed (passthrough today; native after
+// US-005). BatchSourceFromBatchResult column-copies into a RecordBatch
+// whose schema matches the source. extract.go is bypassed entirely on
+// this path — the source columns are already typed.
+//
+// Schema contract: the supplied schema must match every *MeasureBatch
+// produced by br.PullBatch. Enforced loosely — the column-copy helper
+// returns an error on TypedColumn[T] mismatches, which surfaces as a
+// pipeline error.
+type BatchSourceFromBatchResult struct {
+       br        model.MeasureBatchResult
+       schema    *vectorized.BatchSchema
+       pool      *vectorized.BatchPool
+       pending   *model.MeasureBatch
+       err       error
+       batchSize int
+       pendPos   int
+       eof       bool
+       closed    bool
+}
+
+// NewBatchSourceFromBatchResult constructs the source. Init is required
+// before NextBatch (no-op today, kept to satisfy PullOperator).
+func NewBatchSourceFromBatchResult(br model.MeasureBatchResult, schema 
*vectorized.BatchSchema,
+       pool *vectorized.BatchPool, batchSize int,
+) *BatchSourceFromBatchResult {
+       return &BatchSourceFromBatchResult{
+               br:        br,
+               schema:    schema,
+               pool:      pool,
+               batchSize: batchSize,
+       }
+}
+
+// Init satisfies PullOperator.
+func (s *BatchSourceFromBatchResult) Init(_ context.Context) error { return 
nil }
+
+// OutputSchema returns the schema declared at construction.
+func (s *BatchSourceFromBatchResult) OutputSchema() *vectorized.BatchSchema { 
return s.schema }
+
+// NextBatch pulls *MeasureBatch instances from the underlying
+// MeasureBatchResult and copies their rows into a fresh RecordBatch from
+// the pool, up to batchSize rows or source EOF, whichever comes first.
+//
+// EOF is sticky: once observed, subsequent calls return (nil, nil)
+// without re-entering the source. Errors are sticky too.
+func (s *BatchSourceFromBatchResult) NextBatch(ctx context.Context) 
(*vectorized.RecordBatch, error) {
+       if s.err != nil {
+               return nil, s.err
+       }
+       if s.eof {
+               return nil, nil
+       }
+       out := s.pool.Get()
+       for out.Len < s.batchSize {
+               if s.pending == nil || s.pendPos >= s.pending.RowCount() {
+                       mb, pullErr := s.br.PullBatch(ctx)
+                       if pullErr != nil {
+                               s.err = pullErr
+                               return nil, pullErr
+                       }
+                       if mb == nil {
+                               s.eof = true
+                               break
+                       }
+                       if mb.RowCount() == 0 {
+                               s.pending = nil
+                               s.pendPos = 0
+                               continue
+                       }
+                       s.pending = mb
+                       s.pendPos = 0
+               }
+               rowsAvail := s.pending.RowCount() - s.pendPos
+               rowsTake := min(s.batchSize-out.Len, rowsAvail)
+               if copyErr := s.copyRowsInto(out, s.pending, s.pendPos, 
rowsTake); copyErr != nil {
+                       s.err = copyErr
+                       return nil, copyErr
+               }
+               out.Len += rowsTake
+               s.pendPos += rowsTake
+       }
+       if out.Len == 0 {
+               s.pool.Put(out)
+               return nil, nil
+       }
+       return out, nil
+}
+
+// copyRowsInto copies n rows starting at srcPos from mb into the
+// RecordBatch out (appending). Metadata roles read mb's parallel slices;
+// tag/field roles delegate to appendColumnRange for each typed column.
+func (s *BatchSourceFromBatchResult) copyRowsInto(out *vectorized.RecordBatch,
+       mb *model.MeasureBatch, srcPos, n int,
+) error {
+       if tsIdx := s.schema.TimestampIndex(); tsIdx >= 0 {
+               c := out.Columns[tsIdx].(*vectorized.TypedColumn[int64])
+               for k := range n {
+                       c.Append(mb.Timestamps[srcPos+k])
+               }
+       }
+       if vIdx := s.schema.VersionIndex(); vIdx >= 0 {
+               c := out.Columns[vIdx].(*vectorized.TypedColumn[int64])
+               for k := range n {
+                       c.Append(mb.Versions[srcPos+k])
+               }
+       }
+       if sidIdx := s.schema.SeriesIDIndex(); sidIdx >= 0 {
+               c := out.Columns[sidIdx].(*vectorized.TypedColumn[int64])
+               for k := range n {
+                       c.Append(int64(mb.SeriesIDs[srcPos+k]))
+               }
+       }
+       if shIdx := s.schema.ShardIDIndex(); shIdx >= 0 {
+               c := out.Columns[shIdx].(*vectorized.TypedColumn[int64])
+               for k := range n {
+                       c.Append(int64(mb.ShardIDs[srcPos+k]))
+               }
+       }
+       tagIdx, fieldIdx := 0, 0
+       for outColIdx, def := range s.schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       if tagIdx >= len(mb.Tags) {
+                               return fmt.Errorf("BatchSourceFromBatchResult: 
schema declares %d tag columns but MeasureBatch has %d",
+                                       tagIdx+1, len(mb.Tags))
+                       }
+                       if copyErr := appendColumnRange(out.Columns[outColIdx], 
mb.Tags[tagIdx], srcPos, n); copyErr != nil {
+                               return fmt.Errorf("tag %s.%s: %w", 
def.TagFamily, def.Name, copyErr)
+                       }
+                       tagIdx++
+               case vectorized.RoleField:
+                       if fieldIdx >= len(mb.Fields) {
+                               return fmt.Errorf("BatchSourceFromBatchResult: 
schema declares %d field columns but MeasureBatch has %d",
+                                       fieldIdx+1, len(mb.Fields))
+                       }
+                       if copyErr := appendColumnRange(out.Columns[outColIdx], 
mb.Fields[fieldIdx], srcPos, n); copyErr != nil {
+                               return fmt.Errorf("field %s: %w", def.Name, 
copyErr)
+                       }
+                       fieldIdx++
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata roles handled above via parallel slices.
+               }
+       }
+       return nil
+}
+
+// Close releases the underlying MeasureBatchResult exactly once. Idempotent.
+func (s *BatchSourceFromBatchResult) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       if s.br != nil {
+               s.br.Release()
+               s.br = nil
+       }
+       return nil
+}
+
+// appendColumnRange copies n rows starting at srcPos from src into dst
+// (appending). Both columns must share the same TypedColumn[T] type;
+// returns an error on mismatch. Validity bits are propagated cell-by-cell
+// via dst.MarkNullAt when src.IsNull reports null at the corresponding
+// row. Slice-typed values ([]byte / []int64 / []string) are deeply
+// copied so the source MeasureBatch can be released independently of the
+// produced RecordBatch.
+func appendColumnRange(dst, src vectorized.Column, srcPos, n int) error {
+       startLen := dst.Len()
+       switch d := dst.(type) {
+       case *vectorized.TypedColumn[int64]:
+               sCol, ok := src.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst int64 vs src 
%s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       d.Append(sData[srcPos+k])
+               }
+       case *vectorized.TypedColumn[float64]:
+               sCol, ok := src.(*vectorized.TypedColumn[float64])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst float64 vs 
src %s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       d.Append(sData[srcPos+k])
+               }
+       case *vectorized.TypedColumn[string]:
+               sCol, ok := src.(*vectorized.TypedColumn[string])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst string vs src 
%s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       d.Append(sData[srcPos+k])
+               }
+       case *vectorized.TypedColumn[[]byte]:
+               sCol, ok := src.(*vectorized.TypedColumn[[]byte])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst bytes vs src 
%s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       origin := sData[srcPos+k]
+                       buf := make([]byte, len(origin))
+                       copy(buf, origin)
+                       d.Append(buf)
+               }
+       case *vectorized.TypedColumn[[]int64]:
+               sCol, ok := src.(*vectorized.TypedColumn[[]int64])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst int64[] vs 
src %s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       origin := sData[srcPos+k]
+                       buf := make([]int64, len(origin))
+                       copy(buf, origin)
+                       d.Append(buf)
+               }
+       case *vectorized.TypedColumn[[]string]:
+               sCol, ok := src.(*vectorized.TypedColumn[[]string])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst string[] vs 
src %s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       origin := sData[srcPos+k]
+                       buf := make([]string, len(origin))
+                       copy(buf, origin)
+                       d.Append(buf)
+               }
+       case *vectorized.TypedColumn[*modelv1.TagValue]:
+               sCol, ok := src.(*vectorized.TypedColumn[*modelv1.TagValue])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst tagvalue vs 
src %s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       d.Append(sData[srcPos+k])
+               }
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               sCol, ok := src.(*vectorized.TypedColumn[*modelv1.FieldValue])
+               if !ok {
+                       return fmt.Errorf("appendColumnRange: dst fieldvalue vs 
src %s", src.Type())
+               }
+               sData := sCol.Data()
+               for k := range n {
+                       d.Append(sData[srcPos+k])
+               }
+       default:
+               return fmt.Errorf("appendColumnRange: unsupported dst type %s", 
dst.Type())
+       }
+       for k := range n {
+               if src.IsNull(srcPos + k) {
+                       dst.MarkNullAt(startLen + k)
+               }
+       }
+       return nil
+}
diff --git a/pkg/query/vectorized/measure/batchsource_test.go 
b/pkg/query/vectorized/measure/batchsource_test.go
new file mode 100644
index 000000000..283b5dc2f
--- /dev/null
+++ b/pkg/query/vectorized/measure/batchsource_test.go
@@ -0,0 +1,247 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// fakeBatchResult yields a hand-built sequence of *MeasureBatch
+// instances for BatchSourceFromBatchResult tests. The interface is
+// minimal — PullBatch returns the next batch in seq, then nil.
+type fakeBatchResult struct {
+       err        error
+       seq        []*model.MeasureBatch
+       idx        int
+       releaseCnt int
+}
+
+func (f *fakeBatchResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
+       if f.err != nil {
+               return nil, f.err
+       }
+       if f.idx >= len(f.seq) {
+               return nil, nil
+       }
+       b := f.seq[f.idx]
+       f.idx++
+       return b, nil
+}
+
+func (f *fakeBatchResult) Release() {
+       f.releaseCnt++
+}
+
+// passthroughSchema mirrors what BuildBatchSchema (G5a) emits — a single
+// passthrough tag column + one passthrough field column on top of the
+// metadata roles.
+func passthroughSchema() *vectorized.BatchSchema {
+       return vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleShardID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeTagValue},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeFieldValue},
+       })
+}
+
+// makePassthroughBatch constructs a MeasureBatch with the passthrough
+// schema, populating ts/version/sid/shard, one tag value (svc=alpha),
+// and one field value (value=int 42) per row.
+func makePassthroughBatch(schema *vectorized.BatchSchema, sid common.SeriesID, 
baseTS int64, n int) *model.MeasureBatch {
+       tagCol := vectorized.NewTagValueColumn(n)
+       fldCol := vectorized.NewFieldValueColumn(n)
+       tv := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "alpha"}}}
+       fv := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 42}}}
+       timestamps := make([]int64, n)
+       versions := make([]int64, n)
+       shardIDs := make([]common.ShardID, n)
+       seriesIDs := make([]common.SeriesID, n)
+       for i := range n {
+               timestamps[i] = baseTS + int64(i)
+               versions[i] = 1
+               seriesIDs[i] = sid
+               tagCol.Append(tv)
+               fldCol.Append(fv)
+       }
+       return &model.MeasureBatch{
+               Schema:     schema,
+               Timestamps: timestamps,
+               Versions:   versions,
+               ShardIDs:   shardIDs,
+               SeriesIDs:  seriesIDs,
+               Tags:       []vectorized.Column{tagCol},
+               Fields:     []vectorized.Column{fldCol},
+       }
+}
+
+func TestBatchSource_EOFOnEmptySource(t *testing.T) {
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 4)
+       src := NewBatchSourceFromBatchResult(&fakeBatchResult{}, schema, pool, 
4)
+       defer src.Close()
+       b, err := src.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("EOF must not error; got %v", err)
+       }
+       if b != nil {
+               t.Fatalf("EOF must yield nil batch; got %v", b)
+       }
+}
+
+func TestBatchSource_SingleBatchSmallerThanBatchSize(t *testing.T) {
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 8)
+       mb := makePassthroughBatch(schema, 1, 100, 3)
+       src := NewBatchSourceFromBatchResult(&fakeBatchResult{seq: 
[]*model.MeasureBatch{mb}},
+               schema, pool, 8)
+       defer src.Close()
+
+       b, err := src.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("NextBatch: %v", err)
+       }
+       if b == nil {
+               t.Fatal("first NextBatch must return a non-nil batch")
+       }
+       if b.Len != 3 {
+               t.Fatalf("Len: want 3, got %d", b.Len)
+       }
+       // Metadata copied through.
+       if got := 
b.Columns[schema.TimestampIndex()].(*vectorized.TypedColumn[int64]).Data(); 
!equalsInt64s(got, []int64{100, 101, 102}) {
+               t.Fatalf("Timestamps: want [100 101 102], got %v", got)
+       }
+       if got := 
b.Columns[schema.SeriesIDIndex()].(*vectorized.TypedColumn[int64]).Data(); 
!equalsInt64s(got, []int64{1, 1, 1}) {
+               t.Fatalf("SeriesIDs: want [1 1 1], got %v", got)
+       }
+       // Subsequent call: EOF.
+       next, err2 := src.NextBatch(context.Background())
+       if err2 != nil {
+               t.Fatalf("post-EOF NextBatch: %v", err2)
+       }
+       if next != nil {
+               t.Fatalf("post-EOF must return nil; got %v", next)
+       }
+}
+
+func TestBatchSource_AccumulatesAcrossPulls(t *testing.T) {
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 4)
+       a := makePassthroughBatch(schema, 1, 100, 2)
+       b := makePassthroughBatch(schema, 2, 200, 3)
+       src := NewBatchSourceFromBatchResult(&fakeBatchResult{seq: 
[]*model.MeasureBatch{a, b}},
+               schema, pool, 4)
+       defer src.Close()
+
+       first, err := src.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("NextBatch1: %v", err)
+       }
+       if first == nil || first.Len != 4 {
+               t.Fatalf("first batch Len: want 4, got %v", first)
+       }
+       second, err := src.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("NextBatch2: %v", err)
+       }
+       if second == nil || second.Len != 1 {
+               t.Fatalf("second batch Len: want 1, got %v", second)
+       }
+       final, err := src.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("NextBatch3: %v", err)
+       }
+       if final != nil {
+               t.Fatalf("third call must EOF; got %v", final)
+       }
+}
+
+func TestBatchSource_StickyError(t *testing.T) {
+       boom := errors.New("scan failure")
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 4)
+       src := NewBatchSourceFromBatchResult(&fakeBatchResult{err: boom}, 
schema, pool, 4)
+       defer src.Close()
+       if _, err := src.NextBatch(context.Background()); !errors.Is(err, boom) 
{
+               t.Fatalf("first call: want %v, got %v", boom, err)
+       }
+       if _, err := src.NextBatch(context.Background()); !errors.Is(err, boom) 
{
+               t.Fatalf("second call must surface sticky error; got %v", err)
+       }
+}
+
+func TestBatchSource_CloseReleasesSource(t *testing.T) {
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 4)
+       fr := &fakeBatchResult{}
+       src := NewBatchSourceFromBatchResult(fr, schema, pool, 4)
+       if err := src.Close(); err != nil {
+               t.Fatal(err)
+       }
+       if err := src.Close(); err != nil {
+               t.Fatalf("second Close must be no-op; got %v", err)
+       }
+       if fr.releaseCnt != 1 {
+               t.Fatalf("Release called %d times; want 1 (idempotent Close)", 
fr.releaseCnt)
+       }
+}
+
+func TestBatchSource_PassthroughTagFieldPointersPreserved(t *testing.T) {
+       // Passthrough columns must not deep-copy *modelv1.TagValue or
+       // *modelv1.FieldValue pointers — the storage emits the canonical
+       // pointer and the serializer expects it back unchanged.
+       schema := passthroughSchema()
+       pool := vectorized.NewBatchPool(schema, 4)
+       mb := makePassthroughBatch(schema, 1, 100, 2)
+       srcTag := 
mb.Tags[0].(*vectorized.TypedColumn[*modelv1.TagValue]).Data()[0]
+       srcFld := 
mb.Fields[0].(*vectorized.TypedColumn[*modelv1.FieldValue]).Data()[0]
+       src := NewBatchSourceFromBatchResult(&fakeBatchResult{seq: 
[]*model.MeasureBatch{mb}},
+               schema, pool, 4)
+       defer src.Close()
+       b, _ := src.NextBatch(context.Background())
+       tagIdx, _ := schema.TagIndex("default", "svc")
+       fldIdx, _ := schema.FieldIndex("value")
+       gotTag := 
b.Columns[tagIdx].(*vectorized.TypedColumn[*modelv1.TagValue]).Data()[0]
+       gotFld := 
b.Columns[fldIdx].(*vectorized.TypedColumn[*modelv1.FieldValue]).Data()[0]
+       if gotTag != srcTag {
+               t.Fatalf("tag pointer was copied; want passthrough 
preservation")
+       }
+       if gotFld != srcFld {
+               t.Fatalf("field pointer was copied; want passthrough 
preservation")
+       }
+}
+
+func equalsInt64s(a, b []int64) bool {
+       if len(a) != len(b) {
+               return false
+       }
+       for i := range a {
+               if a[i] != b[i] {
+                       return false
+               }
+       }
+       return true
+}
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
index b52cfb5fc..55e7c047a 100644
--- a/pkg/query/vectorized/measure/integration.go
+++ b/pkg/query/vectorized/measure/integration.go
@@ -159,15 +159,32 @@ func NewMIterator(ctx context.Context, qr 
model.MeasureQueryResult,
                return nil, schemaErr
        }
        pool := vectorized.NewBatchPool(schema, cfg.BatchSize)
-       scan := NewBatchScan(qr, schema, pool, cfg.BatchSize)
-       pipeline, buildErr := vectorized.NewPipelineBuilder().From(scan).Build()
+
+       // G5c (US-007): when the storage result also satisfies
+       // MeasureBatchResult, drive the pipeline from PullBatch directly via
+       // BatchSourceFromBatchResult. This bypasses extract.go's per-cell
+       // decode pass — the source columns are already typed (passthrough or
+       // native, depending on BuildBatchSchema's column type choice).
+       //
+       // The fallback is the original BatchScan path that consumes
+       // MeasureQueryResult.Pull and runs extract* on each cell. It stays
+       // alive for any future caller that produces a query result not
+       // satisfying MeasureBatchResult; v1's storage layer satisfies both.
+       var source vectorized.PullOperator
+       if br, ok := qr.(model.MeasureBatchResult); ok {
+               source = NewBatchSourceFromBatchResult(br, schema, pool, 
cfg.BatchSize)
+       } else {
+               source = NewBatchScan(qr, schema, pool, cfg.BatchSize)
+       }
+
+       pipeline, buildErr := 
vectorized.NewPipelineBuilder().From(source).Build()
        if buildErr != nil {
-               // scan was constructed but never wired into a Pipeline; close 
it
-               // directly to release qr through the cursor.
-               _ = scan.Close()
+               // source was constructed but never wired into a Pipeline; close
+               // it directly to release qr through the source.
+               _ = source.Close()
                return nil, buildErr
        }
-       if initErr := scan.Init(ctx); initErr != nil {
+       if initErr := source.Init(ctx); initErr != nil {
                _ = pipeline.Close()
                return nil, initErr
        }

Reply via email to