This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 4a0479e8652898650cf974c623ccd12da7b1e302
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:19:15 2026 +0000

    feat(query/vectorized): shared per-pipeline MemoryTracker (G7a)
    
    `PipelineBuilder.WithMemoryTracker` attaches a `*MemoryTracker` to the
    constructed `*Pipeline`; `Pipeline.Tracker()` exposes it. Operators that
    bookkeep memory (today BatchGroupBy; with this commit BatchAggregation
    too) should be constructed with the same tracker so reservations stack
    against a single per-query budget instead of each operator independently
    under-counting.
    
    BatchAggregation gains an `entrySize` parameter and a `tracker` field.
    On Consume, each new group reserves `entrySize` bytes; on Close, the
    outstanding reservation is refunded. Pass `entrySize=0` to disable
    per-group bookkeeping (existing aggregation unit tests do this so they
    don't care about the budget). The constructor signature is breaking;
    call-sites in `aggregation_test.go` are updated in this commit.
    
    `shared_tracker_test.go` pins two invariants:
    - GroupBy + Aggregation against the same tracker overflow as a single
      budget (not two independent ones).
    - Aggregation.Close refunds its reservation; the tracker returns to zero
      after the operator is closed.
---
 pkg/query/vectorized/measure/aggregation.go        |  28 ++++-
 pkg/query/vectorized/measure/aggregation_test.go   |  24 ++--
 .../vectorized/measure/shared_tracker_test.go      | 139 +++++++++++++++++++++
 pkg/query/vectorized/pipeline.go                   |  21 +++-
 4 files changed, 196 insertions(+), 16 deletions(-)

diff --git a/pkg/query/vectorized/measure/aggregation.go 
b/pkg/query/vectorized/measure/aggregation.go
index 587daba17..240755d3b 100644
--- a/pkg/query/vectorized/measure/aggregation.go
+++ b/pkg/query/vectorized/measure/aggregation.go
@@ -79,11 +79,14 @@ type BatchAggregation struct {
        inputSchema  *vectorized.BatchSchema
        outputSchema *vectorized.BatchSchema
        pool         *vectorized.BatchPool
+       tracker      *vectorized.MemoryTracker
        groups       map[string]*aggGroup
        insertion    []*aggGroup
        keyIndices   []int
        aggs         []AggSpec
        mode         AggMode
+       entrySize    int64
+       reserved     int64
        batchSize    int
        cursor       int
        closed       bool
@@ -113,19 +116,27 @@ type aggSlot struct {
 
 // NewBatchAggregation constructs a BatchAggregation. It builds the output
 // schema internally (keys + agg outputs) and owns its output BatchPool.
+//
+// tracker carries the per-pipeline memory budget; entrySize is the bytes
+// reserved per new group bucket (key columns + slots + map entry overhead).
+// Pass entrySize=0 to disable per-group bookkeeping. tracker must not be nil
+// — use a large NewMemoryTracker for unit tests that don't care about budget.
 func NewBatchAggregation(
        input *vectorized.BatchSchema, keyIndices []int,
        aggs []AggSpec, mode AggMode, batchSize int,
+       tracker *vectorized.MemoryTracker, entrySize int64,
 ) *BatchAggregation {
        outputSchema := buildAggOutputSchema(input, keyIndices, aggs)
        return &BatchAggregation{
                inputSchema:  input,
                outputSchema: outputSchema,
                pool:         vectorized.NewBatchPool(outputSchema, batchSize),
+               tracker:      tracker,
                keyIndices:   slices.Clone(keyIndices),
                aggs:         slices.Clone(aggs),
                mode:         mode,
                batchSize:    batchSize,
+               entrySize:    entrySize,
        }
 }
 
@@ -143,6 +154,10 @@ func (a *BatchAggregation) OutputSchema() 
*vectorized.BatchSchema { return a.out
 
 // Consume folds every active row into its group's accumulator. Null values are
 // excluded from aggregation (count not incremented; sum/min/max unchanged).
+//
+// Each new group reserves entrySize bytes from the shared MemoryTracker. If
+// the budget is exhausted, Consume returns the wrapped tracker error and the
+// row's group is not added — partial-batch state is consistent.
 func (a *BatchAggregation) Consume(_ context.Context, b 
*vectorized.RecordBatch) error {
        if a.mode != AggModeAll {
                return ErrAggModeNotImplemented
@@ -152,6 +167,12 @@ func (a *BatchAggregation) Consume(_ context.Context, b 
*vectorized.RecordBatch)
                key := a.computeKey(b, int(rowIdx))
                group, exists := a.groups[key]
                if !exists {
+                       if a.entrySize > 0 {
+                               if reserveErr := 
a.tracker.Reserve(a.entrySize); reserveErr != nil {
+                                       return fmt.Errorf("aggregation memory 
budget exceeded: %w", reserveErr)
+                               }
+                               a.reserved += a.entrySize
+                       }
                        newGroup, newErr := a.newGroup(b, int(rowIdx), key)
                        if newErr != nil {
                                return newErr
@@ -198,12 +219,17 @@ func (a *BatchAggregation) NextBatch(_ context.Context) 
(*vectorized.RecordBatch
        return out, nil
 }
 
-// Close releases the group map. Idempotent.
+// Close releases the group map and refunds the outstanding memory
+// reservation. Idempotent.
 func (a *BatchAggregation) Close() error {
        if a.closed {
                return nil
        }
        a.closed = true
+       if a.reserved > 0 {
+               a.tracker.Release(a.reserved)
+               a.reserved = 0
+       }
        a.groups = nil
        a.insertion = nil
        return nil
diff --git a/pkg/query/vectorized/measure/aggregation_test.go 
b/pkg/query/vectorized/measure/aggregation_test.go
index 8c77c519f..79da95e3d 100644
--- a/pkg/query/vectorized/measure/aggregation_test.go
+++ b/pkg/query/vectorized/measure/aggregation_test.go
@@ -82,7 +82,7 @@ func findAggRow(t *testing.T, b *vectorized.RecordBatch, key 
string) int {
 func TestBatchAggregation_AggModeAll_SumInt64(t *testing.T) {
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -125,7 +125,7 @@ func TestBatchAggregation_AggModeAll_SumInt64(t *testing.T) 
{
 func TestBatchAggregation_AggModeAll_SumFloat64(t *testing.T) {
        s := aggFloatSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -151,7 +151,7 @@ func TestBatchAggregation_AggModeAll_SumFloat64(t 
*testing.T) {
 func TestBatchAggregation_AggModeAll_Count(t *testing.T) {
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggCount, InputCol: 1, Output: "n"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggCount, InputCol: 1, Output: "n"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -191,7 +191,7 @@ func TestBatchAggregation_AggModeAll_Count(t *testing.T) {
 func TestBatchAggregation_AggModeAll_Min_Int64(t *testing.T) {
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggMin, InputCol: 1, Output: "min_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggMin, InputCol: 1, Output: "min_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
        feedAggInt(t, op, s,
@@ -222,7 +222,7 @@ func TestBatchAggregation_AggModeAll_Min_Int64(t 
*testing.T) {
 func TestBatchAggregation_AggModeAll_Max_Float64(t *testing.T) {
        s := aggFloatSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggMax, InputCol: 1, Output: "max_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggMax, InputCol: 1, Output: "max_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -246,7 +246,7 @@ func TestBatchAggregation_AggModeAll_Max_Float64(t 
*testing.T) {
 func TestBatchAggregation_AggModeAll_Mean_Float64(t *testing.T) {
        s := aggFloatSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggMean, InputCol: 1, Output: "mean_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggMean, InputCol: 1, Output: "mean_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -275,7 +275,7 @@ func 
TestBatchAggregation_AggModeAll_NullField_ExcludedFromAggregation(t *testin
                []AggSpec{
                        {Func: AggSum, InputCol: 1, Output: "sum_v"},
                        {Func: AggCount, InputCol: 1, Output: "n"},
-               }, AggModeAll, 8)
+               }, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
        feedAggInt(t, op, s,
@@ -317,7 +317,7 @@ func TestBatchAggregation_NULInStringKey_NoCollision(t 
*testing.T) {
                {Role: vectorized.RoleField, Name: "v", Type: 
vectorized.ColumnTypeInt64},
        })
        op := NewBatchAggregation(s, []int{0, 1},
-               []AggSpec{{Func: AggSum, InputCol: 2, Output: "sum"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggSum, InputCol: 2, Output: "sum"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -341,7 +341,7 @@ func TestBatchAggregation_NULInStringKey_NoCollision(t 
*testing.T) {
 func TestBatchAggregation_AggModeMap_ReturnsErrNotImplemented(t *testing.T) {
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeMap, 8)
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeMap, 8, vectorized.NewMemoryTracker(1<<30), 0)
        if err := op.Init(context.Background()); err != nil {
                t.Fatalf("Init must succeed regardless of mode; got %v", err)
        }
@@ -364,7 +364,7 @@ func 
TestBatchAggregation_AggModeMap_ReturnsErrNotImplemented(t *testing.T) {
 func TestBatchAggregation_AggModeReduce_ReturnsErrNotImplemented(t *testing.T) 
{
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeReduce, 8)
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeReduce, 8, vectorized.NewMemoryTracker(1<<30), 0)
        if err := op.Init(context.Background()); err != nil {
                t.Fatalf("Init must succeed regardless of mode; got %v", err)
        }
@@ -392,7 +392,7 @@ func 
TestBatchAggregation_AggModeReduce_ReturnsErrNotImplemented(t *testing.T) {
 func TestBatchAggregation_DelegatesToAggregationPackage(t *testing.T) {
        s := aggIntSchema()
        op := NewBatchAggregation(s, []int{0},
-               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8)
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, 
AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
 
@@ -432,7 +432,7 @@ func 
TestBatchAggregation_Correctness_MatchesManualComputation(t *testing.T) {
                        {Func: AggSum, InputCol: 1, Output: "sum_v"},
                        {Func: AggMin, InputCol: 1, Output: "min_v"},
                        {Func: AggMax, InputCol: 1, Output: "max_v"},
-               }, AggModeAll, 8)
+               }, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0)
        _ = op.Init(context.Background())
        defer op.Close()
        feedAggInt(t, op, s,
diff --git a/pkg/query/vectorized/measure/shared_tracker_test.go 
b/pkg/query/vectorized/measure/shared_tracker_test.go
new file mode 100644
index 000000000..5e4e1de22
--- /dev/null
+++ b/pkg/query/vectorized/measure/shared_tracker_test.go
@@ -0,0 +1,139 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "context"
+       "strings"
+       "testing"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+)
+
+// TestSharedTracker_GroupByAggregation_StacksAgainstSingleBudget pins the G7a
+// invariant: when BatchGroupBy and BatchAggregation are constructed with the
+// same MemoryTracker, their per-group reservations stack against one budget.
+// Without sharing, each operator would independently fit under the limit and
+// the pipeline would consume 2x the intended ceiling.
+func TestSharedTracker_GroupByAggregation_StacksAgainstSingleBudget(t 
*testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "v", Type: 
vectorized.ColumnTypeInt64},
+       })
+
+       // Tight budget: only enough for GroupBy's 2 groups (2*100=200), no
+       // headroom for Aggregation to reserve for the same groups.
+       const budget = 250
+       tracker := vectorized.NewMemoryTracker(budget)
+
+       gbPool := vectorized.NewBatchPool(schema, 8)
+       gb := NewBatchGroupBy(schema, []int{0}, gbPool, 8, tracker, 100, 0)
+       if initErr := gb.Init(context.Background()); initErr != nil {
+               t.Fatalf("groupby init: %v", initErr)
+       }
+       defer gb.Close()
+
+       agg := NewBatchAggregation(schema, []int{0},
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}},
+               AggModeAll, 8, tracker, 100)
+       if initErr := agg.Init(context.Background()); initErr != nil {
+               t.Fatalf("agg init: %v", initErr)
+       }
+       defer agg.Close()
+
+       // Two distinct groups, one row each. GroupBy reserves 2*100=200 
(used=200).
+       // Aggregation then sees the GroupBy output and tries to reserve 100 for
+       // its first new group: 200+100=300 > 250 → budget exhausted.
+       in := vectorized.NewRecordBatch(schema, 2)
+       in.Columns[0].(*vectorized.TypedColumn[string]).Append("a")
+       in.Columns[1].(*vectorized.TypedColumn[int64]).Append(1)
+       in.Columns[0].(*vectorized.TypedColumn[string]).Append("b")
+       in.Columns[1].(*vectorized.TypedColumn[int64]).Append(2)
+       in.Len = 2
+
+       if consumeErr := gb.Consume(context.Background(), in); consumeErr != 
nil {
+               t.Fatalf("groupby consume should fit under budget: %v", 
consumeErr)
+       }
+       if got := tracker.Used(); got != 200 {
+               t.Fatalf("after groupby consume: tracker used = %d, want 200", 
got)
+       }
+       if finalizeErr := gb.Finalize(context.Background()); finalizeErr != nil 
{
+               t.Fatalf("groupby finalize: %v", finalizeErr)
+       }
+
+       // Drain GroupBy and feed each batch into Aggregation. Expect a single
+       // budget-exhausted error.
+       var aggErr error
+       for {
+               out, pullErr := gb.NextBatch(context.Background())
+               if pullErr != nil {
+                       t.Fatalf("groupby NextBatch: %v", pullErr)
+               }
+               if out == nil {
+                       break
+               }
+               aggErr = agg.Consume(context.Background(), out)
+               if aggErr != nil {
+                       break
+               }
+       }
+       if aggErr == nil {
+               t.Fatal("aggregation consume should overflow shared budget but 
did not")
+       }
+       if !strings.Contains(aggErr.Error(), "memory budget exceeded") {
+               t.Fatalf("aggregation error must surface budget exhaustion, 
got: %v", aggErr)
+       }
+}
+
+// TestSharedTracker_ReleaseOnClose pins the lifecycle: Close releases the
+// outstanding reservation, returning the budget to fully unused. Important
+// for queries reusing trackers across stages or pipelines.
+func TestSharedTracker_ReleaseOnClose(t *testing.T) {
+       schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", 
Type: vectorized.ColumnTypeString},
+               {Role: vectorized.RoleField, Name: "v", Type: 
vectorized.ColumnTypeInt64},
+       })
+       tracker := vectorized.NewMemoryTracker(1 << 20)
+
+       agg := NewBatchAggregation(schema, []int{0},
+               []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}},
+               AggModeAll, 8, tracker, 128)
+       _ = agg.Init(context.Background())
+
+       in := vectorized.NewRecordBatch(schema, 3)
+       in.Columns[0].(*vectorized.TypedColumn[string]).Append("a")
+       in.Columns[1].(*vectorized.TypedColumn[int64]).Append(1)
+       in.Columns[0].(*vectorized.TypedColumn[string]).Append("b")
+       in.Columns[1].(*vectorized.TypedColumn[int64]).Append(2)
+       in.Columns[0].(*vectorized.TypedColumn[string]).Append("c")
+       in.Columns[1].(*vectorized.TypedColumn[int64]).Append(3)
+       in.Len = 3
+
+       if consumeErr := agg.Consume(context.Background(), in); consumeErr != 
nil {
+               t.Fatalf("agg consume: %v", consumeErr)
+       }
+       if got := tracker.Used(); got != 3*128 {
+               t.Fatalf("after consume: tracker used = %d, want %d", got, 
3*128)
+       }
+       if closeErr := agg.Close(); closeErr != nil {
+               t.Fatalf("agg close: %v", closeErr)
+       }
+       if got := tracker.Used(); got != 0 {
+               t.Fatalf("after close: tracker used = %d, want 0 (reservation 
must be refunded)", got)
+       }
+}
diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go
index b12221555..150c9b2e7 100644
--- a/pkg/query/vectorized/pipeline.go
+++ b/pkg/query/vectorized/pipeline.go
@@ -25,8 +25,9 @@ import (
 // Pipeline is the composed sequence of stages from source to final breaker.
 // It exposes a single PullOperator-shaped Next method to the driver.
 type Pipeline struct {
-       head   PullOperator
-       closed bool
+       head    PullOperator
+       tracker *MemoryTracker
+       closed  bool
 }
 
 // Next returns the next batch from the head stage.
@@ -34,6 +35,11 @@ func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, 
error) {
        return p.head.NextBatch(ctx)
 }
 
+// Tracker returns the shared per-pipeline MemoryTracker, or nil if the builder
+// did not set one. Operators that bookkeep memory should be constructed with
+// this tracker so they all draw from a single budget.
+func (p *Pipeline) Tracker() *MemoryTracker { return p.tracker }
+
 // Close closes the head stage. Idempotent — repeat calls are no-ops.
 func (p *Pipeline) Close() error {
        if p.closed {
@@ -46,6 +52,7 @@ func (p *Pipeline) Close() error {
 // PipelineBuilder fluently composes a Pipeline.
 type PipelineBuilder struct {
        source       PullOperator
+       tracker      *MemoryTracker
        pendingFused []FusibleOperator
        breakers     []BreakerOperator
 }
@@ -56,6 +63,14 @@ func NewPipelineBuilder() *PipelineBuilder { return 
&PipelineBuilder{} }
 // From sets the leaf source.
 func (b *PipelineBuilder) From(p PullOperator) *PipelineBuilder { b.source = 
p; return b }
 
+// WithMemoryTracker attaches a shared MemoryTracker to the pipeline. Operators
+// that bookkeep memory (BatchGroupBy, BatchAggregation) should be constructed
+// with this same tracker so reservations stack against a single budget.
+func (b *PipelineBuilder) WithMemoryTracker(t *MemoryTracker) *PipelineBuilder 
{
+       b.tracker = t
+       return b
+}
+
 // Apply queues a FusibleOperator to fold into the next stage.
 func (b *PipelineBuilder) Apply(f FusibleOperator) *PipelineBuilder {
        b.pendingFused = append(b.pendingFused, f)
@@ -77,7 +92,7 @@ func (b *PipelineBuilder) Build() (*Pipeline, error) {
        for _, br := range b.breakers {
                head = newBreakerStage(head, br)
        }
-       return &Pipeline{head: head}, nil
+       return &Pipeline{head: head, tracker: b.tracker}, nil
 }
 
 // breakerStage wraps a BreakerOperator so it acts as a PullOperator for the 
next stage.

Reply via email to