This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 3755984c43b58dbf469c2716a8286062bdaba012
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun May 3 11:41:42 2026 +0000

    feat(query/vectorized/measure): add measure scan layer (gate G2)
    
    Second gate of the vectorized query pipeline. Implements the
    measure-specific extract/cursor/scan stack that consumes the existing
    MeasureQueryResult interface and produces RecordBatches.
    
    Modules:
      - config.go      VectorizedConfig with Enabled/BatchSize/QueryMemoryMiB
                       and Validate; defaults Enabled=false, 1024 rows, 256 MiB.
      - extract.go     extractTagRow / extractFieldRow / extractTagBulk /
                       extractFieldBulk. Reads already-decoded
                       *modelv1.TagValue / *modelv1.FieldValue values from
                       MeasureResult and writes them into TypedColumn[T] slices.
                       Defensive copies for slice-typed values; hard-fail on
                       unknown variants; handles all 7 TagValue cases including
                       Timestamp (mapped to int64 UnixNano).
      - cursor.go      SeriesCursor with explicit state machine, sticky-error
                       propagation (R1 critical), and bulk-copy fast path
                       exposed via Current/Advance accessors.
      - scan.go        BatchScan PullOperator. Drains the cursor with a bulk
                       fill of timestamp/version/sid/shardid + per-row tag/field
                       extracts; discards the partial batch immediately on any
                       cursor error so failures cannot be silently buried.
    
    Risks closed at this gate (per the design spec):
      R1 multi-series cursor with sticky-error semantics,
      R2 extract-not-decode with defensive slice copies and hard-fail on
      unknown variants.
    
    47 tests, all green under -race in ~1s. go vet clean.
---
 pkg/query/vectorized/measure/config.go        |  45 ++++
 pkg/query/vectorized/measure/config_test.go   |  52 ++++
 pkg/query/vectorized/measure/cursor.go        | 166 ++++++++++++
 pkg/query/vectorized/measure/cursor_test.go   | 206 +++++++++++++++
 pkg/query/vectorized/measure/doc.go           |  20 ++
 pkg/query/vectorized/measure/extract.go       | 171 +++++++++++++
 pkg/query/vectorized/measure/extract_test.go  | 353 ++++++++++++++++++++++++++
 pkg/query/vectorized/measure/fixtures_test.go |  61 +++++
 pkg/query/vectorized/measure/scan.go          | 233 +++++++++++++++++
 pkg/query/vectorized/measure/scan_test.go     | 255 +++++++++++++++++++
 10 files changed, 1562 insertions(+)

diff --git a/pkg/query/vectorized/measure/config.go 
b/pkg/query/vectorized/measure/config.go
new file mode 100644
index 000000000..83d2eed7a
--- /dev/null
+++ b/pkg/query/vectorized/measure/config.go
@@ -0,0 +1,45 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import "fmt"
+
+// VectorizedConfig controls the v1 vectorized Measure query path.
+type VectorizedConfig struct {
+       BatchSize      int
+       QueryMemoryMiB int
+       Enabled        bool
+}
+
+// DefaultConfig returns the v1 default — disabled, 1024-row batches, 256 MiB
+// per-query memory budget. v1 ships with Enabled false; the rollout PR flips
+// the default after soak and bench gates pass.
+func DefaultConfig() VectorizedConfig {
+       return VectorizedConfig{Enabled: false, BatchSize: 1024, 
QueryMemoryMiB: 256}
+}
+
+// Validate rejects nonsense configurations.
+func (c VectorizedConfig) Validate() error {
+       if c.BatchSize <= 0 {
+               return fmt.Errorf("vectorized.measure: BatchSize must be > 0, 
got %d", c.BatchSize)
+       }
+       if c.QueryMemoryMiB <= 0 {
+               return fmt.Errorf("vectorized.measure: QueryMemoryMiB must be > 
0, got %d", c.QueryMemoryMiB)
+       }
+       return nil
+}
diff --git a/pkg/query/vectorized/measure/config_test.go 
b/pkg/query/vectorized/measure/config_test.go
new file mode 100644
index 000000000..628bd77b2
--- /dev/null
+++ b/pkg/query/vectorized/measure/config_test.go
@@ -0,0 +1,52 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import "testing"
+
+func TestVectorizedConfig_Default_Disabled_BatchSize1024_Memory256(t 
*testing.T) {
+       c := DefaultConfig()
+       if c.Enabled {
+               t.Fatal("default Enabled must be false (v1 ships disabled)")
+       }
+       if c.BatchSize != 1024 {
+               t.Fatalf("default BatchSize: want 1024, got %d", c.BatchSize)
+       }
+       if c.QueryMemoryMiB != 256 {
+               t.Fatalf("default QueryMemoryMiB: want 256, got %d", 
c.QueryMemoryMiB)
+       }
+       if err := c.Validate(); err != nil {
+               t.Fatalf("default config must Validate cleanly, got %v", err)
+       }
+}
+
+func TestVectorizedConfig_Validate_ZeroBatchSize_ReturnsError(t *testing.T) {
+       c := DefaultConfig()
+       c.BatchSize = 0
+       if err := c.Validate(); err == nil {
+               t.Fatal("BatchSize=0 must fail Validate")
+       }
+}
+
+func TestVectorizedConfig_Validate_NegativeMemoryMiB_ReturnsError(t 
*testing.T) {
+       c := DefaultConfig()
+       c.QueryMemoryMiB = -1
+       if err := c.Validate(); err == nil {
+               t.Fatal("negative QueryMemoryMiB must fail Validate")
+       }
+}
diff --git a/pkg/query/vectorized/measure/cursor.go 
b/pkg/query/vectorized/measure/cursor.go
new file mode 100644
index 000000000..bf183c1d8
--- /dev/null
+++ b/pkg/query/vectorized/measure/cursor.go
@@ -0,0 +1,166 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+// seriesRow is the ephemeral per-row view exposed by SeriesCursor.NextRow.
+// The slices and pointers reference data owned by the underlying 
MeasureResult;
+// callers must not retain a row past the next NextRow call.
+type seriesRow struct {
+       tagFamilies []model.TagFamily
+       fields      []model.Field
+       timestamp   int64
+       version     int64
+       sid         common.SeriesID
+       shardID     common.ShardID
+       rowIdx      int
+}
+
+// SeriesCursor walks across MeasureResult instances yielded by a 
MeasureQueryResult,
+// presenting either single-row access (NextRow) or bulk-copy access 
(Current+Advance)
+// to BatchScan.
+//
+// Sticky-error contract: once Pull yields a result with Error != nil, the
+// cursor stores the error and every subsequent NextRow returns it. Init is
+// the only way to reset.
+type SeriesCursor struct {
+       qr        model.MeasureQueryResult
+       current   *model.MeasureResult
+       err       error
+       pos       int
+       exhausted bool
+}
+
+// Init resets the cursor and advances to the first non-empty series (or 
EOF/err).
+func (c *SeriesCursor) Init(qr model.MeasureQueryResult) {
+       c.qr = qr
+       c.current = nil
+       c.pos = 0
+       c.err = nil
+       c.exhausted = false
+       c.advanceSeries()
+}
+
+// Err returns the sticky storage error, or nil. Once non-nil it remains so
+// until Init is called again.
+func (c *SeriesCursor) Err() error { return c.err }
+
+// Exhausted reports whether the cursor has run out of input (clean EOF or
+// after an error).
+func (c *SeriesCursor) Exhausted() bool { return c.exhausted }
+
+// RemainingInSeries returns how many rows are left in the current 
MeasureResult.
+// 0 when at EOF, on error, or between series.
+func (c *SeriesCursor) RemainingInSeries() int {
+       if c.exhausted || c.current == nil {
+               return 0
+       }
+       return len(c.current.Timestamps) - c.pos
+}
+
+// Current returns the underlying MeasureResult and the cursor's position 
within
+// it. Used by BatchScan's bulk-copy path to read parallel arrays directly.
+func (c *SeriesCursor) Current() (*model.MeasureResult, int) {
+       return c.current, c.pos
+}
+
+// Advance moves the cursor n rows forward. Crosses to the next series if the
+// new position reaches the end of the current MeasureResult.
+func (c *SeriesCursor) Advance(n int) {
+       c.pos += n
+       if c.current != nil && c.pos >= len(c.current.Timestamps) {
+               c.advanceSeries()
+       }
+}
+
+// NextRow returns the next row across series boundaries.
+//
+// Returns:
+//   - (row, nil)         when a row is available;
+//   - (zero, nil)        on clean EOF;
+//   - (zero, stickyErr)  if a storage error has been observed; subsequent 
calls
+//     return the same error until Init is called again.
+func (c *SeriesCursor) NextRow() (seriesRow, error) {
+       if c.err != nil {
+               return seriesRow{}, c.err
+       }
+       if c.exhausted {
+               return seriesRow{}, nil
+       }
+       if c.current == nil || c.pos >= len(c.current.Timestamps) {
+               c.advanceSeries()
+               if c.err != nil {
+                       return seriesRow{}, c.err
+               }
+               if c.exhausted {
+                       return seriesRow{}, nil
+               }
+       }
+       row := seriesRow{
+               sid:         c.current.SID,
+               timestamp:   c.current.Timestamps[c.pos],
+               version:     c.current.Versions[c.pos],
+               rowIdx:      c.pos,
+               tagFamilies: c.current.TagFamilies,
+               fields:      c.current.Fields,
+       }
+       if c.pos < len(c.current.ShardIDs) {
+               row.shardID = c.current.ShardIDs[c.pos]
+       }
+       c.pos++
+       return row, nil
+}
+
+// Close releases the underlying MeasureQueryResult exactly once. Idempotent
+// and safe after EOF.
+func (c *SeriesCursor) Close() {
+       if c.qr != nil {
+               c.qr.Release()
+               c.qr = nil
+       }
+       c.exhausted = true
+}
+
+// advanceSeries pulls successive MeasureResults until one yields rows, EOF is
+// reached, or an error is observed. Empty results (zero timestamps) are 
skipped.
+func (c *SeriesCursor) advanceSeries() {
+       for {
+               next := c.qr.Pull()
+               if next == nil {
+                       c.exhausted = true
+                       c.current = nil
+                       return
+               }
+               if next.Error != nil {
+                       c.err = next.Error
+                       c.exhausted = true
+                       c.current = nil
+                       return
+               }
+               if len(next.Timestamps) == 0 {
+                       continue
+               }
+               c.current = next
+               c.pos = 0
+               return
+       }
+}
diff --git a/pkg/query/vectorized/measure/cursor_test.go 
b/pkg/query/vectorized/measure/cursor_test.go
new file mode 100644
index 000000000..eb6d64d8c
--- /dev/null
+++ b/pkg/query/vectorized/measure/cursor_test.go
@@ -0,0 +1,206 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "errors"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+func TestSeriesCursor_Init_FirstSeriesActivated(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100, 200, 300)}}
+       var c SeriesCursor
+       c.Init(qr)
+       if got := c.RemainingInSeries(); got != 3 {
+               t.Fatalf("RemainingInSeries after Init: want 3, got %d", got)
+       }
+}
+
+func TestSeriesCursor_NextRow_AdvancesWithinSeries(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
10, 20, 30)}}
+       var c SeriesCursor
+       c.Init(qr)
+       r1, err := c.NextRow()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if r1.timestamp != 10 || r1.sid != 1 {
+               t.Fatalf("row 1: ts=%d sid=%d", r1.timestamp, r1.sid)
+       }
+       r2, _ := c.NextRow()
+       if r2.timestamp != 20 {
+               t.Fatalf("row 2: ts=%d", r2.timestamp)
+       }
+       r3, _ := c.NextRow()
+       if r3.timestamp != 30 {
+               t.Fatalf("row 3: ts=%d", r3.timestamp)
+       }
+}
+
+func TestSeriesCursor_NextRow_CrossesSeriesBoundary(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(1, 100, 200),
+               mkResult(2, 300),
+       }}
+       var c SeriesCursor
+       c.Init(qr)
+       _, _ = c.NextRow() // ts=100, sid=1
+       _, _ = c.NextRow() // ts=200, sid=1
+       r, err := c.NextRow()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if r.sid != 2 || r.timestamp != 300 {
+               t.Fatalf("expected (sid=2, ts=300), got (sid=%d, ts=%d)", 
r.sid, r.timestamp)
+       }
+}
+
+func TestSeriesCursor_NextRow_SkipsEmptyMeasureResult(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(1, 100),
+               mkResult(2),       // empty — must be skipped
+               mkResult(3, 300),
+       }}
+       var c SeriesCursor
+       c.Init(qr)
+       _, _ = c.NextRow() // sid=1
+       r, _ := c.NextRow()
+       if r.sid != 3 || r.timestamp != 300 {
+               t.Fatalf("expected to skip empty series 2; got sid=%d ts=%d", 
r.sid, r.timestamp)
+       }
+}
+
+func TestSeriesCursor_NextRow_EOF_ReturnsZeroValueAndNilErr(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
+       var c SeriesCursor
+       c.Init(qr)
+       _, _ = c.NextRow()
+       r, err := c.NextRow()
+       if err != nil {
+               t.Fatalf("EOF must return nil err, got %v", err)
+       }
+       if r.timestamp != 0 || r.sid != 0 {
+               t.Fatalf("EOF must return zero seriesRow, got %+v", r)
+       }
+}
+
+func TestSeriesCursor_StorageError_StickyOnFirstCall(t *testing.T) {
+       boom := errors.New("storage boom")
+       qr := &fakeMeasureQueryResult{seq: 
[]*model.MeasureResult{mkResultErr(boom)}}
+       var c SeriesCursor
+       c.Init(qr)
+       _, err := c.NextRow()
+       if !errors.Is(err, boom) {
+               t.Fatalf("first NextRow must surface storage error, got %v", 
err)
+       }
+}
+
+// R1-critical — sticky-error contract. The previous draft of this code lost
+// the error and silently returned EOF. This test ensures the regression 
cannot recur.
+func TestSeriesCursor_StorageError_StickyOnSubsequentCalls(t *testing.T) {
+       boom := errors.New("storage boom")
+       qr := &fakeMeasureQueryResult{seq: 
[]*model.MeasureResult{mkResultErr(boom)}}
+       var c SeriesCursor
+       c.Init(qr)
+       for i := range 3 {
+               _, err := c.NextRow()
+               if !errors.Is(err, boom) {
+                       t.Fatalf("call %d: storage error must remain sticky, 
got %v", i, err)
+               }
+       }
+}
+
+func TestSeriesCursor_RemainingInSeries_DropsAsPosAdvances(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
10, 20, 30)}}
+       var c SeriesCursor
+       c.Init(qr)
+       if c.RemainingInSeries() != 3 {
+               t.Fatalf("initial RemainingInSeries: want 3, got %d", 
c.RemainingInSeries())
+       }
+       _, _ = c.NextRow()
+       if c.RemainingInSeries() != 2 {
+               t.Fatalf("after 1 NextRow: want 2, got %d", 
c.RemainingInSeries())
+       }
+       _, _ = c.NextRow()
+       if c.RemainingInSeries() != 1 {
+               t.Fatalf("after 2 NextRows: want 1, got %d", 
c.RemainingInSeries())
+       }
+}
+
+func TestSeriesCursor_Current_ReturnsCurrentResultAndPos(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(7, 
11, 22, 33)}}
+       var c SeriesCursor
+       c.Init(qr)
+       cur, pos := c.Current()
+       if cur == nil || cur.SID != 7 || pos != 0 {
+               t.Fatalf("Current after Init: got cur=%v pos=%d", cur, pos)
+       }
+       _, _ = c.NextRow()
+       cur, pos = c.Current()
+       if cur == nil || pos != 1 {
+               t.Fatalf("Current after 1 NextRow: pos=%d", pos)
+       }
+}
+
+func TestSeriesCursor_Advance_CrossesSeriesWhenPosReachesEnd(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(1, 100, 200, 300),
+               mkResult(2, 400),
+       }}
+       var c SeriesCursor
+       c.Init(qr)
+       c.Advance(3) // exhaust series 1
+       r, err := c.NextRow()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if r.sid != 2 || r.timestamp != 400 {
+               t.Fatalf("after Advance(3) we should be at series 2: got sid=%d 
ts=%d", r.sid, r.timestamp)
+       }
+}
+
+func TestSeriesCursor_Close_Idempotent_SafeAfterEOF(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
+       var c SeriesCursor
+       c.Init(qr)
+       _, _ = c.NextRow()
+       _, _ = c.NextRow() // EOF
+       defer func() {
+               if r := recover(); r != nil {
+                       t.Fatalf("Close must not panic: %v", r)
+               }
+       }()
+       c.Close()
+       c.Close()
+}
+
+func TestSeriesCursor_Close_ReleasesUnderlyingMeasureQueryResult(t *testing.T) 
{
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
+       var c SeriesCursor
+       c.Init(qr)
+       c.Close()
+       if qr.releaseCnt != 1 {
+               t.Fatalf("Release must be called exactly once on first Close: 
got %d", qr.releaseCnt)
+       }
+       c.Close() // idempotent — must not double-release
+       if qr.releaseCnt != 1 {
+               t.Fatalf("second Close must not re-release: got %d", 
qr.releaseCnt)
+       }
+}
diff --git a/pkg/query/vectorized/measure/doc.go 
b/pkg/query/vectorized/measure/doc.go
new file mode 100644
index 000000000..7eb6e6575
--- /dev/null
+++ b/pkg/query/vectorized/measure/doc.go
@@ -0,0 +1,20 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package measure implements measure-specific vectorized operators (scan,
+// cursor, extract, limit, group-by, aggregation, top, output serialization).
+package measure
diff --git a/pkg/query/vectorized/measure/extract.go 
b/pkg/query/vectorized/measure/extract.go
new file mode 100644
index 000000000..5673b407c
--- /dev/null
+++ b/pkg/query/vectorized/measure/extract.go
@@ -0,0 +1,171 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "fmt"
+       "slices"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// extractTagRow writes one already-decoded *modelv1.TagValue into row rowIdx
+// of col. The row must already exist in the column (length >= rowIdx+1);
+// extract does not grow the column.
+//
+// Slice-typed values (BinaryData, IntArray, StrArray) are defensively copied
+// because the storage layer may reuse the protobuf objects between Pull()
+// calls; storing slices by reference would alias prior batches into later 
ones.
+//
+// An unknown TagValue oneof variant returns an error — silent null fallback
+// would mask schema-evolution bugs.
+func extractTagRow(col vectorized.Column, rowIdx int, tv *modelv1.TagValue) 
error {
+       if tv == nil {
+               return fmt.Errorf("vectorized.measure: nil TagValue at row %d", 
rowIdx)
+       }
+       switch v := tv.Value.(type) {
+       case *modelv1.TagValue_Null:
+               col.MarkNullAt(rowIdx)
+               return nil
+       case *modelv1.TagValue_Int:
+               c, ok := col.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       return columnTypeMismatch(col, "int64", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Int.GetValue()
+               return nil
+       case *modelv1.TagValue_Str:
+               c, ok := col.(*vectorized.TypedColumn[string])
+               if !ok {
+                       return columnTypeMismatch(col, "string", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Str.GetValue()
+               return nil
+       case *modelv1.TagValue_BinaryData:
+               c, ok := col.(*vectorized.TypedColumn[[]byte])
+               if !ok {
+                       return columnTypeMismatch(col, "bytes", rowIdx)
+               }
+               buf := make([]byte, len(v.BinaryData))
+               copy(buf, v.BinaryData)
+               c.Data()[rowIdx] = buf
+               return nil
+       case *modelv1.TagValue_IntArray:
+               c, ok := col.(*vectorized.TypedColumn[[]int64])
+               if !ok {
+                       return columnTypeMismatch(col, "int64[]", rowIdx)
+               }
+               c.Data()[rowIdx] = slices.Clone(v.IntArray.GetValue())
+               return nil
+       case *modelv1.TagValue_StrArray:
+               c, ok := col.(*vectorized.TypedColumn[[]string])
+               if !ok {
+                       return columnTypeMismatch(col, "string[]", rowIdx)
+               }
+               c.Data()[rowIdx] = slices.Clone(v.StrArray.GetValue())
+               return nil
+       case *modelv1.TagValue_Timestamp:
+               c, ok := col.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       return columnTypeMismatch(col, "int64", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Timestamp.AsTime().UnixNano()
+               return nil
+       default:
+               return fmt.Errorf("vectorized.measure: unsupported TagValue 
variant %T at row %d", tv.Value, rowIdx)
+       }
+}
+
+// extractFieldRow is the field-side counterpart of extractTagRow. Same rules
+// for defensive copies and unknown variants.
+func extractFieldRow(col vectorized.Column, rowIdx int, fv 
*modelv1.FieldValue) error {
+       if fv == nil {
+               return fmt.Errorf("vectorized.measure: nil FieldValue at row 
%d", rowIdx)
+       }
+       switch v := fv.Value.(type) {
+       case *modelv1.FieldValue_Null:
+               col.MarkNullAt(rowIdx)
+               return nil
+       case *modelv1.FieldValue_Int:
+               c, ok := col.(*vectorized.TypedColumn[int64])
+               if !ok {
+                       return columnTypeMismatch(col, "int64", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Int.GetValue()
+               return nil
+       case *modelv1.FieldValue_Float:
+               c, ok := col.(*vectorized.TypedColumn[float64])
+               if !ok {
+                       return columnTypeMismatch(col, "float64", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Float.GetValue()
+               return nil
+       case *modelv1.FieldValue_Str:
+               c, ok := col.(*vectorized.TypedColumn[string])
+               if !ok {
+                       return columnTypeMismatch(col, "string", rowIdx)
+               }
+               c.Data()[rowIdx] = v.Str.GetValue()
+               return nil
+       case *modelv1.FieldValue_BinaryData:
+               c, ok := col.(*vectorized.TypedColumn[[]byte])
+               if !ok {
+                       return columnTypeMismatch(col, "bytes", rowIdx)
+               }
+               buf := make([]byte, len(v.BinaryData))
+               copy(buf, v.BinaryData)
+               c.Data()[rowIdx] = buf
+               return nil
+       default:
+               return fmt.Errorf("vectorized.measure: unsupported FieldValue 
variant %T at row %d", fv.Value, rowIdx)
+       }
+}
+
+// extractTagBulk extracts n tag values starting at src[0] into 
col[offset..offset+n).
+// The v1 implementation is a tight per-row loop; future SIMD-friendly fast 
paths
+// can specialize this signature without changing callers.
+func extractTagBulk(col vectorized.Column, offset int, src 
[]*modelv1.TagValue, n int) error {
+       if n > len(src) {
+               return fmt.Errorf("vectorized.measure: extractTagBulk n=%d > 
len(src)=%d", n, len(src))
+       }
+       for i := range n {
+               if extractErr := extractTagRow(col, offset+i, src[i]); 
extractErr != nil {
+                       return extractErr
+               }
+       }
+       return nil
+}
+
+// extractFieldBulk is the field-side counterpart of extractTagBulk.
+func extractFieldBulk(col vectorized.Column, offset int, src 
[]*modelv1.FieldValue, n int) error {
+       if n > len(src) {
+               return fmt.Errorf("vectorized.measure: extractFieldBulk n=%d > 
len(src)=%d", n, len(src))
+       }
+       for i := range n {
+               if extractErr := extractFieldRow(col, offset+i, src[i]); 
extractErr != nil {
+                       return extractErr
+               }
+       }
+       return nil
+}
+
+func columnTypeMismatch(col vectorized.Column, want string, row int) error {
+       return fmt.Errorf("vectorized.measure: column type %s, value type %s, 
row %d",
+               col.Type(), want, row)
+}
diff --git a/pkg/query/vectorized/measure/extract_test.go 
b/pkg/query/vectorized/measure/extract_test.go
new file mode 100644
index 000000000..76b146a59
--- /dev/null
+++ b/pkg/query/vectorized/measure/extract_test.go
@@ -0,0 +1,353 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "bytes"
+       "testing"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// Test helpers — minimal constructors for *modelv1.TagValue / 
*modelv1.FieldValue.
+
+func tvNull() *modelv1.TagValue { return &modelv1.TagValue{Value: 
&modelv1.TagValue_Null{}} }
+
+func tvInt(v int64) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: v}}}
+}
+
+func tvStr(v string) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: v}}}
+}
+
+func tvBytes(v []byte) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: v}}
+}
+
+func tvIntArr(v []int64) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: 
&modelv1.IntArray{Value: v}}}
+}
+
+func tvStrArr(v []string) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: 
&modelv1.StrArray{Value: v}}}
+}
+
+func tvTimestamp(ts time.Time) *modelv1.TagValue {
+       return &modelv1.TagValue{Value: &modelv1.TagValue_Timestamp{Timestamp: 
timestamppb.New(ts)}}
+}
+
+func fvNull() *modelv1.FieldValue {
+       return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
+}
+
+func fvInt(v int64) *modelv1.FieldValue {
+       return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: v}}}
+}
+
+func fvFloat(v float64) *modelv1.FieldValue {
+       return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: v}}}
+}
+
+func fvStr(v string) *modelv1.FieldValue {
+       return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: 
&modelv1.Str{Value: v}}}
+}
+
+func fvBytes(v []byte) *modelv1.FieldValue {
+       return &modelv1.FieldValue{Value: 
&modelv1.FieldValue_BinaryData{BinaryData: v}}
+}
+
+// preallocInt64 grows an Int64 column by n zero rows so extractTagRow can
+// write into pre-existing slots (mirrors how BatchScan pre-grows columns
+// before calling per-row extractors at series boundaries).
+func preallocInt64(c *vectorized.TypedColumn[int64], n int) {
+       for range n {
+               c.Append(0)
+       }
+}
+
+// === extractTagRow positives ===
+
+func TestExtractTagRow_Int_WritesValue(t *testing.T) {
+       col := vectorized.NewInt64Column(4)
+       preallocInt64(col, 3)
+       if err := extractTagRow(col, 1, tvInt(42)); err != nil {
+               t.Fatal(err)
+       }
+       if col.Data()[1] != 42 {
+               t.Fatalf("row 1: got %d, want 42", col.Data()[1])
+       }
+       if col.IsNull(1) {
+               t.Fatal("row 1 must not be null after writing valid int")
+       }
+}
+
+func TestExtractTagRow_Str_WritesValue(t *testing.T) {
+       col := vectorized.NewStringColumn(4)
+       col.Append("")
+       col.Append("")
+       if err := extractTagRow(col, 0, tvStr("hello")); err != nil {
+               t.Fatal(err)
+       }
+       if col.Data()[0] != "hello" {
+               t.Fatalf("row 0: got %q, want hello", col.Data()[0])
+       }
+}
+
+func TestExtractTagRow_Timestamp_WritesUnixNano(t *testing.T) {
+       ts := time.Date(2026, 5, 3, 12, 34, 56, 789, time.UTC)
+       col := vectorized.NewInt64Column(2)
+       preallocInt64(col, 1)
+       if err := extractTagRow(col, 0, tvTimestamp(ts)); err != nil {
+               t.Fatal(err)
+       }
+       if got := col.Data()[0]; got != ts.UnixNano() {
+               t.Fatalf("timestamp roundtrip: got %d, want %d", got, 
ts.UnixNano())
+       }
+}
+
+// === extractTagRow defensive-copy parity-killers ===
+
+func TestExtractTagRow_BinaryData_DefensiveCopy(t *testing.T) {
+       src := []byte("abc")
+       tv := tvBytes(src)
+       col := vectorized.NewBytesColumn(2)
+       col.Append(nil)
+       if err := extractTagRow(col, 0, tv); err != nil {
+               t.Fatal(err)
+       }
+       src[0] = 'z' // mutate source AFTER extract
+       if !bytes.Equal(col.Data()[0], []byte("abc")) {
+               t.Fatalf("aliasing detected: column observed mutation of 
source, got %q", col.Data()[0])
+       }
+}
+
+func TestExtractTagRow_IntArray_DefensiveCopy(t *testing.T) {
+       src := []int64{1, 2, 3}
+       tv := tvIntArr(src)
+       col := vectorized.NewInt64ArrayColumn(2)
+       col.Append(nil)
+       if err := extractTagRow(col, 0, tv); err != nil {
+               t.Fatal(err)
+       }
+       src[0] = 99
+       if col.Data()[0][0] != 1 {
+               t.Fatalf("aliasing detected: column observed source mutation, 
got %v", col.Data()[0])
+       }
+}
+
+func TestExtractTagRow_StrArray_DefensiveCopy(t *testing.T) {
+       src := []string{"a", "b"}
+       tv := tvStrArr(src)
+       col := vectorized.NewStrArrayColumn(2)
+       col.Append(nil)
+       if err := extractTagRow(col, 0, tv); err != nil {
+               t.Fatal(err)
+       }
+       src[0] = "z"
+       if col.Data()[0][0] != "a" {
+               t.Fatalf("aliasing detected: column observed source mutation, 
got %v", col.Data()[0])
+       }
+}
+
+// === extractTagRow Null ===
+
+func TestExtractTagRow_Null_MarksValidityWithoutGrowingLen(t *testing.T) {
+       col := vectorized.NewInt64Column(4)
+       preallocInt64(col, 3)
+       beforeLen := col.Len()
+       if err := extractTagRow(col, 1, tvNull()); err != nil {
+               t.Fatal(err)
+       }
+       if col.Len() != beforeLen {
+               t.Fatalf("Len must not change on Null extract: before=%d 
after=%d", beforeLen, col.Len())
+       }
+       if !col.IsNull(1) {
+               t.Fatal("row 1 must be null")
+       }
+}
+
+// === extractTagRow error paths ===
+
+func TestExtractTagRow_UnknownVariant_ReturnsError(t *testing.T) {
+       tv := &modelv1.TagValue{} // Value is nil — no oneof set
+       col := vectorized.NewInt64Column(1)
+       preallocInt64(col, 1)
+       if err := extractTagRow(col, 0, tv); err == nil {
+               t.Fatal("unknown TagValue variant must return error (silent 
null fallback would mask schema-evolution bugs)")
+       }
+}
+
+func TestExtractTagRow_NilTagValue_ReturnsError(t *testing.T) {
+       col := vectorized.NewInt64Column(1)
+       preallocInt64(col, 1)
+       if err := extractTagRow(col, 0, nil); err == nil {
+               t.Fatal("nil TagValue must return error")
+       }
+}
+
+func TestExtractTagRow_ColumnTypeMismatch_ReturnsError(t *testing.T) {
+       intCol := vectorized.NewInt64Column(1)
+       preallocInt64(intCol, 1)
+       if err := extractTagRow(intCol, 0, tvStr("oops")); err == nil {
+               t.Fatal("string TagValue into int64 column must return error")
+       }
+}
+
+// === extractFieldRow positives ===
+
+func TestExtractFieldRow_Int_WritesValue(t *testing.T) {
+       col := vectorized.NewInt64Column(2)
+       preallocInt64(col, 1)
+       if err := extractFieldRow(col, 0, fvInt(7)); err != nil {
+               t.Fatal(err)
+       }
+       if col.Data()[0] != 7 {
+               t.Fatalf("got %d, want 7", col.Data()[0])
+       }
+}
+
+func TestExtractFieldRow_Float_WritesValue(t *testing.T) {
+       col := vectorized.NewFloat64Column(1)
+       col.Append(0)
+       if err := extractFieldRow(col, 0, fvFloat(3.14)); err != nil {
+               t.Fatal(err)
+       }
+       if col.Data()[0] != 3.14 {
+               t.Fatalf("got %v, want 3.14", col.Data()[0])
+       }
+}
+
+func TestExtractFieldRow_Str_WritesValue(t *testing.T) {
+       col := vectorized.NewStringColumn(1)
+       col.Append("")
+       if err := extractFieldRow(col, 0, fvStr("hi")); err != nil {
+               t.Fatal(err)
+       }
+       if col.Data()[0] != "hi" {
+               t.Fatal("string field roundtrip failed")
+       }
+}
+
+func TestExtractFieldRow_BinaryData_DefensiveCopy(t *testing.T) {
+       src := []byte("xyz")
+       fv := fvBytes(src)
+       col := vectorized.NewBytesColumn(1)
+       col.Append(nil)
+       if err := extractFieldRow(col, 0, fv); err != nil {
+               t.Fatal(err)
+       }
+       src[0] = 'q'
+       if !bytes.Equal(col.Data()[0], []byte("xyz")) {
+               t.Fatalf("aliasing detected on FieldValue_BinaryData: %q", 
col.Data()[0])
+       }
+}
+
+func TestExtractFieldRow_Null_MarksValidity(t *testing.T) {
+       col := vectorized.NewInt64Column(1)
+       preallocInt64(col, 1)
+       if err := extractFieldRow(col, 0, fvNull()); err != nil {
+               t.Fatal(err)
+       }
+       if !col.IsNull(0) {
+               t.Fatal("Null FieldValue must mark column null")
+       }
+}
+
+// === extractFieldRow error paths ===
+
+func TestExtractFieldRow_UnknownVariant_ReturnsError(t *testing.T) {
+       fv := &modelv1.FieldValue{}
+       col := vectorized.NewInt64Column(1)
+       preallocInt64(col, 1)
+       if err := extractFieldRow(col, 0, fv); err == nil {
+               t.Fatal("unknown FieldValue variant must return error")
+       }
+}
+
+func TestExtractFieldRow_NilFieldValue_ReturnsError(t *testing.T) {
+       col := vectorized.NewInt64Column(1)
+       preallocInt64(col, 1)
+       if err := extractFieldRow(col, 0, nil); err == nil {
+               t.Fatal("nil FieldValue must return error")
+       }
+}
+
+func TestExtractFieldRow_ColumnTypeMismatch_ReturnsError(t *testing.T) {
+       intCol := vectorized.NewInt64Column(1)
+       preallocInt64(intCol, 1)
+       if err := extractFieldRow(intCol, 0, fvFloat(1.5)); err == nil {
+               t.Fatal("Float FieldValue into int64 column must return error")
+       }
+}
+
+// === extract*Bulk ===
+
+func TestExtractTagBulk_HappyPath(t *testing.T) {
+       col := vectorized.NewStringColumn(4)
+       for range 4 {
+               col.Append("")
+       }
+       src := []*modelv1.TagValue{tvStr("a"), tvStr("b"), tvStr("c"), 
tvStr("d")}
+       if err := extractTagBulk(col, 0, src, 4); err != nil {
+               t.Fatal(err)
+       }
+       for i, want := range []string{"a", "b", "c", "d"} {
+               if col.Data()[i] != want {
+                       t.Fatalf("row %d: got %q, want %q", i, col.Data()[i], 
want)
+               }
+       }
+}
+
+func TestExtractTagBulk_NTooLarge_ReturnsError(t *testing.T) {
+       col := vectorized.NewStringColumn(4)
+       for range 4 {
+               col.Append("")
+       }
+       src := []*modelv1.TagValue{tvStr("a")}
+       if err := extractTagBulk(col, 0, src, 5); err == nil {
+               t.Fatal("n > len(src) must return error")
+       }
+}
+
+func TestExtractFieldBulk_HappyPath(t *testing.T) {
+       col := vectorized.NewInt64Column(3)
+       preallocInt64(col, 3)
+       src := []*modelv1.FieldValue{fvInt(10), fvInt(20), fvInt(30)}
+       if err := extractFieldBulk(col, 0, src, 3); err != nil {
+               t.Fatal(err)
+       }
+       for i, want := range []int64{10, 20, 30} {
+               if col.Data()[i] != want {
+                       t.Fatalf("row %d: got %d, want %d", i, col.Data()[i], 
want)
+               }
+       }
+}
+
+func TestExtractFieldBulk_NTooLarge_ReturnsError(t *testing.T) {
+       col := vectorized.NewInt64Column(2)
+       preallocInt64(col, 2)
+       src := []*modelv1.FieldValue{fvInt(1)}
+       if err := extractFieldBulk(col, 0, src, 2); err == nil {
+               t.Fatal("n > len(src) must return error")
+       }
+}
diff --git a/pkg/query/vectorized/measure/fixtures_test.go 
b/pkg/query/vectorized/measure/fixtures_test.go
new file mode 100644
index 000000000..6b15a2d2c
--- /dev/null
+++ b/pkg/query/vectorized/measure/fixtures_test.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+// fakeMeasureQueryResult yields a hand-built sequence of MeasureResults.
+// Mirrors the shape produced by banyand/measure/query.go's queryResult.Pull.
+type fakeMeasureQueryResult struct {
+       seq        []*model.MeasureResult
+       idx        int
+       releaseCnt int
+}
+
+func (f *fakeMeasureQueryResult) Pull() *model.MeasureResult {
+       if f.idx >= len(f.seq) {
+               return nil
+       }
+       r := f.seq[f.idx]
+       f.idx++
+       return r
+}
+
+func (f *fakeMeasureQueryResult) Release() {
+       f.releaseCnt++
+}
+
+// mkResult constructs a minimal MeasureResult with parallel arrays sized to
+// match ts. Versions and ShardIDs are zero-valued; tags and fields are empty.
+func mkResult(sid common.SeriesID, ts ...int64) *model.MeasureResult {
+       return &model.MeasureResult{
+               SID:        sid,
+               Timestamps: ts,
+               Versions:   make([]int64, len(ts)),
+               ShardIDs:   make([]common.ShardID, len(ts)),
+       }
+}
+
+// mkResultErr constructs a MeasureResult carrying a storage error.
+// MeasureQueryResult.Pull may return such results to signal pull-time 
failures.
+func mkResultErr(err error) *model.MeasureResult {
+       return &model.MeasureResult{Error: err}
+}
diff --git a/pkg/query/vectorized/measure/scan.go 
b/pkg/query/vectorized/measure/scan.go
new file mode 100644
index 000000000..6f06b6d1e
--- /dev/null
+++ b/pkg/query/vectorized/measure/scan.go
@@ -0,0 +1,233 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// BatchScan is the v1 PullOperator that wraps a MeasureQueryResult and
+// produces RecordBatches. It uses a SeriesCursor to manage cross-series
+// boundaries and bulk-extracts metadata, tags, and fields per series fill.
+type BatchScan struct {
+       qr        model.MeasureQueryResult
+       schema    *vectorized.BatchSchema
+       pool      *vectorized.BatchPool
+       cursor    SeriesCursor
+       batchSize int
+       closed    bool
+}
+
+// NewBatchScan returns a BatchScan; call Init before NextBatch.
+func NewBatchScan(qr model.MeasureQueryResult, schema *vectorized.BatchSchema,
+       pool *vectorized.BatchPool, batchSize int,
+) *BatchScan {
+       return &BatchScan{qr: qr, schema: schema, pool: pool, batchSize: 
batchSize}
+}
+
+// Init prepares the cursor and pulls the first non-empty MeasureResult.
+func (s *BatchScan) Init(_ context.Context) error {
+       s.cursor.Init(s.qr)
+       return nil
+}
+
+// OutputSchema returns the schema declared at construction.
+func (s *BatchScan) OutputSchema() *vectorized.BatchSchema { return s.schema }
+
+// Close releases the underlying cursor exactly once.
+func (s *BatchScan) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       s.cursor.Close()
+       return nil
+}
+
+// NextBatch fills a fresh batch up to batchSize rows. Returns:
+//   - (batch, nil) for a valid batch with Len > 0;
+//   - (nil, nil) for clean EOF;
+//   - (nil, err) for storage error (the partial batch is dropped to GC).
+func (s *BatchScan) NextBatch(_ context.Context) (*vectorized.RecordBatch, 
error) {
+       b := s.pool.Get()
+       for b.Len < s.batchSize && !s.cursor.Exhausted() && s.cursor.Err() == 
nil {
+               remaining := s.cursor.RemainingInSeries()
+               if remaining == 0 {
+                       break
+               }
+               n := remaining
+               if avail := s.batchSize - b.Len; n > avail {
+                       n = avail
+               }
+               if fillErr := s.fillFromCurrent(b, n); fillErr != nil {
+                       // R3-style discard: do not return b to the pool.
+                       return nil, fillErr
+               }
+               b.Len += n
+               s.cursor.Advance(n)
+               // Advance may trigger an internal advanceSeries that observes a
+               // storage error on the next MeasureResult. Discard the partial
+               // batch immediately so the error is never silently buried — the
+               // caller must learn about the failure on this call, not the 
next.
+               if cursorErr := s.cursor.Err(); cursorErr != nil {
+                       return nil, cursorErr
+               }
+       }
+       if b.Len == 0 {
+               s.pool.Put(b)
+               if cursorErr := s.cursor.Err(); cursorErr != nil {
+                       return nil, cursorErr
+               }
+               return nil, nil
+       }
+       return b, nil
+}
+
+// fillFromCurrent copies n rows from the cursor's current MeasureResult into
+// b starting at b.Len. Caller must ensure n <= cursor.RemainingInSeries().
+func (s *BatchScan) fillFromCurrent(b *vectorized.RecordBatch, n int) error {
+       cur, pos := s.cursor.Current()
+       offset := b.Len
+       if fillErr := fillMetadata(b, s.schema, cur, pos, n); fillErr != nil {
+               return fillErr
+       }
+       if fillErr := fillTags(b, s.schema, cur, pos, offset, n); fillErr != 
nil {
+               return fillErr
+       }
+       if fillErr := fillFields(b, s.schema, cur, pos, offset, n); fillErr != 
nil {
+               return fillErr
+       }
+       return nil
+}
+
+// fillMetadata populates the timestamp, version, series-id, and shard-id
+// columns for n rows starting at pos. Uses copy() for parallel int64 arrays;
+// SID is a constant per series and is filled with n repetitions.
+func fillMetadata(b *vectorized.RecordBatch, schema *vectorized.BatchSchema,
+       cur *model.MeasureResult, pos, n int,
+) error {
+       offset := b.Len
+       if i := schema.TimestampIndex(); i >= 0 {
+               c := b.Columns[i].(*vectorized.TypedColumn[int64])
+               appendInt64Zeros(c, n)
+               copy(c.Data()[offset:offset+n], cur.Timestamps[pos:pos+n])
+       }
+       if i := schema.VersionIndex(); i >= 0 {
+               c := b.Columns[i].(*vectorized.TypedColumn[int64])
+               appendInt64Zeros(c, n)
+               copy(c.Data()[offset:offset+n], cur.Versions[pos:pos+n])
+       }
+       if i := schema.SeriesIDIndex(); i >= 0 {
+               c := b.Columns[i].(*vectorized.TypedColumn[int64])
+               sid := int64(cur.SID)
+               for range n {
+                       c.Append(sid)
+               }
+       }
+       if i := schema.ShardIDIndex(); i >= 0 {
+               c := b.Columns[i].(*vectorized.TypedColumn[int64])
+               for k := range n {
+                       var v int64
+                       if pos+k < len(cur.ShardIDs) {
+                               v = int64(cur.ShardIDs[pos+k])
+                       }
+                       c.Append(v)
+               }
+       }
+       return nil
+}
+
+// fillTags extracts every tag column for n rows starting at pos into the batch
+// at offset. Tag families and tag names are matched against the BatchSchema;
+// unmatched tags in cur are skipped.
+func fillTags(b *vectorized.RecordBatch, schema *vectorized.BatchSchema,
+       cur *model.MeasureResult, pos, offset, n int,
+) error {
+       for _, tf := range cur.TagFamilies {
+               for _, tag := range tf.Tags {
+                       colIdx, ok := schema.TagIndex(tf.Name, tag.Name)
+                       if !ok {
+                               continue
+                       }
+                       col := b.Columns[colIdx]
+                       growColumn(col, n)
+                       if extractErr := extractTagBulk(col, offset, 
tag.Values[pos:pos+n], n); extractErr != nil {
+                               return extractErr
+                       }
+               }
+       }
+       return nil
+}
+
+// fillFields is the field-side counterpart of fillTags.
+func fillFields(b *vectorized.RecordBatch, schema *vectorized.BatchSchema,
+       cur *model.MeasureResult, pos, offset, n int,
+) error {
+       for _, f := range cur.Fields {
+               colIdx, ok := schema.FieldIndex(f.Name)
+               if !ok {
+                       continue
+               }
+               col := b.Columns[colIdx]
+               growColumn(col, n)
+               if extractErr := extractFieldBulk(col, offset, 
f.Values[pos:pos+n], n); extractErr != nil {
+                       return extractErr
+               }
+       }
+       return nil
+}
+
+func appendInt64Zeros(c *vectorized.TypedColumn[int64], n int) {
+       for range n {
+               c.Append(0)
+       }
+}
+
+// growColumn appends n zero-valued rows so subsequent extract* calls have
+// pre-existing slots to overwrite.
+func growColumn(col vectorized.Column, n int) {
+       switch c := col.(type) {
+       case *vectorized.TypedColumn[int64]:
+               for range n {
+                       c.Append(0)
+               }
+       case *vectorized.TypedColumn[float64]:
+               for range n {
+                       c.Append(0)
+               }
+       case *vectorized.TypedColumn[string]:
+               for range n {
+                       c.Append("")
+               }
+       case *vectorized.TypedColumn[[]byte]:
+               for range n {
+                       c.Append(nil)
+               }
+       case *vectorized.TypedColumn[[]int64]:
+               for range n {
+                       c.Append(nil)
+               }
+       case *vectorized.TypedColumn[[]string]:
+               for range n {
+                       c.Append(nil)
+               }
+       }
+}
diff --git a/pkg/query/vectorized/measure/scan_test.go 
b/pkg/query/vectorized/measure/scan_test.go
new file mode 100644
index 000000000..76f67f1fc
--- /dev/null
+++ b/pkg/query/vectorized/measure/scan_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+func minimalSchema() *vectorized.BatchSchema {
+       return vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+       })
+}
+
+func newScan(qr model.MeasureQueryResult, schema *vectorized.BatchSchema, 
batchSize int) *BatchScan {
+       pool := vectorized.NewBatchPool(schema, batchSize)
+       return NewBatchScan(qr, schema, pool, batchSize)
+}
+
+func TestBatchScan_SingleSeries_FillsBatchViaBulkPath(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100, 200, 300)}}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       if err := scan.Init(context.Background()); err != nil {
+               t.Fatal(err)
+       }
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if err != nil || b == nil {
+               t.Fatalf("err=%v b=%v", err, b)
+       }
+       if b.Len != 3 {
+               t.Fatalf("Len: want 3, got %d", b.Len)
+       }
+       ts := b.Columns[0].(*vectorized.TypedColumn[int64]).Data()
+       for i, want := range []int64{100, 200, 300} {
+               if ts[i] != want {
+                       t.Fatalf("ts[%d]: got %d, want %d", i, ts[i], want)
+               }
+       }
+}
+
+func TestBatchScan_MultiSeries_CrossingBoundaryUsesPerRowPath(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(1, 1, 2),
+               mkResult(2, 3, 4, 5),
+       }}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if err != nil {
+               t.Fatal(err)
+       }
+       if b.Len != 4 {
+               t.Fatalf("first batch Len: want 4, got %d", b.Len)
+       }
+       sids := b.Columns[2].(*vectorized.TypedColumn[int64]).Data()
+       want := []int64{1, 1, 2, 2}
+       for i := range want {
+               if sids[i] != want[i] {
+                       t.Fatalf("sid[%d]: got %d, want %d", i, sids[i], 
want[i])
+               }
+       }
+       b2, _ := scan.NextBatch(context.Background())
+       if b2.Len != 1 || 
b2.Columns[2].(*vectorized.TypedColumn[int64]).Data()[0] != 2 {
+               t.Fatalf("second batch: Len=%d sid[0]=%d", b2.Len, 
b2.Columns[2].(*vectorized.TypedColumn[int64]).Data()[0])
+       }
+}
+
+func TestBatchScan_BatchSizeExceedsTotalRows_OneBatchThenEOF(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
10, 20)}}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 16)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, _ := scan.NextBatch(context.Background())
+       if b.Len != 2 {
+               t.Fatalf("Len: want 2, got %d", b.Len)
+       }
+       eof, err := scan.NextBatch(context.Background())
+       if err != nil || eof != nil {
+               t.Fatalf("expected EOF, err=%v eof=%v", err, eof)
+       }
+}
+
+func TestBatchScan_BatchSizeBelowSeriesSize_MultipleFullBatches(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
1, 2, 3, 4, 5)}}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 2)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b1, _ := scan.NextBatch(context.Background())
+       if b1.Len != 2 {
+               t.Fatalf("first batch Len: want 2, got %d", b1.Len)
+       }
+       b2, _ := scan.NextBatch(context.Background())
+       if b2.Len != 2 {
+               t.Fatalf("second batch Len: want 2, got %d", b2.Len)
+       }
+       b3, _ := scan.NextBatch(context.Background())
+       if b3.Len != 1 {
+               t.Fatalf("third (tail) batch Len: want 1, got %d", b3.Len)
+       }
+       eof, _ := scan.NextBatch(context.Background())
+       if eof != nil {
+               t.Fatal("expected EOF after tail")
+       }
+}
+
+func TestBatchScan_EmptyResult_EOFImmediately(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: nil}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if err != nil || b != nil {
+               t.Fatalf("expected (nil, nil), got (%v, %v)", b, err)
+       }
+}
+
+// Pins the regression flagged by Copilot: when a valid series precedes an
+// errored MeasureResult, the partial batch must be discarded on the same
+// NextBatch call — not returned as if successful with the error buried.
+func TestBatchScan_CursorErrorAfterValidSeries_DiscardsPartialBatch(t 
*testing.T) {
+       boom := errors.New("storage boom mid-stream")
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(1, 100, 200),
+               mkResultErr(boom),
+       }}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want storage error, got err=%v", err)
+       }
+       if b != nil {
+               t.Fatalf("partial batch must be discarded on cursor error; got 
b.Len=%d", b.Len)
+       }
+}
+
+func TestBatchScan_CursorError_PropagatesAsError_NoBatchEmitted(t *testing.T) {
+       boom := errors.New("storage boom")
+       qr := &fakeMeasureQueryResult{seq: 
[]*model.MeasureResult{mkResultErr(boom)}}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want storage error, got %v", err)
+       }
+       if b != nil {
+               t.Fatal("error path must return nil batch")
+       }
+}
+
+func TestBatchScan_TagAndFieldColumns_PopulatedFromAlreadyDecodedValues(t 
*testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: 
"service", Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+       mr := &model.MeasureResult{
+               SID:        common.SeriesID(1),
+               Timestamps: []int64{100, 200},
+               Versions:   []int64{0, 0},
+               ShardIDs:   []common.ShardID{0, 0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "service", Values: 
[]*modelv1.TagValue{tvStr("a"), tvStr("b")}},
+                       }},
+               },
+               Fields: []model.Field{
+                       {Name: "value", Values: 
[]*modelv1.FieldValue{fvInt(10), fvInt(20)}},
+               },
+       }
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mr}}
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, err := scan.NextBatch(context.Background())
+       if err != nil {
+               t.Fatal(err)
+       }
+       tagIdx, _ := schema.TagIndex("default", "service")
+       tags := b.Columns[tagIdx].(*vectorized.TypedColumn[string]).Data()
+       if tags[0] != "a" || tags[1] != "b" {
+               t.Fatalf("tags: got %v", tags)
+       }
+       fieldIdx, _ := schema.FieldIndex("value")
+       fields := b.Columns[fieldIdx].(*vectorized.TypedColumn[int64]).Data()
+       if fields[0] != 10 || fields[1] != 20 {
+               t.Fatalf("fields: got %v", fields)
+       }
+}
+
+func TestBatchScan_SeriesIDColumn_AlwaysPopulated(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{
+               mkResult(common.SeriesID(42), 100, 200),
+       }}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       defer scan.Close()
+       b, _ := scan.NextBatch(context.Background())
+       sids := b.Columns[2].(*vectorized.TypedColumn[int64]).Data()
+       for i, sid := range sids {
+               if sid != 42 {
+                       t.Fatalf("sid[%d]: got %d, want 42", i, sid)
+               }
+       }
+}
+
+func TestBatchScan_Close_ReleasesCursor(t *testing.T) {
+       qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
+       schema := minimalSchema()
+       scan := newScan(qr, schema, 4)
+       _ = scan.Init(context.Background())
+       if err := scan.Close(); err != nil {
+               t.Fatal(err)
+       }
+       if qr.releaseCnt != 1 {
+               t.Fatalf("Close must release the underlying MeasureQueryResult: 
got releaseCnt=%d", qr.releaseCnt)
+       }
+}

Reply via email to