This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cadfec0bf5ef48169fc491cae309d2aaeb96976a Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 15 10:08:28 2026 +0000 feat(query/vectorized/measure/plan): G9b GroupBy/Agg completeness (scalar reduce, raw GroupBy, projection auto-coverage) --- pkg/query/vectorized/measure/groupby.go | 33 +++++- pkg/query/vectorized/measure/plan.go | 46 +++++--- pkg/query/vectorized/measure/plan/analyzer.go | 129 ++++++++++++++++----- pkg/query/vectorized/measure/plan/analyzer_test.go | 49 ++++++-- pkg/query/vectorized/measure/plan/dispatch.go | 90 ++------------ pkg/query/vectorized/measure/plan/executor_test.go | 115 ++++++++++++++++++ pkg/query/vectorized/measure/plan/groupby_agg.go | 39 ++++--- pkg/query/vectorized/measure/plan_test.go | 44 +++++-- 8 files changed, 383 insertions(+), 162 deletions(-) diff --git a/pkg/query/vectorized/measure/groupby.go b/pkg/query/vectorized/measure/groupby.go index 9a8101131..7904da051 100644 --- a/pkg/query/vectorized/measure/groupby.go +++ b/pkg/query/vectorized/measure/groupby.go @@ -35,6 +35,13 @@ import ( // emitted in group-insertion order, with all rows of the first group flushed // before the next group begins. // +// When firstOnly is set the operator keeps only the first-seen row of each +// group and drops subsequent rows. This reproduces the row path's raw +// GroupBy egress: the row aggregator wraps every group in a single +// InternalDataPoint slice and banyand/query/processor.go reads only +// current[0] per Next, so a raw GroupBy surfaces exactly one row (the +// first-seen) per group in group-insertion order. +// // Memory accounting follows a pessimistic-reserve, refund-unused pattern: // // - entrySize is the per-new-group bucket overhead. @@ -58,6 +65,7 @@ type BatchGroupBy struct { cursorBucket int cursorRow int closed bool + firstOnly bool } type groupBucket struct { @@ -88,6 +96,22 @@ func NewBatchGroupBy( } } +// NewBatchGroupByFirst constructs a BatchGroupBy that emits only the +// first-seen row of each group (raw GroupBy without aggregation). It +// matches the row path: groupBy.Execute produces a groupIterator whose +// Current() returns the whole group, and processor.go keeps only +// current[0], so each group surfaces a single row in group-insertion +// order with the input schema unchanged. +func NewBatchGroupByFirst( + schema *vectorized.BatchSchema, keyIndices []int, + pool *vectorized.BatchPool, batchSize int, + tracker *vectorized.MemoryTracker, entrySize int64, +) *BatchGroupBy { + g := NewBatchGroupBy(schema, keyIndices, pool, batchSize, tracker, entrySize, 0) + g.firstOnly = true + return g +} + // Init prepares the group map. func (g *BatchGroupBy) Init(_ context.Context) error { g.groups = make(map[uint64][]*groupBucket) @@ -120,16 +144,23 @@ func (g *BatchGroupBy) Consume(_ context.Context, b *vectorized.RecordBatch) err func (g *BatchGroupBy) consumeRows(b *vectorized.RecordBatch, active []uint16) int64 { var newGroups int64 + var copiedRows int64 for _, rowIdx := range active { bucket, isNew := g.findOrCreate(b, int(rowIdx)) if isNew { newGroups++ } + if g.firstOnly && !isNew { + // Raw GroupBy keeps only the first-seen row per group; + // drop every later row in the same group. + continue + } for colIdx, srcCol := range b.Columns { copyOneValue(bucket.cols[colIdx], srcCol, int(rowIdx)) } + copiedRows++ } - return newGroups*g.entrySize + int64(len(active))*g.rowSize + return newGroups*g.entrySize + copiedRows*g.rowSize } // findOrCreate locates the bucket for the row's key, creating it if absent. diff --git a/pkg/query/vectorized/measure/plan.go b/pkg/query/vectorized/measure/plan.go index 747b96079..38235b1e8 100644 --- a/pkg/query/vectorized/measure/plan.go +++ b/pkg/query/vectorized/measure/plan.go @@ -33,15 +33,18 @@ const aggEntrySize int64 = 512 // into a list of BreakerOperators that, chained after the scan source, form // the vectorized aggregation pipeline. // -// Routing rules (v1): +// Routing rules: // - GroupBy + Agg both set → emit a single BatchAggregation. The operator's // own keyIndex map produces per-group buckets and folds the agg slot; // a separate BatchGroupBy would double the row materialization. -// - GroupBy set without Agg → unsupported. The proto field is paired with -// `agg` for the existing TopN path; without Agg the result semantic is -// ambiguous (selection? deduplication?). Return an error. -// - Agg set without GroupBy → unsupported. Scalar reduce → single output -// row is deferred to a future increment. +// - Agg set without GroupBy → scalar reduce: a BatchAggregation with no +// key columns. Every row maps to one group, so a single output row is +// emitted carrying the first-seen projected tags plus the agg result, +// matching the row path's aggAllIterator. +// - GroupBy set without Agg → raw GroupBy: a first-seen-row-per-group +// BatchGroupBy. The output preserves the input schema; one row per +// group is emitted in group-insertion order, matching the row path's +// groupIterator + processor.go's current[0] read. // - Neither set → empty operator list; caller emits raw rows. // // tracker is the per-pipeline MemoryTracker (G7a); it must be non-nil when @@ -57,12 +60,6 @@ func BuildOperators( if !hasGroupBy && !hasAgg { return nil, nil } - if hasGroupBy && !hasAgg { - return nil, fmt.Errorf("vectorized.measure: GroupBy without Agg is not supported in v1") - } - if hasAgg && !hasGroupBy { - return nil, fmt.Errorf("vectorized.measure: Agg without GroupBy (scalar reduce) is not supported in v1") - } if tracker == nil { return nil, fmt.Errorf("vectorized.measure: BuildOperators requires a non-nil shared MemoryTracker") } @@ -70,9 +67,22 @@ func BuildOperators( return nil, fmt.Errorf("vectorized.measure: batchSize must be > 0, got %d", batchSize) } - keyIndices, keyErr := lookupGroupByKeyIndices(schema, opts.GroupBy) - if keyErr != nil { - return nil, keyErr + var keyIndices []int + if hasGroupBy { + var keyErr error + keyIndices, keyErr = lookupGroupByKeyIndices(schema, opts.GroupBy) + if keyErr != nil { + return nil, keyErr + } + } + + if !hasAgg { + // Raw GroupBy: emit the first-seen row of each group with the + // input schema unchanged. entrySize accounts for the per-group + // bucket; rowSize is zero because at most one row is retained. + pool := vectorized.NewBatchPool(schema, batchSize) + gb := NewBatchGroupByFirst(schema, keyIndices, pool, batchSize, tracker, aggEntrySize) + return []vectorized.BreakerOperator{gb}, nil } fieldIdx, fieldErr := lookupFieldColumnIndex(schema, opts.Agg.FieldName) @@ -93,6 +103,12 @@ func BuildOperators( // like "value_sum" — the suffix would break proto.Equal parity in the // integration suite. The aggregation function lives on the operator // spec, not in the column name. + // + // keyIndices is empty for scalar reduce (Agg without GroupBy): + // BatchAggregation.computeKey then returns the same key for every + // row, so all rows collapse into a single output row carrying the + // first-seen projected tags plus the agg result — the columnar + // equivalent of the row path's aggAllIterator. spec := AggSpec{ Func: aggFn, InputCol: fieldIdx, diff --git a/pkg/query/vectorized/measure/plan/analyzer.go b/pkg/query/vectorized/measure/plan/analyzer.go index 620c9dcc5..6eee444fb 100644 --- a/pkg/query/vectorized/measure/plan/analyzer.go +++ b/pkg/query/vectorized/measure/plan/analyzer.go @@ -48,8 +48,11 @@ const defaultLimit uint32 = 100 // - tag/field projection naming columns not in the schema // - GroupBy referencing a tag absent from the schema // - Agg referencing a field absent from the schema -// - GroupBy set without Agg (raw groupby not supported in v1) -// - Agg set without GroupBy (scalar reduce not supported in v1) +// +// GroupBy and Agg may travel together (group + aggregate), or either +// alone: Agg without GroupBy is a scalar reduce (single output row); +// GroupBy without Agg is a raw GroupBy (first-seen row per group). Both +// mirror the row path (measure_plan_aggregation.go / measure_plan_groupby.go). func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (VecPlan, error) { if req == nil { return nil, fmt.Errorf("plan.Analyze: nil request") @@ -72,13 +75,30 @@ func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (Ve hasAgg := req.GetAgg() != nil var gbModel *model.MeasureGroupBy var aggModel *model.MeasureAgg - if hasGroupBy && hasAgg { - var translateErr error - gbModel, aggModel, translateErr = translateGroupByAgg(req, measureSchema) - if translateErr != nil { - return nil, translateErr + if hasGroupBy { + var gbErr error + gbModel, gbErr = translateGroupBy(req, measureSchema) + if gbErr != nil { + return nil, gbErr + } + } + if hasAgg { + var aggErr error + aggModel, aggErr = translateAgg(req, measureSchema) + if aggErr != nil { + return nil, aggErr } } + + // Projection auto-coverage: BatchAggregation / BatchGroupBy locate + // their key and value columns by name inside the BatchSchema, which + // BuildBatchSchema derives from TagProjection + FieldProjection. + // Extend the projection implicitly so the GroupBy keys and the Agg + // field always materialize a column, instead of falling through to + // the row path when the request omitted them from its projection. + tagProjection = ensureGroupByProjected(tagProjection, gbModel) + fieldProjection = ensureAggFieldProjected(fieldProjection, aggModel) + opts := model.MeasureQueryOptions{ TagProjection: tagProjection, FieldProjection: fieldProjection, @@ -105,17 +125,15 @@ func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (Ve Agg: aggModel, }) - switch { - case hasGroupBy && hasAgg: + if hasGroupBy || hasAgg { + // One BatchAggregation / BatchGroupBy node covers all three + // shapes (group+agg, scalar reduce, raw groupby); BuildOperators + // routes on which of gbModel/aggModel is set. gba, gbaErr := NewGroupByAgg(plan, gbModel, aggModel) if gbaErr != nil { return nil, gbaErr } plan = gba - case hasGroupBy: - return nil, fmt.Errorf("plan.Analyze: GroupBy without Agg is not supported in v1") - case hasAgg: - return nil, fmt.Errorf("plan.Analyze: Agg without GroupBy (scalar reduce) is not supported in v1") } if t := req.GetTop(); t != nil { @@ -150,44 +168,101 @@ func buildTagProjection(req *measurev1.QueryRequest) []model.TagProjection { return out } -// translateGroupByAgg builds the model GroupBy/Agg structs from the proto, +// translateGroupBy builds the model GroupBy struct from the proto, // validating that: // - the GroupBy tag_projection names exactly one family with non-empty tags // (v1 single-family limitation) // - the named GroupBy tags exist in the Measure schema -// - the Agg field exists in the Measure schema -func translateGroupByAgg(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) ( - *model.MeasureGroupBy, *model.MeasureAgg, error, -) { - gbProto := req.GetGroupBy() - families := gbProto.GetTagProjection().GetTagFamilies() +func translateGroupBy(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (*model.MeasureGroupBy, error) { + families := req.GetGroupBy().GetTagProjection().GetTagFamilies() if len(families) == 0 { - return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection must list at least one tag family") + return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection must list at least one tag family") } if len(families) > 1 { - return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection v1 supports a single tag family, got %d", len(families)) + return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection v1 supports a single tag family, got %d", len(families)) } family := families[0] if len(family.GetTags()) == 0 { - return nil, nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection family %q has no tags", family.GetName()) + return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection family %q has no tags", family.GetName()) } gb := &model.MeasureGroupBy{ TagFamily: family.GetName(), TagNames: append([]string(nil), family.GetTags()...), } if validateErr := validateGroupByTags(measureSchema, gb); validateErr != nil { - return nil, nil, validateErr + return nil, validateErr } + return gb, nil +} +// translateAgg builds the model Agg struct from the proto, validating +// that the Agg field exists in the Measure schema. +func translateAgg(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (*model.MeasureAgg, error) { aggProto := req.GetAgg() if validateErr := validateAggField(measureSchema, aggProto.GetFieldName()); validateErr != nil { - return nil, nil, validateErr + return nil, validateErr } - agg := &model.MeasureAgg{ + return &model.MeasureAgg{ FieldName: aggProto.GetFieldName(), Func: aggProto.GetFunction(), + }, nil +} + +// ensureGroupByProjected returns a TagProjection slice guaranteed to +// include every GroupBy key tag. When the request already projects them +// the input slice is returned unchanged; otherwise the missing key tags +// are appended to (or create) the GroupBy family. Mirrors the row path, +// whose GroupBy resolves its key tag refs against the schema regardless +// of the request projection. +func ensureGroupByProjected(tp []model.TagProjection, gb *model.MeasureGroupBy) []model.TagProjection { + if gb == nil || gb.TagFamily == "" || len(gb.TagNames) == 0 { + return tp + } + out := append([]model.TagProjection(nil), tp...) + familyIdx := -1 + for i := range out { + if out[i].Family == gb.TagFamily { + familyIdx = i + break + } + } + if familyIdx < 0 { + out = append(out, model.TagProjection{ + Family: gb.TagFamily, + Names: append([]string(nil), gb.TagNames...), + }) + return out + } + present := make(map[string]struct{}, len(out[familyIdx].Names)) + for _, n := range out[familyIdx].Names { + present[n] = struct{}{} + } + names := append([]string(nil), out[familyIdx].Names...) + for _, n := range gb.TagNames { + if _, ok := present[n]; !ok { + names = append(names, n) + } + } + out[familyIdx].Names = names + return out +} + +// ensureAggFieldProjected returns a FieldProjection slice guaranteed to +// include the Agg field. When already present the input is returned +// unchanged; otherwise the field is appended. Mirrors the row path, +// whose aggregation resolves its field ref against the schema regardless +// of the request projection. +func ensureAggFieldProjected(fp []string, agg *model.MeasureAgg) []string { + if agg == nil || agg.FieldName == "" { + return fp + } + for _, n := range fp { + if n == agg.FieldName { + return fp + } } - return gb, agg, nil + out := append([]string(nil), fp...) + return append(out, agg.FieldName) } // validateGroupByTags ensures every name in gb.TagNames exists within the diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go b/pkg/query/vectorized/measure/plan/analyzer_test.go index 1605a3399..f7041dd87 100644 --- a/pkg/query/vectorized/measure/plan/analyzer_test.go +++ b/pkg/query/vectorized/measure/plan/analyzer_test.go @@ -144,7 +144,9 @@ func TestAnalyze_TopBetweenGroupByAggAndLimit(t *testing.T) { } } -func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) { +// TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy verifies the raw +// GroupBy shape: a GroupByAgg node (Agg nil) below Limit. +func TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", TagProjection: projTagProj(), @@ -154,16 +156,29 @@ func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) { FieldName: "value", }, } - _, err := Analyze(req, testMeasureSchema()) - if err == nil { - t.Fatal("GroupBy without Agg must error") + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("GroupBy without Agg (raw groupby) must not error: %v", err) + } + limit, ok := p.(*Limit) + if !ok { + t.Fatalf("root should be *Limit, got %T", p) } - if !strings.Contains(err.Error(), "Agg") { - t.Fatalf("error should mention Agg, got %v", err) + gba, ok := limit.Child.(*GroupByAgg) + if !ok { + t.Fatalf("Limit child should be *GroupByAgg, got %T", limit.Child) + } + if gba.Agg != nil { + t.Fatalf("raw GroupBy must have nil Agg, got %+v", gba.Agg) + } + if gba.GroupBy == nil || gba.GroupBy.TagNames[0] != "svc" { + t.Fatalf("GroupBy.TagNames: want [svc], got %+v", gba.GroupBy) } } -func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) { +// TestAnalyze_AggWithoutGroupBy_BuildsScalarReduce verifies the scalar +// reduce shape: a GroupByAgg node (GroupBy nil) below Limit. +func TestAnalyze_AggWithoutGroupBy_BuildsScalarReduce(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", TagProjection: projTagProj(), @@ -173,9 +188,23 @@ func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) { FieldName: "value", }, } - _, err := Analyze(req, testMeasureSchema()) - if err == nil { - t.Fatal("Agg without GroupBy must error (scalar reduce not supported)") + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Agg without GroupBy (scalar reduce) must not error: %v", err) + } + limit, ok := p.(*Limit) + if !ok { + t.Fatalf("root should be *Limit, got %T", p) + } + gba, ok := limit.Child.(*GroupByAgg) + if !ok { + t.Fatalf("Limit child should be *GroupByAgg, got %T", limit.Child) + } + if gba.GroupBy != nil { + t.Fatalf("scalar reduce must have nil GroupBy, got %+v", gba.GroupBy) + } + if gba.Agg == nil || gba.Agg.FieldName != "value" { + t.Fatalf("Agg.FieldName: want value, got %+v", gba.Agg) } } diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index 8a2a7b573..bafceb089 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -88,8 +88,9 @@ func FellThroughCount() int64 { return fellThroughCount.Load() } // // Eligibility gate (v1): // - cfg.Enabled must be true -// - request may carry GroupBy+Agg as a pair (scalar reduce + raw -// GroupBy are deferred); aggProjectionCoverage must also hold +// - request may carry GroupBy and/or Agg in any combination (group+agg, +// scalar reduce, raw GroupBy); plan.Analyze auto-extends the +// projection so the keys / agg field always resolve // - request must NOT carry Top (BatchTop's single-heap semantic differs // from the row path's per-timestamp top-N) // - request must carry TimeRange (storage requires a bounded window) @@ -131,24 +132,11 @@ func Dispatch( // today. return nil, "", false, nil } - hasGroupBy := req.GetGroupBy() != nil - hasAgg := req.GetAgg() != nil - if hasGroupBy || hasAgg { - // GroupBy and Agg must travel as a pair (scalar reduce + raw - // groupby are deferred). Either alone falls through. - if hasGroupBy != hasAgg { - return nil, "", false, nil - } - // Projection coverage: BatchAggregation locates its key + value - // columns by name inside the BatchSchema, which is built from - // TagProjection + FieldProjection. Missing coverage means the - // operator would fail at construction; fall through so the row - // path can extend projection implicitly or surface its - // canonical error. - if !aggProjectionCoverage(req) { - return nil, "", false, nil - } - } + // GroupBy and Agg are handled by the vec subsystem in all three + // shapes — group+agg, scalar reduce (Agg only), raw GroupBy (GroupBy + // only). plan.Analyze auto-extends the projection so the GroupBy keys + // and Agg field always materialize a column, so there is no + // projection-coverage gate here anymore (G9b). // G9c #9: a nil TimeRange is NOT a fall-through. The row path does not // reject it — parseFields feeds criteria.GetTimeRange().GetBegin().AsTime() // into the index scan, and a nil *timestamppb.Timestamp resolves to the @@ -409,65 +397,3 @@ func (emptyMIterator) Next() bool { return false } func (emptyMIterator) Current() []*measurev1.InternalDataPoint { return nil } func (emptyMIterator) Close() error { return nil } - -// aggProjectionCoverage reports whether the request's GroupBy keys and -// Agg field are all present in the request's projections. Required by -// the dispatch eligibility gate: the BatchAggregation operator locates -// its key + value columns by name inside the BatchSchema, and the -// BatchSchema is built from TagProjection + FieldProjection. Missing -// coverage means the operator would fail at construction; dispatch -// instead falls through so the row path can handle the request. -// -// v1 requires GroupBy.tag_projection to name a single family; that -// family must appear in req.TagProjection with every tag in GroupBy -// present. Agg.field_name must appear in req.FieldProjection. Non-key -// projected tags are allowed; BatchAggregation carries them forward as -// first-seen-per-group, matching the row path. -func aggProjectionCoverage(req *measurev1.QueryRequest) bool { - gb := req.GetGroupBy() - if gb == nil { - return false - } - gbFamilies := gb.GetTagProjection().GetTagFamilies() - if len(gbFamilies) != 1 { - return false - } - gbFamily := gbFamilies[0] - projected := projectedTagsByFamily(req.GetTagProjection()) - present, ok := projected[gbFamily.GetName()] - if !ok { - return false - } - for _, name := range gbFamily.GetTags() { - if _, hit := present[name]; !hit { - return false - } - } - aggField := req.GetAgg().GetFieldName() - if aggField == "" { - return false - } - for _, name := range req.GetFieldProjection().GetNames() { - if name == aggField { - return true - } - } - return false -} - -// projectedTagsByFamily flattens a TagProjection into family → name-set. -func projectedTagsByFamily(tp *modelv1.TagProjection) map[string]map[string]struct{} { - out := make(map[string]map[string]struct{}) - if tp == nil { - return out - } - for _, tf := range tp.GetTagFamilies() { - family := tf.GetName() - names := make(map[string]struct{}, len(tf.GetTags())) - for _, n := range tf.GetTags() { - names[n] = struct{}{} - } - out[family] = names - } - return out -} diff --git a/pkg/query/vectorized/measure/plan/executor_test.go b/pkg/query/vectorized/measure/plan/executor_test.go index f2148874e..cfce60e0f 100644 --- a/pkg/query/vectorized/measure/plan/executor_test.go +++ b/pkg/query/vectorized/measure/plan/executor_test.go @@ -123,6 +123,121 @@ func TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) { } } +// TestExecute_ScalarReduce_EmitsSingleRow drives Agg without GroupBy. +// The whole result collapses into one row carrying the first-seen +// projected tag plus the agg result, with nil Timestamp — matching the +// row path's aggAllIterator. Fixture: value column {1,4,2,3,5} → SUM 15. +func TestExecute_ScalarReduce_EmitsSingleRow(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + scan := NewScan(schema, ScanParams{}) + scan.Source = src + gba, err := NewGroupByAgg(scan, nil, + &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + ) + if err != nil { + t.Fatalf("NewGroupByAgg: %v", err) + } + root := NewLimit(gba, 0, 10) + + iter, err := Execute(context.Background(), root, execCfg()) + if err != nil { + t.Fatalf("Execute: %v", err) + } + defer iter.Close() + + rows := 0 + var sum int64 + var svc string + for iter.Next() { + dps := iter.Current() + if len(dps) != 1 { + t.Fatalf("Current must yield 1 InternalDataPoint per Next, got %d", len(dps)) + } + idp := dps[0] + if idp.DataPoint.Timestamp != nil { + t.Fatalf("scalar reduce row must have nil Timestamp; got %v", idp.DataPoint.Timestamp) + } + tags := idp.DataPoint.TagFamilies[0].Tags + svc = tags[0].Value.GetStr().GetValue() + if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value" { + t.Fatalf("want one Field 'value', got %+v", idp.DataPoint.Fields) + } + sum = idp.DataPoint.Fields[0].Value.GetInt().GetValue() + rows++ + } + if rows != 1 { + t.Fatalf("scalar reduce must emit exactly 1 row, got %d", rows) + } + if sum != 15 { + t.Fatalf("scalar SUM(value): want 15, got %d", sum) + } + if svc != "a" { + t.Fatalf("scalar reduce carries first-seen tag: want svc=a, got %q", svc) + } +} + +// TestExecute_RawGroupBy_EmitsFirstRowPerGroup drives GroupBy without +// Agg. One row per group is emitted in group-insertion order with the +// input schema preserved (timestamp + field present), matching the row +// path's groupIterator + processor.go's current[0] read. Fixture groups: +// a → first row (ts=1,value=1); b → first row (ts=2,value=4). +func TestExecute_RawGroupBy_EmitsFirstRowPerGroup(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + scan := NewScan(schema, ScanParams{}) + scan.Source = src + gba, err := NewGroupByAgg(scan, + &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, nil, + ) + if err != nil { + t.Fatalf("NewGroupByAgg: %v", err) + } + root := NewLimit(gba, 0, 10) + + iter, err := Execute(context.Background(), root, execCfg()) + if err != nil { + t.Fatalf("Execute: %v", err) + } + defer iter.Close() + + type row struct { + value int64 + first bool + } + got := map[string]row{} + order := make([]string, 0, 2) + for iter.Next() { + dps := iter.Current() + if len(dps) != 1 { + t.Fatalf("Current must yield 1 InternalDataPoint per Next, got %d", len(dps)) + } + idp := dps[0] + // Raw GroupBy preserves the input schema, so the field column + // survives (unlike the aggregation shapes). + if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value" { + t.Fatalf("raw GroupBy must preserve the field column, got %+v", idp.DataPoint.Fields) + } + svc := idp.DataPoint.TagFamilies[0].Tags[0].Value.GetStr().GetValue() + if _, seen := got[svc]; !seen { + order = append(order, svc) + } + got[svc] = row{value: idp.DataPoint.Fields[0].Value.GetInt().GetValue(), first: true} + } + if len(got) != 2 { + t.Fatalf("raw GroupBy must emit one row per group (2), got %d", len(got)) + } + if got["a"].value != 1 { + t.Fatalf("group a first-seen value: want 1, got %d", got["a"].value) + } + if got["b"].value != 4 { + t.Fatalf("group b first-seen value: want 4, got %d", got["b"].value) + } + if len(order) != 2 || order[0] != "a" || order[1] != "b" { + t.Fatalf("groups must emit in insertion order [a b], got %v", order) + } +} + func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) { schema, _ := buildScanInput(t) scan := NewScan(schema, ScanParams{}) // Source intentionally unset diff --git a/pkg/query/vectorized/measure/plan/groupby_agg.go b/pkg/query/vectorized/measure/plan/groupby_agg.go index df678440b..326235b3b 100644 --- a/pkg/query/vectorized/measure/plan/groupby_agg.go +++ b/pkg/query/vectorized/measure/plan/groupby_agg.go @@ -27,14 +27,19 @@ import ( vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" ) -// GroupByAgg is the v1 vec aggregation node: GroupBy + single Aggregation -// fused into one BatchAggregation operator (see G7d planner). The -// operator does its own grouping via keyIndices and folding via aggs. +// GroupByAgg is the vec aggregation/grouping node. It fuses GroupBy and a +// single Aggregation into one BatchAggregation operator (see G7d +// planner), and also covers the two single-sided shapes: // -// Schema-rewriting: output is key columns + one agg result column. The -// timestamp column is dropped, so serializeBatchToProto emits -// DataPoint.Timestamp == nil for every output row (per G7 design -// decision D2). +// - GroupBy + Agg → BatchAggregation: key columns + one agg result +// column (schema-rewriting; timestamp dropped, per G7 decision D2). +// - Agg without GroupBy → scalar reduce: BatchAggregation with no key +// columns, a single output row (first-seen tags + agg result). +// - GroupBy without Agg → raw GroupBy: a first-seen-row-per-group +// BatchGroupBy, schema-preserving. +// +// The concrete operator (and therefore the output schema) is chosen by +// vmeasure.BuildOperators from which of GroupBy/Agg is set. type GroupByAgg struct { Child VecPlan GroupBy *model.MeasureGroupBy @@ -42,15 +47,12 @@ type GroupByAgg struct { outputCache *vectorized.BatchSchema } -// NewGroupByAgg constructs a GroupByAgg node wrapping child. Returns an -// error if GroupBy or Agg are nil (v1 requires both: scalar reduce and -// raw groupby are not supported). +// NewGroupByAgg constructs a GroupByAgg node wrapping child. At least one +// of groupBy/agg must be set (BuildOperators routes on which); child must +// not be nil. func NewGroupByAgg(child VecPlan, groupBy *model.MeasureGroupBy, agg *model.MeasureAgg) (*GroupByAgg, error) { - if groupBy == nil { - return nil, fmt.Errorf("plan.GroupByAgg: GroupBy must not be nil (scalar reduce not supported in v1)") - } - if agg == nil { - return nil, fmt.Errorf("plan.GroupByAgg: Agg must not be nil (raw groupby not supported in v1)") + if groupBy == nil && agg == nil { + return nil, fmt.Errorf("plan.GroupByAgg: GroupBy and Agg must not both be nil") } if child == nil { return nil, fmt.Errorf("plan.GroupByAgg: Child must not be nil") @@ -116,9 +118,8 @@ func (g *GroupByAgg) String() string { if g.GroupBy != nil { tagNames = strings.Join(g.GroupBy.TagNames, ",") } - field := "" - if g.Agg != nil { - field = g.Agg.FieldName + if g.Agg == nil { + return fmt.Sprintf("GroupByAgg(keys=%s, raw)", tagNames) } - return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, g.Agg.Func, field) + return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, g.Agg.Func, g.Agg.FieldName) } diff --git a/pkg/query/vectorized/measure/plan_test.go b/pkg/query/vectorized/measure/plan_test.go index ed4b7e4ea..eade8311f 100644 --- a/pkg/query/vectorized/measure/plan_test.go +++ b/pkg/query/vectorized/measure/plan_test.go @@ -96,23 +96,51 @@ func TestBuildOperators_AggOutputName_InheritsInputFieldName(t *testing.T) { } } -func TestBuildOperators_GroupByWithoutAgg_Errors(t *testing.T) { +// TestBuildOperators_GroupByWithoutAgg_EmitsFirstOnlyGroupBy pins the +// raw-GroupBy shape: a first-seen-row-per-group BatchGroupBy whose output +// preserves the input schema. It matches the row path's groupIterator +// combined with processor.go's current[0] read. +func TestBuildOperators_GroupByWithoutAgg_EmitsFirstOnlyGroupBy(t *testing.T) { opts := model.MeasureQueryOptions{ GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, } - _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) - if err == nil { - t.Fatal("GroupBy without Agg must error in v1") + ops, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("GroupBy without Agg (raw groupby) must not error: %v", err) + } + if len(ops) != 1 { + t.Fatalf("raw GroupBy should emit 1 operator, got %d", len(ops)) + } + gb, ok := ops[0].(*BatchGroupBy) + if !ok { + t.Fatalf("operator must be *BatchGroupBy, got %T", ops[0]) + } + if !gb.firstOnly { + t.Fatal("raw GroupBy must be first-only (one row per group)") } } -func TestBuildOperators_AggWithoutGroupBy_Errors(t *testing.T) { +// TestBuildOperators_AggWithoutGroupBy_EmitsBatchAggregation pins the +// scalar-reduce shape: a BatchAggregation with no key columns, so every +// row collapses into a single output row carrying the first-seen tags +// plus the agg result, matching the row path's aggAllIterator. +func TestBuildOperators_AggWithoutGroupBy_EmitsBatchAggregation(t *testing.T) { opts := model.MeasureQueryOptions{ Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, } - _, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) - if err == nil { - t.Fatal("Agg without GroupBy (scalar reduce) must error in v1") + ops, err := BuildOperators(opts, planSchema(), vectorized.NewMemoryTracker(1<<20), 1024) + if err != nil { + t.Fatalf("Agg without GroupBy (scalar reduce) must not error: %v", err) + } + if len(ops) != 1 { + t.Fatalf("scalar reduce should emit 1 operator, got %d", len(ops)) + } + agg, ok := ops[0].(*BatchAggregation) + if !ok { + t.Fatalf("operator must be *BatchAggregation, got %T", ops[0]) + } + if len(agg.keyIndices) != 0 { + t.Fatalf("scalar reduce must have no key columns, got %d", len(agg.keyIndices)) } }
