This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 275866acaf3ac85c6d44d7d6c113bb3cc7432c2c
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:19:37 2026 +0000

    feat(query/vectorized/measure): wire aggregation planner into NewMIterator 
(G7e)
    
    `NewMIterator` now calls `BuildOperators` with the per-pipeline
    `MemoryTracker` (sized from `cfg.QueryMemoryMiB`) and chains every
    returned breaker into the pipeline via `PipelineBuilder.Break`. When
    operators rewrite the schema (aggregation's output drops the timestamp
    column and adds the agg result field), the egress `BatchPool` is rebuilt
    from the terminal operator's output schema so the adapter's batch
    serialization sees the correct column layout.
    
    This change is dormant in production: `MeasureQueryOptions.GroupBy/Agg`
    are not yet populated by the logical-plan layer. A follow-up will plumb
    `QueryRequest.group_by`/`agg` from `measure_analyzer.go` down to
    `measure_plan_indexscan_local.go:232` so the wiring activates.
    
    `agg_e2e_test.go` exercises the full planner → BatchAggregation →
    serializeBatchToProto path against a known dataset (svc tag, value
    field, SUM aggregation, 5 rows over 2 groups). Asserts the wire shape:
    two `InternalDataPoint`s, each with nil Timestamp (D2: aggregation
    collapses the time dimension), the GroupBy tag carried as
    `TagFamilies[0].Tags[0]`, and the aggregation result as
    `Fields[0]` with the auto-derived `value_sum` name.
---
 pkg/query/vectorized/measure/agg_e2e_test.go | 118 +++++++++++++++++++++++++++
 pkg/query/vectorized/measure/integration.go  |  28 ++++++-
 2 files changed, 144 insertions(+), 2 deletions(-)

diff --git a/pkg/query/vectorized/measure/agg_e2e_test.go 
b/pkg/query/vectorized/measure/agg_e2e_test.go
new file mode 100644
index 000000000..8a99f6324
--- /dev/null
+++ b/pkg/query/vectorized/measure/agg_e2e_test.go
@@ -0,0 +1,118 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "testing"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// TestAggregation_EndToEnd_BatchAggregationToProto exercises the full
+// planner → operator → serialize path against a known dataset. Verifies the
+// final wire shape (InternalDataPoint) matches D2 (nil timestamp) and that
+// group keys reappear as tags while aggregation results appear as fields.
+func TestAggregation_EndToEnd_BatchAggregationToProto(t *testing.T) {
+       inputSchema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
+       })
+       opts := model.MeasureQueryOptions{
+               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       }
+
+       tracker := vectorized.NewMemoryTracker(1 << 20)
+       ops, err := BuildOperators(opts, inputSchema, tracker, 1024)
+       if err != nil {
+               t.Fatalf("BuildOperators: %v", err)
+       }
+       if len(ops) != 1 {
+               t.Fatalf("want 1 operator, got %d", len(ops))
+       }
+       agg := ops[0].(*BatchAggregation)
+       if initErr := agg.Init(context.Background()); initErr != nil {
+               t.Fatalf("agg init: %v", initErr)
+       }
+       defer agg.Close()
+
+       // Feed 5 rows: 3×"a" (sum=6), 2×"b" (sum=9).
+       in := vectorized.NewRecordBatch(inputSchema, 5)
+       pushRow := func(ts int64, svc string, v int64) {
+               in.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts)
+               in.Columns[1].(*vectorized.TypedColumn[string]).Append(svc)
+               in.Columns[2].(*vectorized.TypedColumn[int64]).Append(v)
+       }
+       pushRow(1, "a", 1)
+       pushRow(2, "b", 4)
+       pushRow(3, "a", 2)
+       pushRow(4, "a", 3)
+       pushRow(5, "b", 5)
+       in.Len = 5
+
+       if consumeErr := agg.Consume(context.Background(), in); consumeErr != 
nil {
+               t.Fatalf("agg consume: %v", consumeErr)
+       }
+       if finalizeErr := agg.Finalize(context.Background()); finalizeErr != 
nil {
+               t.Fatalf("agg finalize: %v", finalizeErr)
+       }
+
+       out, pullErr := agg.NextBatch(context.Background())
+       if pullErr != nil {
+               t.Fatalf("agg nextbatch: %v", pullErr)
+       }
+       if out == nil || out.Len != 2 {
+               t.Fatalf("want 2 output rows (2 groups), got %v", out)
+       }
+
+       // Egress: convert to protobuf and verify the wire shape.
+       dps := serializeBatchToProto(out, nil)
+       if len(dps) != 2 {
+               t.Fatalf("want 2 InternalDataPoints, got %d", len(dps))
+       }
+
+       // Index by svc tag for deterministic assertions.
+       bySvc := map[string]int64{}
+       for _, idp := range dps {
+               if idp.DataPoint.Timestamp != nil {
+                       t.Fatalf("aggregation row must have nil Timestamp (D2); 
got %v", idp.DataPoint.Timestamp)
+               }
+               if len(idp.DataPoint.TagFamilies) != 1 || 
idp.DataPoint.TagFamilies[0].Name != "default" {
+                       t.Fatalf("want one TagFamily 'default', got %+v", 
idp.DataPoint.TagFamilies)
+               }
+               tags := idp.DataPoint.TagFamilies[0].Tags
+               if len(tags) != 1 || tags[0].Key != "svc" {
+                       t.Fatalf("want one Tag 'svc', got %+v", tags)
+               }
+               svc := tags[0].Value.GetStr().GetValue()
+               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value_sum" {
+                       t.Fatalf("want one Field 'value_sum', got %+v", 
idp.DataPoint.Fields)
+               }
+               bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue()
+       }
+       if bySvc["a"] != 6 {
+               t.Fatalf("sum(a): want 6, got %d", bySvc["a"])
+       }
+       if bySvc["b"] != 9 {
+               t.Fatalf("sum(b): want 9, got %d", bySvc["b"])
+       }
+}
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
index f0805ae2e..0db67ce24 100644
--- a/pkg/query/vectorized/measure/integration.go
+++ b/pkg/query/vectorized/measure/integration.go
@@ -190,18 +190,42 @@ func NewMIterator(ctx context.Context, qr 
model.MeasureQueryResult,
                source = NewBatchScan(qr, schema, pool, cfg.BatchSize)
        }
 
-       pipeline, buildErr := 
vectorized.NewPipelineBuilder().From(source).Build()
+       // G7e: when opts carries GroupBy + Agg, BuildOperators emits the
+       // BatchAggregation breaker that does both grouping and folding. The
+       // per-pipeline MemoryTracker drawn from cfg.QueryMemoryMiB is shared
+       // across operators so reservations stack against one budget (G7a).
+       tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 
* 1024)
+       ops, opsErr := BuildOperators(opts, schema, tracker, cfg.BatchSize)
+       if opsErr != nil {
+               _ = source.Close()
+               return nil, opsErr
+       }
+
+       builder := 
vectorized.NewPipelineBuilder().From(source).WithMemoryTracker(tracker)
+       for _, op := range ops {
+               builder = builder.Break(op)
+       }
+       pipeline, buildErr := builder.Build()
        if buildErr != nil {
                // source was constructed but never wired into a Pipeline; close
                // it directly to release qr through the source.
                _ = source.Close()
                return nil, buildErr
        }
+       // When operators are present, choose the egress pool based on the
+       // terminal operator's output schema — its column layout differs from
+       // the source schema (e.g., agg output drops timestamp and adds the
+       // agg result field). The serializer reads each batch's own Schema so
+       // pooling is the only schema-coupled concern here.
+       egressPool := pool
+       if len(ops) > 0 {
+               egressPool = 
vectorized.NewBatchPool(ops[len(ops)-1].OutputSchema(), cfg.BatchSize)
+       }
        if initErr := source.Init(ctx); initErr != nil {
                _ = pipeline.Close()
                return nil, initErr
        }
-       return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, 
pipeline, pool)}, nil
+       return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, 
pipeline, egressPool)}, nil
 }
 
 // VectorizedMIterator is the public adapter exposed to other packages. It is

Reply via email to