This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ba6e8278766a97a3ce00fbe2b4a5a0386bcd35c8 Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 8 08:00:08 2026 +0000 feat(query/vectorized/measure): native typed-column dispatch in BuildMeasureBatchFromResult (G5b/G5c partial) Adds the typed-column dispatch path to the dual-emit converter so PullBatch can produce either passthrough or native typed columns depending on the BatchSchema declared per ColumnDef. Lays the foundation for US-005 (BuildBatchSchema → native types) without breaking the G5a vec adapter, which still uses passthrough. Changes: - BuildMeasureBatchFromResult now factors the per-column fill into buildTagColumn / buildFieldColumn helpers. Each helper dispatches on def.Type: * ColumnTypeTagValue / ColumnTypeFieldValue → existing passthrough behavior (cell pointer stored directly). * Native primitives (Int64 / Float64 / String / Bytes / Int64Array / StrArray) → appendTagValueAsTyped / appendFieldValueAsTyped extract the typed value from the *modelv1.TagValue / *modelv1.FieldValue wrapper and append to the typed column. - appendTagValueAsTyped / appendFieldValueAsTyped degrade gracefully on oneof / column-type mismatch (AppendNull, no panic) so heterogeneous multi-group projections behave like the existing copyAllTo null-substitution path. - New tests: * BuildMeasureBatchFromResult_NativeTypedDispatch — string tag + int64 field round-trip via native columns. * BuildMeasureBatchFromResult_NativeMissingTagNullFilled — missing tag fills the typed column with IsNull(i) == true. * BuildMeasureBatchFromResult_NativeOneofMismatchYieldsNull — degrades cleanly when the schema-declared type doesn't match the inner TagValue oneof. - Existing passthrough tests still pass; row-path Pull() unchanged. US-003 acceptance status — partial: ✓ PullBatch builds typed-column batches (when batchSchema declares native types). ✓ Legacy row-path copyAllTo unchanged. ✓ All banyand/measure tests pass. ✗ "block_cursor has a path that decodes raw stored bytes directly into typed columns, bypassing the *modelv1.TagValue intermediate" — NOT YET. The transient *modelv1.TagValue still exists between Pull's mustDecodeTagValue and the converter's appendTagValueAsTyped. True bypass requires a parallel copyAllToBatch (~250 LOC mirroring copyAllTo) plus a batch-aware queryResult.merge. Deferred to a follow-on session due to scope (multi-day storage refactor on the C1-protected hot path; deserves focused review). US-004 (BatchScan consumes PullBatch / extract.go delete / passthrough type removal), US-005 (BuildBatchSchema → native), US-006 (serialize.go passthrough fast paths removal), and US-007 (NewMIterator wires PullBatch) are tightly coupled — they must land atomically as a single follow-on PR. Detailed handoff in .omc/progress.txt. Verification: - go test ./pkg/query/vectorized/measure -count=1 -race OK - go build ./... clean - make -s lint clean --- pkg/query/vectorized/measure/build_batch.go | 212 ++++++++++++++++++++--- pkg/query/vectorized/measure/build_batch_test.go | 103 +++++++++++ 2 files changed, 287 insertions(+), 28 deletions(-) diff --git a/pkg/query/vectorized/measure/build_batch.go b/pkg/query/vectorized/measure/build_batch.go index 94702e574..426538830 100644 --- a/pkg/query/vectorized/measure/build_batch.go +++ b/pkg/query/vectorized/measure/build_batch.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" @@ -85,37 +86,17 @@ func BuildMeasureBatchFromResult(r *model.MeasureResult, schema *vectorized.Batc for _, def := range schema.Columns { switch def.Role { case vectorized.RoleTag: - col := vectorized.NewTagValueColumn(n) - tag, present := resultTags[def.TagFamily+"\x00"+def.Name] - if !present { - for range n { - col.Append(pbv1.NullTagValue) - } - } else { - if len(tag.Values) < n { - return nil, fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d", - def.TagFamily, def.Name, len(tag.Values), n) - } - for k := range n { - col.Append(tag.Values[k]) - } + tag := resultTags[def.TagFamily+"\x00"+def.Name] + col, fillErr := buildTagColumn(def, tag, n) + if fillErr != nil { + return nil, fillErr } tagCols = append(tagCols, col) case vectorized.RoleField: - col := vectorized.NewFieldValueColumn(n) - fld, present := resultFields[def.Name] - if !present { - for range n { - col.Append(pbv1.NullFieldValue) - } - } else { - if len(fld.Values) < n { - return nil, fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d", - def.Name, len(fld.Values), n) - } - for k := range n { - col.Append(fld.Values[k]) - } + fld := resultFields[def.Name] + col, fillErr := buildFieldColumn(def, fld, n) + if fillErr != nil { + return nil, fillErr } fieldCols = append(fieldCols, col) case vectorized.RoleTimestamp, vectorized.RoleVersion, @@ -135,3 +116,178 @@ func BuildMeasureBatchFromResult(r *model.MeasureResult, schema *vectorized.Batc Fields: fieldCols, }, nil } + +// buildTagColumn allocates the typed Column corresponding to def and fills +// it from tag (or null-fills it when tag is nil — the multi-group "schema +// declares but result lacks" case). Dispatches on def.Type so the same +// helper handles passthrough columns (G5a, ColumnTypeTagValue) and native +// typed columns (G5c+, Int64 / String / Bytes / Int64Array / StrArray). +// +// Length-invariant violations surface as errors rather than truncation; +// callers see a clean failure path during dual-emit. +func buildTagColumn(def vectorized.ColumnDef, tag *model.Tag, n int) (vectorized.Column, error) { + if def.Type == vectorized.ColumnTypeTagValue { + col := vectorized.NewTagValueColumn(n) + if tag == nil { + for range n { + col.Append(pbv1.NullTagValue) + } + return col, nil + } + if len(tag.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d", + def.TagFamily, def.Name, len(tag.Values), n) + } + for k := range n { + col.Append(tag.Values[k]) + } + return col, nil + } + col := vectorized.NewColumnForType(def.Type, n) + if tag == nil { + for range n { + appendTagValueAsTyped(col, pbv1.NullTagValue) + } + return col, nil + } + if len(tag.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d", + def.TagFamily, def.Name, len(tag.Values), n) + } + for k := range n { + appendTagValueAsTyped(col, tag.Values[k]) + } + return col, nil +} + +// buildFieldColumn is the field-side counterpart of buildTagColumn. +func buildFieldColumn(def vectorized.ColumnDef, fld *model.Field, n int) (vectorized.Column, error) { + if def.Type == vectorized.ColumnTypeFieldValue { + col := vectorized.NewFieldValueColumn(n) + if fld == nil { + for range n { + col.Append(pbv1.NullFieldValue) + } + return col, nil + } + if len(fld.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d", + def.Name, len(fld.Values), n) + } + for k := range n { + col.Append(fld.Values[k]) + } + return col, nil + } + col := vectorized.NewColumnForType(def.Type, n) + if fld == nil { + for range n { + appendFieldValueAsTyped(col, pbv1.NullFieldValue) + } + return col, nil + } + if len(fld.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d", + def.Name, len(fld.Values), n) + } + for k := range n { + appendFieldValueAsTyped(col, fld.Values[k]) + } + return col, nil +} + +// appendTagValueAsTyped projects a pre-decoded *modelv1.TagValue onto a +// native typed column. On a oneof / column-type mismatch the column +// receives an AppendNull rather than panicking, matching the existing +// copyAllTo behavior of substituting NullTagValue for unsatisfied +// projections in heterogeneous multi-group results. +func appendTagValueAsTyped(col vectorized.Column, v *modelv1.TagValue) { + if v == nil { + col.AppendNull() + return + } + switch x := v.Value.(type) { + case *modelv1.TagValue_Null: + col.AppendNull() + case *modelv1.TagValue_Int: + if c, ok := col.(*vectorized.TypedColumn[int64]); ok { + c.Append(x.Int.GetValue()) + return + } + col.AppendNull() + case *modelv1.TagValue_Str: + if c, ok := col.(*vectorized.TypedColumn[string]); ok { + c.Append(x.Str.GetValue()) + return + } + col.AppendNull() + case *modelv1.TagValue_BinaryData: + if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok { + buf := make([]byte, len(x.BinaryData)) + copy(buf, x.BinaryData) + c.Append(buf) + return + } + col.AppendNull() + case *modelv1.TagValue_IntArray: + if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok { + out := make([]int64, len(x.IntArray.GetValue())) + copy(out, x.IntArray.GetValue()) + c.Append(out) + return + } + col.AppendNull() + case *modelv1.TagValue_StrArray: + if c, ok := col.(*vectorized.TypedColumn[[]string]); ok { + out := make([]string, len(x.StrArray.GetValue())) + copy(out, x.StrArray.GetValue()) + c.Append(out) + return + } + col.AppendNull() + default: + col.AppendNull() + } +} + +// appendFieldValueAsTyped is the field-side counterpart of +// appendTagValueAsTyped. FieldValue has fewer variants (no Int64Array / +// StrArray) — the rest mirror. +func appendFieldValueAsTyped(col vectorized.Column, v *modelv1.FieldValue) { + if v == nil { + col.AppendNull() + return + } + switch x := v.Value.(type) { + case *modelv1.FieldValue_Null: + col.AppendNull() + case *modelv1.FieldValue_Int: + if c, ok := col.(*vectorized.TypedColumn[int64]); ok { + c.Append(x.Int.GetValue()) + return + } + col.AppendNull() + case *modelv1.FieldValue_Float: + if c, ok := col.(*vectorized.TypedColumn[float64]); ok { + c.Append(x.Float.GetValue()) + return + } + col.AppendNull() + case *modelv1.FieldValue_Str: + if c, ok := col.(*vectorized.TypedColumn[string]); ok { + c.Append(x.Str.GetValue()) + return + } + col.AppendNull() + case *modelv1.FieldValue_BinaryData: + if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok { + buf := make([]byte, len(x.BinaryData)) + copy(buf, x.BinaryData) + c.Append(buf) + return + } + col.AppendNull() + default: + col.AppendNull() + } +} diff --git a/pkg/query/vectorized/measure/build_batch_test.go b/pkg/query/vectorized/measure/build_batch_test.go index 0dcbf63cb..359d52638 100644 --- a/pkg/query/vectorized/measure/build_batch_test.go +++ b/pkg/query/vectorized/measure/build_batch_test.go @@ -224,3 +224,106 @@ func TestBuildMeasureBatchFromResult_FieldValueLengthMismatchErrors(t *testing.T t.Fatal("length-invariant violation must return an error") } } + +// schemaNativeTypedColumns mirrors schemaSingleTagSingleField but declares +// native typed columns instead of the G5a passthrough types. Used to +// exercise BuildMeasureBatchFromResult's native dispatch path. +func schemaNativeTypedColumns() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) +} + +func TestBuildMeasureBatchFromResult_NativeTypedDispatch(t *testing.T) { + svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} + valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} + r := &model.MeasureResult{ + SID: 7, + Timestamps: []int64{100, 200}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{svcVal, svcVal}}, + }}, + }, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{valueFld, valueFld}}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult (native): %v", err) + } + tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[string]) + if !ok { + t.Fatalf("Tags[0]: want TypedColumn[string], got %T", batch.Tags[0]) + } + for i, v := range tagCol.Data() { + if v != "alpha" { + t.Fatalf("Tags[0][%d]: want \"alpha\", got %q", i, v) + } + } + fldCol, ok := batch.Fields[0].(*vectorized.TypedColumn[int64]) + if !ok { + t.Fatalf("Fields[0]: want TypedColumn[int64], got %T", batch.Fields[0]) + } + for i, v := range fldCol.Data() { + if v != 42 { + t.Fatalf("Fields[0][%d]: want 42, got %d", i, v) + } + } +} + +func TestBuildMeasureBatchFromResult_NativeMissingTagNullFilled(t *testing.T) { + // Native column with missing tag must yield col.IsNull(i) == true for + // every row, matching the multi-group projection contract. + valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 1}}} + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{10, 20}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{valueFld, valueFld}}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult: %v", err) + } + for i := range batch.Timestamps { + if !batch.Tags[0].IsNull(i) { + t.Fatalf("Tags[0].IsNull(%d): want true (missing tag null fill)", i) + } + } +} + +func TestBuildMeasureBatchFromResult_NativeOneofMismatchYieldsNull(t *testing.T) { + // The schema declares svc as ColumnTypeString, but the result delivers + // an Int TagValue. The converter must emit AppendNull rather than + // panicking — degrades gracefully on heterogeneous projections. + wrongTag := &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 7}}} + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{1}, + Versions: []int64{1}, + ShardIDs: []common.ShardID{0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{wrongTag}}, + }}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaNativeTypedColumns()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult: %v", err) + } + if !batch.Tags[0].IsNull(0) { + t.Fatal("oneof mismatch must yield IsNull(0)=true") + } +}
