This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 36140ca9403f1dab3165a2362e55894ec63cbdb4
Author: Hongtao Gao <[email protected]>
AuthorDate: Sat May 9 05:32:03 2026 +0000

    feat(banyand/measure): batch-aware multi-block merge for PullBatch (US-003 
strict completion)
    
    Replace the BuildMeasureBatchFromResult fallback in queryResult.PullBatch's
    multi-block branch with a native mergeBatch that writes directly into a
    *model.MeasureBatch via typed column helpers, eliminating the
    *modelv1.TagValue / *modelv1.FieldValue row intermediate.
    
    New additions:
    - vectorized.TypedColumn.SetAt / validityBitmap.ClearNull: minimal
      overwrite support needed by the replace-in-batch path.
    - blockCursor.copyToBatch / replaceInBatch: single-row typed-column emit
      mirroring copyTo / replace (block.go), with set-based column mutators
      for in-place version replacement (setTagCellAt, setFieldCellAt, etc.).
    - queryResult.mergeBatch: heap-merge producing a *MeasureBatch directly,
      capped at mergeBatchMaxRows (4096) rows per call; respects series
      boundaries and version replacement exactly as merge() does.
    - TopN aggregation (mergeTopNResultBatch) deferred: topNQueryOptions
      queries fall back to the existing merge -> BuildMeasureBatchFromResult
      path; TopN is not exercised by the vectorized bench corpus or
      integration suite.
    - indexSortResult.PullBatch unchanged (segResult layer makes a typed emit
      a separate ~150 LOC project; documented deferral).
    
    Verified: go build ./..., go test ./banyand/measure/... -short -race PASS,
    go test ./pkg/query/vectorized/... -race PASS,
    RUN_BENCH_GATES=1 TestBenchGates_PerWorkload (W1-W5, W2-MB, W4-MB, W5-MB)
    PASS, integration vectorized parity 110/110 PASS, make lint PASS.
---
 banyand/measure/block_batch.go       | 323 +++++++++++++++++++++++++++++++++++
 banyand/measure/query_batch.go       | 101 ++++++++++-
 pkg/query/vectorized/column.go       |  10 ++
 pkg/query/vectorized/typed_column.go |  10 ++
 4 files changed, 436 insertions(+), 8 deletions(-)

diff --git a/banyand/measure/block_batch.go b/banyand/measure/block_batch.go
index ace098ac1..48cb257b9 100644
--- a/banyand/measure/block_batch.go
+++ b/banyand/measure/block_batch.go
@@ -20,6 +20,8 @@ package measure
 import (
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
@@ -245,6 +247,327 @@ func fillFieldCell(col vectorized.Column, def 
vectorized.ColumnDef, valueType pb
        appendDecodedFieldBytesAsTyped(col, valueType, raw)
 }
 
+// copyToBatch appends the single row at bc.idx into b's parallel slices and
+// typed columns. It is the batch-path mirror of blockCursor.copyTo (block.go)
+// and is used by mergeBatch when emitting a fresh (non-duplicate) row.
+//
+// Schema/column indexing contract matches copyAllToBatch: Tags[tagIdx]
+// aligns with the tagIdx-th RoleTag entry in schema.Columns, Fields similarly.
+func (bc *blockCursor) copyToBatch(b *model.MeasureBatch, schema 
*vectorized.BatchSchema,
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) {
+       sid := bc.bm.seriesID
+       b.Timestamps = append(b.Timestamps, bc.timestamps[bc.idx])
+       b.Versions = append(b.Versions, bc.versions[bc.idx])
+       b.ShardIDs = append(b.ShardIDs, bc.shardID)
+       b.SeriesIDs = append(b.SeriesIDs, sid)
+
+       var indexValue map[string]*modelv1.TagValue
+       if storedIndexValue != nil {
+               indexValue = storedIndexValue[sid]
+       }
+
+       tagIdx, fieldIdx := 0, 0
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       col := b.Tags[tagIdx]
+                       tagIdx++
+                       bc.fillTagCell(col, def, bc.idx, indexValue)
+               case vectorized.RoleField:
+                       col := b.Fields[fieldIdx]
+                       fieldIdx++
+                       bc.fillFieldCell(col, def, bc.idx)
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata handled via the parallel slices above.
+               }
+       }
+}
+
+// fillTagCell appends a single tag cell at row rowIdx from bc into col.
+func (bc *blockCursor) fillTagCell(col vectorized.Column, def 
vectorized.ColumnDef,
+       rowIdx int, indexValue map[string]*modelv1.TagValue,
+) {
+       if indexValue != nil && indexValue[def.Name] != nil {
+               appendTagValueAt(col, def, indexValue[def.Name])
+               return
+       }
+       cf := bc.findTagFamily(def.TagFamily)
+       if cf == nil {
+               appendNullTagN(col, def, 1)
+               return
+       }
+       column := cf.findColumn(def.Name)
+       if column == nil {
+               appendNullTagN(col, def, 1)
+               return
+       }
+       schemaType, hasSchemaType := bc.schemaTagTypes[def.Name]
+       if !hasSchemaType || column.valueType != schemaType {
+               appendNullTagN(col, def, 1)
+               return
+       }
+       fillTagCell(col, def, column.valueType, column.values[rowIdx])
+}
+
+// fillFieldCell appends a single field cell at row rowIdx from bc into col.
+func (bc *blockCursor) fillFieldCell(col vectorized.Column, def 
vectorized.ColumnDef, rowIdx int) {
+       column := bc.findFieldColumn(def.Name)
+       if column == nil {
+               appendNullFieldN(col, def, 1)
+               return
+       }
+       fillFieldCell(col, def, column.valueType, column.values[rowIdx])
+}
+
+// replaceInBatch overwrites the LAST row of b (at index lastRow) with the
+// data from bc.idx. It is the batch-path mirror of blockCursor.replace
+// (block.go) and is called by mergeBatch when a duplicate timestamp with a
+// newer version is encountered.
+//
+// The parallel metadata slices are updated in-place; typed column cells are
+// overwritten via setTagCellAt / setFieldCellAt.
+func (bc *blockCursor) replaceInBatch(b *model.MeasureBatch, schema 
*vectorized.BatchSchema,
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+) {
+       sid := bc.bm.seriesID
+       lastRow := len(b.Timestamps) - 1
+       b.Versions[lastRow] = bc.versions[bc.idx]
+
+       var indexValue map[string]*modelv1.TagValue
+       if storedIndexValue != nil {
+               indexValue = storedIndexValue[sid]
+       }
+
+       tagIdx, fieldIdx := 0, 0
+       for _, def := range schema.Columns {
+               switch def.Role {
+               case vectorized.RoleTag:
+                       col := b.Tags[tagIdx]
+                       tagIdx++
+                       bc.setTagCellAt(col, def, bc.idx, lastRow, indexValue)
+               case vectorized.RoleField:
+                       col := b.Fields[fieldIdx]
+                       fieldIdx++
+                       bc.setFieldCellAt(col, def, bc.idx, lastRow)
+               case vectorized.RoleTimestamp, vectorized.RoleVersion,
+                       vectorized.RoleSeriesID, vectorized.RoleShardID:
+                       // Metadata handled above.
+               }
+       }
+}
+
+// setTagCellAt overwrites column position destRow with the tag value decoded
+// from bc at srcRow.
+func (bc *blockCursor) setTagCellAt(col vectorized.Column, def 
vectorized.ColumnDef,
+       srcRow, destRow int, indexValue map[string]*modelv1.TagValue,
+) {
+       if indexValue != nil && indexValue[def.Name] != nil {
+               setTagValueAt(col, def, destRow, indexValue[def.Name])
+               return
+       }
+       cf := bc.findTagFamily(def.TagFamily)
+       if cf == nil {
+               setNullTagAt(col, def, destRow)
+               return
+       }
+       column := cf.findColumn(def.Name)
+       if column == nil {
+               setNullTagAt(col, def, destRow)
+               return
+       }
+       schemaType, hasSchemaType := bc.schemaTagTypes[def.Name]
+       if !hasSchemaType || column.valueType != schemaType {
+               setNullTagAt(col, def, destRow)
+               return
+       }
+       setTagCellAt(col, def, column.valueType, column.values[srcRow], destRow)
+}
+
+// setFieldCellAt overwrites column position destRow with the field value
+// decoded from bc at srcRow.
+func (bc *blockCursor) setFieldCellAt(col vectorized.Column, def 
vectorized.ColumnDef, srcRow, destRow int) {
+       column := bc.findFieldColumn(def.Name)
+       if column == nil {
+               setNullFieldAt(col, def, destRow)
+               return
+       }
+       setFieldCellAt(col, def, column.valueType, column.values[srcRow], 
destRow)
+}
+
+// setTagCellAt overwrites one cell in col at destRow. Mirrors fillTagCell but
+// uses SetAt instead of Append.
+func setTagCellAt(col vectorized.Column, def vectorized.ColumnDef, valueType 
pbv1.ValueType, raw []byte, destRow int) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               c.SetAt(destRow, mustDecodeTagValue(valueType, raw))
+               return
+       }
+       setDecodedTagBytesAt(col, valueType, raw, destRow)
+}
+
+// setFieldCellAt overwrites one field cell in col at destRow.
+func setFieldCellAt(col vectorized.Column, def vectorized.ColumnDef, valueType 
pbv1.ValueType, raw []byte, destRow int) {
+       if def.Type == vectorized.ColumnTypeFieldValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.FieldValue])
+               c.SetAt(destRow, mustDecodeFieldValue(valueType, raw))
+               return
+       }
+       setDecodedFieldBytesAt(col, valueType, raw, destRow)
+}
+
+// setTagValueAt overwrites one passthrough/native tag cell with a pre-decoded
+// *modelv1.TagValue, dispatching on def.Type.
+func setTagValueAt(col vectorized.Column, def vectorized.ColumnDef, destRow 
int, v *modelv1.TagValue) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               c.SetAt(destRow, v)
+               return
+       }
+       setTagValueTypedAt(col, v, destRow)
+}
+
+// setNullTagAt marks destRow null in a tag column.
+func setNullTagAt(col vectorized.Column, def vectorized.ColumnDef, destRow 
int) {
+       if def.Type == vectorized.ColumnTypeTagValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.TagValue])
+               c.SetAt(destRow, pbv1.NullTagValue)
+               return
+       }
+       col.MarkNullAt(destRow)
+}
+
+// setNullFieldAt marks destRow null in a field column.
+func setNullFieldAt(col vectorized.Column, def vectorized.ColumnDef, destRow 
int) {
+       if def.Type == vectorized.ColumnTypeFieldValue {
+               c := col.(*vectorized.TypedColumn[*modelv1.FieldValue])
+               c.SetAt(destRow, pbv1.NullFieldValue)
+               return
+       }
+       col.MarkNullAt(destRow)
+}
+
+// setDecodedTagBytesAt overwrites a native typed tag column at destRow by
+// decoding raw bytes of the given valueType.
+func setDecodedTagBytesAt(col vectorized.Column, valueType pbv1.ValueType, raw 
[]byte, destRow int) {
+       if raw == nil {
+               col.MarkNullAt(destRow)
+               return
+       }
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               col.(*vectorized.TypedColumn[int64]).SetAt(destRow, 
convert.BytesToInt64(raw))
+       case pbv1.ValueTypeStr:
+               col.(*vectorized.TypedColumn[string]).SetAt(destRow, 
string(raw))
+       case pbv1.ValueTypeBinaryData:
+               buf := make([]byte, len(raw))
+               copy(buf, raw)
+               col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, buf)
+       case pbv1.ValueTypeInt64Arr:
+               var values []int64
+               for i := 0; i < len(raw); i += 8 {
+                       values = append(values, 
convert.BytesToInt64(raw[i:i+8]))
+               }
+               col.(*vectorized.TypedColumn[[]int64]).SetAt(destRow, values)
+       case pbv1.ValueTypeStrArr:
+               bb := bigValuePool.Generate()
+               var values []string
+               buf := raw
+               var unmarshalErr error
+               for len(buf) > 0 {
+                       bb.Buf, buf, unmarshalErr = 
unmarshalVarArray(bb.Buf[:0], buf)
+                       if unmarshalErr != nil {
+                               logger.Panicf("setDecodedTagBytesAt 
unmarshalVarArray failed: %v", unmarshalErr)
+                       }
+                       values = append(values, string(bb.Buf))
+               }
+               col.(*vectorized.TypedColumn[[]string]).SetAt(destRow, values)
+       default:
+               col.MarkNullAt(destRow)
+       }
+}
+
+// setDecodedFieldBytesAt overwrites a native typed field column at destRow.
+func setDecodedFieldBytesAt(col vectorized.Column, valueType pbv1.ValueType, 
raw []byte, destRow int) {
+       if raw == nil {
+               switch valueType {
+               case pbv1.ValueTypeStr:
+                       col.(*vectorized.TypedColumn[string]).SetAt(destRow, "")
+               case pbv1.ValueTypeBinaryData:
+                       col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, 
[]byte{})
+               default:
+                       col.MarkNullAt(destRow)
+               }
+               return
+       }
+       switch valueType {
+       case pbv1.ValueTypeInt64:
+               col.(*vectorized.TypedColumn[int64]).SetAt(destRow, 
convert.BytesToInt64(raw))
+       case pbv1.ValueTypeFloat64:
+               col.(*vectorized.TypedColumn[float64]).SetAt(destRow, 
convert.BytesToFloat64(raw))
+       case pbv1.ValueTypeStr:
+               col.(*vectorized.TypedColumn[string]).SetAt(destRow, 
string(raw))
+       case pbv1.ValueTypeBinaryData:
+               buf := make([]byte, len(raw))
+               copy(buf, raw)
+               col.(*vectorized.TypedColumn[[]byte]).SetAt(destRow, buf)
+       default:
+               col.MarkNullAt(destRow)
+       }
+}
+
+// setTagValueTypedAt projects a pre-decoded *modelv1.TagValue onto a native
+// typed column at destRow.
+func setTagValueTypedAt(col vectorized.Column, v *modelv1.TagValue, destRow 
int) {
+       if v == nil {
+               col.MarkNullAt(destRow)
+               return
+       }
+       switch x := v.Value.(type) {
+       case *modelv1.TagValue_Null:
+               col.MarkNullAt(destRow)
+       case *modelv1.TagValue_Int:
+               if c, ok := col.(*vectorized.TypedColumn[int64]); ok {
+                       c.SetAt(destRow, x.Int.GetValue())
+                       return
+               }
+               col.MarkNullAt(destRow)
+       case *modelv1.TagValue_Str:
+               if c, ok := col.(*vectorized.TypedColumn[string]); ok {
+                       c.SetAt(destRow, x.Str.GetValue())
+                       return
+               }
+               col.MarkNullAt(destRow)
+       case *modelv1.TagValue_BinaryData:
+               if c, ok := col.(*vectorized.TypedColumn[[]byte]); ok {
+                       buf := make([]byte, len(x.BinaryData))
+                       copy(buf, x.BinaryData)
+                       c.SetAt(destRow, buf)
+                       return
+               }
+               col.MarkNullAt(destRow)
+       case *modelv1.TagValue_IntArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]int64]); ok {
+                       out := make([]int64, len(x.IntArray.GetValue()))
+                       copy(out, x.IntArray.GetValue())
+                       c.SetAt(destRow, out)
+                       return
+               }
+               col.MarkNullAt(destRow)
+       case *modelv1.TagValue_StrArray:
+               if c, ok := col.(*vectorized.TypedColumn[[]string]); ok {
+                       out := make([]string, len(x.StrArray.GetValue()))
+                       copy(out, x.StrArray.GetValue())
+                       c.SetAt(destRow, out)
+                       return
+               }
+               col.MarkNullAt(destRow)
+       default:
+               col.MarkNullAt(destRow)
+       }
+}
+
 // appendTagValueAt projects a pre-decoded *modelv1.TagValue onto col,
 // dispatching on def.Type.
 func appendTagValueAt(col vectorized.Column, def vectorized.ColumnDef, v 
*modelv1.TagValue) {
diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go
index d6cf3bc0f..448a64695 100644
--- a/banyand/measure/query_batch.go
+++ b/banyand/measure/query_batch.go
@@ -25,7 +25,10 @@ import (
 
        "github.com/pkg/errors"
 
+       "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
        vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
 )
 
@@ -81,17 +84,99 @@ func (qr *queryResult) PullBatch(_ context.Context) 
(*model.MeasureBatch, error)
                releaseBlockCursor(bc)
                return b, nil
        }
-       // Multi-block: heap-merge via Pull's existing path, then convert.
-       // The protobuf intermediate is transient here — eliminating it
-       // requires a batch-aware merge variant deferred to a follow-on.
-       r := qr.merge(qr.storedIndexValue, qr.tagProjection)
-       if r == nil {
+       // Multi-block: TopN aggregation requires mergeTopNResult which rewrites
+       // field binary payloads via the row path's PostProcessor. The 
vectorized
+       // batch path defers TopN merge support; fall back to the row path so
+       // correctness is preserved. Non-topN queries take the native batch 
merge.
+       if qr.topNQueryOptions != nil {
+               r := qr.merge(qr.storedIndexValue, qr.tagProjection)
+               if r == nil {
+                       return nil, nil
+               }
+               if r.Error != nil {
+                       return nil, r.Error
+               }
+               return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema)
+       }
+       return qr.mergeBatch(qr.storedIndexValue, qr.batchSchema)
+}
+
+// mergeBatch is the batch-aware counterpart of queryResult.merge (query.go).
+// It consumes the heap with the same semantics — version-replacement for
+// duplicate timestamps within the same series, series-boundary stop — but
+// writes directly into a *model.MeasureBatch via the typed column helpers in
+// block_batch.go, bypassing the *modelv1.TagValue / *modelv1.FieldValue row
+// intermediate that the original merge → BuildMeasureBatchFromResult path
+// allocates.
+//
+// TopN aggregation (mergeTopNResult) is NOT implemented here; callers must
+// check topNQueryOptions and fall back to the row path before calling
+// mergeBatch. See the PullBatch dispatch above.
+//
+// Batch sizing: mergeBatch emits at most mergeBatchMaxRows rows per call so
+// that large multi-series results do not exceed a reasonable working-set.
+// The batchsource (BatchSourceFromBatchResult) slices the returned batch
+// further to its configured batchSize, so any value here that is at least as
+// large as the caller's batchSize is fine. 4096 is chosen conservatively.
+const mergeBatchMaxRows = 4096
+
+func (qr *queryResult) mergeBatch(
+       storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue,
+       schema *vectorized.BatchSchema,
+) (*model.MeasureBatch, error) {
+       if qr.Len() == 0 {
                return nil, nil
        }
-       if r.Error != nil {
-               return nil, r.Error
+       step := 1
+       if qr.orderByTimestampDesc() {
+               step = -1
+       }
+
+       b := newMeasureBatchForSchema(schema, mergeBatchMaxRows)
+       var lastVersion int64
+       var lastSid common.SeriesID
+
+       for qr.Len() > 0 && b.RowCount() < mergeBatchMaxRows {
+               topBC := qr.data[0]
+               // Series boundary: stop and let the caller call again for the 
next series.
+               if lastSid != 0 && topBC.bm.seriesID != lastSid {
+                       break
+               }
+               lastSid = topBC.bm.seriesID
+
+               if b.RowCount() > 0 &&
+                       topBC.timestamps[topBC.idx] == 
b.Timestamps[len(b.Timestamps)-1] {
+                       // Duplicate timestamp within the same series: keep the 
higher version.
+                       if topBC.versions[topBC.idx] > lastVersion {
+                               topBC.replaceInBatch(b, schema, 
storedIndexValue)
+                               lastVersion = topBC.versions[topBC.idx]
+                       }
+               } else {
+                       topBC.copyToBatch(b, schema, storedIndexValue)
+                       lastVersion = topBC.versions[topBC.idx]
+               }
+
+               topBC.idx += step
+
+               if qr.orderByTimestampDesc() {
+                       if topBC.idx < 0 {
+                               heap.Pop(qr)
+                       } else {
+                               heap.Fix(qr, 0)
+                       }
+               } else {
+                       if topBC.idx >= len(topBC.timestamps) {
+                               heap.Pop(qr)
+                       } else {
+                               heap.Fix(qr, 0)
+                       }
+               }
+       }
+
+       if b.RowCount() == 0 {
+               return nil, nil
        }
-       return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema)
+       return b, nil
 }
 
 // loadCursorsForBatch is the lazy load step replicated from Pull(). It is
diff --git a/pkg/query/vectorized/column.go b/pkg/query/vectorized/column.go
index b1b6b5476..797b07e3f 100644
--- a/pkg/query/vectorized/column.go
+++ b/pkg/query/vectorized/column.go
@@ -99,6 +99,16 @@ func (v *validityBitmap) MarkNull(i int) {
        v.bits[word] |= 1 << uint(i%64)
 }
 
+// ClearNull clears bit i (marks row i as valid). No-op if i is beyond
+// the current bitmap length (meaning it was never marked null).
+func (v *validityBitmap) ClearNull(i int) {
+       word := i / 64
+       if word >= len(v.bits) {
+               return
+       }
+       v.bits[word] &^= 1 << uint(i%64)
+}
+
 // Reset clears every null mark but keeps the underlying slice for reuse.
 func (v *validityBitmap) Reset() {
        for i := range v.bits {
diff --git a/pkg/query/vectorized/typed_column.go 
b/pkg/query/vectorized/typed_column.go
index 6d8ee697a..616eeca3d 100644
--- a/pkg/query/vectorized/typed_column.go
+++ b/pkg/query/vectorized/typed_column.go
@@ -57,6 +57,16 @@ func (c *TypedColumn[T]) MarkNullAt(i int) {
        c.validity.MarkNull(i)
 }
 
+// SetAt overwrites the value at index i without changing length or validity.
+// The validity bit at i is cleared (row becomes valid). Panics if i is out
+// of range — matches the same contract as a direct slice write.
+func (c *TypedColumn[T]) SetAt(i int, v T) {
+       c.data[i] = v
+       if c.validity.IsNull(i) {
+               c.validity.ClearNull(i)
+       }
+}
+
 // Reset clears length and validity. Capacity is retained.
 func (c *TypedColumn[T]) Reset() {
        c.data = c.data[:0]

Reply via email to