This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ba6e8278766a97a3ce00fbe2b4a5a0386bcd35c8
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 8 08:00:08 2026 +0000

    feat(query/vectorized/measure): native typed-column dispatch in 
BuildMeasureBatchFromResult (G5b/G5c partial)
    
    Adds the typed-column dispatch path to the dual-emit converter so
    PullBatch can produce either passthrough or native typed columns
    depending on the BatchSchema declared per ColumnDef. Lays the
    foundation for US-005 (BuildBatchSchema → native types) without
    breaking the G5a vec adapter, which still uses passthrough.
    
    Changes:
    - BuildMeasureBatchFromResult now factors the per-column fill into
      buildTagColumn / buildFieldColumn helpers. Each helper dispatches
      on def.Type:
      * ColumnTypeTagValue / ColumnTypeFieldValue → existing passthrough
        behavior (cell pointer stored directly).
      * Native primitives (Int64 / Float64 / String / Bytes / Int64Array /
        StrArray) → appendTagValueAsTyped / appendFieldValueAsTyped extract
        the typed value from the *modelv1.TagValue / *modelv1.FieldValue
        wrapper and append to the typed column.
    - appendTagValueAsTyped / appendFieldValueAsTyped degrade gracefully on
      oneof / column-type mismatch (AppendNull, no panic) so heterogeneous
      multi-group projections behave like the existing copyAllTo
      null-substitution path.
    - New tests:
      * BuildMeasureBatchFromResult_NativeTypedDispatch — string tag +
        int64 field round-trip via native columns.
      * BuildMeasureBatchFromResult_NativeMissingTagNullFilled — missing
        tag fills the typed column with IsNull(i) == true.
      * BuildMeasureBatchFromResult_NativeOneofMismatchYieldsNull — degrades
        cleanly when the schema-declared type doesn't match the inner
        TagValue oneof.
    - Existing passthrough tests still pass; row-path Pull() unchanged.
    
    US-003 acceptance status — partial:
      ✓ PullBatch builds typed-column batches (when batchSchema declares
        native types).
      ✓ Legacy row-path copyAllTo unchanged.
      ✓ All banyand/measure tests pass.
      ✗ "block_cursor has a path that decodes raw stored bytes directly
        into typed columns, bypassing the *modelv1.TagValue intermediate"
        — NOT YET. The transient *modelv1.TagValue still exists between
        Pull's mustDecodeTagValue and the converter's
        appendTagValueAsTyped. True bypass requires a parallel
        copyAllToBatch (~250 LOC mirroring copyAllTo) plus a batch-aware
        queryResult.merge. Deferred to a follow-on session due to scope
        (multi-day storage refactor on the C1-protected hot path;
        deserves focused review).
    
    US-004 (BatchScan consumes PullBatch / extract.go delete / passthrough
    type removal), US-005 (BuildBatchSchema → native), US-006 (serialize.go
    passthrough fast paths removal), and US-007 (NewMIterator wires
    PullBatch) are tightly coupled — they must land atomically as a single
    follow-on PR. Detailed handoff in .omc/progress.txt.
    
    Verification:
    - go test ./pkg/query/vectorized/measure -count=1 -race    OK
    - go build ./...                                            clean
    - make -s lint                                              clean
---
 pkg/query/vectorized/measure/build_batch.go      | 212 ++++++++++++++++++++---
 pkg/query/vectorized/measure/build_batch_test.go | 103 +++++++++++
 2 files changed, 287 insertions(+), 28 deletions(-)

diff --git a/pkg/query/vectorized/measure/build_batch.go 
b/pkg/query/vectorized/measure/build_batch.go
index 94702e574..426538830 100644
--- a/pkg/query/vectorized/measure/build_batch.go
+++ b/pkg/query/vectorized/measure/build_batch.go
@@ -21,6 +21,7 @@ import (
        "fmt"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
@@ -85,37 +86,17 @@ func BuildMeasureBatchFromResult(r *model.MeasureResult, 
schema *vectorized.Batc
        for _, def := range schema.Columns {
                switch def.Role {
                case vectorized.RoleTag:
-                       col := vectorized.NewTagValueColumn(n)
-                       tag, present := 
resultTags[def.TagFamily+"\x00"+def.Name]
-                       if !present {
-                               for range n {
-                                       col.Append(pbv1.NullTagValue)
-                               }
-                       } else {
-                               if len(tag.Values) < n {
-                                       return nil, 
fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d",
-                                               def.TagFamily, def.Name, 
len(tag.Values), n)
-                               }
-                               for k := range n {
-                                       col.Append(tag.Values[k])
-                               }
+                       tag := resultTags[def.TagFamily+"\x00"+def.Name]
+                       col, fillErr := buildTagColumn(def, tag, n)
+                       if fillErr != nil {
+                               return nil, fillErr
                        }
                        tagCols = append(tagCols, col)
                case vectorized.RoleField:
-                       col := vectorized.NewFieldValueColumn(n)
-                       fld, present := resultFields[def.Name]
-                       if !present {
-                               for range n {
-                                       col.Append(pbv1.NullFieldValue)
-                               }
-                       } else {
-                               if len(fld.Values) < n {
-                                       return nil, 
fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d",
-                                               def.Name, len(fld.Values), n)
-                               }
-                               for k := range n {
-                                       col.Append(fld.Values[k])
-                               }
+                       fld := resultFields[def.Name]
+                       col, fillErr := buildFieldColumn(def, fld, n)
+                       if fillErr != nil {
+                               return nil, fillErr
                        }
                        fieldCols = append(fieldCols, col)
                case vectorized.RoleTimestamp, vectorized.RoleVersion,
@@ -135,3 +116,178 @@ func BuildMeasureBatchFromResult(r *model.MeasureResult, 
schema *vectorized.Batc
                Fields:     fieldCols,
        }, nil
 }
+
+// buildTagColumn allocates the typed Column corresponding to def and fills
+// it from tag (or null-fills it when tag is nil — the multi-group "schema
+// declares but result lacks" case). Dispatches on def.Type so the same
+// helper handles passthrough columns (G5a, ColumnTypeTagValue) and native
+// typed columns (G5c+, Int64 / String / Bytes / Int64Array / StrArray).
+//
+// Length-invariant violations surface as errors rather than truncation;
+// callers see a clean failure path during dual-emit.
+func buildTagColumn(def vectorized.ColumnDef, tag *model.Tag, n int) 
(vectorized.Column, error) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               col := vectorized.NewTagValueColumn(n)
+               if tag == nil {
+                       for range n {
+                               col.Append(pbv1.NullTagValue)
+                       }
+                       return col, nil
+               }
+               if len(tag.Values) < n {
+                       return nil, fmt.Errorf("BuildMeasureBatchFromResult: 
tag %s.%s has %d values, expected %d",
+                               def.TagFamily, def.Name, len(tag.Values), n)
+               }
+               for k := range n {
+                       col.Append(tag.Values[k])
+               }
+               return col, nil
+       }
+       col := vectorized.NewColumnForType(def.Type, n)
+       if tag == nil {
+               for range n {
+                       appendTagValueAsTyped(col, pbv1.NullTagValue)
+               }
+               return col, nil
+       }
+       if len(tag.Values) < n {
+               return nil, fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s 
has %d values, expected %d",
+                       def.TagFamily, def.Name, len(tag.Values), n)
+       }
+       for k := range n {
+               appendTagValueAsTyped(col, tag.Values[k])
+       }
+       return col, nil
+}
+
+// buildFieldColumn is the field-side counterpart of buildTagColumn.
+func buildFieldColumn(def vectorized.ColumnDef, fld *model.Field, n int) 
(vectorized.Column, error) {
+       if def.Type == vectorized.ColumnTypeFieldValue {
+               col := vectorized.NewFieldValueColumn(n)
+               if fld == nil {
+                       for range n {
+                               col.Append(pbv1.NullFieldValue)
+                       }
+                       return col, nil
+               }
+               if len(fld.Values) < n {
+                       return nil, fmt.Errorf("BuildMeasureBatchFromResult: 
field %s has %d values, expected %d",
+                               def.Name, len(fld.Values), n)
+               }
+               for k := range n {
+                       col.Append(fld.Values[k])
+               }
+               return col, nil
+       }
+       col := vectorized.NewColumnForType(def.Type, n)
+       if fld == nil {
+               for range n {
+                       appendFieldValueAsTyped(col, pbv1.NullFieldValue)
+               }
+               return col, nil
+       }
+       if len(fld.Values) < n {
+               return nil, fmt.Errorf("BuildMeasureBatchFromResult: field %s 
has %d values, expected %d",
+                       def.Name, len(fld.Values), n)
+       }
+       for k := range n {
+               appendFieldValueAsTyped(col, fld.Values[k])
+       }
+       return col, nil
+}
+
+// appendTagValueAsTyped projects a pre-decoded *modelv1.TagValue onto a
+// native typed column. On a oneof / column-type mismatch the column
+// receives an AppendNull rather than panicking, matching the existing
+// copyAllTo behavior of substituting NullTagValue for unsatisfied
+// projections in heterogeneous multi-group results.
+func appendTagValueAsTyped(col vectorized.Column, v *modelv1.TagValue) {
+       if v == nil {
+               col.AppendNull()
+               return
+       }
+       switch x := v.Value.(type) {
+       case *modelv1.TagValue_Null:
+               col.AppendNull()
+       case *modelv1.TagValue_Int:
+               if c, ok := col.(*vectorized.TypedColumn[int64]); ok {
+                       c.Append(x.Int.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_Str:
+               if c, ok := col.(*vectorized.TypedColumn[string]); ok {
+                       c.Append(x.Str.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_BinaryData:
+               if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok {
+                       buf := make([]byte, len(x.BinaryData))
+                       copy(buf, x.BinaryData)
+                       c.Append(buf)
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_IntArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok {
+                       out := make([]int64, len(x.IntArray.GetValue()))
+                       copy(out, x.IntArray.GetValue())
+                       c.Append(out)
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.TagValue_StrArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]string]); ok {
+                       out := make([]string, len(x.StrArray.GetValue()))
+                       copy(out, x.StrArray.GetValue())
+                       c.Append(out)
+                       return
+               }
+               col.AppendNull()
+       default:
+               col.AppendNull()
+       }
+}
+
+// appendFieldValueAsTyped is the field-side counterpart of
+// appendTagValueAsTyped. FieldValue has fewer variants (no Int64Array /
+// StrArray) — the rest mirror.
+func appendFieldValueAsTyped(col vectorized.Column, v *modelv1.FieldValue) {
+       if v == nil {
+               col.AppendNull()
+               return
+       }
+       switch x := v.Value.(type) {
+       case *modelv1.FieldValue_Null:
+               col.AppendNull()
+       case *modelv1.FieldValue_Int:
+               if c, ok := col.(*vectorized.TypedColumn[int64]); ok {
+                       c.Append(x.Int.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.FieldValue_Float:
+               if c, ok := col.(*vectorized.TypedColumn[float64]); ok {
+                       c.Append(x.Float.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.FieldValue_Str:
+               if c, ok := col.(*vectorized.TypedColumn[string]); ok {
+                       c.Append(x.Str.GetValue())
+                       return
+               }
+               col.AppendNull()
+       case *modelv1.FieldValue_BinaryData:
+               if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok {
+                       buf := make([]byte, len(x.BinaryData))
+                       copy(buf, x.BinaryData)
+                       c.Append(buf)
+                       return
+               }
+               col.AppendNull()
+       default:
+               col.AppendNull()
+       }
+}
diff --git a/pkg/query/vectorized/measure/build_batch_test.go 
b/pkg/query/vectorized/measure/build_batch_test.go
index 0dcbf63cb..359d52638 100644
--- a/pkg/query/vectorized/measure/build_batch_test.go
+++ b/pkg/query/vectorized/measure/build_batch_test.go
@@ -224,3 +224,106 @@ func 
TestBuildMeasureBatchFromResult_FieldValueLengthMismatchErrors(t *testing.T
                t.Fatal("length-invariant violation must return an error")
        }
 }
+
+// schemaNativeTypedColumns mirrors schemaSingleTagSingleField but declares
+// native typed columns instead of the G5a passthrough types. Used to
+// exercise BuildMeasureBatchFromResult's native dispatch path.
+func schemaNativeTypedColumns() *vectorized.BatchSchema {
+       return vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleVersion, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleSeriesID, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+}
+
+func TestBuildMeasureBatchFromResult_NativeTypedDispatch(t *testing.T) {
+       svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: "alpha"}}}
+       valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 42}}}
+       r := &model.MeasureResult{
+               SID:        7,
+               Timestamps: []int64{100, 200},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: 
[]*modelv1.TagValue{svcVal, svcVal}},
+                       }},
+               },
+               Fields: []model.Field{
+                       {Name: "value", Values: []*modelv1.FieldValue{valueFld, 
valueFld}},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult (native): %v", err)
+       }
+       tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[string])
+       if !ok {
+               t.Fatalf("Tags[0]: want TypedColumn[string], got %T", 
batch.Tags[0])
+       }
+       for i, v := range tagCol.Data() {
+               if v != "alpha" {
+                       t.Fatalf("Tags[0][%d]: want \"alpha\", got %q", i, v)
+               }
+       }
+       fldCol, ok := batch.Fields[0].(*vectorized.TypedColumn[int64])
+       if !ok {
+               t.Fatalf("Fields[0]: want TypedColumn[int64], got %T", 
batch.Fields[0])
+       }
+       for i, v := range fldCol.Data() {
+               if v != 42 {
+                       t.Fatalf("Fields[0][%d]: want 42, got %d", i, v)
+               }
+       }
+}
+
+func TestBuildMeasureBatchFromResult_NativeMissingTagNullFilled(t *testing.T) {
+       // Native column with missing tag must yield col.IsNull(i) == true for
+       // every row, matching the multi-group projection contract.
+       valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: 
&modelv1.Int{Value: 1}}}
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{10, 20},
+               Versions:   []int64{1, 1},
+               ShardIDs:   []common.ShardID{0, 0},
+               Fields: []model.Field{
+                       {Name: "value", Values: []*modelv1.FieldValue{valueFld, 
valueFld}},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult: %v", err)
+       }
+       for i := range batch.Timestamps {
+               if !batch.Tags[0].IsNull(i) {
+                       t.Fatalf("Tags[0].IsNull(%d): want true (missing tag 
null fill)", i)
+               }
+       }
+}
+
+func TestBuildMeasureBatchFromResult_NativeOneofMismatchYieldsNull(t 
*testing.T) {
+       // The schema declares svc as ColumnTypeString, but the result delivers
+       // an Int TagValue. The converter must emit AppendNull rather than
+       // panicking — degrades gracefully on heterogeneous projections.
+       wrongTag := &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: 7}}}
+       r := &model.MeasureResult{
+               SID:        1,
+               Timestamps: []int64{1},
+               Versions:   []int64{1},
+               ShardIDs:   []common.ShardID{0},
+               TagFamilies: []model.TagFamily{
+                       {Name: "default", Tags: []model.Tag{
+                               {Name: "svc", Values: 
[]*modelv1.TagValue{wrongTag}},
+                       }},
+               },
+       }
+       batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns())
+       if err != nil {
+               t.Fatalf("BuildMeasureBatchFromResult: %v", err)
+       }
+       if !batch.Tags[0].IsNull(0) {
+               t.Fatal("oneof mismatch must yield IsNull(0)=true")
+       }
+}

Reply via email to