This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 40fcf37b7a81d60b4a73ff837f658ca11edbe63e Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 14 04:39:52 2026 +0000 feat(query/vectorized/measure): BatchAggregation row-path parity for MEAN type and non-key projected tags (G8d.3) Two egress gaps closed so vec dispatch can serve every GroupBy+Agg shape the row path serves: - MEAN/SUM/MIN/MAX now preserve input type (Map[int64] for FIELD_TYPE_INT, Map[float64] for FIELD_TYPE_FLOAT) and the agg output column inherits the input type. Matches measure_plan_aggregation.go's int/float dispatch, so vec emits the same FieldValue oneof variant the row path does (was: MEAN always produced float64, breaking proto.Equal parity). - BatchAggregation carries every projected tag column forward, not just GroupBy keys. Non-key projected tags get the first-seen value per group, matching the row path's mapAccumulator. tagIndices is derived from the input schema (RoleTag columns in projection order) so callers don't have to thread it through; legacy callers passing keyIndices == schema tags see identical output. copyOneValue gains passthrough TagValue/FieldValue cases because non-key tags arrive as *modelv1.TagValue columns (BuildBatchSchema only emits native columns for GroupBy keys + Agg field). via [HAPI](https://hapi.run) --- pkg/query/vectorized/measure/aggregation.go | 90 ++++++++++++++++++++--------- pkg/query/vectorized/measure/groupby.go | 11 +++- 2 files changed, 72 insertions(+), 29 deletions(-) diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go index 240755d3b..c3cbba065 100644 --- a/pkg/query/vectorized/measure/aggregation.go +++ b/pkg/query/vectorized/measure/aggregation.go @@ -73,8 +73,15 @@ type AggSpec struct { // the row-based path uses. This keeps numeric semantics in lockstep across the // two paths so a fix in one path is shared by the other. // -// Output schema = key columns (same definitions as input) + one column per AggSpec. -// Output rows are emitted one per group, in group-insertion order, paginated by batchSize. +// Output schema = all projected tag columns (in input schema order — keys +// AND non-key tags) followed by one column per AggSpec. Non-key tags are +// carried forward as the first-seen value per group, matching the row +// path's aggregator (pkg/query/logical/measure/measure_plan_aggregation.go), +// which preserves any tag the request projected even when it is not a +// GroupBy key. +// +// Output rows are emitted one per group, in group-insertion order, +// paginated by batchSize. type BatchAggregation struct { inputSchema *vectorized.BatchSchema outputSchema *vectorized.BatchSchema @@ -83,6 +90,7 @@ type BatchAggregation struct { groups map[string]*aggGroup insertion []*aggGroup keyIndices []int + tagIndices []int aggs []AggSpec mode AggMode entrySize int64 @@ -92,10 +100,13 @@ type BatchAggregation struct { closed bool } -// aggGroup carries one bucket's reduction state plus a copy of its key column values. +// aggGroup carries one bucket's reduction state plus a copy of every +// projected tag column for this group. tagCols is indexed by position in +// BatchAggregation.tagIndices (NOT just the GroupBy keys), so non-key +// projected tags can be emitted as their first-seen value. type aggGroup struct { key string - keyCols []vectorized.Column + tagCols []vectorized.Column slots []aggSlot } @@ -126,13 +137,15 @@ func NewBatchAggregation( aggs []AggSpec, mode AggMode, batchSize int, tracker *vectorized.MemoryTracker, entrySize int64, ) *BatchAggregation { - outputSchema := buildAggOutputSchema(input, keyIndices, aggs) + tagIndices := collectTagIndices(input, keyIndices) + outputSchema := buildAggOutputSchema(input, tagIndices, aggs) return &BatchAggregation{ inputSchema: input, outputSchema: outputSchema, pool: vectorized.NewBatchPool(outputSchema, batchSize), tracker: tracker, keyIndices: slices.Clone(keyIndices), + tagIndices: tagIndices, aggs: slices.Clone(aggs), mode: mode, batchSize: batchSize, @@ -140,6 +153,25 @@ func NewBatchAggregation( } } +// collectTagIndices returns every tag column index in input, in input +// schema order. When the schema has no RoleTag columns at all (synthetic +// unit-test fixtures that pre-date the storage bridge), fall back to +// keyIndices so the operator still produces the keys-only output those +// tests expect. Production paths always have RoleTag columns because +// BuildBatchSchema emits one per projected tag. +func collectTagIndices(input *vectorized.BatchSchema, keyIndices []int) []int { + out := make([]int, 0, len(input.Columns)) + for i, def := range input.Columns { + if def.Role == vectorized.RoleTag { + out = append(out, i) + } + } + if len(out) == 0 { + return slices.Clone(keyIndices) + } + return out +} + // Init prepares the group map. It does NOT validate the mode — mode rejection // happens at the per-method level (Consume/Finalize/NextBatch) so the // dispatcher matches the spec's distributed forward-compat language. @@ -236,10 +268,10 @@ func (a *BatchAggregation) Close() error { } func (a *BatchAggregation) newGroup(b *vectorized.RecordBatch, rowIdx int, key string) (*aggGroup, error) { - keyCols := make([]vectorized.Column, len(a.keyIndices)) - for i, kIdx := range a.keyIndices { - keyCols[i] = vectorized.NewColumnForType(a.inputSchema.Columns[kIdx].Type, 1) - copyOneValue(keyCols[i], b.Columns[kIdx], rowIdx) + tagCols := make([]vectorized.Column, len(a.tagIndices)) + for i, tIdx := range a.tagIndices { + tagCols[i] = vectorized.NewColumnForType(a.inputSchema.Columns[tIdx].Type, 1) + copyOneValue(tagCols[i], b.Columns[tIdx], rowIdx) } slots := make([]aggSlot, len(a.aggs)) for i, spec := range a.aggs { @@ -250,7 +282,7 @@ func (a *BatchAggregation) newGroup(b *vectorized.RecordBatch, rowIdx int, key s } slots[i] = slot } - return &aggGroup{key: key, keyCols: keyCols, slots: slots}, nil + return &aggGroup{key: key, tagCols: tagCols, slots: slots}, nil } // fold delegates one row's value to the slot's underlying aggregation.Map. @@ -280,13 +312,15 @@ func (a *BatchAggregation) fold(b *vectorized.RecordBatch, rowIdx int, slot *agg } func (a *BatchAggregation) emitGroupRow(out *vectorized.RecordBatch, group *aggGroup) { - // Key columns come first in the output schema, in keyIndices order. - for i := range a.keyIndices { - copyOneValue(out.Columns[i], group.keyCols[i], 0) + // All projected tag columns come first in the output schema, in + // tagIndices order — including non-key tags carried forward as the + // first-seen value. + for i := range a.tagIndices { + copyOneValue(out.Columns[i], group.tagCols[i], 0) } // Then agg output columns. for slotIdx := range a.aggs { - colIdx := len(a.keyIndices) + slotIdx + colIdx := len(a.tagIndices) + slotIdx group.slots[slotIdx].write(out.Columns[colIdx]) } } @@ -315,9 +349,12 @@ func newAggSlot(fn AggFunc, inputIsFloat bool) (aggSlot, error) { switch fn { case AggCount: useFloat = false - case AggMean: - useFloat = true default: + // MEAN follows input type to match the row path's + // aggregation.NewMap[int64] / [float64] dispatch in + // pkg/query/logical/measure/measure_plan_aggregation.go + // (FIELD_TYPE_INT → int64 truncation; FIELD_TYPE_FLOAT → + // float64). Same handling for SUM/MIN/MAX. useFloat = inputIsFloat } if useFloat { @@ -363,11 +400,11 @@ func toModelAggFunc(fn AggFunc) (modelv1.AggregationFunction, error) { } func buildAggOutputSchema( - input *vectorized.BatchSchema, keyIndices []int, aggs []AggSpec, + input *vectorized.BatchSchema, tagIndices []int, aggs []AggSpec, ) *vectorized.BatchSchema { - defs := make([]vectorized.ColumnDef, 0, len(keyIndices)+len(aggs)) - for _, ki := range keyIndices { - defs = append(defs, input.Columns[ki]) + defs := make([]vectorized.ColumnDef, 0, len(tagIndices)+len(aggs)) + for _, ti := range tagIndices { + defs = append(defs, input.Columns[ti]) } for _, agg := range aggs { defs = append(defs, vectorized.ColumnDef{ @@ -381,16 +418,13 @@ func buildAggOutputSchema( // aggOutputType maps (input type, agg func) to the output column type. // - COUNT is always int64. -// - MEAN is always float64. -// - SUM/MIN/MAX preserve the input type. +// - MEAN, SUM, MIN, MAX preserve the input type so vec egress emits the +// same FieldValue oneof variant the row path uses (its accumulator is +// dispatched on FIELD_TYPE_INT → Map[int64] / FIELD_TYPE_FLOAT → +// Map[float64]; see measure_plan_aggregation.go). func aggOutputType(in vectorized.ColumnType, fn AggFunc) vectorized.ColumnType { - switch fn { - case AggCount: + if fn == AggCount { return vectorized.ColumnTypeInt64 - case AggMean: - return vectorized.ColumnTypeFloat64 - case AggSum, AggMin, AggMax: - return in } return in } diff --git a/pkg/query/vectorized/measure/groupby.go b/pkg/query/vectorized/measure/groupby.go index 37441d232..9a8101131 100644 --- a/pkg/query/vectorized/measure/groupby.go +++ b/pkg/query/vectorized/measure/groupby.go @@ -26,6 +26,7 @@ import ( "math" "slices" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" ) @@ -268,7 +269,11 @@ func newGroupBucket(schema *vectorized.BatchSchema) *groupBucket { return &groupBucket{cols: cols} } -// copyOneValue appends src[rowIdx] to dst, preserving null status. +// copyOneValue appends src[rowIdx] to dst, preserving null status. The +// passthrough TagValue / FieldValue cases let BatchAggregation carry +// non-key projected tag columns forward unchanged (the pointer is +// owned by the upstream MeasureBatch, which the pipeline keeps live for +// the duration of aggregation). func copyOneValue(dst, src vectorized.Column, rowIdx int) { if src.IsNull(rowIdx) { dst.AppendNull() @@ -287,5 +292,9 @@ func copyOneValue(dst, src vectorized.Column, rowIdx int) { dst.(*vectorized.TypedColumn[[]int64]).Append(s.Data()[rowIdx]) case *vectorized.TypedColumn[[]string]: dst.(*vectorized.TypedColumn[[]string]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.TagValue]: + dst.(*vectorized.TypedColumn[*modelv1.TagValue]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[*modelv1.FieldValue]: + dst.(*vectorized.TypedColumn[*modelv1.FieldValue]).Append(s.Data()[rowIdx]) } }
