This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 8008aa30f9b437dcc34e6c7b0a49bb0ea772e3cc
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:42:27 2026 +0000

    Revert "feat(query/vectorized/measure): wire aggregation planner into 
NewMIterator (G7e)"
    
    This reverts commit 275866acaf3ac85c6d44d7d6c113bb3cc7432c2c.
---
 pkg/query/vectorized/measure/agg_e2e_test.go | 118 ---------------------------
 pkg/query/vectorized/measure/integration.go  |  28 +------
 2 files changed, 2 insertions(+), 144 deletions(-)

diff --git a/pkg/query/vectorized/measure/agg_e2e_test.go 
b/pkg/query/vectorized/measure/agg_e2e_test.go
deleted file mode 100644
index 8a99f6324..000000000
--- a/pkg/query/vectorized/measure/agg_e2e_test.go
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package measure
-
-import (
-       "context"
-       "testing"
-
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
-       "github.com/apache/skywalking-banyandb/pkg/query/model"
-       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
-)
-
-// TestAggregation_EndToEnd_BatchAggregationToProto exercises the full
-// planner → operator → serialize path against a known dataset. Verifies the
-// final wire shape (InternalDataPoint) matches D2 (nil timestamp) and that
-// group keys reappear as tags while aggregation results appear as fields.
-func TestAggregation_EndToEnd_BatchAggregationToProto(t *testing.T) {
-       inputSchema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
-               {Role: vectorized.RoleTimestamp, Type: 
vectorized.ColumnTypeInt64},
-               {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", 
Type: vectorized.ColumnTypeString},
-               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeInt64},
-       })
-       opts := model.MeasureQueryOptions{
-               GroupBy: &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
-               Agg:     &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
-       }
-
-       tracker := vectorized.NewMemoryTracker(1 << 20)
-       ops, err := BuildOperators(opts, inputSchema, tracker, 1024)
-       if err != nil {
-               t.Fatalf("BuildOperators: %v", err)
-       }
-       if len(ops) != 1 {
-               t.Fatalf("want 1 operator, got %d", len(ops))
-       }
-       agg := ops[0].(*BatchAggregation)
-       if initErr := agg.Init(context.Background()); initErr != nil {
-               t.Fatalf("agg init: %v", initErr)
-       }
-       defer agg.Close()
-
-       // Feed 5 rows: 3×"a" (sum=6), 2×"b" (sum=9).
-       in := vectorized.NewRecordBatch(inputSchema, 5)
-       pushRow := func(ts int64, svc string, v int64) {
-               in.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts)
-               in.Columns[1].(*vectorized.TypedColumn[string]).Append(svc)
-               in.Columns[2].(*vectorized.TypedColumn[int64]).Append(v)
-       }
-       pushRow(1, "a", 1)
-       pushRow(2, "b", 4)
-       pushRow(3, "a", 2)
-       pushRow(4, "a", 3)
-       pushRow(5, "b", 5)
-       in.Len = 5
-
-       if consumeErr := agg.Consume(context.Background(), in); consumeErr != 
nil {
-               t.Fatalf("agg consume: %v", consumeErr)
-       }
-       if finalizeErr := agg.Finalize(context.Background()); finalizeErr != 
nil {
-               t.Fatalf("agg finalize: %v", finalizeErr)
-       }
-
-       out, pullErr := agg.NextBatch(context.Background())
-       if pullErr != nil {
-               t.Fatalf("agg nextbatch: %v", pullErr)
-       }
-       if out == nil || out.Len != 2 {
-               t.Fatalf("want 2 output rows (2 groups), got %v", out)
-       }
-
-       // Egress: convert to protobuf and verify the wire shape.
-       dps := serializeBatchToProto(out, nil)
-       if len(dps) != 2 {
-               t.Fatalf("want 2 InternalDataPoints, got %d", len(dps))
-       }
-
-       // Index by svc tag for deterministic assertions.
-       bySvc := map[string]int64{}
-       for _, idp := range dps {
-               if idp.DataPoint.Timestamp != nil {
-                       t.Fatalf("aggregation row must have nil Timestamp (D2); 
got %v", idp.DataPoint.Timestamp)
-               }
-               if len(idp.DataPoint.TagFamilies) != 1 || 
idp.DataPoint.TagFamilies[0].Name != "default" {
-                       t.Fatalf("want one TagFamily 'default', got %+v", 
idp.DataPoint.TagFamilies)
-               }
-               tags := idp.DataPoint.TagFamilies[0].Tags
-               if len(tags) != 1 || tags[0].Key != "svc" {
-                       t.Fatalf("want one Tag 'svc', got %+v", tags)
-               }
-               svc := tags[0].Value.GetStr().GetValue()
-               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value_sum" {
-                       t.Fatalf("want one Field 'value_sum', got %+v", 
idp.DataPoint.Fields)
-               }
-               bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue()
-       }
-       if bySvc["a"] != 6 {
-               t.Fatalf("sum(a): want 6, got %d", bySvc["a"])
-       }
-       if bySvc["b"] != 9 {
-               t.Fatalf("sum(b): want 9, got %d", bySvc["b"])
-       }
-}
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
index 0db67ce24..f0805ae2e 100644
--- a/pkg/query/vectorized/measure/integration.go
+++ b/pkg/query/vectorized/measure/integration.go
@@ -190,42 +190,18 @@ func NewMIterator(ctx context.Context, qr 
model.MeasureQueryResult,
                source = NewBatchScan(qr, schema, pool, cfg.BatchSize)
        }
 
-       // G7e: when opts carries GroupBy + Agg, BuildOperators emits the
-       // BatchAggregation breaker that does both grouping and folding. The
-       // per-pipeline MemoryTracker drawn from cfg.QueryMemoryMiB is shared
-       // across operators so reservations stack against one budget (G7a).
-       tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 
* 1024)
-       ops, opsErr := BuildOperators(opts, schema, tracker, cfg.BatchSize)
-       if opsErr != nil {
-               _ = source.Close()
-               return nil, opsErr
-       }
-
-       builder := 
vectorized.NewPipelineBuilder().From(source).WithMemoryTracker(tracker)
-       for _, op := range ops {
-               builder = builder.Break(op)
-       }
-       pipeline, buildErr := builder.Build()
+       pipeline, buildErr := 
vectorized.NewPipelineBuilder().From(source).Build()
        if buildErr != nil {
                // source was constructed but never wired into a Pipeline; close
                // it directly to release qr through the source.
                _ = source.Close()
                return nil, buildErr
        }
-       // When operators are present, choose the egress pool based on the
-       // terminal operator's output schema — its column layout differs from
-       // the source schema (e.g., agg output drops timestamp and adds the
-       // agg result field). The serializer reads each batch's own Schema so
-       // pooling is the only schema-coupled concern here.
-       egressPool := pool
-       if len(ops) > 0 {
-               egressPool = 
vectorized.NewBatchPool(ops[len(ops)-1].OutputSchema(), cfg.BatchSize)
-       }
        if initErr := source.Init(ctx); initErr != nil {
                _ = pipeline.Close()
                return nil, initErr
        }
-       return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, 
pipeline, egressPool)}, nil
+       return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, 
pipeline, pool)}, nil
 }
 
 // VectorizedMIterator is the public adapter exposed to other packages. It is

Reply via email to