This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 40fcf37b7a81d60b4a73ff837f658ca11edbe63e
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 14 04:39:52 2026 +0000

    feat(query/vectorized/measure): BatchAggregation row-path parity for MEAN 
type and non-key projected tags (G8d.3)
    
    Two egress gaps closed so vec dispatch can serve every GroupBy+Agg shape
    the row path serves:
    
    - MEAN/SUM/MIN/MAX now preserve input type (Map[int64] for FIELD_TYPE_INT,
      Map[float64] for FIELD_TYPE_FLOAT) and the agg output column inherits
      the input type. Matches measure_plan_aggregation.go's int/float
      dispatch, so vec emits the same FieldValue oneof variant the row path
      does (was: MEAN always produced float64, breaking proto.Equal parity).
    
    - BatchAggregation carries every projected tag column forward, not just
      GroupBy keys. Non-key projected tags get the first-seen value per
      group, matching the row path's mapAccumulator. tagIndices is derived
      from the input schema (RoleTag columns in projection order) so callers
      don't have to thread it through; legacy callers passing keyIndices ==
      schema tags see identical output.
    
      copyOneValue gains passthrough TagValue/FieldValue cases because
      non-key tags arrive as *modelv1.TagValue columns (BuildBatchSchema only
      emits native columns for GroupBy keys + Agg field).
    
    via [HAPI](https://hapi.run)
---
 pkg/query/vectorized/measure/aggregation.go | 90 ++++++++++++++++++++---------
 pkg/query/vectorized/measure/groupby.go     | 11 +++-
 2 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/pkg/query/vectorized/measure/aggregation.go 
b/pkg/query/vectorized/measure/aggregation.go
index 240755d3b..c3cbba065 100644
--- a/pkg/query/vectorized/measure/aggregation.go
+++ b/pkg/query/vectorized/measure/aggregation.go
@@ -73,8 +73,15 @@ type AggSpec struct {
 // the row-based path uses. This keeps numeric semantics in lockstep across the
 // two paths so a fix in one path is shared by the other.
 //
-// Output schema = key columns (same definitions as input) + one column per 
AggSpec.
-// Output rows are emitted one per group, in group-insertion order, paginated 
by batchSize.
+// Output schema = all projected tag columns (in input schema order — keys
+// AND non-key tags) followed by one column per AggSpec. Non-key tags are
+// carried forward as the first-seen value per group, matching the row
+// path's aggregator (pkg/query/logical/measure/measure_plan_aggregation.go),
+// which preserves any tag the request projected even when it is not a
+// GroupBy key.
+//
+// Output rows are emitted one per group, in group-insertion order,
+// paginated by batchSize.
 type BatchAggregation struct {
        inputSchema  *vectorized.BatchSchema
        outputSchema *vectorized.BatchSchema
@@ -83,6 +90,7 @@ type BatchAggregation struct {
        groups       map[string]*aggGroup
        insertion    []*aggGroup
        keyIndices   []int
+       tagIndices   []int
        aggs         []AggSpec
        mode         AggMode
        entrySize    int64
@@ -92,10 +100,13 @@ type BatchAggregation struct {
        closed       bool
 }
 
-// aggGroup carries one bucket's reduction state plus a copy of its key column 
values.
+// aggGroup carries one bucket's reduction state plus a copy of every
+// projected tag column for this group. tagCols is indexed by position in
+// BatchAggregation.tagIndices (NOT just the GroupBy keys), so non-key
+// projected tags can be emitted as their first-seen value.
 type aggGroup struct {
        key     string
-       keyCols []vectorized.Column
+       tagCols []vectorized.Column
        slots   []aggSlot
 }
 
@@ -126,13 +137,15 @@ func NewBatchAggregation(
        aggs []AggSpec, mode AggMode, batchSize int,
        tracker *vectorized.MemoryTracker, entrySize int64,
 ) *BatchAggregation {
-       outputSchema := buildAggOutputSchema(input, keyIndices, aggs)
+       tagIndices := collectTagIndices(input, keyIndices)
+       outputSchema := buildAggOutputSchema(input, tagIndices, aggs)
        return &BatchAggregation{
                inputSchema:  input,
                outputSchema: outputSchema,
                pool:         vectorized.NewBatchPool(outputSchema, batchSize),
                tracker:      tracker,
                keyIndices:   slices.Clone(keyIndices),
+               tagIndices:   tagIndices,
                aggs:         slices.Clone(aggs),
                mode:         mode,
                batchSize:    batchSize,
@@ -140,6 +153,25 @@ func NewBatchAggregation(
        }
 }
 
+// collectTagIndices returns every tag column index in input, in input
+// schema order. When the schema has no RoleTag columns at all (synthetic
+// unit-test fixtures that pre-date the storage bridge), fall back to
+// keyIndices so the operator still produces the keys-only output those
+// tests expect. Production paths always have RoleTag columns because
+// BuildBatchSchema emits one per projected tag.
+func collectTagIndices(input *vectorized.BatchSchema, keyIndices []int) []int {
+       out := make([]int, 0, len(input.Columns))
+       for i, def := range input.Columns {
+               if def.Role == vectorized.RoleTag {
+                       out = append(out, i)
+               }
+       }
+       if len(out) == 0 {
+               return slices.Clone(keyIndices)
+       }
+       return out
+}
+
 // Init prepares the group map. It does NOT validate the mode — mode rejection
 // happens at the per-method level (Consume/Finalize/NextBatch) so the
 // dispatcher matches the spec's distributed forward-compat language.
@@ -236,10 +268,10 @@ func (a *BatchAggregation) Close() error {
 }
 
 func (a *BatchAggregation) newGroup(b *vectorized.RecordBatch, rowIdx int, key 
string) (*aggGroup, error) {
-       keyCols := make([]vectorized.Column, len(a.keyIndices))
-       for i, kIdx := range a.keyIndices {
-               keyCols[i] = 
vectorized.NewColumnForType(a.inputSchema.Columns[kIdx].Type, 1)
-               copyOneValue(keyCols[i], b.Columns[kIdx], rowIdx)
+       tagCols := make([]vectorized.Column, len(a.tagIndices))
+       for i, tIdx := range a.tagIndices {
+               tagCols[i] = 
vectorized.NewColumnForType(a.inputSchema.Columns[tIdx].Type, 1)
+               copyOneValue(tagCols[i], b.Columns[tIdx], rowIdx)
        }
        slots := make([]aggSlot, len(a.aggs))
        for i, spec := range a.aggs {
@@ -250,7 +282,7 @@ func (a *BatchAggregation) newGroup(b 
*vectorized.RecordBatch, rowIdx int, key s
                }
                slots[i] = slot
        }
-       return &aggGroup{key: key, keyCols: keyCols, slots: slots}, nil
+       return &aggGroup{key: key, tagCols: tagCols, slots: slots}, nil
 }
 
 // fold delegates one row's value to the slot's underlying aggregation.Map.
@@ -280,13 +312,15 @@ func (a *BatchAggregation) fold(b 
*vectorized.RecordBatch, rowIdx int, slot *agg
 }
 
 func (a *BatchAggregation) emitGroupRow(out *vectorized.RecordBatch, group 
*aggGroup) {
-       // Key columns come first in the output schema, in keyIndices order.
-       for i := range a.keyIndices {
-               copyOneValue(out.Columns[i], group.keyCols[i], 0)
+       // All projected tag columns come first in the output schema, in
+       // tagIndices order — including non-key tags carried forward as the
+       // first-seen value.
+       for i := range a.tagIndices {
+               copyOneValue(out.Columns[i], group.tagCols[i], 0)
        }
        // Then agg output columns.
        for slotIdx := range a.aggs {
-               colIdx := len(a.keyIndices) + slotIdx
+               colIdx := len(a.tagIndices) + slotIdx
                group.slots[slotIdx].write(out.Columns[colIdx])
        }
 }
@@ -315,9 +349,12 @@ func newAggSlot(fn AggFunc, inputIsFloat bool) (aggSlot, 
error) {
        switch fn {
        case AggCount:
                useFloat = false
-       case AggMean:
-               useFloat = true
        default:
+               // MEAN follows input type to match the row path's
+               // aggregation.NewMap[int64] / [float64] dispatch in
+               // pkg/query/logical/measure/measure_plan_aggregation.go
+               // (FIELD_TYPE_INT → int64 truncation; FIELD_TYPE_FLOAT →
+               // float64). Same handling for SUM/MIN/MAX.
                useFloat = inputIsFloat
        }
        if useFloat {
@@ -363,11 +400,11 @@ func toModelAggFunc(fn AggFunc) 
(modelv1.AggregationFunction, error) {
 }
 
 func buildAggOutputSchema(
-       input *vectorized.BatchSchema, keyIndices []int, aggs []AggSpec,
+       input *vectorized.BatchSchema, tagIndices []int, aggs []AggSpec,
 ) *vectorized.BatchSchema {
-       defs := make([]vectorized.ColumnDef, 0, len(keyIndices)+len(aggs))
-       for _, ki := range keyIndices {
-               defs = append(defs, input.Columns[ki])
+       defs := make([]vectorized.ColumnDef, 0, len(tagIndices)+len(aggs))
+       for _, ti := range tagIndices {
+               defs = append(defs, input.Columns[ti])
        }
        for _, agg := range aggs {
                defs = append(defs, vectorized.ColumnDef{
@@ -381,16 +418,13 @@ func buildAggOutputSchema(
 
 // aggOutputType maps (input type, agg func) to the output column type.
 //   - COUNT is always int64.
-//   - MEAN is always float64.
-//   - SUM/MIN/MAX preserve the input type.
+//   - MEAN, SUM, MIN, MAX preserve the input type so vec egress emits the
+//     same FieldValue oneof variant the row path uses (its accumulator is
+//     dispatched on FIELD_TYPE_INT → Map[int64] / FIELD_TYPE_FLOAT →
+//     Map[float64]; see measure_plan_aggregation.go).
 func aggOutputType(in vectorized.ColumnType, fn AggFunc) vectorized.ColumnType 
{
-       switch fn {
-       case AggCount:
+       if fn == AggCount {
                return vectorized.ColumnTypeInt64
-       case AggMean:
-               return vectorized.ColumnTypeFloat64
-       case AggSum, AggMin, AggMax:
-               return in
        }
        return in
 }
diff --git a/pkg/query/vectorized/measure/groupby.go 
b/pkg/query/vectorized/measure/groupby.go
index 37441d232..9a8101131 100644
--- a/pkg/query/vectorized/measure/groupby.go
+++ b/pkg/query/vectorized/measure/groupby.go
@@ -26,6 +26,7 @@ import (
        "math"
        "slices"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
 )
 
@@ -268,7 +269,11 @@ func newGroupBucket(schema *vectorized.BatchSchema) 
*groupBucket {
        return &groupBucket{cols: cols}
 }
 
-// copyOneValue appends src[rowIdx] to dst, preserving null status.
+// copyOneValue appends src[rowIdx] to dst, preserving null status. The
+// passthrough TagValue / FieldValue cases let BatchAggregation carry
+// non-key projected tag columns forward unchanged (the pointer is
+// owned by the upstream MeasureBatch, which the pipeline keeps live for
+// the duration of aggregation).
 func copyOneValue(dst, src vectorized.Column, rowIdx int) {
        if src.IsNull(rowIdx) {
                dst.AppendNull()
@@ -287,5 +292,9 @@ func copyOneValue(dst, src vectorized.Column, rowIdx int) {
                dst.(*vectorized.TypedColumn[[]int64]).Append(s.Data()[rowIdx])
        case *vectorized.TypedColumn[[]string]:
                dst.(*vectorized.TypedColumn[[]string]).Append(s.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.TagValue]:
+               
dst.(*vectorized.TypedColumn[*modelv1.TagValue]).Append(s.Data()[rowIdx])
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               
dst.(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(s.Data()[rowIdx])
        }
 }

Reply via email to