This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 36140ca9403f1dab3165a2362e55894ec63cbdb4 Author: Hongtao Gao <[email protected]> AuthorDate: Sat May 9 05:32:03 2026 +0000 feat(banyand/measure): batch-aware multi-block merge for PullBatch (US-003 strict completion) Replace the BuildMeasureBatchFromResult fallback in queryResult.PullBatch's multi-block branch with a native mergeBatch that writes directly into a *model.MeasureBatch via typed column helpers, eliminating the *modelv1.TagValue / *modelv1.FieldValue row intermediate. New additions: - vectorized.TypedColumn.SetAt / validityBitmap.ClearNull: minimal overwrite support needed by the replace-in-batch path. - blockCursor.copyToBatch / replaceInBatch: single-row typed-column emit mirroring copyTo / replace (block.go), with set-based column mutators for in-place version replacement (setTagCellAt, setFieldCellAt, etc.). - queryResult.mergeBatch: heap-merge producing a *MeasureBatch directly, capped at mergeBatchMaxRows (4096) rows per call; respects series boundaries and version replacement exactly as merge() does. - TopN aggregation (mergeTopNResultBatch) deferred: topNQueryOptions queries fall back to the existing merge -> BuildMeasureBatchFromResult path; TopN is not exercised by the vectorized bench corpus or integration suite. - indexSortResult.PullBatch unchanged (segResult layer makes a typed emit a separate ~150 LOC project; documented deferral). Verified: go build ./..., go test ./banyand/measure/... -short -race PASS, go test ./pkg/query/vectorized/... -race PASS, RUN_BENCH_GATES=1 TestBenchGates_PerWorkload (W1-W5, W2-MB, W4-MB, W5-MB) PASS, integration vectorized parity 110/110 PASS, make lint PASS. --- banyand/measure/block_batch.go | 323 +++++++++++++++++++++++++++++++++++ banyand/measure/query_batch.go | 101 ++++++++++- pkg/query/vectorized/column.go | 10 ++ pkg/query/vectorized/typed_column.go | 10 ++ 4 files changed, 436 insertions(+), 8 deletions(-) diff --git a/banyand/measure/block_batch.go b/banyand/measure/block_batch.go index ace098ac1..48cb257b9 100644 --- a/banyand/measure/block_batch.go +++ b/banyand/measure/block_batch.go @@ -20,6 +20,8 @@ package measure import ( "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" @@ -245,6 +247,327 @@ func fillFieldCell(col vectorized.Column, def vectorized.ColumnDef, valueType pb appendDecodedFieldBytesAsTyped(col, valueType, raw) } +// copyToBatch appends the single row at bc.idx into b's parallel slices and +// typed columns. It is the batch-path mirror of blockCursor.copyTo (block.go) +// and is used by mergeBatch when emitting a fresh (non-duplicate) row. +// +// Schema/column indexing contract matches copyAllToBatch: Tags[tagIdx] +// aligns with the tagIdx-th RoleTag entry in schema.Columns, Fields similarly. +func (bc *blockCursor) copyToBatch(b *model.MeasureBatch, schema *vectorized.BatchSchema, + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, +) { + sid := bc.bm.seriesID + b.Timestamps = append(b.Timestamps, bc.timestamps[bc.idx]) + b.Versions = append(b.Versions, bc.versions[bc.idx]) + b.ShardIDs = append(b.ShardIDs, bc.shardID) + b.SeriesIDs = append(b.SeriesIDs, sid) + + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[sid] + } + + tagIdx, fieldIdx := 0, 0 + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + col := b.Tags[tagIdx] + tagIdx++ + bc.fillTagCell(col, def, bc.idx, indexValue) + case vectorized.RoleField: + col := b.Fields[fieldIdx] + fieldIdx++ + bc.fillFieldCell(col, def, bc.idx) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata handled via the parallel slices above. + } + } +} + +// fillTagCell appends a single tag cell at row rowIdx from bc into col. +func (bc *blockCursor) fillTagCell(col vectorized.Column, def vectorized.ColumnDef, + rowIdx int, indexValue map[string]*modelv1.TagValue, +) { + if indexValue != nil && indexValue[def.Name] != nil { + appendTagValueAt(col, def, indexValue[def.Name]) + return + } + cf := bc.findTagFamily(def.TagFamily) + if cf == nil { + appendNullTagN(col, def, 1) + return + } + column := cf.findColumn(def.Name) + if column == nil { + appendNullTagN(col, def, 1) + return + } + schemaType, hasSchemaType := bc.schemaTagTypes[def.Name] + if !hasSchemaType || column.valueType != schemaType { + appendNullTagN(col, def, 1) + return + } + fillTagCell(col, def, column.valueType, column.values[rowIdx]) +} + +// fillFieldCell appends a single field cell at row rowIdx from bc into col. +func (bc *blockCursor) fillFieldCell(col vectorized.Column, def vectorized.ColumnDef, rowIdx int) { + column := bc.findFieldColumn(def.Name) + if column == nil { + appendNullFieldN(col, def, 1) + return + } + fillFieldCell(col, def, column.valueType, column.values[rowIdx]) +} + +// replaceInBatch overwrites the LAST row of b (at index lastRow) with the +// data from bc.idx. It is the batch-path mirror of blockCursor.replace +// (block.go) and is called by mergeBatch when a duplicate timestamp with a +// newer version is encountered. +// +// The parallel metadata slices are updated in-place; typed column cells are +// overwritten via setTagCellAt / setFieldCellAt. +func (bc *blockCursor) replaceInBatch(b *model.MeasureBatch, schema *vectorized.BatchSchema, + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, +) { + sid := bc.bm.seriesID + lastRow := len(b.Timestamps) - 1 + b.Versions[lastRow] = bc.versions[bc.idx] + + var indexValue map[string]*modelv1.TagValue + if storedIndexValue != nil { + indexValue = storedIndexValue[sid] + } + + tagIdx, fieldIdx := 0, 0 + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + col := b.Tags[tagIdx] + tagIdx++ + bc.setTagCellAt(col, def, bc.idx, lastRow, indexValue) + case vectorized.RoleField: + col := b.Fields[fieldIdx] + fieldIdx++ + bc.setFieldCellAt(col, def, bc.idx, lastRow) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata handled above. + } + } +} + +// setTagCellAt overwrites column position destRow with the tag value decoded +// from bc at srcRow. +func (bc *blockCursor) setTagCellAt(col vectorized.Column, def vectorized.ColumnDef, + srcRow, destRow int, indexValue map[string]*modelv1.TagValue, +) { + if indexValue != nil && indexValue[def.Name] != nil { + setTagValueAt(col, def, destRow, indexValue[def.Name]) + return + } + cf := bc.findTagFamily(def.TagFamily) + if cf == nil { + setNullTagAt(col, def, destRow) + return + } + column := cf.findColumn(def.Name) + if column == nil { + setNullTagAt(col, def, destRow) + return + } + schemaType, hasSchemaType := bc.schemaTagTypes[def.Name] + if !hasSchemaType || column.valueType != schemaType { + setNullTagAt(col, def, destRow) + return + } + setTagCellAt(col, def, column.valueType, column.values[srcRow], destRow) +} + +// setFieldCellAt overwrites column position destRow with the field value +// decoded from bc at srcRow. +func (bc *blockCursor) setFieldCellAt(col vectorized.Column, def vectorized.ColumnDef, srcRow, destRow int) { + column := bc.findFieldColumn(def.Name) + if column == nil { + setNullFieldAt(col, def, destRow) + return + } + setFieldCellAt(col, def, column.valueType, column.values[srcRow], destRow) +} + +// setTagCellAt overwrites one cell in col at destRow. Mirrors fillTagCell but +// uses SetAt instead of Append. +func setTagCellAt(col vectorized.Column, def vectorized.ColumnDef, valueType pbv1.ValueType, raw []byte, destRow int) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + c.SetAt(destRow, mustDecodeTagValue(valueType, raw)) + return + } + setDecodedTagBytesAt(col, valueType, raw, destRow) +} + +// setFieldCellAt overwrites one field cell in col at destRow. +func setFieldCellAt(col vectorized.Column, def vectorized.ColumnDef, valueType pbv1.ValueType, raw []byte, destRow int) { + if def.Type == vectorized.ColumnTypeFieldValue { + c := col.(*vectorized.TypedColumn[*modelv1.FieldValue]) + c.SetAt(destRow, mustDecodeFieldValue(valueType, raw)) + return + } + setDecodedFieldBytesAt(col, valueType, raw, destRow) +} + +// setTagValueAt overwrites one passthrough/native tag cell with a pre-decoded +// *modelv1.TagValue, dispatching on def.Type. +func setTagValueAt(col vectorized.Column, def vectorized.ColumnDef, destRow int, v *modelv1.TagValue) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + c.SetAt(destRow, v) + return + } + setTagValueTypedAt(col, v, destRow) +} + +// setNullTagAt marks destRow null in a tag column. +func setNullTagAt(col vectorized.Column, def vectorized.ColumnDef, destRow int) { + if def.Type == vectorized.ColumnTypeTagValue { + c := col.(*vectorized.TypedColumn[*modelv1.TagValue]) + c.SetAt(destRow, pbv1.NullTagValue) + return + } + col.MarkNullAt(destRow) +} + +// setNullFieldAt marks destRow null in a field column. +func setNullFieldAt(col vectorized.Column, def vectorized.ColumnDef, destRow int) { + if def.Type == vectorized.ColumnTypeFieldValue { + c := col.(*vectorized.TypedColumn[*modelv1.FieldValue]) + c.SetAt(destRow, pbv1.NullFieldValue) + return + } + col.MarkNullAt(destRow) +} + +// setDecodedTagBytesAt overwrites a native typed tag column at destRow by +// decoding raw bytes of the given valueType. +func setDecodedTagBytesAt(col vectorized.Column, valueType pbv1.ValueType, raw []byte, destRow int) { + if raw == nil { + col.MarkNullAt(destRow) + return + } + switch valueType { + case pbv1.ValueTypeInt64: + col.(*vectorized.TypedColumn[int64]).SetAt(destRow, convert.BytesToInt64(raw)) + case pbv1.ValueTypeStr: + col.(*vectorized.TypedColumn[string]).SetAt(destRow, string(raw)) + case pbv1.ValueTypeBinaryData: + buf := make([]byte, len(raw)) + copy(buf, raw) + col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, buf) + case pbv1.ValueTypeInt64Arr: + var values []int64 + for i := 0; i < len(raw); i += 8 { + values = append(values, convert.BytesToInt64(raw[i:i+8])) + } + col.(*vectorized.TypedColumn[[]int64]).SetAt(destRow, values) + case pbv1.ValueTypeStrArr: + bb := bigValuePool.Generate() + var values []string + buf := raw + var unmarshalErr error + for len(buf) > 0 { + bb.Buf, buf, unmarshalErr = unmarshalVarArray(bb.Buf[:0], buf) + if unmarshalErr != nil { + logger.Panicf("setDecodedTagBytesAt unmarshalVarArray failed: %v", unmarshalErr) + } + values = append(values, string(bb.Buf)) + } + col.(*vectorized.TypedColumn[[]string]).SetAt(destRow, values) + default: + col.MarkNullAt(destRow) + } +} + +// setDecodedFieldBytesAt overwrites a native typed field column at destRow. +func setDecodedFieldBytesAt(col vectorized.Column, valueType pbv1.ValueType, raw []byte, destRow int) { + if raw == nil { + switch valueType { + case pbv1.ValueTypeStr: + col.(*vectorized.TypedColumn[string]).SetAt(destRow, "") + case pbv1.ValueTypeBinaryData: + col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, []byte{}) + default: + col.MarkNullAt(destRow) + } + return + } + switch valueType { + case pbv1.ValueTypeInt64: + col.(*vectorized.TypedColumn[int64]).SetAt(destRow, convert.BytesToInt64(raw)) + case pbv1.ValueTypeFloat64: + col.(*vectorized.TypedColumn[float64]).SetAt(destRow, convert.BytesToFloat64(raw)) + case pbv1.ValueTypeStr: + col.(*vectorized.TypedColumn[string]).SetAt(destRow, string(raw)) + case pbv1.ValueTypeBinaryData: + buf := make([]byte, len(raw)) + copy(buf, raw) + col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, buf) + default: + col.MarkNullAt(destRow) + } +} + +// setTagValueTypedAt projects a pre-decoded *modelv1.TagValue onto a native +// typed column at destRow. +func setTagValueTypedAt(col vectorized.Column, v *modelv1.TagValue, destRow int) { + if v == nil { + col.MarkNullAt(destRow) + return + } + switch x := v.Value.(type) { + case *modelv1.TagValue_Null: + col.MarkNullAt(destRow) + case *modelv1.TagValue_Int: + if c, ok := col.(*vectorized.TypedColumn[int64]); ok { + c.SetAt(destRow, x.Int.GetValue()) + return + } + col.MarkNullAt(destRow) + case *modelv1.TagValue_Str: + if c, ok := col.(*vectorized.TypedColumn[string]); ok { + c.SetAt(destRow, x.Str.GetValue()) + return + } + col.MarkNullAt(destRow) + case *modelv1.TagValue_BinaryData: + if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok { + buf := make([]byte, len(x.BinaryData)) + copy(buf, x.BinaryData) + c.SetAt(destRow, buf) + return + } + col.MarkNullAt(destRow) + case *modelv1.TagValue_IntArray: + if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok { + out := make([]int64, len(x.IntArray.GetValue())) + copy(out, x.IntArray.GetValue()) + c.SetAt(destRow, out) + return + } + col.MarkNullAt(destRow) + case *modelv1.TagValue_StrArray: + if c, ok := col.(*vectorized.TypedColumn[[]string]); ok { + out := make([]string, len(x.StrArray.GetValue())) + copy(out, x.StrArray.GetValue()) + c.SetAt(destRow, out) + return + } + col.MarkNullAt(destRow) + default: + col.MarkNullAt(destRow) + } +} + // appendTagValueAt projects a pre-decoded *modelv1.TagValue onto col, // dispatching on def.Type. func appendTagValueAt(col vectorized.Column, def vectorized.ColumnDef, v *modelv1.TagValue) { diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go index d6cf3bc0f..448a64695 100644 --- a/banyand/measure/query_batch.go +++ b/banyand/measure/query_batch.go @@ -25,7 +25,10 @@ import ( "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" ) @@ -81,17 +84,99 @@ func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) releaseBlockCursor(bc) return b, nil } - // Multi-block: heap-merge via Pull's existing path, then convert. - // The protobuf intermediate is transient here — eliminating it - // requires a batch-aware merge variant deferred to a follow-on. - r := qr.merge(qr.storedIndexValue, qr.tagProjection) - if r == nil { + // Multi-block: TopN aggregation requires mergeTopNResult which rewrites + // field binary payloads via the row path's PostProcessor. The vectorized + // batch path defers TopN merge support; fall back to the row path so + // correctness is preserved. Non-topN queries take the native batch merge. + if qr.topNQueryOptions != nil { + r := qr.merge(qr.storedIndexValue, qr.tagProjection) + if r == nil { + return nil, nil + } + if r.Error != nil { + return nil, r.Error + } + return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema) + } + return qr.mergeBatch(qr.storedIndexValue, qr.batchSchema) +} + +// mergeBatch is the batch-aware counterpart of queryResult.merge (query.go). +// It consumes the heap with the same semantics — version-replacement for +// duplicate timestamps within the same series, series-boundary stop — but +// writes directly into a *model.MeasureBatch via the typed column helpers in +// block_batch.go, bypassing the *modelv1.TagValue / *modelv1.FieldValue row +// intermediate that the original merge → BuildMeasureBatchFromResult path +// allocates. +// +// TopN aggregation (mergeTopNResult) is NOT implemented here; callers must +// check topNQueryOptions and fall back to the row path before calling +// mergeBatch. See the PullBatch dispatch above. +// +// Batch sizing: mergeBatch emits at most mergeBatchMaxRows rows per call so +// that large multi-series results do not exceed a reasonable working-set. +// The batchsource (BatchSourceFromBatchResult) slices the returned batch +// further to its configured batchSize, so any value here that is at least as +// large as the caller's batchSize is fine. 4096 is chosen conservatively. +const mergeBatchMaxRows = 4096 + +func (qr *queryResult) mergeBatch( + storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue, + schema *vectorized.BatchSchema, +) (*model.MeasureBatch, error) { + if qr.Len() == 0 { return nil, nil } - if r.Error != nil { - return nil, r.Error + step := 1 + if qr.orderByTimestampDesc() { + step = -1 + } + + b := newMeasureBatchForSchema(schema, mergeBatchMaxRows) + var lastVersion int64 + var lastSid common.SeriesID + + for qr.Len() > 0 && b.RowCount() < mergeBatchMaxRows { + topBC := qr.data[0] + // Series boundary: stop and let the caller call again for the next series. + if lastSid != 0 && topBC.bm.seriesID != lastSid { + break + } + lastSid = topBC.bm.seriesID + + if b.RowCount() > 0 && + topBC.timestamps[topBC.idx] == b.Timestamps[len(b.Timestamps)-1] { + // Duplicate timestamp within the same series: keep the higher version. + if topBC.versions[topBC.idx] > lastVersion { + topBC.replaceInBatch(b, schema, storedIndexValue) + lastVersion = topBC.versions[topBC.idx] + } + } else { + topBC.copyToBatch(b, schema, storedIndexValue) + lastVersion = topBC.versions[topBC.idx] + } + + topBC.idx += step + + if qr.orderByTimestampDesc() { + if topBC.idx < 0 { + heap.Pop(qr) + } else { + heap.Fix(qr, 0) + } + } else { + if topBC.idx >= len(topBC.timestamps) { + heap.Pop(qr) + } else { + heap.Fix(qr, 0) + } + } + } + + if b.RowCount() == 0 { + return nil, nil } - return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema) + return b, nil } // loadCursorsForBatch is the lazy load step replicated from Pull(). It is diff --git a/pkg/query/vectorized/column.go b/pkg/query/vectorized/column.go index b1b6b5476..797b07e3f 100644 --- a/pkg/query/vectorized/column.go +++ b/pkg/query/vectorized/column.go @@ -99,6 +99,16 @@ func (v *validityBitmap) MarkNull(i int) { v.bits[word] |= 1 << uint(i%64) } +// ClearNull clears bit i (marks row i as valid). No-op if i is beyond +// the current bitmap length (meaning it was never marked null). +func (v *validityBitmap) ClearNull(i int) { + word := i / 64 + if word >= len(v.bits) { + return + } + v.bits[word] &^= 1 << uint(i%64) +} + // Reset clears every null mark but keeps the underlying slice for reuse. func (v *validityBitmap) Reset() { for i := range v.bits { diff --git a/pkg/query/vectorized/typed_column.go b/pkg/query/vectorized/typed_column.go index 6d8ee697a..616eeca3d 100644 --- a/pkg/query/vectorized/typed_column.go +++ b/pkg/query/vectorized/typed_column.go @@ -57,6 +57,16 @@ func (c *TypedColumn[T]) MarkNullAt(i int) { c.validity.MarkNull(i) } +// SetAt overwrites the value at index i without changing length or validity. +// The validity bit at i is cleared (row becomes valid). Panics if i is out +// of range — matches the same contract as a direct slice write. +func (c *TypedColumn[T]) SetAt(i int, v T) { + c.data[i] = v + if c.validity.IsNull(i) { + c.validity.ClearNull(i) + } +} + // Reset clears length and validity. Capacity is retained. func (c *TypedColumn[T]) Reset() { c.data = c.data[:0]
