This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 275866acaf3ac85c6d44d7d6c113bb3cc7432c2c Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:19:37 2026 +0000 feat(query/vectorized/measure): wire aggregation planner into NewMIterator (G7e) `NewMIterator` now calls `BuildOperators` with the per-pipeline `MemoryTracker` (sized from `cfg.QueryMemoryMiB`) and chains every returned breaker into the pipeline via `PipelineBuilder.Break`. When operators rewrite the schema (aggregation's output drops the timestamp column and adds the agg result field), the egress `BatchPool` is rebuilt from the terminal operator's output schema so the adapter's batch serialization sees the correct column layout. This change is dormant in production: `MeasureQueryOptions.GroupBy/Agg` are not yet populated by the logical-plan layer. A follow-up will plumb `QueryRequest.group_by`/`agg` from `measure_analyzer.go` down to `measure_plan_indexscan_local.go:232` so the wiring activates. `agg_e2e_test.go` exercises the full planner → BatchAggregation → serializeBatchToProto path against a known dataset (svc tag, value field, SUM aggregation, 5 rows over 2 groups). Asserts the wire shape: two `InternalDataPoint`s, each with nil Timestamp (D2: aggregation collapses the time dimension), the GroupBy tag carried as `TagFamilies[0].Tags[0]`, and the aggregation result as `Fields[0]` with the auto-derived `value_sum` name. --- pkg/query/vectorized/measure/agg_e2e_test.go | 118 +++++++++++++++++++++++++++ pkg/query/vectorized/measure/integration.go | 28 ++++++- 2 files changed, 144 insertions(+), 2 deletions(-) diff --git a/pkg/query/vectorized/measure/agg_e2e_test.go b/pkg/query/vectorized/measure/agg_e2e_test.go new file mode 100644 index 000000000..8a99f6324 --- /dev/null +++ b/pkg/query/vectorized/measure/agg_e2e_test.go @@ -0,0 +1,118 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "testing" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// TestAggregation_EndToEnd_BatchAggregationToProto exercises the full +// planner → operator → serialize path against a known dataset. Verifies the +// final wire shape (InternalDataPoint) matches D2 (nil timestamp) and that +// group keys reappear as tags while aggregation results appear as fields. +func TestAggregation_EndToEnd_BatchAggregationToProto(t *testing.T) { + inputSchema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) + opts := model.MeasureQueryOptions{ + GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + } + + tracker := vectorized.NewMemoryTracker(1 << 20) + ops, err := BuildOperators(opts, inputSchema, tracker, 1024) + if err != nil { + t.Fatalf("BuildOperators: %v", err) + } + if len(ops) != 1 { + t.Fatalf("want 1 operator, got %d", len(ops)) + } + agg := ops[0].(*BatchAggregation) + if initErr := agg.Init(context.Background()); initErr != nil { + t.Fatalf("agg init: %v", initErr) + } + defer agg.Close() + + // Feed 5 rows: 3×"a" (sum=6), 2×"b" (sum=9). + in := vectorized.NewRecordBatch(inputSchema, 5) + pushRow := func(ts int64, svc string, v int64) { + in.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts) + in.Columns[1].(*vectorized.TypedColumn[string]).Append(svc) + in.Columns[2].(*vectorized.TypedColumn[int64]).Append(v) + } + pushRow(1, "a", 1) + pushRow(2, "b", 4) + pushRow(3, "a", 2) + pushRow(4, "a", 3) + pushRow(5, "b", 5) + in.Len = 5 + + if consumeErr := agg.Consume(context.Background(), in); consumeErr != nil { + t.Fatalf("agg consume: %v", consumeErr) + } + if finalizeErr := agg.Finalize(context.Background()); finalizeErr != nil { + t.Fatalf("agg finalize: %v", finalizeErr) + } + + out, pullErr := agg.NextBatch(context.Background()) + if pullErr != nil { + t.Fatalf("agg nextbatch: %v", pullErr) + } + if out == nil || out.Len != 2 { + t.Fatalf("want 2 output rows (2 groups), got %v", out) + } + + // Egress: convert to protobuf and verify the wire shape. + dps := serializeBatchToProto(out, nil) + if len(dps) != 2 { + t.Fatalf("want 2 InternalDataPoints, got %d", len(dps)) + } + + // Index by svc tag for deterministic assertions. + bySvc := map[string]int64{} + for _, idp := range dps { + if idp.DataPoint.Timestamp != nil { + t.Fatalf("aggregation row must have nil Timestamp (D2); got %v", idp.DataPoint.Timestamp) + } + if len(idp.DataPoint.TagFamilies) != 1 || idp.DataPoint.TagFamilies[0].Name != "default" { + t.Fatalf("want one TagFamily 'default', got %+v", idp.DataPoint.TagFamilies) + } + tags := idp.DataPoint.TagFamilies[0].Tags + if len(tags) != 1 || tags[0].Key != "svc" { + t.Fatalf("want one Tag 'svc', got %+v", tags) + } + svc := tags[0].Value.GetStr().GetValue() + if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value_sum" { + t.Fatalf("want one Field 'value_sum', got %+v", idp.DataPoint.Fields) + } + bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue() + } + if bySvc["a"] != 6 { + t.Fatalf("sum(a): want 6, got %d", bySvc["a"]) + } + if bySvc["b"] != 9 { + t.Fatalf("sum(b): want 9, got %d", bySvc["b"]) + } +} diff --git a/pkg/query/vectorized/measure/integration.go b/pkg/query/vectorized/measure/integration.go index f0805ae2e..0db67ce24 100644 --- a/pkg/query/vectorized/measure/integration.go +++ b/pkg/query/vectorized/measure/integration.go @@ -190,18 +190,42 @@ func NewMIterator(ctx context.Context, qr model.MeasureQueryResult, source = NewBatchScan(qr, schema, pool, cfg.BatchSize) } - pipeline, buildErr := vectorized.NewPipelineBuilder().From(source).Build() + // G7e: when opts carries GroupBy + Agg, BuildOperators emits the + // BatchAggregation breaker that does both grouping and folding. The + // per-pipeline MemoryTracker drawn from cfg.QueryMemoryMiB is shared + // across operators so reservations stack against one budget (G7a). + tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 * 1024) + ops, opsErr := BuildOperators(opts, schema, tracker, cfg.BatchSize) + if opsErr != nil { + _ = source.Close() + return nil, opsErr + } + + builder := vectorized.NewPipelineBuilder().From(source).WithMemoryTracker(tracker) + for _, op := range ops { + builder = builder.Break(op) + } + pipeline, buildErr := builder.Build() if buildErr != nil { // source was constructed but never wired into a Pipeline; close // it directly to release qr through the source. _ = source.Close() return nil, buildErr } + // When operators are present, choose the egress pool based on the + // terminal operator's output schema — its column layout differs from + // the source schema (e.g., agg output drops timestamp and adds the + // agg result field). The serializer reads each batch's own Schema so + // pooling is the only schema-coupled concern here. + egressPool := pool + if len(ops) > 0 { + egressPool = vectorized.NewBatchPool(ops[len(ops)-1].OutputSchema(), cfg.BatchSize) + } if initErr := source.Init(ctx); initErr != nil { _ = pipeline.Close() return nil, initErr } - return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, pipeline, pool)}, nil + return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, pipeline, egressPool)}, nil } // VectorizedMIterator is the public adapter exposed to other packages. It is
