This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit a421813e12e9bfa75b53682bf4a07b14c5415648
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 07:50:37 2026 +0000

    feat(query/model): add MeasureBatchResult interface; storage dual-emits 
(G5b foundation)
    
    First two stories of G5b — adds the columnar storage→executor interface
    and a wrapper-style implementation on the existing storage layer. Pull()
    remains byte-identical for legacy consumers; PullBatch is purely
    additive.
    
    US-001 — MeasureBatchResult interface + MeasureBatch struct
    - pkg/query/model/batch.go defines:
      * MeasureBatchResult { PullBatch(ctx) (*MeasureBatch, error); Release() }
      * MeasureBatch { Schema, Timestamps, Versions, ShardIDs, SeriesIDs,
        Tags, Fields, SeriesBoundaries }
    - pkg/query/model/batch_test.go covers nil receiver, empty/single/mixed
      column shapes, multi-series boundary semantics, sticky-error contract,
      and idempotent Release.
    
    US-002 — *queryResult and *indexSortResult dual-emit
    - pkg/query/vectorized/measure/build_batch.go provides
      BuildMeasureBatchFromResult — wrapper-style conversion of the existing
      *model.MeasureResult into *model.MeasureBatch with passthrough columns
      (TypedColumn[*modelv1.TagValue] / [*modelv1.FieldValue]). Multi-group
      null fill matches scan.go::fillTags / fillFields.
      Tests cover full row, missing-tag null fill, missing-field null fill,
      length-invariant violations, nil result/schema.
    - banyand/measure/query.go gains a *vectorized.BatchSchema field on
      queryResult and indexSortResult, populated once per Query() /
      buildIndexQueryResult() via vmeasure.BuildBatchSchema. Builds are
      best-effort — schema-build errors leave batchSchema nil; PullBatch
      returns a clean error rather than degrading Pull().
    - banyand/measure/query_batch.go implements PullBatch on both result
      types as a wrapper around Pull() + BuildMeasureBatchFromResult.
      Compile-time assertions confirm both types satisfy
      model.MeasureBatchResult.
    
    Mixing Pull() and PullBatch() on the same result is undefined; each
    advances the same underlying cursors. Documented on each PullBatch
    method.
    
    This is the lazy/wrapper design — the *modelv1.TagValue intermediate
    still exists, just transiently. The architectural decode-elimination is
    US-003 (block_cursor emits typed columns natively), which removes the
    intermediate from the storage hot path.
    
    Verification:
    - go test ./pkg/query/model/... -count=1 -race                        OK
    - go test ./pkg/query/vectorized/... -count=1 -race                   OK
    - go test ./banyand/measure/... -count=1 -short                       OK 
(28s)
    - go test ./test/integration/standalone/query/ -timeout=15m           OK 
(245s)
    - make -s lint                                                        clean
    - go build ./...                                                      clean
    
    Pull() byte-identical at the gRPC layer (integration suite parity).
    PullBatch is unwired — no consumer calls it yet (US-007 wires
    NewMIterator). G5a bench gates green at 5de3e0eb; this commit keeps
    them green by being purely additive.
---
 banyand/measure/query.go                         |  17 +-
 banyand/measure/query_batch.go                   |  81 ++++++++
 pkg/query/model/batch.go                         |  92 +++++++++
 pkg/query/model/batch_test.go                    | 186 +++++++++++++++++++
 pkg/query/vectorized/measure/build_batch.go      | 137 ++++++++++++++
 pkg/query/vectorized/measure/build_batch_test.go | 226 +++++++++++++++++++++++
 6 files changed, 737 insertions(+), 2 deletions(-)

diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 398e059f6..fd5f9ee70 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -39,6 +39,7 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
        vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -217,6 +218,11 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                m.queryMetrics.resultPoints.Observe(float64(len(result.data)))
        }
 
+       // Build the columnar BatchSchema once for PullBatch consumers (G5b).
+       // Falls back to nil on schema-build failure; PullBatch checks for nil
+       // and returns a clean error rather than degrading the row-path Pull().
+       result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, mqo)
+
        return &result, nil
 }
 
@@ -535,6 +541,11 @@ func (m *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
                r.segResults.sortDesc = true
        }
 
+       // G5b — build the columnar BatchSchema once for PullBatch consumers.
+       // Errors here are non-fatal for the row-path Pull(); PullBatch will
+       // return a clean error if batchSchema is nil at call time.
+       r.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, mqo)
+
        heap.Init(&r.segResults)
        return r, nil
 }
@@ -743,6 +754,7 @@ type queryResult struct {
        topNQueryOptions *topNQueryOptions
        sidToIndex       map[common.SeriesID]int
        storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue
+       batchSchema      *vectorized.BatchSchema
        tagProjection    []model.TagProjection
        data             []*blockCursor
        snapshots        []*snapshot
@@ -959,8 +971,9 @@ func (qr *queryResult) merge(storedIndexValue 
map[common.SeriesID]map[string]*mo
 }
 
 type indexSortResult struct {
-       tfl        []tagFamilyLocation
-       segResults segResultHeap
+       batchSchema *vectorized.BatchSchema
+       tfl         []tagFamilyLocation
+       segResults  segResultHeap
 }
 
 // Pull implements model.MeasureQueryResult.
diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go
new file mode 100644
index 000000000..08a7b1647
--- /dev/null
+++ b/banyand/measure/query_batch.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// Compile-time assertions that both result types satisfy the columnar
+// interface alongside MeasureQueryResult.
+var (
+       _ model.MeasureBatchResult = (*queryResult)(nil)
+       _ model.MeasureBatchResult = (*indexSortResult)(nil)
+)
+
+// PullBatch implements model.MeasureBatchResult for queryResult.
+//
+// G5b "dual-emit" wrapper: the implementation calls Pull() to obtain the
+// next *model.MeasureResult and converts it to a *model.MeasureBatch using
+// vmeasure.BuildMeasureBatchFromResult. Pull()'s row-path output stays
+// byte-identical for legacy consumers; PullBatch is purely additive.
+//
+// Mixing Pull() and PullBatch() on the same queryResult is undefined —
+// each call advances the same underlying cursors. Callers must pick one.
+//
+// Sticky-error contract: if Pull() returns a *MeasureResult with
+// non-nil Error, PullBatch returns (nil, that error) so future calls
+// continue to surface the same error (Pull() is itself sticky here).
+func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
+       if qr.batchSchema == nil {
+               return nil, fmt.Errorf("queryResult.PullBatch: batchSchema not 
initialized; " +
+                       "the underlying query did not record a vectorized 
BatchSchema (likely a schema-build error at Query time)")
+       }
+       r := qr.Pull()
+       if r == nil {
+               return nil, nil
+       }
+       if r.Error != nil {
+               return nil, r.Error
+       }
+       return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema)
+}
+
+// PullBatch implements model.MeasureBatchResult for indexSortResult.
+//
+// indexSortResult.Pull() yields single-row MeasureResults; the conversion
+// shape is identical to queryResult.PullBatch — single-row batches that
+// the vectorized adapter accumulates into larger pipeline batches.
+func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, 
error) {
+       if iqr.batchSchema == nil {
+               return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema 
not initialized; " +
+                       "the underlying query did not record a vectorized 
BatchSchema (likely a schema-build error at Query time)")
+       }
+       r := iqr.Pull()
+       if r == nil {
+               return nil, nil
+       }
+       if r.Error != nil {
+               return nil, r.Error
+       }
+       return vmeasure.BuildMeasureBatchFromResult(r, iqr.batchSchema)
+}
diff --git a/pkg/query/model/batch.go b/pkg/query/model/batch.go
new file mode 100644
index 000000000..aaad3a191
--- /dev/null
+++ b/pkg/query/model/batch.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+       "context"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// MeasureBatchResult is the columnar counterpart to MeasureQueryResult.
+// Implementations return a *MeasureBatch carrying parallel typed slices
+// instead of the row-shaped *modelv1.TagValue / *modelv1.FieldValue cells.
+//
+// The contract is independent of MeasureQueryResult: a single underlying
+// query may satisfy both interfaces (dual-emit) or only one. Callers that
+// want the columnar path call PullBatch; callers that still want the row
+// path call MeasureQueryResult.Pull. Release is shared semantics — the
+// caller invokes it exactly once after iteration is complete; calling
+// Release twice (e.g. once via each interface) is safe but unnecessary.
+type MeasureBatchResult interface {
+       // PullBatch returns the next batch from the underlying scan. It returns
+       // (nil, nil) on EOF; (nil, err) on storage error. Once an error is
+       // returned, subsequent calls must continue to return that same error
+       // (sticky-error contract — matches MeasureQueryResult.Pull's
+       // MeasureResult.Error semantics).
+       PullBatch(ctx context.Context) (*MeasureBatch, error)
+
+       // Release frees any resources held by the result. Idempotent.
+       Release()
+}
+
+// MeasureBatch is a single columnar batch flowing out of the storage layer.
+//
+// Row count is the length of Timestamps. Versions, ShardIDs and SeriesIDs
+// have the same length. Tags and Fields are parallel column slices whose
+// indices correspond to Schema.Columns entries with RoleTag / RoleField
+// respectively (the column at Tags[i] aligns with the i-th tag slot in
+// Schema, in declaration order; same for Fields[i]).
+//
+// SeriesBoundaries records the exclusive end-of-series row indices within
+// the batch. For example, a batch holding 100 rows of series A followed by
+// 200 rows of series B reports SeriesBoundaries = [100, 300]. Empty (nil
+// or zero-length) means "single series" — every row in the batch belongs
+// to the same series.
+type MeasureBatch struct {
+       // Schema describes the column layout. Tags[i] / Fields[i] entries are
+       // indexed via Schema.TagIndex / Schema.FieldIndex.
+       Schema *vectorized.BatchSchema
+
+       // Per-row metadata columns. Length equals the row count.
+       Timestamps []int64
+       Versions   []int64
+       ShardIDs   []common.ShardID
+       SeriesIDs  []common.SeriesID
+
+       // Typed tag and field columns. Tags[i] corresponds to the i-th
+       // RoleTag entry in Schema.Columns (in declaration order); same for
+       // Fields. Each column reports the same Len() as len(Timestamps).
+       Tags   []vectorized.Column
+       Fields []vectorized.Column
+
+       // SeriesBoundaries records exclusive end-of-series row indices for
+       // multi-series batches. nil or empty when the batch contains a single
+       // series.
+       SeriesBoundaries []int
+}
+
+// RowCount returns the number of rows in the batch. It reads len(Timestamps);
+// callers must keep the parallel column lengths in sync.
+func (b *MeasureBatch) RowCount() int {
+       if b == nil {
+               return 0
+       }
+       return len(b.Timestamps)
+}
diff --git a/pkg/query/model/batch_test.go b/pkg/query/model/batch_test.go
new file mode 100644
index 000000000..eb6081177
--- /dev/null
+++ b/pkg/query/model/batch_test.go
@@ -0,0 +1,186 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package model
+
+import (
+       "context"
+       "errors"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+func TestMeasureBatch_RowCount_Nil(t *testing.T) {
+       var b *MeasureBatch
+       if got := b.RowCount(); got != 0 {
+               t.Fatalf("nil batch RowCount: want 0, got %d", got)
+       }
+}
+
+func TestMeasureBatch_RowCount_Empty(t *testing.T) {
+       b := &MeasureBatch{}
+       if got := b.RowCount(); got != 0 {
+               t.Fatalf("empty batch RowCount: want 0, got %d", got)
+       }
+}
+
+func TestMeasureBatch_SingleTagColumn(t *testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+       })
+       tagCol := vectorized.NewStringColumn(4)
+       tagCol.Append("alpha")
+       tagCol.Append("beta")
+       b := &MeasureBatch{
+               Schema:     schema,
+               Timestamps: []int64{1, 2},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               SeriesIDs:  []common.SeriesID{7, 7},
+               Tags:       []vectorized.Column{tagCol},
+       }
+       if got := b.RowCount(); got != 2 {
+               t.Fatalf("RowCount: want 2, got %d", got)
+       }
+       if got := len(b.Tags); got != 1 {
+               t.Fatalf("len(Tags): want 1, got %d", got)
+       }
+       if got := b.Tags[0].Len(); got != 2 {
+               t.Fatalf("Tags[0].Len: want 2, got %d", got)
+       }
+}
+
+func TestMeasureBatch_MixedTagAndField(t *testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+       tagCol := vectorized.NewStringColumn(4)
+       tagCol.Append("alpha")
+       tagCol.Append("beta")
+       tagCol.Append("gamma")
+       fieldCol := vectorized.NewInt64Column(4)
+       fieldCol.Append(10)
+       fieldCol.Append(20)
+       fieldCol.Append(30)
+       b := &MeasureBatch{
+               Schema:     schema,
+               Timestamps: []int64{1, 2, 3},
+               Versions:   []int64{1, 1, 1},
+               ShardIDs:   []common.ShardID{0, 0, 0},
+               SeriesIDs:  []common.SeriesID{1, 1, 1},
+               Tags:       []vectorized.Column{tagCol},
+               Fields:     []vectorized.Column{fieldCol},
+       }
+       if got := b.RowCount(); got != 3 {
+               t.Fatalf("RowCount: want 3, got %d", got)
+       }
+       if got := len(b.Fields); got != 1 {
+               t.Fatalf("len(Fields): want 1, got %d", got)
+       }
+       if got := b.Fields[0].Len(); got != 3 {
+               t.Fatalf("Fields[0].Len: want 3, got %d", got)
+       }
+       // Schema lookup helpers must align with the columns.
+       if idx, ok := schema.TagIndex("default", "svc"); !ok || idx != 1 {
+               t.Fatalf("TagIndex: want (1, true), got (%d, %v)", idx, ok)
+       }
+       if idx, ok := schema.FieldIndex("value"); !ok || idx != 2 {
+               t.Fatalf("FieldIndex: want (2, true), got (%d, %v)", idx, ok)
+       }
+}
+
+func TestMeasureBatch_SeriesBoundariesMultiSeries(t *testing.T) {
+       // A batch holding 2 rows of series A followed by 3 rows of series B
+       // reports SeriesBoundaries = [2, 5].
+       b := &MeasureBatch{
+               Timestamps:       []int64{1, 2, 3, 4, 5},
+               Versions:         []int64{1, 1, 1, 1, 1},
+               ShardIDs:         []common.ShardID{0, 0, 0, 0, 0},
+               SeriesIDs:        []common.SeriesID{1, 1, 2, 2, 2},
+               SeriesBoundaries: []int{2, 5},
+       }
+       if got := b.RowCount(); got != 5 {
+               t.Fatalf("RowCount: want 5, got %d", got)
+       }
+       if got := b.SeriesBoundaries; len(got) != 2 || got[0] != 2 || got[1] != 
5 {
+               t.Fatalf("SeriesBoundaries: want [2 5], got %v", got)
+       }
+}
+
+// fakeMeasureBatchResult is the minimal MeasureBatchResult used to verify
+// the interface contract (sticky errors, EOF semantics, idempotent Release).
+type fakeMeasureBatchResult struct {
+       err        error
+       seq        []*MeasureBatch
+       idx        int
+       releaseCnt int
+}
+
+func (f *fakeMeasureBatchResult) PullBatch(_ context.Context) (*MeasureBatch, 
error) {
+       if f.err != nil {
+               return nil, f.err
+       }
+       if f.idx >= len(f.seq) {
+               return nil, nil
+       }
+       b := f.seq[f.idx]
+       f.idx++
+       return b, nil
+}
+
+func (f *fakeMeasureBatchResult) Release() {
+       f.releaseCnt++
+}
+
+func TestMeasureBatchResult_PullBatchEOF(t *testing.T) {
+       r := &fakeMeasureBatchResult{seq: nil}
+       b, err := r.PullBatch(context.Background())
+       if err != nil {
+               t.Fatalf("EOF must not return an error; got %v", err)
+       }
+       if b != nil {
+               t.Fatalf("EOF must return nil batch; got %v", b)
+       }
+}
+
+func TestMeasureBatchResult_PullBatchStickyError(t *testing.T) {
+       boom := errors.New("scan failed")
+       r := &fakeMeasureBatchResult{err: boom}
+       for i := range 3 {
+               b, err := r.PullBatch(context.Background())
+               if !errors.Is(err, boom) {
+                       t.Fatalf("call %d: want sticky error %v, got %v", i, 
boom, err)
+               }
+               if b != nil {
+                       t.Fatalf("call %d: error must yield nil batch, got %v", 
i, b)
+               }
+       }
+}
+
+func TestMeasureBatchResult_ReleaseIdempotent(t *testing.T) {
+       r := &fakeMeasureBatchResult{}
+       r.Release()
+       r.Release()
+       if r.releaseCnt != 2 {
+               t.Fatalf("Release calls counted: want 2, got %d", r.releaseCnt)
+       }
+}
diff --git a/pkg/query/vectorized/measure/build_batch.go 
b/pkg/query/vectorized/measure/build_batch.go
new file mode 100644
index 000000000..94702e574
--- /dev/null
+++ b/pkg/query/vectorized/measure/build_batch.go
@@ -0,0 +1,137 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// BuildMeasureBatchFromResult converts a *model.MeasureResult produced by a
+// row-shaped MeasureQueryResult.Pull() call into a *model.MeasureBatch whose
+// columns match the supplied BatchSchema.
+//
+// This is the G5b "dual-emit" wrapper helper: the storage layer can
+// implement MeasureBatchResult.PullBatch by calling its existing Pull() and
+// passing the result through this converter. It preserves the existing
+// row-path decode pipeline; the architectural decode-elimination is left to
+// G5c/G5d (block_cursor native column emit).
+//
+// Schema-declared tag/field columns missing from the result are null-filled
+// using pbv1.Null{Tag,Field}Value singletons — matching the multi-group
+// projection behavior in fillTags / fillFields when one group's schema
+// lacks a tag the other has.
+//
+// Returns (nil, nil) when r is nil. Returns (nil, err) when a length
+// invariant is violated (a value slice is shorter than the timestamp count).
+func BuildMeasureBatchFromResult(r *model.MeasureResult, schema 
*vectorized.BatchSchema) (*model.MeasureBatch, error) {
+       if r == nil {
+               return nil, nil
+       }
+       if schema == nil {
+               return nil, fmt.Errorf("BuildMeasureBatchFromResult: nil 
schema")
+       }
+       n := len(r.Timestamps)
+
+       timestamps := make([]int64, n)
+       copy(timestamps, r.Timestamps)
+       versions := make([]int64, n)
+       if len(r.Versions) >= n {
+               copy(versions, r.Versions[:n])
+       }
+       shardIDs := make([]common.ShardID, n)
+       if len(r.ShardIDs) >= n {
+               copy(shardIDs, r.ShardIDs[:n])
+       }
+       seriesIDs := make([]common.SeriesID, n)
+       for i := range seriesIDs {
+               seriesIDs[i] = r.SID
+       }
+
+       resultTags := make(map[string]*model.Tag)
+       for i := range r.TagFamilies {
+               tf := &r.TagFamilies[i]
+               for j := range tf.Tags {
+                       tag := &tf.Tags[j]
+                       resultTags[tf.Name+"\x00"+tag.Name] = tag
+               }
+       }
+       resultFields := make(map[string]*model.Field, len(r.Fields))
+       for i := range r.Fields {
+               resultFields[r.Fields[i].Name] = &r.Fields[i]
+       }
+
+       tagCols := make([]vectorized.Column, 0, len(schema.Columns))
+       fieldCols := make([]vectorized.Column, 0, len(schema.Columns))
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       col := vectorized.NewTagValueColumn(n)
+                       tag, present := 
resultTags[def.TagFamily+"\x00"+def.Name]
+                       if !present {
+                               for range n {
+                                       col.Append(pbv1.NullTagValue)
+                               }
+                       } else {
+                               if len(tag.Values) < n {
+                                       return nil, 
fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d",
+                                               def.TagFamily, def.Name, 
len(tag.Values), n)
+                               }
+                               for k := range n {
+                                       col.Append(tag.Values[k])
+                               }
+                       }
+                       tagCols = append(tagCols, col)
+               case vectorized.RoleField:
+                       col := vectorized.NewFieldValueColumn(n)
+                       fld, present := resultFields[def.Name]
+                       if !present {
+                               for range n {
+                                       col.Append(pbv1.NullFieldValue)
+                               }
+                       } else {
+                               if len(fld.Values) < n {
+                                       return nil, 
fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d",
+                                               def.Name, len(fld.Values), n)
+                               }
+                               for k := range n {
+                                       col.Append(fld.Values[k])
+                               }
+                       }
+                       fieldCols = append(fieldCols, col)
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata roles are populated via the parallel slices 
on the
+                       // MeasureBatch itself; no per-column entry is needed.
+               }
+       }
+
+       return &model.MeasureBatch{
+               Schema:     schema,
+               Timestamps: timestamps,
+               Versions:   versions,
+               ShardIDs:   shardIDs,
+               SeriesIDs:  seriesIDs,
+               Tags:       tagCols,
+               Fields:     fieldCols,
+       }, nil
+}
diff --git a/pkg/query/vectorized/measure/build_batch_test.go 
b/pkg/query/vectorized/measure/build_batch_test.go
new file mode 100644
index 000000000..0dcbf63cb
--- /dev/null
+++ b/pkg/query/vectorized/measure/build_batch_test.go
@@ -0,0 +1,226 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// schemaSingleTagSingleField is the canonical mini-schema used across the
+// build_batch tests: timestamp + version + sid metadata + one tag (svc) +
+// one field (value). Passthrough column types match what the storage-side
+// converter emits in the G5b wrapper-style PullBatch.
+func schemaSingleTagSingleField() *vectorized.BatchSchema {
+       return vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeTagValue},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeFieldValue},
+       })
+}
+
+func TestBuildMeasureBatchFromResult_NilResult(t *testing.T) {
+       batch, err := BuildMeasureBatchFromResult(nil, 
schemaSingleTagSingleField())
+       if err != nil {
+               t.Fatalf("nil result must return (nil, nil); got err %v", err)
+       }
+       if batch != nil {
+               t.Fatalf("nil result must return (nil, nil); got batch %v", 
batch)
+       }
+}
+
+func TestBuildMeasureBatchFromResult_NilSchema(t *testing.T) {
+       r := &model.MeasureResult{SID: 1, Timestamps: []int64{1}}
+       if _, err := BuildMeasureBatchFromResult(r, nil); err == nil {
+               t.Fatal("nil schema must return an error")
+       }
+}
+
+func TestBuildMeasureBatchFromResult_FullRow(t *testing.T) {
+       svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "alpha"}}}
+       valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 42}}}
+       r := &model.MeasureResult{
+               SID:        7,
+               Timestamps: []int64{100, 200, 300},
+               Versions:   []int64{1, 1, 1},
+               ShardIDs:   []common.ShardID{2, 2, 2},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: 
[]*modelv1.TagValue{svcVal, svcVal, svcVal}},
+                       }},
+               },
+               Fields: []model.Field{
+                       {Name: "value", Values: []*modelv1.FieldValue{valueFld, 
valueFld, valueFld}},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, 
schemaSingleTagSingleField())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult: %v", err)
+       }
+       if got := batch.RowCount(); got != 3 {
+               t.Fatalf("RowCount: want 3, got %d", got)
+       }
+       for i, ts := range []int64{100, 200, 300} {
+               if batch.Timestamps[i] != ts {
+                       t.Fatalf("Timestamps[%d]: want %d, got %d", i, ts, 
batch.Timestamps[i])
+               }
+       }
+       for i, sid := range batch.SeriesIDs {
+               if sid != 7 {
+                       t.Fatalf("SeriesIDs[%d]: want 7, got %d", i, sid)
+               }
+       }
+       for i, sh := range batch.ShardIDs {
+               if sh != 2 {
+                       t.Fatalf("ShardIDs[%d]: want 2, got %d", i, sh)
+               }
+       }
+       if len(batch.Tags) != 1 {
+               t.Fatalf("len(Tags): want 1, got %d", len(batch.Tags))
+       }
+       tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[*modelv1.TagValue])
+       if !ok {
+               t.Fatalf("Tags[0] is not TypedColumn[*modelv1.TagValue]; got 
%T", batch.Tags[0])
+       }
+       if tagCol.Len() != 3 {
+               t.Fatalf("Tags[0].Len: want 3, got %d", tagCol.Len())
+       }
+       for i, v := range tagCol.Data() {
+               if v != svcVal {
+                       t.Fatalf("Tags[0][%d]: want passthrough pointer to 
svcVal, got %v", i, v)
+               }
+       }
+       if len(batch.Fields) != 1 {
+               t.Fatalf("len(Fields): want 1, got %d", len(batch.Fields))
+       }
+       fldCol, ok := 
batch.Fields[0].(*vectorized.TypedColumn[*modelv1.FieldValue])
+       if !ok {
+               t.Fatalf("Fields[0] is not TypedColumn[*modelv1.FieldValue]; 
got %T", batch.Fields[0])
+       }
+       for i, v := range fldCol.Data() {
+               if v != valueFld {
+                       t.Fatalf("Fields[0][%d]: want passthrough pointer to 
valueFld, got %v", i, v)
+               }
+       }
+}
+
+func TestBuildMeasureBatchFromResult_MissingTagNullFilled(t *testing.T) {
+       // Schema declares tag "svc" + field "value", but the result omits 
"svc".
+       // Output column for svc must be null-filled with pbv1.NullTagValue per
+       // the multi-group projection contract.
+       valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 42}}}
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{10, 20},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               Fields: []model.Field{
+                       {Name: "value", Values: []*modelv1.FieldValue{valueFld, 
valueFld}},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, 
schemaSingleTagSingleField())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult: %v", err)
+       }
+       if batch.RowCount() != 2 {
+               t.Fatalf("RowCount: want 2, got %d", batch.RowCount())
+       }
+       tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[*modelv1.TagValue])
+       if !ok {
+               t.Fatalf("Tags[0] type")
+       }
+       for i, v := range tagCol.Data() {
+               if v != pbv1.NullTagValue {
+                       t.Fatalf("Tags[0][%d]: want pbv1.NullTagValue 
(singleton), got %v", i, v)
+               }
+       }
+}
+
+func TestBuildMeasureBatchFromResult_MissingFieldNullFilled(t *testing.T) {
+       svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "alpha"}}}
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{10, 20},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: 
[]*modelv1.TagValue{svcVal, svcVal}},
+                       }},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, 
schemaSingleTagSingleField())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult: %v", err)
+       }
+       fldCol, ok := 
batch.Fields[0].(*vectorized.TypedColumn[*modelv1.FieldValue])
+       if !ok {
+               t.Fatalf("Fields[0] type")
+       }
+       for i, v := range fldCol.Data() {
+               if v != pbv1.NullFieldValue {
+                       t.Fatalf("Fields[0][%d]: want pbv1.NullFieldValue 
(singleton), got %v", i, v)
+               }
+       }
+}
+
+func TestBuildMeasureBatchFromResult_TagValueLengthMismatchErrors(t 
*testing.T) {
+       // The schema demands 2 timestamps' worth of cells but the tag's Values
+       // slice is shorter — the converter must surface the length invariant
+       // rather than silently truncating.
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{1, 2},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: []*modelv1.TagValue{
+                                       {Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "x"}}},
+                               }},
+                       }},
+               },
+       }
+       if _, err := BuildMeasureBatchFromResult(r, 
schemaSingleTagSingleField()); err == nil {
+               t.Fatal("length-invariant violation must return an error")
+       }
+}
+
+func TestBuildMeasureBatchFromResult_FieldValueLengthMismatchErrors(t 
*testing.T) {
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{1, 2},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               Fields: []model.Field{
+                       {Name: "value", Values: []*modelv1.FieldValue{
+                               {Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 1}}},
+                       }},
+               },
+       }
+       if _, err := BuildMeasureBatchFromResult(r, 
schemaSingleTagSingleField()); err == nil {
+               t.Fatal("length-invariant violation must return an error")
+       }
+}

Reply via email to