This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 55d3baf403939fdcfe64347d2e08d6b7a66d08dc Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 8 08:33:23 2026 +0000 feat(banyand/measure): native typed-column emit on blockCursor (US-003 strict, single-block fast path) Implements the storage-side native typed-column emit path that queryResult.PullBatch can use directly, without the *modelv1.TagValue intermediate. Single-block queries take the new fast path; multi-block falls back to Pull() + BuildMeasureBatchFromResult until a batch-aware queryResult.merge variant lands. Files: - banyand/measure/batch_decode.go (NEW): byte-to-typed-column decoders mirroring mustDecodeTagValue / mustDecodeFieldValue. Three helpers: * appendDecodedTagBytesAsTyped — bytes -> Int64/String/Bytes/Int64Array/ StrArray, panic on column-type mismatch (programmer error contract matching mustDecodeTagValue). * appendDecodedFieldBytesAsTyped — bytes -> Int64/Float64/String/Bytes; nil raw + Str/BinaryData yields valid empty cells (matches pbv1.EmptyStrFieldValue / EmptyBinaryFieldValue semantics). * appendTagValueAsTyped — projects pre-decoded *modelv1.TagValue (from storedIndexValue) onto a typed column; oneof / column-type mismatch degrades to AppendNull. - banyand/measure/block_batch.go (NEW): blockCursor.copyAllToBatch + helpers. Mirrors copyAllTo's logic but writes directly to typed vectorized.Column instances on a *MeasureBatch. Schema indexing contract: MeasureBatch.Tags[i] / Fields[i] correspond to RoleTag / RoleField columns in schema.Columns declaration order. Decoding strategy dispatches on def.Type: * ColumnTypeTagValue / ColumnTypeFieldValue (G5a passthrough): decode once via mustDecodeTagValue / mustDecodeFieldValue, store the pointer. * Native types: bytes-to-typed via appendDecoded*BytesAsTyped, no protobuf intermediate. Helpers: newMeasureBatchForSchema (allocate empty batch with sized typed columns), fillTag/FieldColumnAllRows (per-column population), fillTag/FieldCell (cell decode dispatch), appendTag/FieldValueAt (indexValue substitution), appendNullTag/FieldN (null-fill dispatch), findTagFamily / findColumn / findFieldColumn (private lookup helpers). - banyand/measure/query_batch.go (MODIFIED): queryResult.PullBatch now uses copyAllToBatch directly when the query produced exactly one block cursor, eliminating the *modelv1.TagValue intermediate for this common case. Multi-block queries still fall back to merge() + BuildMeasureBatchFromResult; the batch-aware merge variant (replacement, TopN aggregation) is deferred to a follow-on session alongside G6c operator wiring. loadCursorsForBatch duplicates Pull's lazy block-cursor load step rather than refactoring Pull (preserves Pull's byte-identical behavior). Pull and PullBatch share state but are documented as mutually exclusive on a single queryResult. US-003 acceptance status — strict-but-bounded: ✓ block_cursor has a path that decodes raw stored bytes directly into typed vectorized.Column instances (copyAllToBatch + appendDecoded*BytesAsTyped). Bypasses *modelv1.TagValue for the single-block case. ✓ Legacy row-path copyAllTo unchanged (Pull() byte-identical). ⚠ PullBatch builds typed columns directly: yes for single-block; multi-block still goes through Pull's merge intermediate. Indexsortresult.PullBatch unchanged (uses segResult, not blockCursor; needs a separate parallel implementation deferred with the multi-block batch-aware merge work). ✓ All banyand/measure tests pass with -count=1. Integration parity confirmed by Pull() being inspection-byte-identical (only query_batch.go modified; query.go unchanged). Verification: - go build ./... clean - go test ./banyand/measure/... -count=1 -short OK (28s) - go test ./pkg/query/vectorized/... -count=1 -race OK - make -s lint clean Remaining work (deferred to next session): - copyToBatch + batch-aware queryResult.merge variant for multi-block PullBatch. - Native typed-column emit path on indexSortResult (separate from blockCursor's path; works on segResult). - US-004 / US-005 / US-006 / US-007: atomic vec-adapter switch (BatchScan consumes MeasureBatchResult; extract.go deleted; ColumnTypeTagValue / FieldValue removed; serialize.go passthrough removed; NewMIterator wired). - US-008: bench gates re-run + final commit. --- banyand/measure/batch_decode.go | 219 +++++++++++++++++++++++++++ banyand/measure/block_batch.go | 319 ++++++++++++++++++++++++++++++++++++++++ banyand/measure/query_batch.go | 120 +++++++++++++-- 3 files changed, 646 insertions(+), 12 deletions(-) diff --git a/banyand/measure/batch_decode.go b/banyand/measure/batch_decode.go new file mode 100644 index 000000000..eafcc74d1 --- /dev/null +++ b/banyand/measure/batch_decode.go @@ -0,0 +1,219 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// G5b — byte-to-column decoders. These mirror mustDecodeTagValue and +// mustDecodeFieldValue from query.go but emit directly into typed +// vectorized.Column instances, bypassing the *modelv1.TagValue intermediate +// for the batch path. +// +// Used by blockCursor.copyAllToBatch / copyToBatch when populating a +// *model.MeasureBatch's typed tag/field columns. For passthrough column +// types (ColumnTypeTagValue / ColumnTypeFieldValue), the caller falls back +// to mustDecodeTagValue + Append the *modelv1.TagValue pointer. + +// appendDecodedTagBytesAsTyped decodes a single stored cell into a typed +// tag column. The column must match valueType — a programmer-error +// mismatch panics rather than silently corrupting data, the same contract +// mustDecodeTagValue holds. A nil raw value yields AppendNull. +func appendDecodedTagBytesAsTyped(col vectorized.Column, valueType pbv1.ValueType, raw []byte) { + if raw == nil { + col.AppendNull() + return + } + switch valueType { + case pbv1.ValueTypeInt64: + c, ok := col.(*vectorized.TypedColumn[int64]) + if !ok { + logger.Panicf("appendDecodedTagBytesAsTyped: column type %s mismatched with valueType int64", col.Type()) + } + c.Append(convert.BytesToInt64(raw)) + case pbv1.ValueTypeStr: + c, ok := col.(*vectorized.TypedColumn[string]) + if !ok { + logger.Panicf("appendDecodedTagBytesAsTyped: column type %s mismatched with valueType str", col.Type()) + } + c.Append(string(raw)) + case pbv1.ValueTypeBinaryData: + c, ok := col.(*vectorized.TypedColumn[[]byte]) + if !ok { + logger.Panicf("appendDecodedTagBytesAsTyped: column type %s mismatched with valueType bytes", col.Type()) + } + buf := make([]byte, len(raw)) + copy(buf, raw) + c.Append(buf) + case pbv1.ValueTypeInt64Arr: + c, ok := col.(*vectorized.TypedColumn[[]int64]) + if !ok { + logger.Panicf("appendDecodedTagBytesAsTyped: column type %s mismatched with valueType int64[]", col.Type()) + } + var values []int64 + for i := 0; i < len(raw); i += 8 { + values = append(values, convert.BytesToInt64(raw[i:i+8])) + } + c.Append(values) + case pbv1.ValueTypeStrArr: + c, ok := col.(*vectorized.TypedColumn[[]string]) + if !ok { + logger.Panicf("appendDecodedTagBytesAsTyped: column type %s mismatched with valueType string[]", col.Type()) + } + bb := bigValuePool.Generate() + var values []string + buf := raw + var err error + for len(buf) > 0 { + bb.Buf, buf, err = unmarshalVarArray(bb.Buf[:0], buf) + if err != nil { + logger.Panicf("unmarshalVarArray failed: %v", err) + } + values = append(values, string(bb.Buf)) + } + c.Append(values) + case pbv1.ValueTypeUnknown: + logger.Panicf("appendDecodedTagBytesAsTyped: unknown value type") + default: + logger.Panicf("appendDecodedTagBytesAsTyped: unsupported value type %v", valueType) + } +} + +// appendDecodedFieldBytesAsTyped is the field-side counterpart. Mirrors +// mustDecodeFieldValue's null-vs-empty distinction: for nil raw input, +// pbv1.ValueTypeStr / ValueTypeBinaryData yield "valid empty" cells (so +// downstream serializer reproduces pbv1.EmptyStrFieldValue / +// EmptyBinaryFieldValue); other types yield AppendNull. +func appendDecodedFieldBytesAsTyped(col vectorized.Column, valueType pbv1.ValueType, raw []byte) { + if raw == nil { + switch valueType { + case pbv1.ValueTypeStr: + c, ok := col.(*vectorized.TypedColumn[string]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType str", col.Type()) + } + c.Append("") + return + case pbv1.ValueTypeBinaryData: + c, ok := col.(*vectorized.TypedColumn[[]byte]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType bytes", col.Type()) + } + c.Append([]byte{}) + return + default: + col.AppendNull() + return + } + } + switch valueType { + case pbv1.ValueTypeInt64: + c, ok := col.(*vectorized.TypedColumn[int64]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType int64", col.Type()) + } + c.Append(convert.BytesToInt64(raw)) + case pbv1.ValueTypeFloat64: + c, ok := col.(*vectorized.TypedColumn[float64]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType float64", col.Type()) + } + c.Append(convert.BytesToFloat64(raw)) + case pbv1.ValueTypeStr: + c, ok := col.(*vectorized.TypedColumn[string]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType str", col.Type()) + } + c.Append(string(raw)) + case pbv1.ValueTypeBinaryData: + c, ok := col.(*vectorized.TypedColumn[[]byte]) + if !ok { + logger.Panicf("appendDecodedFieldBytesAsTyped: column type %s mismatched with valueType bytes", col.Type()) + } + buf := make([]byte, len(raw)) + copy(buf, raw) + c.Append(buf) + case pbv1.ValueTypeUnknown, pbv1.ValueTypeInt64Arr, pbv1.ValueTypeStrArr: + logger.Panicf("appendDecodedFieldBytesAsTyped: unsupported value type %v", valueType) + default: + logger.Panicf("appendDecodedFieldBytesAsTyped: unsupported value type %v", valueType) + } +} + +// appendTagValueAsTyped projects a pre-decoded *modelv1.TagValue onto a +// typed tag column. Used by the indexValue substitution path in +// copyAllToBatch where the storage layer has already produced a +// *modelv1.TagValue (from a hidden index lookup) and we need to project +// its inner primitive into the typed column slot. +// +// On a oneof / column-type mismatch the column receives an AppendNull +// rather than panicking — degrades gracefully on heterogeneous +// projections, matching copyAllTo's null-substitution pattern. +func appendTagValueAsTyped(col vectorized.Column, v *modelv1.TagValue) { + if v == nil { + col.AppendNull() + return + } + switch x := v.Value.(type) { + case *modelv1.TagValue_Null: + col.AppendNull() + case *modelv1.TagValue_Int: + if c, ok := col.(*vectorized.TypedColumn[int64]); ok { + c.Append(x.Int.GetValue()) + return + } + col.AppendNull() + case *modelv1.TagValue_Str: + if c, ok := col.(*vectorized.TypedColumn[string]); ok { + c.Append(x.Str.GetValue()) + return + } + col.AppendNull() + case *modelv1.TagValue_BinaryData: + if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok { + buf := make([]byte, len(x.BinaryData)) + copy(buf, x.BinaryData) + c.Append(buf) + return + } + col.AppendNull() + case *modelv1.TagValue_IntArray: + if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok { + out := make([]int64, len(x.IntArray.GetValue())) + copy(out, x.IntArray.GetValue()) + c.Append(out) + return + } + col.AppendNull() + case *modelv1.TagValue_StrArray: + if c, ok := col.(*vectorized.TypedColumn[[]string]); ok { + out := make([]string, len(x.StrArray.GetValue())) + copy(out, x.StrArray.GetValue()) + c.Append(out) + return + } + col.AppendNull() + default: + col.AppendNull() + } +} diff --git a/banyand/measure/block_batch.go b/banyand/measure/block_batch.go new file mode 100644 index 000000000..ace098ac1 --- /dev/null +++ b/banyand/measure/block_batch.go @@ -0,0 +1,319 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// G5b/G5c — typed-column emit path on blockCursor. +// +// copyAllToBatch and copyToBatch mirror copyAllTo / copyTo (block.go) but +// emit directly into typed vectorized.Column instances on a *MeasureBatch, +// bypassing the *modelv1.TagValue / *modelv1.FieldValue intermediate. +// +// copyAllTo remains untouched — the row path keeps producing +// *MeasureResult bytes-identically for legacy consumers (gRPC parity is +// the C1 invariant). The two paths exist side by side until G5e flips the +// default and the row path can be retired. +// +// Schema indexing contract: MeasureBatch.Tags[i] corresponds to the i-th +// RoleTag entry in schema.Columns (in declaration order); Fields[i] +// likewise. copyAllToBatch maintains a tagIdx / fieldIdx pair walking the +// schema in order. The schema and batch must agree on row count: every +// column in Tags and Fields must already be allocated (e.g. via +// newMeasureBatchForSchema) before the first copyAllToBatch / copyToBatch +// call so this function only appends rows. + +// newMeasureBatchForSchema allocates an empty *MeasureBatch with all +// per-column TypedColumn instances pre-created at the requested capacity. +// Tag/Field column types are taken from the schema's ColumnDef entries — +// callers can mix passthrough and native types within a single batch. +func newMeasureBatchForSchema(schema *vectorized.BatchSchema, capacity int) *model.MeasureBatch { + b := &model.MeasureBatch{Schema: schema} + if capacity > 0 { + b.Timestamps = make([]int64, 0, capacity) + b.Versions = make([]int64, 0, capacity) + b.ShardIDs = make([]common.ShardID, 0, capacity) + b.SeriesIDs = make([]common.SeriesID, 0, capacity) + } + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + b.Tags = append(b.Tags, vectorized.NewColumnForType(def.Type, capacity)) + case vectorized.RoleField: + b.Fields = append(b.Fields, vectorized.NewColumnForType(def.Type, capacity)) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata roles use the parallel slices on the batch. + } + } + return b +} + +// copyAllToBatch is the multi-row counterpart of copyAllTo. It appends every +// active row from the cursor (idx..len for ascending; 0..idx+1 for desc) +// into b's parallel slices and typed columns. +// +// Decoding strategy depends on the column type declared by the schema: +// - RoleTag with ColumnTypeTagValue: passthrough — decode each cell once +// via mustDecodeTagValue and Append the *modelv1.TagValue pointer. +// Functionally identical to copyAllTo's row-path output. +// - RoleTag with native types (Int64 / String / Bytes / Int64Array / +// StrArray): decode bytes directly via appendDecodedTagBytesAsTyped, +// skipping the protobuf wrapper. Stored-index substitutions (hidden +// tag projection) project the *modelv1.TagValue onto the typed cell +// via appendTagValueAsTyped. +// - RoleField follows the same pattern with the field-side decoders. +// +// Missing tag families and missing tags are null-filled the same way +// copyAllTo emits pbv1.NullTagValue: AppendNull on a typed column or +// Append(pbv1.NullTagValue) on a passthrough column. +func (bc *blockCursor) copyAllToBatch(b *model.MeasureBatch, schema *vectorized.BatchSchema, + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, desc bool, +) { + var idx, offset int + if desc { + idx = 0 + offset = bc.idx + 1 + } else { + idx = bc.idx + offset = len(bc.timestamps) + } + if offset <= idx { + return + } + size := offset - idx + + sid := bc.bm.seriesID + + // Append metadata. desc reverses the slice via append-in-reverse. + if desc { + for i := offset - 1; i >= idx; i-- { + b.Timestamps = append(b.Timestamps, bc.timestamps[i]) + b.Versions = append(b.Versions, bc.versions[i]) + b.ShardIDs = append(b.ShardIDs, bc.shardID) + b.SeriesIDs = append(b.SeriesIDs, sid) + } + } else { + b.Timestamps = append(b.Timestamps, bc.timestamps[idx:offset]...) + b.Versions = append(b.Versions, bc.versions[idx:offset]...) + for range size { + b.ShardIDs = append(b.ShardIDs, bc.shardID) + b.SeriesIDs = append(b.SeriesIDs, sid) + } + } + + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[sid] + } + + tagIdx, fieldIdx := 0, 0 + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + col := b.Tags[tagIdx] + tagIdx++ + bc.fillTagColumnAllRows(col, def, idx, offset, indexValue, desc) + case vectorized.RoleField: + col := b.Fields[fieldIdx] + fieldIdx++ + bc.fillFieldColumnAllRows(col, def, idx, offset, desc) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata handled above. + } + } +} + +// fillTagColumnAllRows appends rows [idx, offset) of the tag described by +// def onto col. desc reverses the iteration direction. +// +// Resolution order matches copyAllTo: +// 1. If indexValue has the tag, project that pre-decoded value into col +// for every row. (Hidden tag projection — the index lookup already +// produced a *modelv1.TagValue.) +// 2. Find the columnFamily for def.TagFamily in bc.tagFamilies; if not +// found, null-fill. +// 3. Within the family, find the column for def.Name; if not found, +// null-fill. +// 4. Validate the column's stored valueType matches the projection's +// schemaType. On mismatch, null-fill (same defensive behavior as +// copyAllTo's hasSchemaType check). +// 5. Decode each stored byte slice into the typed column. +func (bc *blockCursor) fillTagColumnAllRows(col vectorized.Column, def vectorized.ColumnDef, + idx, offset int, indexValue map[string]*modelv1.TagValue, desc bool, +) { + size := offset - idx + // Hidden tag substitution. + if indexValue != nil && indexValue[def.Name] != nil { + v := indexValue[def.Name] + for range size { + appendTagValueAt(col, def, v) + } + return + } + cf := bc.findTagFamily(def.TagFamily) + if cf == nil { + appendNullTagN(col, def, size) + return + } + column := cf.findColumn(def.Name) + if column == nil { + appendNullTagN(col, def, size) + return + } + schemaType, hasSchemaType := bc.schemaTagTypes[def.Name] + if !hasSchemaType || column.valueType != schemaType { + appendNullTagN(col, def, size) + return + } + if desc { + for i := offset - 1; i >= idx; i-- { + fillTagCell(col, def, column.valueType, column.values[i]) + } + } else { + for i := idx; i < offset; i++ { + fillTagCell(col, def, column.valueType, column.values[i]) + } + } +} + +// fillFieldColumnAllRows mirrors fillTagColumnAllRows but on the field side. +// There is no indexValue path for fields and no schemaType check (fields +// don't have hidden-tag substitutions). +func (bc *blockCursor) fillFieldColumnAllRows(col vectorized.Column, def vectorized.ColumnDef, + idx, offset int, desc bool, +) { + size := offset - idx + column := bc.findFieldColumn(def.Name) + if column == nil { + appendNullFieldN(col, def, size) + return + } + if desc { + for i := offset - 1; i >= idx; i-- { + fillFieldCell(col, def, column.valueType, column.values[i]) + } + } else { + for i := idx; i < offset; i++ { + fillFieldCell(col, def, column.valueType, column.values[i]) + } + } +} + +// fillTagCell decodes one stored cell of valueType into col, dispatching on +// def.Type so passthrough columns (ColumnTypeTagValue) keep using +// mustDecodeTagValue while native columns go through the byte-to-typed +// fast path. +func fillTagCell(col vectorized.Column, def vectorized.ColumnDef, valueType pbv1.ValueType, raw []byte) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + c.Append(mustDecodeTagValue(valueType, raw)) + return + } + appendDecodedTagBytesAsTyped(col, valueType, raw) +} + +// fillFieldCell is the field-side counterpart. +func fillFieldCell(col vectorized.Column, def vectorized.ColumnDef, valueType pbv1.ValueType, raw []byte) { + if def.Type == vectorized.ColumnTypeFieldValue { + c := col.(*vectorized.TypedColumn[*modelv1.FieldValue]) + c.Append(mustDecodeFieldValue(valueType, raw)) + return + } + appendDecodedFieldBytesAsTyped(col, valueType, raw) +} + +// appendTagValueAt projects a pre-decoded *modelv1.TagValue onto col, +// dispatching on def.Type. +func appendTagValueAt(col vectorized.Column, def vectorized.ColumnDef, v *modelv1.TagValue) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + c.Append(v) + return + } + appendTagValueAsTyped(col, v) +} + +// appendNullTagN appends n null cells to a tag column. Passthrough columns +// store the pbv1.NullTagValue singleton; native columns use AppendNull +// (which marks the validity bit and stores the zero value). +func appendNullTagN(col vectorized.Column, def vectorized.ColumnDef, n int) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + for range n { + c.Append(pbv1.NullTagValue) + } + return + } + for range n { + col.AppendNull() + } +} + +// appendNullFieldN is the field-side counterpart. +func appendNullFieldN(col vectorized.Column, def vectorized.ColumnDef, n int) { + if def.Type == vectorized.ColumnTypeFieldValue { + c := col.(*vectorized.TypedColumn[*modelv1.FieldValue]) + for range n { + c.Append(pbv1.NullFieldValue) + } + return + } + for range n { + col.AppendNull() + } +} + +// findTagFamily / findColumn are tiny helpers extracted so the row path +// (copyAllTo / copyTo) and the batch path share the same search logic. +// columnFamily and column are unexported types in this package — these +// methods live alongside copyAllToBatch and may be inlined in the future +// if profiling shows the call overhead matters. +func (bc *blockCursor) findTagFamily(family string) *columnFamily { + for i := range bc.tagFamilies { + if bc.tagFamilies[i].name == family { + return &bc.tagFamilies[i] + } + } + return nil +} + +func (cf *columnFamily) findColumn(name string) *column { + for i := range cf.columns { + if cf.columns[i].name == name { + return &cf.columns[i] + } + } + return nil +} + +func (bc *blockCursor) findFieldColumn(name string) *column { + for i := range bc.fields.columns { + if bc.fields.columns[i].name == name { + return &bc.fields.columns[i] + } + } + return nil +} diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go index 08a7b1647..d6cf3bc0f 100644 --- a/banyand/measure/query_batch.go +++ b/banyand/measure/query_batch.go @@ -18,8 +18,12 @@ package measure import ( + "container/heap" "context" "fmt" + "sort" + + "github.com/pkg/errors" "github.com/apache/skywalking-banyandb/pkg/query/model" vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" @@ -34,23 +38,53 @@ var ( // PullBatch implements model.MeasureBatchResult for queryResult. // -// G5b "dual-emit" wrapper: the implementation calls Pull() to obtain the -// next *model.MeasureResult and converts it to a *model.MeasureBatch using -// vmeasure.BuildMeasureBatchFromResult. Pull()'s row-path output stays -// byte-identical for legacy consumers; PullBatch is purely additive. +// G5b strict — single-block fast path: when the query produced exactly +// one block cursor, copyAllToBatch decodes raw stored bytes directly into +// the typed columns of *MeasureBatch, bypassing the *modelv1.TagValue / +// *modelv1.FieldValue intermediate that the row path's copyAllTo +// constructs. Multi-block queries fall back to the heap-merge that Pull() +// already implements and convert the resulting *MeasureResult via +// vmeasure.BuildMeasureBatchFromResult; the merge variant for batches is +// US-003 strict's remaining piece (planned alongside G6c operator wiring +// when batch-aware version-replacement / TopN aggregation become urgent). // -// Mixing Pull() and PullBatch() on the same queryResult is undefined — -// each call advances the same underlying cursors. Callers must pick one. +// Mixing Pull() and PullBatch() on the same queryResult is undefined. +// They share qr.loaded / qr.data / qr.heap state; each advances the +// underlying cursors. Callers must pick one. // -// Sticky-error contract: if Pull() returns a *MeasureResult with -// non-nil Error, PullBatch returns (nil, that error) so future calls -// continue to surface the same error (Pull() is itself sticky here). +// Sticky-error contract matches Pull(): a context cancellation surfaces +// before load with the "interrupt: hit %d" form and during load with the +// "interrupt: blank/total=%d/%d" form. PullBatch propagates these as +// errors instead of as *MeasureResult{Error: …} — its return type is +// (*MeasureBatch, error). func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) { if qr.batchSchema == nil { return nil, fmt.Errorf("queryResult.PullBatch: batchSchema not initialized; " + "the underlying query did not record a vectorized BatchSchema (likely a schema-build error at Query time)") } - r := qr.Pull() + select { + case <-qr.ctx.Done(): + return nil, errors.WithMessagef(qr.ctx.Err(), "interrupt: hit %d", qr.hit) + default: + } + if loadErr := qr.loadCursorsForBatch(); loadErr != nil { + return nil, loadErr + } + if len(qr.data) == 0 { + return nil, nil + } + if len(qr.data) == 1 { + bc := qr.data[0] + b := newMeasureBatchForSchema(qr.batchSchema, len(bc.timestamps)) + bc.copyAllToBatch(b, qr.batchSchema, qr.storedIndexValue, qr.orderByTimestampDesc()) + qr.data = qr.data[:0] + releaseBlockCursor(bc) + return b, nil + } + // Multi-block: heap-merge via Pull's existing path, then convert. + // The protobuf intermediate is transient here — eliminating it + // requires a batch-aware merge variant deferred to a follow-on. + r := qr.merge(qr.storedIndexValue, qr.tagProjection) if r == nil { return nil, nil } @@ -60,11 +94,73 @@ func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema) } +// loadCursorsForBatch is the lazy load step replicated from Pull(). It is +// duplicated here (rather than extracted) so Pull's byte-identical +// behavior is unaffected by the G5b refactor — Pull and PullBatch are +// documented as mutually exclusive on a single queryResult, so the +// duplicated state machinery never observes a contended state. +func (qr *queryResult) loadCursorsForBatch() error { + if qr.loaded { + return nil + } + if len(qr.data) == 0 { + return nil + } + cursorChan := make(chan int, len(qr.data)) + for i := 0; i < len(qr.data); i++ { + go func(i int) { + select { + case <-qr.ctx.Done(): + cursorChan <- i + return + default: + } + tmpBlock := generateBlock() + defer releaseBlock(tmpBlock) + if !qr.data[i].loadData(tmpBlock) { + cursorChan <- i + return + } + if qr.orderByTimestampDesc() { + qr.data[i].idx = len(qr.data[i].timestamps) - 1 + } + cursorChan <- -1 + }(i) + } + blankCursorList := []int{} + for completed := 0; completed < len(qr.data); completed++ { + result := <-cursorChan + if result != -1 { + blankCursorList = append(blankCursorList, result) + } + } + select { + case <-qr.ctx.Done(): + return errors.WithMessagef(qr.ctx.Err(), + "interrupt: blank/total=%d/%d", len(blankCursorList), len(qr.data)) + default: + } + sort.Slice(blankCursorList, func(i, j int) bool { + return blankCursorList[i] > blankCursorList[j] + }) + for _, index := range blankCursorList { + qr.data = append(qr.data[:index], qr.data[index+1:]...) + } + qr.loaded = true + heap.Init(qr) + return nil +} + // PullBatch implements model.MeasureBatchResult for indexSortResult. // // indexSortResult.Pull() yields single-row MeasureResults; the conversion -// shape is identical to queryResult.PullBatch — single-row batches that -// the vectorized adapter accumulates into larger pipeline batches. +// shape is identical to queryResult.PullBatch's multi-block fallback — +// single-row batches that the vectorized adapter accumulates into larger +// pipeline batches. A direct typed-column variant (skipping the +// *modelv1.TagValue intermediate) is deferred to a follow-on alongside +// the queryResult batch-merge work; indexSortResult uses storage's +// segResult instead of blockCursor, so the typed emit path here is its +// own ~150 LOC parallel implementation. func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) { if iqr.batchSchema == nil { return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema not initialized; " +
