This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 8008aa30f9b437dcc34e6c7b0a49bb0ea772e3cc Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:42:27 2026 +0000 Revert "feat(query/vectorized/measure): wire aggregation planner into NewMIterator (G7e)" This reverts commit 275866acaf3ac85c6d44d7d6c113bb3cc7432c2c. --- pkg/query/vectorized/measure/agg_e2e_test.go | 118 --------------------------- pkg/query/vectorized/measure/integration.go | 28 +------ 2 files changed, 2 insertions(+), 144 deletions(-) diff --git a/pkg/query/vectorized/measure/agg_e2e_test.go b/pkg/query/vectorized/measure/agg_e2e_test.go deleted file mode 100644 index 8a99f6324..000000000 --- a/pkg/query/vectorized/measure/agg_e2e_test.go +++ /dev/null @@ -1,118 +0,0 @@ -// Licensed to Apache Software Foundation (ASF) under one or more contributor -// license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright -// ownership. Apache Software Foundation (ASF) licenses this file to you under -// the Apache License, Version 2.0 (the "License"); you may -// not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package measure - -import ( - "context" - "testing" - - modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" - "github.com/apache/skywalking-banyandb/pkg/query/model" - "github.com/apache/skywalking-banyandb/pkg/query/vectorized" -) - -// TestAggregation_EndToEnd_BatchAggregationToProto exercises the full -// planner → operator → serialize path against a known dataset. Verifies the -// final wire shape (InternalDataPoint) matches D2 (nil timestamp) and that -// group keys reappear as tags while aggregation results appear as fields. -func TestAggregation_EndToEnd_BatchAggregationToProto(t *testing.T) { - inputSchema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ - {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, - {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, - {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, - }) - opts := model.MeasureQueryOptions{ - GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, - Agg: &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, - } - - tracker := vectorized.NewMemoryTracker(1 << 20) - ops, err := BuildOperators(opts, inputSchema, tracker, 1024) - if err != nil { - t.Fatalf("BuildOperators: %v", err) - } - if len(ops) != 1 { - t.Fatalf("want 1 operator, got %d", len(ops)) - } - agg := ops[0].(*BatchAggregation) - if initErr := agg.Init(context.Background()); initErr != nil { - t.Fatalf("agg init: %v", initErr) - } - defer agg.Close() - - // Feed 5 rows: 3×"a" (sum=6), 2×"b" (sum=9). - in := vectorized.NewRecordBatch(inputSchema, 5) - pushRow := func(ts int64, svc string, v int64) { - in.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts) - in.Columns[1].(*vectorized.TypedColumn[string]).Append(svc) - in.Columns[2].(*vectorized.TypedColumn[int64]).Append(v) - } - pushRow(1, "a", 1) - pushRow(2, "b", 4) - pushRow(3, "a", 2) - pushRow(4, "a", 3) - pushRow(5, "b", 5) - in.Len = 5 - - if consumeErr := agg.Consume(context.Background(), in); consumeErr != nil { - t.Fatalf("agg consume: %v", consumeErr) - } - if finalizeErr := agg.Finalize(context.Background()); finalizeErr != nil { - t.Fatalf("agg finalize: %v", finalizeErr) - } - - out, pullErr := agg.NextBatch(context.Background()) - if pullErr != nil { - t.Fatalf("agg nextbatch: %v", pullErr) - } - if out == nil || out.Len != 2 { - t.Fatalf("want 2 output rows (2 groups), got %v", out) - } - - // Egress: convert to protobuf and verify the wire shape. - dps := serializeBatchToProto(out, nil) - if len(dps) != 2 { - t.Fatalf("want 2 InternalDataPoints, got %d", len(dps)) - } - - // Index by svc tag for deterministic assertions. - bySvc := map[string]int64{} - for _, idp := range dps { - if idp.DataPoint.Timestamp != nil { - t.Fatalf("aggregation row must have nil Timestamp (D2); got %v", idp.DataPoint.Timestamp) - } - if len(idp.DataPoint.TagFamilies) != 1 || idp.DataPoint.TagFamilies[0].Name != "default" { - t.Fatalf("want one TagFamily 'default', got %+v", idp.DataPoint.TagFamilies) - } - tags := idp.DataPoint.TagFamilies[0].Tags - if len(tags) != 1 || tags[0].Key != "svc" { - t.Fatalf("want one Tag 'svc', got %+v", tags) - } - svc := tags[0].Value.GetStr().GetValue() - if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value_sum" { - t.Fatalf("want one Field 'value_sum', got %+v", idp.DataPoint.Fields) - } - bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue() - } - if bySvc["a"] != 6 { - t.Fatalf("sum(a): want 6, got %d", bySvc["a"]) - } - if bySvc["b"] != 9 { - t.Fatalf("sum(b): want 9, got %d", bySvc["b"]) - } -} diff --git a/pkg/query/vectorized/measure/integration.go b/pkg/query/vectorized/measure/integration.go index 0db67ce24..f0805ae2e 100644 --- a/pkg/query/vectorized/measure/integration.go +++ b/pkg/query/vectorized/measure/integration.go @@ -190,42 +190,18 @@ func NewMIterator(ctx context.Context, qr model.MeasureQueryResult, source = NewBatchScan(qr, schema, pool, cfg.BatchSize) } - // G7e: when opts carries GroupBy + Agg, BuildOperators emits the - // BatchAggregation breaker that does both grouping and folding. The - // per-pipeline MemoryTracker drawn from cfg.QueryMemoryMiB is shared - // across operators so reservations stack against one budget (G7a). - tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 * 1024) - ops, opsErr := BuildOperators(opts, schema, tracker, cfg.BatchSize) - if opsErr != nil { - _ = source.Close() - return nil, opsErr - } - - builder := vectorized.NewPipelineBuilder().From(source).WithMemoryTracker(tracker) - for _, op := range ops { - builder = builder.Break(op) - } - pipeline, buildErr := builder.Build() + pipeline, buildErr := vectorized.NewPipelineBuilder().From(source).Build() if buildErr != nil { // source was constructed but never wired into a Pipeline; close // it directly to release qr through the source. _ = source.Close() return nil, buildErr } - // When operators are present, choose the egress pool based on the - // terminal operator's output schema — its column layout differs from - // the source schema (e.g., agg output drops timestamp and adds the - // agg result field). The serializer reads each batch's own Schema so - // pooling is the only schema-coupled concern here. - egressPool := pool - if len(ops) > 0 { - egressPool = vectorized.NewBatchPool(ops[len(ops)-1].OutputSchema(), cfg.BatchSize) - } if initErr := source.Init(ctx); initErr != nil { _ = pipeline.Close() return nil, initErr } - return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, pipeline, egressPool)}, nil + return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, pipeline, pool)}, nil } // VectorizedMIterator is the public adapter exposed to other packages. It is
