This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cadfec0bf5ef48169fc491cae309d2aaeb96976a
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 15 10:08:28 2026 +0000

    feat(query/vectorized/measure/plan): G9b GroupBy/Agg completeness (scalar 
reduce, raw GroupBy, projection auto-coverage)
---
 pkg/query/vectorized/measure/groupby.go            |  33 +++++-
 pkg/query/vectorized/measure/plan.go               |  46 +++++---
 pkg/query/vectorized/measure/plan/analyzer.go      | 129 ++++++++++++++++-----
 pkg/query/vectorized/measure/plan/analyzer_test.go |  49 ++++++--
 pkg/query/vectorized/measure/plan/dispatch.go      |  90 ++------------
 pkg/query/vectorized/measure/plan/executor_test.go | 115 ++++++++++++++++++
 pkg/query/vectorized/measure/plan/groupby_agg.go   |  39 ++++---
 pkg/query/vectorized/measure/plan_test.go          |  44 +++++--
 8 files changed, 383 insertions(+), 162 deletions(-)

diff --git a/pkg/query/vectorized/measure/groupby.go 
b/pkg/query/vectorized/measure/groupby.go
index 9a8101131..7904da051 100644
--- a/pkg/query/vectorized/measure/groupby.go
+++ b/pkg/query/vectorized/measure/groupby.go
@@ -35,6 +35,13 @@ import (
 // emitted in group-insertion order, with all rows of the first group flushed
 // before the next group begins.
 //
+// When firstOnly is set the operator keeps only the first-seen row of each
+// group and drops subsequent rows. This reproduces the row path's raw
+// GroupBy egress: the row aggregator wraps every group in a single
+// InternalDataPoint slice and banyand/query/processor.go reads only
+// current[0] per Next, so a raw GroupBy surfaces exactly one row (the
+// first-seen) per group in group-insertion order.
+//
 // Memory accounting follows a pessimistic-reserve, refund-unused pattern:
 //
 //   - entrySize is the per-new-group bucket overhead.
@@ -58,6 +65,7 @@ type BatchGroupBy struct {
        cursorBucket int
        cursorRow    int
        closed       bool
+       firstOnly    bool
 }
 
 type groupBucket struct {
@@ -88,6 +96,22 @@ func NewBatchGroupBy(
        }
 }
 
+// NewBatchGroupByFirst constructs a BatchGroupBy that emits only the
+// first-seen row of each group (raw GroupBy without aggregation). It
+// matches the row path: groupBy.Execute produces a groupIterator whose
+// Current() returns the whole group, and processor.go keeps only
+// current[0], so each group surfaces a single row in group-insertion
+// order with the input schema unchanged.
+func NewBatchGroupByFirst(
+       schema *vectorized.BatchSchema, keyIndices []int,
+       pool *vectorized.BatchPool, batchSize int,
+       tracker *vectorized.MemoryTracker, entrySize int64,
+) *BatchGroupBy {
+       g := NewBatchGroupBy(schema, keyIndices, pool, batchSize, tracker, 
entrySize, 0)
+       g.firstOnly = true
+       return g
+}
+
 // Init prepares the group map.
 func (g *BatchGroupBy) Init(_ context.Context) error {
        g.groups = make(map[uint64][]*groupBucket)
@@ -120,16 +144,23 @@ func (g *BatchGroupBy) Consume(_ context.Context, b 
*vectorized.RecordBatch) err
 
 func (g *BatchGroupBy) consumeRows(b *vectorized.RecordBatch, active []uint16) 
int64 {
        var newGroups int64
+       var copiedRows int64
        for _, rowIdx := range active {
                bucket, isNew := g.findOrCreate(b, int(rowIdx))
                if isNew {
                        newGroups++
                }
+               if g.firstOnly && !isNew {
+                       // Raw GroupBy keeps only the first-seen row per group;
+                       // drop every later row in the same group.
+                       continue
+               }
                for colIdx, srcCol := range b.Columns {
                        copyOneValue(bucket.cols[colIdx], srcCol, int(rowIdx))
                }
+               copiedRows++
        }
-       return newGroups*g.entrySize + int64(len(active))*g.rowSize
+       return newGroups*g.entrySize + copiedRows*g.rowSize
 }
 
 // findOrCreate locates the bucket for the row's key, creating it if absent.
diff --git a/pkg/query/vectorized/measure/plan.go 
b/pkg/query/vectorized/measure/plan.go
index 747b96079..38235b1e8 100644
--- a/pkg/query/vectorized/measure/plan.go
+++ b/pkg/query/vectorized/measure/plan.go
@@ -33,15 +33,18 @@ const aggEntrySize int64 = 512
 // into a list of BreakerOperators that, chained after the scan source, form
 // the vectorized aggregation pipeline.
 //
-// Routing rules (v1):
+// Routing rules:
 //   - GroupBy + Agg both set → emit a single BatchAggregation. The operator's
 //     own keyIndex map produces per-group buckets and folds the agg slot;
 //     a separate BatchGroupBy would double the row materialization.
-//   - GroupBy set without Agg → unsupported. The proto field is paired with
-//     `agg` for the existing TopN path; without Agg the result semantic is
-//     ambiguous (selection? deduplication?). Return an error.
-//   - Agg set without GroupBy → unsupported. Scalar reduce → single output
-//     row is deferred to a future increment.
+//   - Agg set without GroupBy → scalar reduce: a BatchAggregation with no
+//     key columns. Every row maps to one group, so a single output row is
+//     emitted carrying the first-seen projected tags plus the agg result,
+//     matching the row path's aggAllIterator.
+//   - GroupBy set without Agg → raw GroupBy: a first-seen-row-per-group
+//     BatchGroupBy. The output preserves the input schema; one row per
+//     group is emitted in group-insertion order, matching the row path's
+//     groupIterator + processor.go's current[0] read.
 //   - Neither set → empty operator list; caller emits raw rows.
 //
 // tracker is the per-pipeline MemoryTracker (G7a); it must be non-nil when
@@ -57,12 +60,6 @@ func BuildOperators(
        if !hasGroupBy && !hasAgg {
                return nil, nil
        }
-       if hasGroupBy && !hasAgg {
-               return nil, fmt.Errorf("vectorized.measure: GroupBy without Agg 
is not supported in v1")
-       }
-       if hasAgg && !hasGroupBy {
-               return nil, fmt.Errorf("vectorized.measure: Agg without GroupBy 
(scalar reduce) is not supported in v1")
-       }
        if tracker == nil {
                return nil, fmt.Errorf("vectorized.measure: BuildOperators 
requires a non-nil shared MemoryTracker")
        }
@@ -70,9 +67,22 @@ func BuildOperators(
                return nil, fmt.Errorf("vectorized.measure: batchSize must be > 
0, got %d", batchSize)
        }
 
-       keyIndices, keyErr := lookupGroupByKeyIndices(schema, opts.GroupBy)
-       if keyErr != nil {
-               return nil, keyErr
+       var keyIndices []int
+       if hasGroupBy {
+               var keyErr error
+               keyIndices, keyErr = lookupGroupByKeyIndices(schema, 
opts.GroupBy)
+               if keyErr != nil {
+                       return nil, keyErr
+               }
+       }
+
+       if !hasAgg {
+               // Raw GroupBy: emit the first-seen row of each group with the
+               // input schema unchanged. entrySize accounts for the per-group
+               // bucket; rowSize is zero because at most one row is retained.
+               pool := vectorized.NewBatchPool(schema, batchSize)
+               gb := NewBatchGroupByFirst(schema, keyIndices, pool, batchSize, 
tracker, aggEntrySize)
+               return []vectorized.BreakerOperator{gb}, nil
        }
 
        fieldIdx, fieldErr := lookupFieldColumnIndex(schema, opts.Agg.FieldName)
@@ -93,6 +103,12 @@ func BuildOperators(
        // like "value_sum" — the suffix would break proto.Equal parity in the
        // integration suite. The aggregation function lives on the operator
        // spec, not in the column name.
+       //
+       // keyIndices is empty for scalar reduce (Agg without GroupBy):
+       // BatchAggregation.computeKey then returns the same key for every
+       // row, so all rows collapse into a single output row carrying the
+       // first-seen projected tags plus the agg result — the columnar
+       // equivalent of the row path's aggAllIterator.
        spec := AggSpec{
                Func:     aggFn,
                InputCol: fieldIdx,
diff --git a/pkg/query/vectorized/measure/plan/analyzer.go 
b/pkg/query/vectorized/measure/plan/analyzer.go
index 620c9dcc5..6eee444fb 100644
--- a/pkg/query/vectorized/measure/plan/analyzer.go
+++ b/pkg/query/vectorized/measure/plan/analyzer.go
@@ -48,8 +48,11 @@ const defaultLimit uint32 = 100
 //   - tag/field projection naming columns not in the schema
 //   - GroupBy referencing a tag absent from the schema
 //   - Agg referencing a field absent from the schema
-//   - GroupBy set without Agg (raw groupby not supported in v1)
-//   - Agg set without GroupBy (scalar reduce not supported in v1)
+//
+// GroupBy and Agg may travel together (group + aggregate), or either
+// alone: Agg without GroupBy is a scalar reduce (single output row);
+// GroupBy without Agg is a raw GroupBy (first-seen row per group). Both
+// mirror the row path (measure_plan_aggregation.go / measure_plan_groupby.go).
 func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) 
(VecPlan, error) {
        if req == nil {
                return nil, fmt.Errorf("plan.Analyze: nil request")
@@ -72,13 +75,30 @@ func Analyze(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (Ve
        hasAgg := req.GetAgg() != nil
        var gbModel *model.MeasureGroupBy
        var aggModel *model.MeasureAgg
-       if hasGroupBy && hasAgg {
-               var translateErr error
-               gbModel, aggModel, translateErr = translateGroupByAgg(req, 
measureSchema)
-               if translateErr != nil {
-                       return nil, translateErr
+       if hasGroupBy {
+               var gbErr error
+               gbModel, gbErr = translateGroupBy(req, measureSchema)
+               if gbErr != nil {
+                       return nil, gbErr
+               }
+       }
+       if hasAgg {
+               var aggErr error
+               aggModel, aggErr = translateAgg(req, measureSchema)
+               if aggErr != nil {
+                       return nil, aggErr
                }
        }
+
+       // Projection auto-coverage: BatchAggregation / BatchGroupBy locate
+       // their key and value columns by name inside the BatchSchema, which
+       // BuildBatchSchema derives from TagProjection + FieldProjection.
+       // Extend the projection implicitly so the GroupBy keys and the Agg
+       // field always materialize a column, instead of falling through to
+       // the row path when the request omitted them from its projection.
+       tagProjection = ensureGroupByProjected(tagProjection, gbModel)
+       fieldProjection = ensureAggFieldProjected(fieldProjection, aggModel)
+
        opts := model.MeasureQueryOptions{
                TagProjection:   tagProjection,
                FieldProjection: fieldProjection,
@@ -105,17 +125,15 @@ func Analyze(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (Ve
                Agg:             aggModel,
        })
 
-       switch {
-       case hasGroupBy && hasAgg:
+       if hasGroupBy || hasAgg {
+               // One BatchAggregation / BatchGroupBy node covers all three
+               // shapes (group+agg, scalar reduce, raw groupby); 
BuildOperators
+               // routes on which of gbModel/aggModel is set.
                gba, gbaErr := NewGroupByAgg(plan, gbModel, aggModel)
                if gbaErr != nil {
                        return nil, gbaErr
                }
                plan = gba
-       case hasGroupBy:
-               return nil, fmt.Errorf("plan.Analyze: GroupBy without Agg is 
not supported in v1")
-       case hasAgg:
-               return nil, fmt.Errorf("plan.Analyze: Agg without GroupBy 
(scalar reduce) is not supported in v1")
        }
 
        if t := req.GetTop(); t != nil {
@@ -150,44 +168,101 @@ func buildTagProjection(req *measurev1.QueryRequest) 
[]model.TagProjection {
        return out
 }
 
-// translateGroupByAgg builds the model GroupBy/Agg structs from the proto,
+// translateGroupBy builds the model GroupBy struct from the proto,
 // validating that:
 //   - the GroupBy tag_projection names exactly one family with non-empty tags
 //     (v1 single-family limitation)
 //   - the named GroupBy tags exist in the Measure schema
-//   - the Agg field exists in the Measure schema
-func translateGroupByAgg(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (
-       *model.MeasureGroupBy, *model.MeasureAgg, error,
-) {
-       gbProto := req.GetGroupBy()
-       families := gbProto.GetTagProjection().GetTagFamilies()
+func translateGroupBy(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (*model.MeasureGroupBy, error) {
+       families := req.GetGroupBy().GetTagProjection().GetTagFamilies()
        if len(families) == 0 {
-               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection must list at least one tag family")
+               return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection 
must list at least one tag family")
        }
        if len(families) > 1 {
-               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection v1 supports a single tag family, got %d", len(families))
+               return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection v1 
supports a single tag family, got %d", len(families))
        }
        family := families[0]
        if len(family.GetTags()) == 0 {
-               return nil, nil, fmt.Errorf("plan.Analyze: 
GroupBy.tag_projection family %q has no tags", family.GetName())
+               return nil, fmt.Errorf("plan.Analyze: GroupBy.tag_projection 
family %q has no tags", family.GetName())
        }
        gb := &model.MeasureGroupBy{
                TagFamily: family.GetName(),
                TagNames:  append([]string(nil), family.GetTags()...),
        }
        if validateErr := validateGroupByTags(measureSchema, gb); validateErr 
!= nil {
-               return nil, nil, validateErr
+               return nil, validateErr
        }
+       return gb, nil
+}
 
+// translateAgg builds the model Agg struct from the proto, validating
+// that the Agg field exists in the Measure schema.
+func translateAgg(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (*model.MeasureAgg, error) {
        aggProto := req.GetAgg()
        if validateErr := validateAggField(measureSchema, 
aggProto.GetFieldName()); validateErr != nil {
-               return nil, nil, validateErr
+               return nil, validateErr
        }
-       agg := &model.MeasureAgg{
+       return &model.MeasureAgg{
                FieldName: aggProto.GetFieldName(),
                Func:      aggProto.GetFunction(),
+       }, nil
+}
+
+// ensureGroupByProjected returns a TagProjection slice guaranteed to
+// include every GroupBy key tag. When the request already projects them
+// the input slice is returned unchanged; otherwise the missing key tags
+// are appended to (or create) the GroupBy family. Mirrors the row path,
+// whose GroupBy resolves its key tag refs against the schema regardless
+// of the request projection.
+func ensureGroupByProjected(tp []model.TagProjection, gb 
*model.MeasureGroupBy) []model.TagProjection {
+       if gb == nil || gb.TagFamily == "" || len(gb.TagNames) == 0 {
+               return tp
+       }
+       out := append([]model.TagProjection(nil), tp...)
+       familyIdx := -1
+       for i := range out {
+               if out[i].Family == gb.TagFamily {
+                       familyIdx = i
+                       break
+               }
+       }
+       if familyIdx < 0 {
+               out = append(out, model.TagProjection{
+                       Family: gb.TagFamily,
+                       Names:  append([]string(nil), gb.TagNames...),
+               })
+               return out
+       }
+       present := make(map[string]struct{}, len(out[familyIdx].Names))
+       for _, n := range out[familyIdx].Names {
+               present[n] = struct{}{}
+       }
+       names := append([]string(nil), out[familyIdx].Names...)
+       for _, n := range gb.TagNames {
+               if _, ok := present[n]; !ok {
+                       names = append(names, n)
+               }
+       }
+       out[familyIdx].Names = names
+       return out
+}
+
+// ensureAggFieldProjected returns a FieldProjection slice guaranteed to
+// include the Agg field. When already present the input is returned
+// unchanged; otherwise the field is appended. Mirrors the row path,
+// whose aggregation resolves its field ref against the schema regardless
+// of the request projection.
+func ensureAggFieldProjected(fp []string, agg *model.MeasureAgg) []string {
+       if agg == nil || agg.FieldName == "" {
+               return fp
+       }
+       for _, n := range fp {
+               if n == agg.FieldName {
+                       return fp
+               }
        }
-       return gb, agg, nil
+       out := append([]string(nil), fp...)
+       return append(out, agg.FieldName)
 }
 
 // validateGroupByTags ensures every name in gb.TagNames exists within the
diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go 
b/pkg/query/vectorized/measure/plan/analyzer_test.go
index 1605a3399..f7041dd87 100644
--- a/pkg/query/vectorized/measure/plan/analyzer_test.go
+++ b/pkg/query/vectorized/measure/plan/analyzer_test.go
@@ -144,7 +144,9 @@ func TestAnalyze_TopBetweenGroupByAggAndLimit(t *testing.T) 
{
        }
 }
 
-func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) {
+// TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy verifies the raw
+// GroupBy shape: a GroupByAgg node (Agg nil) below Limit.
+func TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
                TagProjection:   projTagProj(),
@@ -154,16 +156,29 @@ func TestAnalyze_GroupByWithoutAgg_Errors(t *testing.T) {
                        FieldName:     "value",
                },
        }
-       _, err := Analyze(req, testMeasureSchema())
-       if err == nil {
-               t.Fatal("GroupBy without Agg must error")
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("GroupBy without Agg (raw groupby) must not error: 
%v", err)
+       }
+       limit, ok := p.(*Limit)
+       if !ok {
+               t.Fatalf("root should be *Limit, got %T", p)
        }
-       if !strings.Contains(err.Error(), "Agg") {
-               t.Fatalf("error should mention Agg, got %v", err)
+       gba, ok := limit.Child.(*GroupByAgg)
+       if !ok {
+               t.Fatalf("Limit child should be *GroupByAgg, got %T", 
limit.Child)
+       }
+       if gba.Agg != nil {
+               t.Fatalf("raw GroupBy must have nil Agg, got %+v", gba.Agg)
+       }
+       if gba.GroupBy == nil || gba.GroupBy.TagNames[0] != "svc" {
+               t.Fatalf("GroupBy.TagNames: want [svc], got %+v", gba.GroupBy)
        }
 }
 
-func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) {
+// TestAnalyze_AggWithoutGroupBy_BuildsScalarReduce verifies the scalar
+// reduce shape: a GroupByAgg node (GroupBy nil) below Limit.
+func TestAnalyze_AggWithoutGroupBy_BuildsScalarReduce(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
                TagProjection:   projTagProj(),
@@ -173,9 +188,23 @@ func TestAnalyze_AggWithoutGroupBy_Errors(t *testing.T) {
                        FieldName: "value",
                },
        }
-       _, err := Analyze(req, testMeasureSchema())
-       if err == nil {
-               t.Fatal("Agg without GroupBy must error (scalar reduce not 
supported)")
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Agg without GroupBy (scalar reduce) must not error: 
%v", err)
+       }
+       limit, ok := p.(*Limit)
+       if !ok {
+               t.Fatalf("root should be *Limit, got %T", p)
+       }
+       gba, ok := limit.Child.(*GroupByAgg)
+       if !ok {
+               t.Fatalf("Limit child should be *GroupByAgg, got %T", 
limit.Child)
+       }
+       if gba.GroupBy != nil {
+               t.Fatalf("scalar reduce must have nil GroupBy, got %+v", 
gba.GroupBy)
+       }
+       if gba.Agg == nil || gba.Agg.FieldName != "value" {
+               t.Fatalf("Agg.FieldName: want value, got %+v", gba.Agg)
        }
 }
 
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index 8a2a7b573..bafceb089 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -88,8 +88,9 @@ func FellThroughCount() int64 { return 
fellThroughCount.Load() }
 //
 // Eligibility gate (v1):
 //   - cfg.Enabled must be true
-//   - request may carry GroupBy+Agg as a pair (scalar reduce + raw
-//     GroupBy are deferred); aggProjectionCoverage must also hold
+//   - request may carry GroupBy and/or Agg in any combination (group+agg,
+//     scalar reduce, raw GroupBy); plan.Analyze auto-extends the
+//     projection so the keys / agg field always resolve
 //   - request must NOT carry Top (BatchTop's single-heap semantic differs
 //     from the row path's per-timestamp top-N)
 //   - request must carry TimeRange (storage requires a bounded window)
@@ -131,24 +132,11 @@ func Dispatch(
                // today.
                return nil, "", false, nil
        }
-       hasGroupBy := req.GetGroupBy() != nil
-       hasAgg := req.GetAgg() != nil
-       if hasGroupBy || hasAgg {
-               // GroupBy and Agg must travel as a pair (scalar reduce + raw
-               // groupby are deferred). Either alone falls through.
-               if hasGroupBy != hasAgg {
-                       return nil, "", false, nil
-               }
-               // Projection coverage: BatchAggregation locates its key + value
-               // columns by name inside the BatchSchema, which is built from
-               // TagProjection + FieldProjection. Missing coverage means the
-               // operator would fail at construction; fall through so the row
-               // path can extend projection implicitly or surface its
-               // canonical error.
-               if !aggProjectionCoverage(req) {
-                       return nil, "", false, nil
-               }
-       }
+       // GroupBy and Agg are handled by the vec subsystem in all three
+       // shapes — group+agg, scalar reduce (Agg only), raw GroupBy (GroupBy
+       // only). plan.Analyze auto-extends the projection so the GroupBy keys
+       // and Agg field always materialize a column, so there is no
+       // projection-coverage gate here anymore (G9b).
        // G9c #9: a nil TimeRange is NOT a fall-through. The row path does not
        // reject it — parseFields feeds 
criteria.GetTimeRange().GetBegin().AsTime()
        // into the index scan, and a nil *timestamppb.Timestamp resolves to the
@@ -409,65 +397,3 @@ func (emptyMIterator) Next() bool { return false }
 func (emptyMIterator) Current() []*measurev1.InternalDataPoint { return nil }
 
 func (emptyMIterator) Close() error { return nil }
-
-// aggProjectionCoverage reports whether the request's GroupBy keys and
-// Agg field are all present in the request's projections. Required by
-// the dispatch eligibility gate: the BatchAggregation operator locates
-// its key + value columns by name inside the BatchSchema, and the
-// BatchSchema is built from TagProjection + FieldProjection. Missing
-// coverage means the operator would fail at construction; dispatch
-// instead falls through so the row path can handle the request.
-//
-// v1 requires GroupBy.tag_projection to name a single family; that
-// family must appear in req.TagProjection with every tag in GroupBy
-// present. Agg.field_name must appear in req.FieldProjection. Non-key
-// projected tags are allowed; BatchAggregation carries them forward as
-// first-seen-per-group, matching the row path.
-func aggProjectionCoverage(req *measurev1.QueryRequest) bool {
-       gb := req.GetGroupBy()
-       if gb == nil {
-               return false
-       }
-       gbFamilies := gb.GetTagProjection().GetTagFamilies()
-       if len(gbFamilies) != 1 {
-               return false
-       }
-       gbFamily := gbFamilies[0]
-       projected := projectedTagsByFamily(req.GetTagProjection())
-       present, ok := projected[gbFamily.GetName()]
-       if !ok {
-               return false
-       }
-       for _, name := range gbFamily.GetTags() {
-               if _, hit := present[name]; !hit {
-                       return false
-               }
-       }
-       aggField := req.GetAgg().GetFieldName()
-       if aggField == "" {
-               return false
-       }
-       for _, name := range req.GetFieldProjection().GetNames() {
-               if name == aggField {
-                       return true
-               }
-       }
-       return false
-}
-
-// projectedTagsByFamily flattens a TagProjection into family → name-set.
-func projectedTagsByFamily(tp *modelv1.TagProjection) 
map[string]map[string]struct{} {
-       out := make(map[string]map[string]struct{})
-       if tp == nil {
-               return out
-       }
-       for _, tf := range tp.GetTagFamilies() {
-               family := tf.GetName()
-               names := make(map[string]struct{}, len(tf.GetTags()))
-               for _, n := range tf.GetTags() {
-                       names[n] = struct{}{}
-               }
-               out[family] = names
-       }
-       return out
-}
diff --git a/pkg/query/vectorized/measure/plan/executor_test.go 
b/pkg/query/vectorized/measure/plan/executor_test.go
index f2148874e..cfce60e0f 100644
--- a/pkg/query/vectorized/measure/plan/executor_test.go
+++ b/pkg/query/vectorized/measure/plan/executor_test.go
@@ -123,6 +123,121 @@ func 
TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) {
        }
 }
 
+// TestExecute_ScalarReduce_EmitsSingleRow drives Agg without GroupBy.
+// The whole result collapses into one row carrying the first-seen
+// projected tag plus the agg result, with nil Timestamp — matching the
+// row path's aggAllIterator. Fixture: value column {1,4,2,3,5} → SUM 15.
+func TestExecute_ScalarReduce_EmitsSingleRow(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       gba, err := NewGroupByAgg(scan, nil,
+               &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       )
+       if err != nil {
+               t.Fatalf("NewGroupByAgg: %v", err)
+       }
+       root := NewLimit(gba, 0, 10)
+
+       iter, err := Execute(context.Background(), root, execCfg())
+       if err != nil {
+               t.Fatalf("Execute: %v", err)
+       }
+       defer iter.Close()
+
+       rows := 0
+       var sum int64
+       var svc string
+       for iter.Next() {
+               dps := iter.Current()
+               if len(dps) != 1 {
+                       t.Fatalf("Current must yield 1 InternalDataPoint per 
Next, got %d", len(dps))
+               }
+               idp := dps[0]
+               if idp.DataPoint.Timestamp != nil {
+                       t.Fatalf("scalar reduce row must have nil Timestamp; 
got %v", idp.DataPoint.Timestamp)
+               }
+               tags := idp.DataPoint.TagFamilies[0].Tags
+               svc = tags[0].Value.GetStr().GetValue()
+               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value" {
+                       t.Fatalf("want one Field 'value', got %+v", 
idp.DataPoint.Fields)
+               }
+               sum = idp.DataPoint.Fields[0].Value.GetInt().GetValue()
+               rows++
+       }
+       if rows != 1 {
+               t.Fatalf("scalar reduce must emit exactly 1 row, got %d", rows)
+       }
+       if sum != 15 {
+               t.Fatalf("scalar SUM(value): want 15, got %d", sum)
+       }
+       if svc != "a" {
+               t.Fatalf("scalar reduce carries first-seen tag: want svc=a, got 
%q", svc)
+       }
+}
+
+// TestExecute_RawGroupBy_EmitsFirstRowPerGroup drives GroupBy without
+// Agg. One row per group is emitted in group-insertion order with the
+// input schema preserved (timestamp + field present), matching the row
+// path's groupIterator + processor.go's current[0] read. Fixture groups:
+// a → first row (ts=1,value=1); b → first row (ts=2,value=4).
+func TestExecute_RawGroupBy_EmitsFirstRowPerGroup(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       gba, err := NewGroupByAgg(scan,
+               &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}}, nil,
+       )
+       if err != nil {
+               t.Fatalf("NewGroupByAgg: %v", err)
+       }
+       root := NewLimit(gba, 0, 10)
+
+       iter, err := Execute(context.Background(), root, execCfg())
+       if err != nil {
+               t.Fatalf("Execute: %v", err)
+       }
+       defer iter.Close()
+
+       type row struct {
+               value int64
+               first bool
+       }
+       got := map[string]row{}
+       order := make([]string, 0, 2)
+       for iter.Next() {
+               dps := iter.Current()
+               if len(dps) != 1 {
+                       t.Fatalf("Current must yield 1 InternalDataPoint per 
Next, got %d", len(dps))
+               }
+               idp := dps[0]
+               // Raw GroupBy preserves the input schema, so the field column
+               // survives (unlike the aggregation shapes).
+               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value" {
+                       t.Fatalf("raw GroupBy must preserve the field column, 
got %+v", idp.DataPoint.Fields)
+               }
+               svc := 
idp.DataPoint.TagFamilies[0].Tags[0].Value.GetStr().GetValue()
+               if _, seen := got[svc]; !seen {
+                       order = append(order, svc)
+               }
+               got[svc] = row{value: 
idp.DataPoint.Fields[0].Value.GetInt().GetValue(), first: true}
+       }
+       if len(got) != 2 {
+               t.Fatalf("raw GroupBy must emit one row per group (2), got %d", 
len(got))
+       }
+       if got["a"].value != 1 {
+               t.Fatalf("group a first-seen value: want 1, got %d", 
got["a"].value)
+       }
+       if got["b"].value != 4 {
+               t.Fatalf("group b first-seen value: want 4, got %d", 
got["b"].value)
+       }
+       if len(order) != 2 || order[0] != "a" || order[1] != "b" {
+               t.Fatalf("groups must emit in insertion order [a b], got %v", 
order)
+       }
+}
+
 func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) {
        schema, _ := buildScanInput(t)
        scan := NewScan(schema, ScanParams{}) // Source intentionally unset
diff --git a/pkg/query/vectorized/measure/plan/groupby_agg.go 
b/pkg/query/vectorized/measure/plan/groupby_agg.go
index df678440b..326235b3b 100644
--- a/pkg/query/vectorized/measure/plan/groupby_agg.go
+++ b/pkg/query/vectorized/measure/plan/groupby_agg.go
@@ -27,14 +27,19 @@ import (
        vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
 )
 
-// GroupByAgg is the v1 vec aggregation node: GroupBy + single Aggregation
-// fused into one BatchAggregation operator (see G7d planner). The
-// operator does its own grouping via keyIndices and folding via aggs.
+// GroupByAgg is the vec aggregation/grouping node. It fuses GroupBy and a
+// single Aggregation into one BatchAggregation operator (see G7d
+// planner), and also covers the two single-sided shapes:
 //
-// Schema-rewriting: output is key columns + one agg result column. The
-// timestamp column is dropped, so serializeBatchToProto emits
-// DataPoint.Timestamp == nil for every output row (per G7 design
-// decision D2).
+//   - GroupBy + Agg → BatchAggregation: key columns + one agg result
+//     column (schema-rewriting; timestamp dropped, per G7 decision D2).
+//   - Agg without GroupBy → scalar reduce: BatchAggregation with no key
+//     columns, a single output row (first-seen tags + agg result).
+//   - GroupBy without Agg → raw GroupBy: a first-seen-row-per-group
+//     BatchGroupBy, schema-preserving.
+//
+// The concrete operator (and therefore the output schema) is chosen by
+// vmeasure.BuildOperators from which of GroupBy/Agg is set.
 type GroupByAgg struct {
        Child       VecPlan
        GroupBy     *model.MeasureGroupBy
@@ -42,15 +47,12 @@ type GroupByAgg struct {
        outputCache *vectorized.BatchSchema
 }
 
-// NewGroupByAgg constructs a GroupByAgg node wrapping child. Returns an
-// error if GroupBy or Agg are nil (v1 requires both: scalar reduce and
-// raw groupby are not supported).
+// NewGroupByAgg constructs a GroupByAgg node wrapping child. At least one
+// of groupBy/agg must be set (BuildOperators routes on which); child must
+// not be nil.
 func NewGroupByAgg(child VecPlan, groupBy *model.MeasureGroupBy, agg 
*model.MeasureAgg) (*GroupByAgg, error) {
-       if groupBy == nil {
-               return nil, fmt.Errorf("plan.GroupByAgg: GroupBy must not be 
nil (scalar reduce not supported in v1)")
-       }
-       if agg == nil {
-               return nil, fmt.Errorf("plan.GroupByAgg: Agg must not be nil 
(raw groupby not supported in v1)")
+       if groupBy == nil && agg == nil {
+               return nil, fmt.Errorf("plan.GroupByAgg: GroupBy and Agg must 
not both be nil")
        }
        if child == nil {
                return nil, fmt.Errorf("plan.GroupByAgg: Child must not be nil")
@@ -116,9 +118,8 @@ func (g *GroupByAgg) String() string {
        if g.GroupBy != nil {
                tagNames = strings.Join(g.GroupBy.TagNames, ",")
        }
-       field := ""
-       if g.Agg != nil {
-               field = g.Agg.FieldName
+       if g.Agg == nil {
+               return fmt.Sprintf("GroupByAgg(keys=%s, raw)", tagNames)
        }
-       return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, 
g.Agg.Func, field)
+       return fmt.Sprintf("GroupByAgg(keys=%s, fn=%v, field=%s)", tagNames, 
g.Agg.Func, g.Agg.FieldName)
 }
diff --git a/pkg/query/vectorized/measure/plan_test.go 
b/pkg/query/vectorized/measure/plan_test.go
index ed4b7e4ea..eade8311f 100644
--- a/pkg/query/vectorized/measure/plan_test.go
+++ b/pkg/query/vectorized/measure/plan_test.go
@@ -96,23 +96,51 @@ func 
TestBuildOperators_AggOutputName_InheritsInputFieldName(t *testing.T) {
        }
 }
 
-func TestBuildOperators_GroupByWithoutAgg_Errors(t *testing.T) {
+// TestBuildOperators_GroupByWithoutAgg_EmitsFirstOnlyGroupBy pins the
+// raw-GroupBy shape: a first-seen-row-per-group BatchGroupBy whose output
+// preserves the input schema. It matches the row path's groupIterator
+// combined with processor.go's current[0] read.
+func TestBuildOperators_GroupByWithoutAgg_EmitsFirstOnlyGroupBy(t *testing.T) {
        opts := model.MeasureQueryOptions{
                GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
        }
-       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
-       if err == nil {
-               t.Fatal("GroupBy without Agg must error in v1")
+       ops, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err != nil {
+               t.Fatalf("GroupBy without Agg (raw groupby) must not error: 
%v", err)
+       }
+       if len(ops) != 1 {
+               t.Fatalf("raw GroupBy should emit 1 operator, got %d", len(ops))
+       }
+       gb, ok := ops[0].(*BatchGroupBy)
+       if !ok {
+               t.Fatalf("operator must be *BatchGroupBy, got %T", ops[0])
+       }
+       if !gb.firstOnly {
+               t.Fatal("raw GroupBy must be first-only (one row per group)")
        }
 }
 
-func TestBuildOperators_AggWithoutGroupBy_Errors(t *testing.T) {
+// TestBuildOperators_AggWithoutGroupBy_EmitsBatchAggregation pins the
+// scalar-reduce shape: a BatchAggregation with no key columns, so every
+// row collapses into a single output row carrying the first-seen tags
+// plus the agg result, matching the row path's aggAllIterator.
+func TestBuildOperators_AggWithoutGroupBy_EmitsBatchAggregation(t *testing.T) {
        opts := model.MeasureQueryOptions{
                Agg: &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
        }
-       _, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
-       if err == nil {
-               t.Fatal("Agg without GroupBy (scalar reduce) must error in v1")
+       ops, err := BuildOperators(opts, planSchema(), 
vectorized.NewMemoryTracker(1<<20), 1024)
+       if err != nil {
+               t.Fatalf("Agg without GroupBy (scalar reduce) must not error: 
%v", err)
+       }
+       if len(ops) != 1 {
+               t.Fatalf("scalar reduce should emit 1 operator, got %d", 
len(ops))
+       }
+       agg, ok := ops[0].(*BatchAggregation)
+       if !ok {
+               t.Fatalf("operator must be *BatchAggregation, got %T", ops[0])
+       }
+       if len(agg.keyIndices) != 0 {
+               t.Fatalf("scalar reduce must have no key columns, got %d", 
len(agg.keyIndices))
        }
 }
 

Reply via email to