This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit fe9d38eb3bb87dbd20b173e6222d0dba33e18311
Author: Hongtao Gao <[email protected]>
AuthorDate: Tue May 5 22:36:39 2026 +0000

    feat(query/vectorized/measure): add integration & parity (gate G4)
    
    Fourth gate of the vectorized query pipeline. Wires VectorizedConfig
    through the measure module, branches on the flag in the logical layer,
    and adds the unit-level differential parity tests plus an integration
    suite that boots a flag-on standalone cluster.
    
    banyand/measure: VectorizedConfig field on option/measure, plumbed
    through openMeasure; bind --measure-vectorized-{enabled,batch-size,
    query-memory-mib} CLI flags on standalone/data/liaison services.
    
    pkg/query/logical/measure: localIndexScan.Execute type-asserts ec for
    VectorizedConfig and routes through vmeasure.NewMIterator when Enabled,
    wrapped in a hidden-tag stripping decorator. Release MeasureQueryResult
    on construction failure.
    
    pkg/query/vectorized/measure:
    - integration.go: public BuildBatchSchema + NewMIterator + facade.
    - adapter.go: one-DataPoint-per-Next contract matching the row path's
      resultMIterator (the gRPC collector reads only Current()[0]); recycle
      consumed batches into the pool; Close returns errors.Join(err, ...).
    - serialize.go: defensive-copy slice-typed values so pool reuse cannot
      alias prior DataPoints.
    - scan.go: fillTags/fillFields walk the schema (not the result) and
      null-fill projections missing from a multi-group result.
    - diff_test.go: parity tests over single-/multi-series, index-mode,
      batch-boundary, null variants, ShardID flow, interleaved series,
      version variations, mid-iteration cancel, storage-error propagation.
    
    test/integration/standalone/query/vectorized_test.go: Ginkgo Describe
    that boots --measure-vectorized-enabled=true and replays casesmeasure +
    casestopn -- every assertion is a row-path equality check.
    
    Default Enabled=false keeps the row path byte-identical pre-G4 (C1).
---
 banyand/measure/measure.go                         |  26 +
 banyand/measure/metadata.go                        |   4 +-
 banyand/measure/query.go                           |   3 +
 banyand/measure/svc_data.go                        |   1 +
 banyand/measure/svc_liaison.go                     |   1 +
 banyand/measure/svc_standalone.go                  |   1 +
 .../measure/measure_plan_indexscan_local.go        | 100 +++-
 pkg/query/vectorized/measure/adapter.go            |  97 +++-
 pkg/query/vectorized/measure/adapter_test.go       |  47 +-
 pkg/query/vectorized/measure/diff_test.go          | 592 +++++++++++++++++++++
 pkg/query/vectorized/measure/integration.go        | 203 +++++++
 pkg/query/vectorized/measure/scan.go               |  66 ++-
 pkg/query/vectorized/measure/serialize.go          |  21 +-
 .../standalone/query/vectorized_test.go            |  98 ++++
 14 files changed, 1187 insertions(+), 73 deletions(-)

diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 4e36be0dd..a90df68ea 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/partition"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
        "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -56,6 +57,7 @@ type option struct {
        flushTimeout                 time.Duration
        syncInterval                 time.Duration
        failedPartsMaxTotalSizeBytes uint64
+       vectorized                   vmeasure.VectorizedConfig
 }
 
 type indexSchema struct {
@@ -87,6 +89,7 @@ type measure struct {
        name         string
        group        string
        interval     time.Duration
+       vectorized   vmeasure.VectorizedConfig
 }
 
 func (m *measure) GetSchema() *databasev1.Measure {
@@ -146,6 +149,7 @@ func newQueryMetrics(factory observability.Factory) 
*queryMetrics {
 
 func openMeasure(spec measureSpec,
        l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo 
*schemaRepo, qm *queryMetrics,
+       vectorized vmeasure.VectorizedConfig,
 ) (*measure, error) {
        m := &measure{
                schema:       spec.schema,
@@ -154,9 +158,31 @@ func openMeasure(spec measureSpec,
                pm:           pm,
                schemaRepo:   schemaRepo,
                queryMetrics: qm,
+               vectorized:   vectorized,
        }
        if err := m.parseSpec(); err != nil {
                return nil, err
        }
        return m, nil
 }
+
+// VectorizedConfig returns the per-Measure vectorized query configuration. It
+// is consumed by the logical query layer to decide whether to wrap the row
+// MeasureQueryResult in a vectorized iterator.
+func (m *measure) VectorizedConfig() vmeasure.VectorizedConfig {
+       return m.vectorized
+}
+
+// bindVectorizedFlags wires VectorizedConfig fields to a run.FlagSet. The
+// defaults match vmeasure.DefaultConfig — Enabled=false, BatchSize=1024,
+// QueryMemoryMiB=256 — so config loaders that omit these flags retain the
+// pre-G4 row-only behavior.
+func bindVectorizedFlags(flagS *run.FlagSet, cfg *vmeasure.VectorizedConfig) {
+       defaults := vmeasure.DefaultConfig()
+       flagS.BoolVar(&cfg.Enabled, "measure-vectorized-enabled", 
defaults.Enabled,
+               "enable the vectorized measure query path (off by default; flip 
after soak)")
+       flagS.IntVar(&cfg.BatchSize, "measure-vectorized-batch-size", 
defaults.BatchSize,
+               "row count per vectorized batch")
+       flagS.IntVar(&cfg.QueryMemoryMiB, 
"measure-vectorized-query-memory-mib", defaults.QueryMemoryMiB,
+               "per-query memory budget for the vectorized path, in MiB")
+}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 8482eed9d..92ea69818 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -671,7 +671,7 @@ func (s *supplier) OpenResource(spec 
resourceSchema.Resource) (resourceSchema.In
        measureSchema := spec.Schema().(*databasev1.Measure)
        return openMeasure(measureSpec{
                schema: measureSchema,
-       }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load())
+       }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load(), 
s.option.vectorized)
 }
 
 func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
@@ -785,7 +785,7 @@ func (s *queueSupplier) OpenResource(spec 
resourceSchema.Resource) (resourceSche
        measureSchema := spec.Schema().(*databasev1.Measure)
        return openMeasure(measureSpec{
                schema: measureSchema,
-       }, s.l, nil, s.pm, s.schemaRepo, nil)
+       }, s.l, nil, s.pm, s.schemaRepo, nil, s.option.vectorized)
 }
 
 func (s *queueSupplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 35ddb6e09..398e059f6 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -39,6 +39,7 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -71,6 +72,7 @@ var _ Measure = (*measure)(nil)
 type queryOptions struct {
        schemaTagTypes map[string]pbv1.ValueType
        model.MeasureQueryOptions
+       vectorized   vmeasure.VectorizedConfig
        minTimestamp int64
        maxTimestamp int64
 }
@@ -183,6 +185,7 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                schemaTagTypes:      schemaTagTypes,
                minTimestamp:        mqo.TimeRange.Start.UnixNano(),
                maxTimestamp:        mqo.TimeRange.End.UnixNano(),
+               vectorized:          m.vectorized,
        }
        var n int
        for i := range tables {
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index e4f776f9b..21e382d66 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -189,6 +189,7 @@ func (s *dataSVC) FlagSet() *run.FlagSet {
        flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum 
service cache size (e.g., 100M)")
        flagS.DurationVar(&s.cc.CleanupInterval, 
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup 
interval")
        flagS.DurationVar(&s.cc.IdleTimeout, "service-cache-idle-timeout", 
2*time.Minute, "service cache entry idle timeout")
+       bindVectorizedFlags(flagS, &s.option.vectorized)
        return flagS
 }
 
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 7212c3316..ba863456a 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -115,6 +115,7 @@ func (s *liaison) FlagSet() *run.FlagSet {
                "percentage of BanyanDB's allowed disk usage allocated to 
failed parts storage. "+
                        "Calculated as: totalDisk * 
measure-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
                        "Set to 0 to disable copying failed parts. Valid range: 
0-100")
+       bindVectorizedFlags(flagS, &s.option.vectorized)
        return flagS
 }
 
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index ebb4f9f7c..a46fc3039 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -196,6 +196,7 @@ func (s *standalone) FlagSet() *run.FlagSet {
        flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum 
service cache size (e.g., 100M)")
        flagS.DurationVar(&s.cc.CleanupInterval, 
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup 
interval")
        flagS.DurationVar(&s.cc.IdleTimeout, "service-cache-idle-timeout", 
2*time.Minute, "service cache entry idle timeout")
+       bindVectorizedFlags(flagS, &s.option.vectorized)
        return flagS
 }
 
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index f82dee2f9..7f9da3b83 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -26,6 +26,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/index"
@@ -36,9 +37,19 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        "github.com/apache/skywalking-banyandb/pkg/query/logical"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+// vectorizedExecutionContext is an optional capability the executor may
+// expose. When set and Enabled, localIndexScan wraps the row-path
+// MeasureQueryResult in a vectorized adapter instead of the legacy
+// resultMIterator. The bool guard keeps the row path mechanically untouched
+// when the flag is off — confirming C1 (zero regression).
+type vectorizedExecutionContext interface {
+       VectorizedConfig() vmeasure.VectorizedConfig
+}
+
 var _ logical.UnresolvedPlan = (*unresolvedIndexScan)(nil)
 
 type unresolvedIndexScan struct {
@@ -218,7 +229,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit 
executor.MIterator, e
        }
        ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderBy)
        defer stop(err)
-       result, err := i.ec.Query(ctx, model.MeasureQueryOptions{
+       opts := model.MeasureQueryOptions{
                Name:            i.metadata.GetName(),
                TimeRange:       &i.timeRange,
                Entities:        i.entities,
@@ -226,10 +237,16 @@ func (i *localIndexScan) Execute(ctx context.Context) 
(mit executor.MIterator, e
                Order:           orderBy,
                TagProjection:   i.projectionTags,
                FieldProjection: i.projectionFields,
-       })
+       }
+       result, err := i.ec.Query(ctx, opts)
        if err != nil {
                return nil, fmt.Errorf("failed to query measure: %w", err)
        }
+       if vit, vok, vErr := i.maybeVectorized(ctx, result, opts); vErr != nil {
+               return nil, vErr
+       } else if vok {
+               return vit, nil
+       }
        return &resultMIterator{
                result:           result,
                projectionTags:   i.projectionTags,
@@ -238,6 +255,85 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit 
executor.MIterator, e
        }, nil
 }
 
+// maybeVectorized returns a vectorized MIterator iff every gate is met:
+//   - the executor exposes a VectorizedConfig.
+//   - the config is Enabled.
+//   - the underlying result is non-nil (the row path returns a typed-nil 
result
+//     for empty queries; routing that through the vectorized scan would NPE).
+//   - the logical schema exposes the *databasev1.Measure needed to type the
+//     batch columns.
+//
+// Any negative answer falls through to the row path, so flag-off behavior is
+// byte-identical to the pre-G4 codepath.
+func (i *localIndexScan) maybeVectorized(ctx context.Context, result 
model.MeasureQueryResult,
+       opts model.MeasureQueryOptions,
+) (executor.MIterator, bool, error) {
+       if result == nil {
+               return nil, false, nil
+       }
+       src, ok := i.ec.(vectorizedExecutionContext)
+       if !ok {
+               return nil, false, nil
+       }
+       cfg := src.VectorizedConfig()
+       if !cfg.Enabled {
+               return nil, false, nil
+       }
+       measureSchema := i.measureSchema()
+       if measureSchema == nil {
+               return nil, false, nil
+       }
+       it, buildErr := vmeasure.NewMIterator(ctx, result, measureSchema, opts, 
cfg)
+       if buildErr != nil {
+               // NewMIterator does not take ownership of result on failure; 
the
+               // row-path code releases on Close, so without this Release the
+               // snapshots / segments behind result leak.
+               result.Release()
+               return nil, false, fmt.Errorf("failed to build vectorized 
iterator: %w", buildErr)
+       }
+       if i.hiddenTags.IsEmpty() {
+               return it, true, nil
+       }
+       return &hiddenTagsMIterator{inner: it, hiddenTags: i.hiddenTags}, true, 
nil
+}
+
+// hiddenTagsMIterator wraps an MIterator and strips hidden criteria tags from
+// each Current() result. The row path applies the same strip inside
+// resultMIterator.Next; this decorator keeps the vectorized path's contract
+// identical without duplicating filter logic into the vectorized package.
+type hiddenTagsMIterator struct {
+       inner      executor.MIterator
+       hiddenTags logical.HiddenTagSet
+}
+
+func (h *hiddenTagsMIterator) Next() bool { return h.inner.Next() }
+
+func (h *hiddenTagsMIterator) Current() []*measurev1.InternalDataPoint {
+       dps := h.inner.Current()
+       for _, dp := range dps {
+               if dp == nil || dp.DataPoint == nil {
+                       continue
+               }
+               dp.DataPoint.TagFamilies = 
h.hiddenTags.StripHiddenTags(dp.DataPoint.TagFamilies)
+       }
+       return dps
+}
+
+func (h *hiddenTagsMIterator) Close() error { return h.inner.Close() }
+
+// measureSchema extracts the *databasev1.Measure from the logical schema. The
+// vectorized path needs it to type each projected column.
+func (i *localIndexScan) measureSchema() *databasev1.Measure {
+       if i.schema == nil {
+               return nil
+       }
+       ms, ok := i.schema.(*schema)
+       if !ok {
+               return nil
+       }
+       return ms.measure
+}
+
 func (i *localIndexScan) String() string {
        return fmt.Sprintf("IndexScan: 
startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; 
projection=%s; order=%s;",
                i.timeRange.Start.Unix(), i.timeRange.End.Unix(), 
i.metadata.GetGroup(), i.metadata.GetName(),
diff --git a/pkg/query/vectorized/measure/adapter.go 
b/pkg/query/vectorized/measure/adapter.go
index 1e872f82d..80294adde 100644
--- a/pkg/query/vectorized/measure/adapter.go
+++ b/pkg/query/vectorized/measure/adapter.go
@@ -19,6 +19,7 @@ package measure
 
 import (
        "context"
+       "errors"
 
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
@@ -28,45 +29,81 @@ import (
 // interface so the existing gRPC handler can drive it without knowing it
 // is internally columnar.
 //
-// Lifecycle: Next pulls a RecordBatch from the pipeline, serializes it via
-// serializeBatchToProto, and exposes the result through Current. EOF and
-// errors both terminate iteration; the latter is exposed via Err.
+// Contract match: the row-path resultMIterator advances one DataPoint per
+// Next() and exposes it as a single-element slice via Current(); the gRPC
+// collector at banyand/query/processor.go::collectInternalDataPoints reads
+// only current[0]. This adapter mirrors that semantics: Next() advances one
+// row at a time, pulling a new batch from the pipeline whenever the
+// previously-serialized batch is exhausted, returning the consumed batch to
+// pool so allocations stay flat.
 type vectorizedMIterator struct {
-       ctx      context.Context
-       pipeline *vectorized.Pipeline
-       err      error
-       current  []*measurev1.InternalDataPoint
-       done     bool
+       ctx       context.Context
+       pipeline  *vectorized.Pipeline
+       pool      *vectorized.BatchPool
+       prevBatch *vectorized.RecordBatch
+       err       error
+       batch     []*measurev1.InternalDataPoint
+       pos       int
+       done      bool
 }
 
-// newVectorizedMIterator constructs an adapter bound to ctx.
-func newVectorizedMIterator(ctx context.Context, p *vectorized.Pipeline) 
*vectorizedMIterator {
-       return &vectorizedMIterator{ctx: ctx, pipeline: p}
+// newVectorizedMIterator constructs an adapter bound to ctx. The pool is the
+// BatchPool that BatchScan draws from; consumed batches are returned there
+// after serialization.
+func newVectorizedMIterator(ctx context.Context, p *vectorized.Pipeline, pool 
*vectorized.BatchPool) *vectorizedMIterator {
+       return &vectorizedMIterator{ctx: ctx, pipeline: p, pool: pool, pos: -1}
 }
 
-// Next pulls one batch from the pipeline and serializes it. Returns true while
-// data remains. After EOF or error, returns false and stays in the terminal 
state.
+// Next advances by exactly one DataPoint. Whenever the cached serialization
+// is exhausted, the previous batch is returned to the pool and a new one is
+// pulled from the pipeline. Empty-active batches (e.g., from BatchLimit
+// emitting an empty selection) are silently recycled — they are not
+// surfaced as zero-row Current() reads. EOF and errors are terminal.
 func (i *vectorizedMIterator) Next() bool {
        if i.done {
                return false
        }
-       b, err := i.pipeline.Next(i.ctx)
-       if err != nil {
-               i.err = err
-               i.done = true
-               return false
-       }
-       if b == nil {
-               i.done = true
-               return false
+       i.pos++
+       for i.pos >= len(i.batch) {
+               i.recyclePrev()
+               b, pullErr := i.pipeline.Next(i.ctx)
+               if pullErr != nil {
+                       i.err = pullErr
+                       i.done = true
+                       return false
+               }
+               if b == nil {
+                       i.done = true
+                       return false
+               }
+               i.prevBatch = b
+               i.batch = serializeBatchToProto(b, i.batch[:0])
+               i.pos = 0
        }
-       i.current = serializeBatchToProto(b, i.current[:0])
        return true
 }
 
-// Current returns the most recently serialized batch.
+// recyclePrev returns the last consumed batch to the pool. Safe to call when
+// no batch is held. The serializer defensive-copies slice-typed values
+// (see columnValueToTagValue / columnValueToFieldValue) so reusing the batch
+// cannot corrupt previously-emitted DataPoints.
+func (i *vectorizedMIterator) recyclePrev() {
+       if i.prevBatch == nil || i.pool == nil {
+               i.prevBatch = nil
+               return
+       }
+       i.pool.Put(i.prevBatch)
+       i.prevBatch = nil
+}
+
+// Current returns a single-element slice containing the row most recently
+// advanced into via Next(). Matches the row-path contract; collectors that
+// read only Current()[0] see every row.
 func (i *vectorizedMIterator) Current() []*measurev1.InternalDataPoint {
-       return i.current
+       if i.pos < 0 || i.pos >= len(i.batch) {
+               return nil
+       }
+       return i.batch[i.pos : i.pos+1]
 }
 
 // Err returns the storage error that terminated iteration, or nil.
@@ -74,8 +111,12 @@ func (i *vectorizedMIterator) Err() error {
        return i.err
 }
 
-// Close delegates to the underlying pipeline. Pipeline.Close is idempotent,
-// so repeated Close calls on this adapter are safe.
+// Close releases pooled batches and the pipeline (which closes the BatchScan
+// and releases the underlying MeasureQueryResult). Returns the join of any
+// sticky iteration error and the pipeline-close error, mirroring the row-path
+// resultMIterator.Close contract that surfaces ei.err.
 func (i *vectorizedMIterator) Close() error {
-       return i.pipeline.Close()
+       i.recyclePrev()
+       closeErr := i.pipeline.Close()
+       return errors.Join(i.err, closeErr)
 }
diff --git a/pkg/query/vectorized/measure/adapter_test.go 
b/pkg/query/vectorized/measure/adapter_test.go
index 56d455ff8..cc0e0b942 100644
--- a/pkg/query/vectorized/measure/adapter_test.go
+++ b/pkg/query/vectorized/measure/adapter_test.go
@@ -27,8 +27,9 @@ import (
 )
 
 // buildAdapterPipeline wires a BatchScan into a minimal Pipeline against the
-// supplied fake MeasureQueryResult.
-func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) 
*vectorized.Pipeline {
+// supplied fake MeasureQueryResult and returns both the pipeline and its pool
+// so the adapter can recycle batches.
+func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) 
(*vectorized.Pipeline, *vectorized.BatchPool) {
        t.Helper()
        schema := minimalSchema()
        pool := vectorized.NewBatchPool(schema, 4)
@@ -40,28 +41,32 @@ func buildAdapterPipeline(t *testing.T, qr 
model.MeasureQueryResult) *vectorized
        if err := scan.Init(context.Background()); err != nil {
                t.Fatal(err)
        }
-       return p
+       return p, pool
 }
 
 func TestVectorizedMIterator_Next_PullsAndSerializes(t *testing.T) {
        qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100, 200)}}
-       p := buildAdapterPipeline(t, qr)
-       it := newVectorizedMIterator(context.Background(), p)
+       p, pool := buildAdapterPipeline(t, qr)
+       it := newVectorizedMIterator(context.Background(), p, pool)
        defer it.Close()
 
-       if !it.Next() {
-               t.Fatalf("Next must return true while pipeline has data; 
err=%v", it.Err())
+       count := 0
+       for it.Next() {
+               dps := it.Current()
+               if len(dps) != 1 {
+                       t.Fatalf("Current length: want 1, got %d", len(dps))
+               }
+               count++
        }
-       dps := it.Current()
-       if len(dps) != 2 {
-               t.Fatalf("Current length: want 2, got %d", len(dps))
+       if count != 2 {
+               t.Fatalf("iterations: want 2, got %d", count)
        }
 }
 
 func TestVectorizedMIterator_Next_ReturnsFalseOnEOF(t *testing.T) {
        qr := &fakeMeasureQueryResult{seq: nil}
-       p := buildAdapterPipeline(t, qr)
-       it := newVectorizedMIterator(context.Background(), p)
+       p, pool := buildAdapterPipeline(t, qr)
+       it := newVectorizedMIterator(context.Background(), p, pool)
        defer it.Close()
 
        if it.Next() {
@@ -75,8 +80,8 @@ func TestVectorizedMIterator_Next_ReturnsFalseOnEOF(t 
*testing.T) {
 func TestVectorizedMIterator_Next_ReturnsFalseOnError_ErrExposedViaErr(t 
*testing.T) {
        boom := errors.New("storage boom")
        qr := &fakeMeasureQueryResult{seq: 
[]*model.MeasureResult{mkResultErr(boom)}}
-       p := buildAdapterPipeline(t, qr)
-       it := newVectorizedMIterator(context.Background(), p)
+       p, pool := buildAdapterPipeline(t, qr)
+       it := newVectorizedMIterator(context.Background(), p, pool)
        defer it.Close()
 
        if it.Next() {
@@ -87,10 +92,10 @@ func 
TestVectorizedMIterator_Next_ReturnsFalseOnError_ErrExposedViaErr(t *testin
        }
 }
 
-func TestVectorizedMIterator_Current_ReturnsLastSerializedBatch(t *testing.T) {
+func TestVectorizedMIterator_Current_ReturnsCurrentRow(t *testing.T) {
        qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
-       p := buildAdapterPipeline(t, qr)
-       it := newVectorizedMIterator(context.Background(), p)
+       p, pool := buildAdapterPipeline(t, qr)
+       it := newVectorizedMIterator(context.Background(), p, pool)
        defer it.Close()
 
        _ = it.Next()
@@ -98,16 +103,16 @@ func 
TestVectorizedMIterator_Current_ReturnsLastSerializedBatch(t *testing.T) {
        if len(first) != 1 {
                t.Fatalf("first Current len: want 1, got %d", len(first))
        }
-       // Current called repeatedly should keep returning the last batch.
+       // Current called repeatedly without Next must keep returning the same 
row.
        if got := it.Current(); len(got) != 1 {
-               t.Fatalf("repeat Current must return same batch; got len %d", 
len(got))
+               t.Fatalf("repeat Current must return same row; got len %d", 
len(got))
        }
 }
 
 func TestVectorizedMIterator_Close_DelegatesToPipelineClose_Idempotent(t 
*testing.T) {
        qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 
100)}}
-       p := buildAdapterPipeline(t, qr)
-       it := newVectorizedMIterator(context.Background(), p)
+       p, pool := buildAdapterPipeline(t, qr)
+       it := newVectorizedMIterator(context.Background(), p, pool)
        if err := it.Close(); err != nil {
                t.Fatal(err)
        }
diff --git a/pkg/query/vectorized/measure/diff_test.go 
b/pkg/query/vectorized/measure/diff_test.go
new file mode 100644
index 000000000..5891d35e2
--- /dev/null
+++ b/pkg/query/vectorized/measure/diff_test.go
@@ -0,0 +1,592 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "errors"
+       "testing"
+       "time"
+
+       "github.com/google/go-cmp/cmp"
+       "google.golang.org/protobuf/proto"
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+)
+
+// diffFixture is a single parity case: a Measure schema, the projection
+// applied to it, and the deterministic MeasureResult sequence both paths
+// consume.
+type diffFixture struct {
+       schema  *databasev1.Measure
+       name    string
+       results []*model.MeasureResult
+       opts    model.MeasureQueryOptions
+}
+
+// runDiff drives both the row-path serializer and the vectorized adapter on
+// fresh copies of fx.results, then returns the two output slices for
+// proto-equality comparison. fx.results is *not* shared across paths because
+// the vectorized adapter consumes (and may mutate referenced) values; each
+// path needs an independent fake MeasureQueryResult.
+func runDiff(t *testing.T, fx diffFixture) (rowOut, vecOut 
[]*measurev1.InternalDataPoint) {
+       t.Helper()
+       rowQR := &fakeMeasureQueryResult{seq: cloneResults(fx.results)}
+       rowOut = rowSerialize(rowQR, fx.opts)
+
+       vecQR := &fakeMeasureQueryResult{seq: cloneResults(fx.results)}
+       cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64}
+       it, err := NewMIterator(context.Background(), vecQR, fx.schema, 
fx.opts, cfg)
+       if err != nil {
+               t.Fatalf("%s: NewMIterator: %v", fx.name, err)
+       }
+       defer it.Close()
+       for it.Next() {
+               // Adapter conforms to the row-path's "one DataPoint per Next" 
contract:
+               // the gRPC collector reads only Current()[0]. Mirror that here.
+               current := it.Current()
+               if len(current) > 0 {
+                       vecOut = append(vecOut, cloneIDPs(current[:1])...)
+               }
+       }
+       if itErr := it.Err(); itErr != nil {
+               t.Fatalf("%s: vectorized iterator err: %v", fx.name, itErr)
+       }
+       return rowOut, vecOut
+}
+
+// cloneResults returns a fresh slice so the row and vectorized paths
+// each own their MeasureResult sequence; both paths advance the cursor
+// independently.
+func cloneResults(in []*model.MeasureResult) []*model.MeasureResult {
+       out := make([]*model.MeasureResult, len(in))
+       copy(out, in) // shallow — neither path writes back into MeasureResult.
+       return out
+}
+
+func cloneIDPs(in []*measurev1.InternalDataPoint) 
[]*measurev1.InternalDataPoint {
+       out := make([]*measurev1.InternalDataPoint, len(in))
+       for i, dp := range in {
+               out[i] = proto.Clone(dp).(*measurev1.InternalDataPoint)
+       }
+       return out
+}
+
+// rowSerialize replays the row-path serializer that pkg/query/logical/measure
+// runs in resultMIterator.Next. Replicated here (not imported) because
+// pkg/query/logical/measure already imports this package — importing it back
+// would form a cycle.
+func rowSerialize(qr model.MeasureQueryResult, opts model.MeasureQueryOptions) 
[]*measurev1.InternalDataPoint {
+       var dps []*measurev1.InternalDataPoint
+       for {
+               r := qr.Pull()
+               if r == nil {
+                       return dps
+               }
+               if r.Error != nil {
+                       return dps
+               }
+               tagFamilyMap := make(map[string]*model.TagFamily, 
len(r.TagFamilies))
+               for idx := range r.TagFamilies {
+                       tagFamilyMap[r.TagFamilies[idx].Name] = 
&r.TagFamilies[idx]
+               }
+               for i := range r.Timestamps {
+                       dp := &measurev1.DataPoint{
+                               Timestamp: timestamppb.New(time.Unix(0, 
r.Timestamps[i])),
+                               Sid:       uint64(r.SID),
+                               Version:   r.Versions[i],
+                       }
+                       for _, proj := range opts.TagProjection {
+                               tf := &modelv1.TagFamily{Name: proj.Family}
+                               dp.TagFamilies = append(dp.TagFamilies, tf)
+                               resultTF := tagFamilyMap[proj.Family]
+                               for _, tagName := range proj.Names {
+                                       var value *modelv1.TagValue
+                                       if resultTF != nil {
+                                               for _, t := range resultTF.Tags 
{
+                                                       if t.Name == tagName {
+                                                               value = 
t.Values[i]
+                                                               break
+                                                       }
+                                               }
+                                       }
+                                       if value == nil {
+                                               value = pbv1.NullTagValue
+                                       }
+                                       tf.Tags = append(tf.Tags, 
&modelv1.Tag{Key: tagName, Value: value})
+                               }
+                       }
+                       for _, pf := range opts.FieldProjection {
+                               foundIdx := -1
+                               for idx := range r.Fields {
+                                       if r.Fields[idx].Name == pf {
+                                               foundIdx = idx
+                                               break
+                                       }
+                               }
+                               if foundIdx != -1 {
+                                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                                               Name:  r.Fields[foundIdx].Name,
+                                               Value: 
r.Fields[foundIdx].Values[i],
+                                       })
+                               } else {
+                                       dp.Fields = append(dp.Fields, 
&measurev1.DataPoint_Field{
+                                               Name:  pf,
+                                               Value: pbv1.NullFieldValue,
+                                       })
+                               }
+                       }
+                       var shardID common.ShardID
+                       if len(r.ShardIDs) > i {
+                               shardID = r.ShardIDs[i]
+                       }
+                       dps = append(dps, &measurev1.InternalDataPoint{
+                               DataPoint: dp,
+                               ShardId:   uint32(shardID),
+                       })
+               }
+       }
+}
+
+// assertDPEqual compares two slices of InternalDataPoint structurally up to
+// the subset of fields the vectorized path is expected to set. Notes:
+//   - Tag/field projection ordering is identical across both paths.
+//   - The vectorized path may not populate TagFamilies whose entries do not
+//     appear in the source MeasureResult; the row path always emits a
+//     family-shaped entry. Tests below pin the input to materialize tags so
+//     this difference does not surface.
+func assertDPEqual(t *testing.T, name string, want, got 
[]*measurev1.InternalDataPoint) {
+       t.Helper()
+       if len(want) != len(got) {
+               t.Fatalf("%s: count mismatch: row=%d vec=%d", name, len(want), 
len(got))
+       }
+       for i := range want {
+               if diff := cmp.Diff(want[i], got[i],
+                       cmp.Comparer(proto.Equal),
+               ); diff != "" {
+                       t.Fatalf("%s: dp[%d] mismatch:\n%s", name, i, diff)
+               }
+       }
+}
+
+// Schemas / fixtures.
+
+func diffSchemaTagFieldMix() *databasev1.Measure {
+       return &databasev1.Measure{
+               Metadata: nil,
+               TagFamilies: []*databasev1.TagFamilySpec{
+                       {Name: "default", Tags: []*databasev1.TagSpec{
+                               {Name: "svc", Type: 
databasev1.TagType_TAG_TYPE_STRING},
+                               {Name: "env_id", Type: 
databasev1.TagType_TAG_TYPE_INT},
+                               {Name: "blob", Type: 
databasev1.TagType_TAG_TYPE_DATA_BINARY},
+                               {Name: "ports", Type: 
databasev1.TagType_TAG_TYPE_INT_ARRAY},
+                               {Name: "labels", Type: 
databasev1.TagType_TAG_TYPE_STRING_ARRAY},
+                       }},
+               },
+               Fields: []*databasev1.FieldSpec{
+                       {Name: "v_int", FieldType: 
databasev1.FieldType_FIELD_TYPE_INT},
+                       {Name: "v_float", FieldType: 
databasev1.FieldType_FIELD_TYPE_FLOAT},
+                       {Name: "v_str", FieldType: 
databasev1.FieldType_FIELD_TYPE_STRING},
+                       {Name: "v_bytes", FieldType: 
databasev1.FieldType_FIELD_TYPE_DATA_BINARY},
+               },
+       }
+}
+
+func diffOptsAllTagsAllFields() model.MeasureQueryOptions {
+       return model.MeasureQueryOptions{
+               TagProjection: []model.TagProjection{
+                       {Family: "default", Names: []string{"svc", "env_id", 
"blob", "ports", "labels"}},
+               },
+               FieldProjection: []string{"v_int", "v_float", "v_str", 
"v_bytes"},
+       }
+}
+
+// rowSet is shorthand for the canonical "all variants set" row used by most
+// fixtures.
+func rowSet(sid common.SeriesID, ts int64) *model.MeasureResult {
+       return &model.MeasureResult{
+               SID:        sid,
+               Timestamps: []int64{ts},
+               Versions:   []int64{1},
+               ShardIDs:   []common.ShardID{0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: []*modelv1.TagValue{
+                                       {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "alpha"}}},
+                               }},
+                               {Name: "env_id", Values: []*modelv1.TagValue{
+                                       {Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 7}}},
+                               }},
+                               {Name: "blob", Values: []*modelv1.TagValue{
+                                       {Value: 
&modelv1.TagValue_BinaryData{BinaryData: []byte{0x01, 0x02, 0x03}}},
+                               }},
+                               {Name: "ports", Values: []*modelv1.TagValue{
+                                       {Value: 
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{80, 
443}}}},
+                               }},
+                               {Name: "labels", Values: []*modelv1.TagValue{
+                                       {Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"a", 
"b"}}}},
+                               }},
+                       }},
+               },
+               Fields: []model.Field{
+                       {Name: "v_int", Values: []*modelv1.FieldValue{
+                               {Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 42}}},
+                       }},
+                       {Name: "v_float", Values: []*modelv1.FieldValue{
+                               {Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: 3.14}}},
+                       }},
+                       {Name: "v_str", Values: []*modelv1.FieldValue{
+                               {Value: &modelv1.FieldValue_Str{Str: 
&modelv1.Str{Value: "ok"}}},
+                       }},
+                       {Name: "v_bytes", Values: []*modelv1.FieldValue{
+                               {Value: 
&modelv1.FieldValue_BinaryData{BinaryData: []byte{0xde, 0xad}}},
+                       }},
+               },
+       }
+}
+
+// rowMulti returns one MeasureResult covering n consecutive rows of a single
+// series — exercises the bulk fast path.
+func rowMulti(sid common.SeriesID, baseTS int64, n int) *model.MeasureResult {
+       r := &model.MeasureResult{SID: sid}
+       tags := []model.Tag{
+               {Name: "svc"},
+               {Name: "env_id"},
+               {Name: "blob"},
+               {Name: "ports"},
+               {Name: "labels"},
+       }
+       fields := []model.Field{
+               {Name: "v_int"},
+               {Name: "v_float"},
+               {Name: "v_str"},
+               {Name: "v_bytes"},
+       }
+       for i := range n {
+               r.Timestamps = append(r.Timestamps, baseTS+int64(i))
+               r.Versions = append(r.Versions, 1)
+               r.ShardIDs = append(r.ShardIDs, 0)
+               tags[0].Values = append(tags[0].Values, 
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"alpha"}}})
+               tags[1].Values = append(tags[1].Values, 
&modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 
int64(i)}}})
+               tags[2].Values = append(tags[2].Values, 
&modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: 
[]byte{byte(i)}}})
+               tags[3].Values = append(tags[3].Values, 
&modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: 
&modelv1.IntArray{Value: []int64{int64(i)}}}})
+               tags[4].Values = append(tags[4].Values, 
&modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: 
&modelv1.StrArray{Value: []string{"x"}}}})
+               fields[0].Values = append(fields[0].Values, 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 
int64(i)}}})
+               fields[1].Values = append(fields[1].Values, 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: 
&modelv1.Float{Value: float64(i)}}})
+               fields[2].Values = append(fields[2].Values, 
&modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: 
"s"}}})
+               fields[3].Values = append(fields[3].Values, 
&modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: 
[]byte{0xff}}})
+       }
+       r.TagFamilies = []model.TagFamily{{Name: "default", Tags: tags}}
+       r.Fields = fields
+       return r
+}
+
+// Tests.
+
+func TestDiff_SingleSeries_AllVariants(t *testing.T) {
+       fx := diffFixture{
+               name:    "single-series-all-variants",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{rowSet(1, 100)},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_MultiSeries_TagOnlyProjection(t *testing.T) {
+       fx := diffFixture{
+               name:   "multi-series-tag-only",
+               schema: diffSchemaTagFieldMix(),
+               opts: model.MeasureQueryOptions{
+                       TagProjection: []model.TagProjection{
+                               {Family: "default", Names: []string{"svc", 
"env_id"}},
+                       },
+               },
+               results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200), 
rowSet(3, 300)},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_MultiSeries_MixedProjection(t *testing.T) {
+       fx := diffFixture{
+               name:   "multi-series-mixed",
+               schema: diffSchemaTagFieldMix(),
+               opts: model.MeasureQueryOptions{
+                       TagProjection: []model.TagProjection{
+                               {Family: "default", Names: []string{"svc"}},
+                       },
+                       FieldProjection: []string{"v_int", "v_float"},
+               },
+               results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200)},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_IndexMode_SingleRowResults(t *testing.T) {
+       // indexSortResult.Pull yields single-row MeasureResults; mimic that 
shape.
+       fx := diffFixture{
+               name:   "index-mode-single-row",
+               schema: diffSchemaTagFieldMix(),
+               opts:   diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{
+                       rowSet(1, 100), rowSet(1, 200), rowSet(2, 300),
+               },
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_EmptyResultSet(t *testing.T) {
+       fx := diffFixture{
+               name:    "empty",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: nil,
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       if len(rowOut) != 0 || len(vecOut) != 0 {
+               t.Fatalf("empty fixture produced rows: row=%d vec=%d", 
len(rowOut), len(vecOut))
+       }
+}
+
+func TestDiff_CrossesBatchBoundary(t *testing.T) {
+       // BatchSize=4 (set in runDiff). Use 9 rows so we cross at least two
+       // batch boundaries inside a single series.
+       fx := diffFixture{
+               name:    "batch-boundary",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{rowMulti(1, 1000, 9)},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_NullVariants(t *testing.T) {
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{100},
+               Versions:   []int64{1},
+               ShardIDs:   []common.ShardID{0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
+                               {Name: "env_id", Values: 
[]*modelv1.TagValue{pbv1.NullTagValue}},
+                       }},
+               },
+               Fields: []model.Field{
+                       {Name: "v_int", Values: 
[]*modelv1.FieldValue{pbv1.NullFieldValue}},
+                       {Name: "v_float", Values: 
[]*modelv1.FieldValue{pbv1.NullFieldValue}},
+               },
+       }
+       fx := diffFixture{
+               name:   "null-variants",
+               schema: diffSchemaTagFieldMix(),
+               opts: model.MeasureQueryOptions{
+                       TagProjection:   []model.TagProjection{{Family: 
"default", Names: []string{"svc", "env_id"}}},
+                       FieldProjection: []string{"v_int", "v_float"},
+               },
+               results: []*model.MeasureResult{r},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+func TestDiff_CanceledContext(t *testing.T) {
+       // Build a result the iterator would normally walk through, then cancel
+       // the context before iteration. Both paths must surface the 
cancellation
+       // without producing partial output. The row path doesn't propagate
+       // cancellation through the dummy fakeMeasureQueryResult, so this test
+       // just asserts the vectorized path's Close is safe under cancel.
+       fx := diffFixture{
+               name:    "canceled-context",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200)},
+       }
+       cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64}
+       ctx, cancel := context.WithCancel(context.Background())
+       cancel()
+       qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)}
+       it, err := NewMIterator(ctx, qr, fx.schema, fx.opts, cfg)
+       if err != nil {
+               t.Fatalf("%s: NewMIterator under canceled ctx: %v", fx.name, 
err)
+       }
+       if closeErr := it.Close(); closeErr != nil && !errors.Is(closeErr, 
context.Canceled) {
+               t.Fatalf("%s: Close after cancel: %v", fx.name, closeErr)
+       }
+}
+
+// rowWithShard mirrors rowSet but pins both Version and ShardID to caller-
+// specified values. Used by parity tests that exercise non-zero ShardIDs and
+// per-row version variations.
+func rowWithShard(sid common.SeriesID, ts, version int64, shard 
common.ShardID) *model.MeasureResult {
+       r := rowSet(sid, ts)
+       r.Versions = []int64{version}
+       r.ShardIDs = []common.ShardID{shard}
+       return r
+}
+
+// TestDiff_NonZeroShardIDs pins ShardID column flow through the
+// fillMetadata → serializeBatchToProto path: row path emits shard via
+// dp.ShardId; vectorized path stores it as an int64 column then casts to
+// uint32. Mismatched casts surface here.
+func TestDiff_NonZeroShardIDs(t *testing.T) {
+       fx := diffFixture{
+               name:   "non-zero-shard-ids",
+               schema: diffSchemaTagFieldMix(),
+               opts:   diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{
+                       rowWithShard(1, 100, 1, 7),
+                       rowWithShard(2, 200, 1, 0),
+                       rowWithShard(3, 300, 1, 12345),
+               },
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+// TestDiff_MixedShardIDsAcrossBatchBoundary keeps ShardID varying within a
+// single MeasureResult so the bulk-fill metadata path copies a non-uniform
+// shard column across a batch boundary.
+func TestDiff_MixedShardIDsAcrossBatchBoundary(t *testing.T) {
+       r := rowMulti(1, 1000, 9)
+       for i := range r.ShardIDs {
+               r.ShardIDs[i] = common.ShardID(i % 3)
+       }
+       fx := diffFixture{
+               name:    "mixed-shard-ids-batch-boundary",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{r},
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+// TestDiff_VersionVariations advances per-row Version (the field that drives
+// the row-path's queryResult.merge dedup, applied upstream of this adapter).
+// At the adapter layer both paths must propagate every supplied row verbatim;
+// dedup happens inside queryResult.Pull, not the iterator.
+func TestDiff_VersionVariations(t *testing.T) {
+       fx := diffFixture{
+               name:   "version-variations",
+               schema: diffSchemaTagFieldMix(),
+               opts:   diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{
+                       rowWithShard(1, 100, 1, 0),
+                       rowWithShard(1, 200, 5, 0),
+                       rowWithShard(1, 300, 99, 0),
+               },
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+// TestDiff_InterleavedSeriesAcrossResults verifies cross-MeasureResult
+// ordering: each Pull yields a different series, and the adapter must emit
+// rows in the order produced by the source (no internal reordering).
+func TestDiff_InterleavedSeriesAcrossResults(t *testing.T) {
+       fx := diffFixture{
+               name:   "interleaved-multi-result",
+               schema: diffSchemaTagFieldMix(),
+               opts:   diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{
+                       rowWithShard(7, 100, 1, 0),
+                       rowWithShard(3, 110, 1, 1),
+                       rowWithShard(7, 120, 1, 0),
+                       rowWithShard(11, 130, 1, 2),
+                       rowWithShard(3, 140, 1, 1),
+               },
+       }
+       rowOut, vecOut := runDiff(t, fx)
+       assertDPEqual(t, fx.name, rowOut, vecOut)
+}
+
+// TestDiff_CanceledMidIteration starts iterating, cancels the context, then
+// continues iterating. The vectorized adapter must safely terminate without
+// returning partial garbage and Close must be safe.
+func TestDiff_CanceledMidIteration(t *testing.T) {
+       fx := diffFixture{
+               name:    "canceled-mid-iteration",
+               schema:  diffSchemaTagFieldMix(),
+               opts:    diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{rowMulti(1, 1000, 9)},
+       }
+       cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64}
+       ctx, cancel := context.WithCancel(context.Background())
+       qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)}
+       it, err := NewMIterator(ctx, qr, fx.schema, fx.opts, cfg)
+       if err != nil {
+               t.Fatalf("%s: NewMIterator: %v", fx.name, err)
+       }
+       if !it.Next() {
+               t.Fatalf("%s: first Next must succeed before cancel", fx.name)
+       }
+       if got := it.Current(); len(got) != 1 {
+               t.Fatalf("%s: Current after first Next: want 1, got %d", 
fx.name, len(got))
+       }
+       cancel()
+       // Subsequent Next calls may return true (the fake doesn't honor ctx) or
+       // false; either is acceptable. The contract under test is that Close
+       // after cancel cleans up without panicking and returns either nil or a
+       // context.Canceled error.
+       for it.Next() { //nolint:revive // intentionally drains remaining 
cached batch
+       }
+       if closeErr := it.Close(); closeErr != nil && !errors.Is(closeErr, 
context.Canceled) {
+               t.Fatalf("%s: Close after mid-iteration cancel: %v", fx.name, 
closeErr)
+       }
+}
+
+// TestDiff_StorageError_PropagatesViaErr confirms a Pull-time error from the
+// MeasureQueryResult surfaces via Err() and is joined into Close().
+func TestDiff_StorageError_PropagatesViaErr(t *testing.T) {
+       boom := errors.New("simulated storage failure")
+       fx := diffFixture{
+               name:   "storage-error",
+               schema: diffSchemaTagFieldMix(),
+               opts:   diffOptsAllTagsAllFields(),
+               results: []*model.MeasureResult{
+                       rowSet(1, 100),
+                       {Error: boom},
+               },
+       }
+       cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64}
+       qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)}
+       it, err := NewMIterator(context.Background(), qr, fx.schema, fx.opts, 
cfg)
+       if err != nil {
+               t.Fatalf("%s: NewMIterator: %v", fx.name, err)
+       }
+       for it.Next() { //nolint:revive // drain until error
+       }
+       if itErr := it.Err(); !errors.Is(itErr, boom) {
+               t.Fatalf("%s: Err want %v, got %v", fx.name, boom, itErr)
+       }
+       if closeErr := it.Close(); !errors.Is(closeErr, boom) {
+               t.Fatalf("%s: Close must surface sticky storage error; got %v", 
fx.name, closeErr)
+       }
+}
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
new file mode 100644
index 000000000..e1d8c6e20
--- /dev/null
+++ b/pkg/query/vectorized/measure/integration.go
@@ -0,0 +1,203 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// BuildBatchSchema derives a BatchSchema from a Measure schema and a query's
+// projection list. The output column order is fixed:
+//
+//     timestamp, version, sid, shardID, then projected tags (in TagProjection
+//     order, family-by-family, name-by-name), then projected fields (in
+//     FieldProjection order).
+//
+// Tag families and tag names that are not present in the Measure schema are
+// dropped — the row path silently skips unknown tags as well, so this matches
+// existing semantics. Fields not present in the schema yield a Null-typed
+// column so projection still produces a slot in the output.
+func BuildBatchSchema(measureSchema *databasev1.Measure, opts 
model.MeasureQueryOptions) (*vectorized.BatchSchema, error) {
+       if measureSchema == nil {
+               return nil, fmt.Errorf("vectorized.measure: nil Measure schema")
+       }
+       cols := []vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleShardID, Type: 
vectorized.ColumnTypeInt64},
+       }
+
+       tagSpecs := make(map[string]map[string]*databasev1.TagSpec)
+       for _, tf := range measureSchema.GetTagFamilies() {
+               byName := make(map[string]*databasev1.TagSpec, 
len(tf.GetTags()))
+               for _, ts := range tf.GetTags() {
+                       byName[ts.GetName()] = ts
+               }
+               tagSpecs[tf.GetName()] = byName
+       }
+
+       // Projection entries whose names are absent from the Measure schema 
get a
+       // nullable placeholder column so the serializer can still emit a
+       // NullTagValue / NullFieldValue slot — matching the row path, which 
fills
+       // missing projections with pbv1.Null{Tag,Field}Value. The placeholder 
type
+       // is irrelevant: validity-bit-only access via MarkNullAt + IsNull means
+       // the value bytes are never read.
+       for _, tp := range opts.TagProjection {
+               family := tagSpecs[tp.Family]
+               for _, name := range tp.Names {
+                       ct := vectorized.ColumnTypeInt64
+                       if family != nil {
+                               if spec, found := family[name]; found {
+                                       mapped, mapErr := 
tagTypeToColumnType(spec.GetType())
+                                       if mapErr != nil {
+                                               return nil, 
fmt.Errorf("vectorized.measure: tag %s.%s: %w", tp.Family, name, mapErr)
+                                       }
+                                       ct = mapped
+                               }
+                       }
+                       cols = append(cols, vectorized.ColumnDef{
+                               Role:      vectorized.RoleTag,
+                               TagFamily: tp.Family,
+                               Name:      name,
+                               Type:      ct,
+                       })
+               }
+       }
+
+       fieldSpecs := make(map[string]*databasev1.FieldSpec, 
len(measureSchema.GetFields()))
+       for _, fs := range measureSchema.GetFields() {
+               fieldSpecs[fs.GetName()] = fs
+       }
+       for _, name := range opts.FieldProjection {
+               ct := vectorized.ColumnTypeInt64
+               if spec, found := fieldSpecs[name]; found {
+                       mapped, mapErr := 
fieldTypeToColumnType(spec.GetFieldType())
+                       if mapErr != nil {
+                               return nil, fmt.Errorf("vectorized.measure: 
field %s: %w", name, mapErr)
+                       }
+                       ct = mapped
+               }
+               cols = append(cols, vectorized.ColumnDef{
+                       Role: vectorized.RoleField,
+                       Name: name,
+                       Type: ct,
+               })
+       }
+
+       return vectorized.NewBatchSchema(cols), nil
+}
+
+func tagTypeToColumnType(t databasev1.TagType) (vectorized.ColumnType, error) {
+       switch t {
+       case databasev1.TagType_TAG_TYPE_INT:
+               return vectorized.ColumnTypeInt64, nil
+       case databasev1.TagType_TAG_TYPE_STRING:
+               return vectorized.ColumnTypeString, nil
+       case databasev1.TagType_TAG_TYPE_DATA_BINARY:
+               return vectorized.ColumnTypeBytes, nil
+       case databasev1.TagType_TAG_TYPE_INT_ARRAY:
+               return vectorized.ColumnTypeInt64Array, nil
+       case databasev1.TagType_TAG_TYPE_STRING_ARRAY:
+               return vectorized.ColumnTypeStrArray, nil
+       case databasev1.TagType_TAG_TYPE_UNSPECIFIED, 
databasev1.TagType_TAG_TYPE_TIMESTAMP:
+               return 0, fmt.Errorf("unsupported tag type %v", t)
+       }
+       return 0, fmt.Errorf("unsupported tag type %v", t)
+}
+
+func fieldTypeToColumnType(t databasev1.FieldType) (vectorized.ColumnType, 
error) {
+       switch t {
+       case databasev1.FieldType_FIELD_TYPE_INT:
+               return vectorized.ColumnTypeInt64, nil
+       case databasev1.FieldType_FIELD_TYPE_FLOAT:
+               return vectorized.ColumnTypeFloat64, nil
+       case databasev1.FieldType_FIELD_TYPE_STRING:
+               return vectorized.ColumnTypeString, nil
+       case databasev1.FieldType_FIELD_TYPE_DATA_BINARY:
+               return vectorized.ColumnTypeBytes, nil
+       case databasev1.FieldType_FIELD_TYPE_UNSPECIFIED:
+               return 0, fmt.Errorf("unsupported field type %v", t)
+       }
+       return 0, fmt.Errorf("unsupported field type %v", t)
+}
+
+// NewMIterator builds a vectorized adapter that drives a Pipeline over qr and
+// satisfies executor.MIterator. The returned VectorizedMIterator owns the qr
+// lifetime through the pipeline: Close → pipeline.Close → BatchScan.Close →
+// SeriesCursor.Close → qr.Release. Callers must NOT release qr themselves on
+// the success path.
+//
+// On error, ownership of qr stays with the caller; the caller is responsible
+// for releasing it. Construction is split so qr.Release-on-failure is decided
+// at the call site (where build inputs other than qr are also tracked).
+func NewMIterator(ctx context.Context, qr model.MeasureQueryResult,
+       measureSchema *databasev1.Measure, opts model.MeasureQueryOptions, cfg 
VectorizedConfig,
+) (*VectorizedMIterator, error) {
+       if validateErr := cfg.Validate(); validateErr != nil {
+               return nil, validateErr
+       }
+       schema, schemaErr := BuildBatchSchema(measureSchema, opts)
+       if schemaErr != nil {
+               return nil, schemaErr
+       }
+       pool := vectorized.NewBatchPool(schema, cfg.BatchSize)
+       scan := NewBatchScan(qr, schema, pool, cfg.BatchSize)
+       pipeline, buildErr := vectorized.NewPipelineBuilder().From(scan).Build()
+       if buildErr != nil {
+               // scan was constructed but never wired into a Pipeline; close 
it
+               // directly to release qr through the cursor.
+               _ = scan.Close()
+               return nil, buildErr
+       }
+       if initErr := scan.Init(ctx); initErr != nil {
+               _ = pipeline.Close()
+               return nil, initErr
+       }
+       return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, 
pipeline, pool)}, nil
+}
+
+// VectorizedMIterator is the public adapter exposed to other packages. It is
+// a thin facade over the unexported vectorizedMIterator so the executor
+// interface is satisfied without leaking the package-private type.
+type VectorizedMIterator struct {
+       inner *vectorizedMIterator
+}
+
+// Next advances one DataPoint.
+func (v *VectorizedMIterator) Next() bool { return v.inner.Next() }
+
+// Current returns the current row as a single-element slice (matches row-path
+// contract).
+func (v *VectorizedMIterator) Current() []*measurev1.InternalDataPoint { 
return v.inner.Current() }
+
+// Err returns any sticky storage error that terminated iteration.
+func (v *VectorizedMIterator) Err() error { return v.inner.Err() }
+
+// Close releases the pipeline (and through it the BatchScan, cursor, and
+// underlying MeasureQueryResult). Returns the join of the sticky iteration
+// error and the pipeline-close error, matching resultMIterator.Close.
+func (v *VectorizedMIterator) Close() error {
+       return v.inner.Close()
+}
diff --git a/pkg/query/vectorized/measure/scan.go 
b/pkg/query/vectorized/measure/scan.go
index 678346a83..c0c7cb1b7 100644
--- a/pkg/query/vectorized/measure/scan.go
+++ b/pkg/query/vectorized/measure/scan.go
@@ -153,38 +153,64 @@ func fillMetadata(b *vectorized.RecordBatch, schema 
*vectorized.BatchSchema,
 }
 
 // fillTags extracts every tag column for n rows starting at pos into the batch
-// at offset. Tag families and tag names are matched against the BatchSchema;
-// unmatched tags in cur are skipped.
+// at offset. Schema-declared tag columns missing from cur are grown with
+// explicit nulls so a downstream serializer sees the same shape the row path
+// emits when the projected tag is absent (which the multi-group flow
+// produces — one group's schema may lack a tag the other group has).
 func fillTags(b *vectorized.RecordBatch, schema *vectorized.BatchSchema,
        cur *model.MeasureResult, pos, offset, n int,
 ) error {
-       for _, tf := range cur.TagFamilies {
-               for _, tag := range tf.Tags {
-                       colIdx, ok := schema.TagIndex(tf.Name, tag.Name)
-                       if !ok {
-                               continue
-                       }
-                       col := b.Columns[colIdx]
-                       growColumn(col, n)
-                       if extractErr := extractTagBulk(col, offset, 
tag.Values[pos:pos+n], n); extractErr != nil {
-                               return extractErr
+       resultTags := make(map[int]*model.Tag, len(cur.TagFamilies))
+       for tfIdx := range cur.TagFamilies {
+               tf := &cur.TagFamilies[tfIdx]
+               for tagIdx := range tf.Tags {
+                       tag := &tf.Tags[tagIdx]
+                       if colIdx, ok := schema.TagIndex(tf.Name, tag.Name); ok 
{
+                               resultTags[colIdx] = tag
                        }
                }
        }
+       for colIdx, def := range schema.Columns {
+               if def.Role != vectorized.RoleTag {
+                       continue
+               }
+               col := b.Columns[colIdx]
+               growColumn(col, n)
+               tag, present := resultTags[colIdx]
+               if !present {
+                       markRowsNull(col, offset, n)
+                       continue
+               }
+               if extractErr := extractTagBulk(col, offset, 
tag.Values[pos:pos+n], n); extractErr != nil {
+                       return extractErr
+               }
+       }
        return nil
 }
 
-// fillFields is the field-side counterpart of fillTags.
+// fillFields is the field-side counterpart of fillTags. Same null-fill rule
+// applies for projection entries that the active result lacks.
 func fillFields(b *vectorized.RecordBatch, schema *vectorized.BatchSchema,
        cur *model.MeasureResult, pos, offset, n int,
 ) error {
-       for _, f := range cur.Fields {
-               colIdx, ok := schema.FieldIndex(f.Name)
-               if !ok {
+       resultFields := make(map[int]*model.Field, len(cur.Fields))
+       for fIdx := range cur.Fields {
+               f := &cur.Fields[fIdx]
+               if colIdx, ok := schema.FieldIndex(f.Name); ok {
+                       resultFields[colIdx] = f
+               }
+       }
+       for colIdx, def := range schema.Columns {
+               if def.Role != vectorized.RoleField {
                        continue
                }
                col := b.Columns[colIdx]
                growColumn(col, n)
+               f, present := resultFields[colIdx]
+               if !present {
+                       markRowsNull(col, offset, n)
+                       continue
+               }
                if extractErr := extractFieldBulk(col, offset, 
f.Values[pos:pos+n], n); extractErr != nil {
                        return extractErr
                }
@@ -192,6 +218,14 @@ func fillFields(b *vectorized.RecordBatch, schema 
*vectorized.BatchSchema,
        return nil
 }
 
+// markRowsNull marks rows [offset, offset+n) in col as null without otherwise
+// touching the underlying data slice.
+func markRowsNull(col vectorized.Column, offset, n int) {
+       for k := range n {
+               col.MarkNullAt(offset + k)
+       }
+}
+
 func appendInt64Zeros(c *vectorized.TypedColumn[int64], n int) {
        for range n {
                c.Append(0)
diff --git a/pkg/query/vectorized/measure/serialize.go 
b/pkg/query/vectorized/measure/serialize.go
index 6a20c0fee..f2e0c3e64 100644
--- a/pkg/query/vectorized/measure/serialize.go
+++ b/pkg/query/vectorized/measure/serialize.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "slices"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -94,6 +95,10 @@ func buildDataPoint(b *vectorized.RecordBatch, schema 
*vectorized.BatchSchema, r
        return dp
 }
 
+// columnValueToTagValue materializes a *modelv1.TagValue from one row of col.
+// Slice-typed values (BinaryData, IntArray, StrArray) are defensively copied
+// so the produced TagValue does not alias the column's backing slice — pooled
+// batches re-overwrite that slice on the next iteration.
 func columnValueToTagValue(col vectorized.Column, rowIdx int) 
*modelv1.TagValue {
        if col.IsNull(rowIdx) {
                return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
@@ -104,15 +109,20 @@ func columnValueToTagValue(col vectorized.Column, rowIdx 
int) *modelv1.TagValue
        case *vectorized.TypedColumn[string]:
                return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: c.Data()[rowIdx]}}}
        case *vectorized.TypedColumn[[]byte]:
-               return &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: c.Data()[rowIdx]}}
+               src := c.Data()[rowIdx]
+               buf := make([]byte, len(src))
+               copy(buf, src)
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_BinaryData{BinaryData: buf}}
        case *vectorized.TypedColumn[[]int64]:
-               return &modelv1.TagValue{Value: 
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: 
c.Data()[rowIdx]}}}
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: 
slices.Clone(c.Data()[rowIdx])}}}
        case *vectorized.TypedColumn[[]string]:
-               return &modelv1.TagValue{Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: 
c.Data()[rowIdx]}}}
+               return &modelv1.TagValue{Value: 
&modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: 
slices.Clone(c.Data()[rowIdx])}}}
        }
        return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}}
 }
 
+// columnValueToFieldValue is the field-side counterpart. Same defensive copy
+// rule for BinaryData.
 func columnValueToFieldValue(col vectorized.Column, rowIdx int) 
*modelv1.FieldValue {
        if col.IsNull(rowIdx) {
                return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
@@ -125,7 +135,10 @@ func columnValueToFieldValue(col vectorized.Column, rowIdx 
int) *modelv1.FieldVa
        case *vectorized.TypedColumn[string]:
                return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: 
&modelv1.Str{Value: c.Data()[rowIdx]}}}
        case *vectorized.TypedColumn[[]byte]:
-               return &modelv1.FieldValue{Value: 
&modelv1.FieldValue_BinaryData{BinaryData: c.Data()[rowIdx]}}
+               src := c.Data()[rowIdx]
+               buf := make([]byte, len(src))
+               copy(buf, src)
+               return &modelv1.FieldValue{Value: 
&modelv1.FieldValue_BinaryData{BinaryData: buf}}
        }
        return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}}
 }
diff --git a/test/integration/standalone/query/vectorized_test.go 
b/test/integration/standalone/query/vectorized_test.go
new file mode 100644
index 000000000..15f530e67
--- /dev/null
+++ b/test/integration/standalone/query/vectorized_test.go
@@ -0,0 +1,98 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package query_test
+
+import (
+       "time"
+
+       "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+)
+
+// Vectorized parity gate (G4 §"Integration Test Plan").
+//
+// Boots a *separate* standalone with --measure-vectorized-enabled=true and
+// replays the same Measure / TopN test entries the row-path integration suite
+// already covers in suite_test.go. Each case asserts the row-path's expected
+// output, so every greenness here is a parity check: vectorized produced the
+// row-path's reference InternalDataPoints. The cluster is fresh and isolated
+// so neither side observes the other's state.
+//
+// This block runs after the on-disk-data Describe in round2.go, which closes
+// the original cluster. The integration suite remains a release-candidate
+// gate; the unit-level differential tests in pkg/query/vectorized/measure
+// gate every PR.
+var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, func() {
+       var (
+               vectorizedConn *grpc.ClientConn
+               stopFn         func()
+       )
+       ginkgo.BeforeAll(func() {
+               path, diskCleanupFn, pathErr := test.NewSpace()
+               gomega.Expect(pathErr).NotTo(gomega.HaveOccurred())
+               ports, portsErr := test.AllocateFreePorts(5)
+               gomega.Expect(portsErr).NotTo(gomega.HaveOccurred())
+               tmpDir, tmpDirCleanup, tmpErr := test.NewSpace()
+               gomega.Expect(tmpErr).NotTo(gomega.HaveOccurred())
+               dfWriter := setup.NewDiscoveryFileWriter(tmpDir)
+               config := setup.PropertyClusterConfig(dfWriter)
+               addr, _, closeFn := setup.ClosableStandalone(config, path, 
ports,
+                       "--measure-vectorized-enabled=true",
+               )
+               stopFn = func() {
+                       closeFn()
+                       diskCleanupFn()
+                       tmpDirCleanup()
+               }
+               var connErr error
+               vectorizedConn, connErr = grpchelper.Conn(addr, 10*time.Second,
+                       
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               gomega.Expect(connErr).NotTo(gomega.HaveOccurred())
+               ns := timestamp.NowMilli().UnixNano()
+               now := time.Unix(0, ns-ns%int64(time.Minute))
+               test_cases.Initialize(addr, now)
+               sharedCtx := helpers.SharedContext{
+                       Connection: vectorizedConn,
+                       BaseTime:   now,
+               }
+               casesmeasure.SharedContext = sharedCtx
+               casestopn.SharedContext = sharedCtx
+       })
+       ginkgo.AfterAll(func() {
+               if vectorizedConn != nil {
+                       
gomega.Expect(vectorizedConn.Close()).To(gomega.Succeed())
+               }
+               if stopFn != nil {
+                       stopFn()
+               }
+       })
+
+       casesmeasure.RegisterTable("Vectorized: scanning measures")
+       casestopn.RegisterTable("Vectorized: TopN")
+})

Reply via email to