This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 4a0479e8652898650cf974c623ccd12da7b1e302 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:19:15 2026 +0000 feat(query/vectorized): shared per-pipeline MemoryTracker (G7a) `PipelineBuilder.WithMemoryTracker` attaches a `*MemoryTracker` to the constructed `*Pipeline`; `Pipeline.Tracker()` exposes it. Operators that bookkeep memory (today BatchGroupBy; with this commit BatchAggregation too) should be constructed with the same tracker so reservations stack against a single per-query budget instead of each operator independently under-counting. BatchAggregation gains an `entrySize` parameter and a `tracker` field. On Consume, each new group reserves `entrySize` bytes; on Close, the outstanding reservation is refunded. Pass `entrySize=0` to disable per-group bookkeeping (existing aggregation unit tests do this so they don't care about the budget). The constructor signature is breaking; call-sites in `aggregation_test.go` are updated in this commit. `shared_tracker_test.go` pins two invariants: - GroupBy + Aggregation against the same tracker overflow as a single budget (not two independent ones). - Aggregation.Close refunds its reservation; the tracker returns to zero after the operator is closed. --- pkg/query/vectorized/measure/aggregation.go | 28 ++++- pkg/query/vectorized/measure/aggregation_test.go | 24 ++-- .../vectorized/measure/shared_tracker_test.go | 139 +++++++++++++++++++++ pkg/query/vectorized/pipeline.go | 21 +++- 4 files changed, 196 insertions(+), 16 deletions(-) diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go index 587daba17..240755d3b 100644 --- a/pkg/query/vectorized/measure/aggregation.go +++ b/pkg/query/vectorized/measure/aggregation.go @@ -79,11 +79,14 @@ type BatchAggregation struct { inputSchema *vectorized.BatchSchema outputSchema *vectorized.BatchSchema pool *vectorized.BatchPool + tracker *vectorized.MemoryTracker groups map[string]*aggGroup insertion []*aggGroup keyIndices []int aggs []AggSpec mode AggMode + entrySize int64 + reserved int64 batchSize int cursor int closed bool @@ -113,19 +116,27 @@ type aggSlot struct { // NewBatchAggregation constructs a BatchAggregation. It builds the output // schema internally (keys + agg outputs) and owns its output BatchPool. +// +// tracker carries the per-pipeline memory budget; entrySize is the bytes +// reserved per new group bucket (key columns + slots + map entry overhead). +// Pass entrySize=0 to disable per-group bookkeeping. tracker must not be nil +// — use a large NewMemoryTracker for unit tests that don't care about budget. func NewBatchAggregation( input *vectorized.BatchSchema, keyIndices []int, aggs []AggSpec, mode AggMode, batchSize int, + tracker *vectorized.MemoryTracker, entrySize int64, ) *BatchAggregation { outputSchema := buildAggOutputSchema(input, keyIndices, aggs) return &BatchAggregation{ inputSchema: input, outputSchema: outputSchema, pool: vectorized.NewBatchPool(outputSchema, batchSize), + tracker: tracker, keyIndices: slices.Clone(keyIndices), aggs: slices.Clone(aggs), mode: mode, batchSize: batchSize, + entrySize: entrySize, } } @@ -143,6 +154,10 @@ func (a *BatchAggregation) OutputSchema() *vectorized.BatchSchema { return a.out // Consume folds every active row into its group's accumulator. Null values are // excluded from aggregation (count not incremented; sum/min/max unchanged). +// +// Each new group reserves entrySize bytes from the shared MemoryTracker. If +// the budget is exhausted, Consume returns the wrapped tracker error and the +// row's group is not added — partial-batch state is consistent. func (a *BatchAggregation) Consume(_ context.Context, b *vectorized.RecordBatch) error { if a.mode != AggModeAll { return ErrAggModeNotImplemented @@ -152,6 +167,12 @@ func (a *BatchAggregation) Consume(_ context.Context, b *vectorized.RecordBatch) key := a.computeKey(b, int(rowIdx)) group, exists := a.groups[key] if !exists { + if a.entrySize > 0 { + if reserveErr := a.tracker.Reserve(a.entrySize); reserveErr != nil { + return fmt.Errorf("aggregation memory budget exceeded: %w", reserveErr) + } + a.reserved += a.entrySize + } newGroup, newErr := a.newGroup(b, int(rowIdx), key) if newErr != nil { return newErr @@ -198,12 +219,17 @@ func (a *BatchAggregation) NextBatch(_ context.Context) (*vectorized.RecordBatch return out, nil } -// Close releases the group map. Idempotent. +// Close releases the group map and refunds the outstanding memory +// reservation. Idempotent. func (a *BatchAggregation) Close() error { if a.closed { return nil } a.closed = true + if a.reserved > 0 { + a.tracker.Release(a.reserved) + a.reserved = 0 + } a.groups = nil a.insertion = nil return nil diff --git a/pkg/query/vectorized/measure/aggregation_test.go b/pkg/query/vectorized/measure/aggregation_test.go index 8c77c519f..79da95e3d 100644 --- a/pkg/query/vectorized/measure/aggregation_test.go +++ b/pkg/query/vectorized/measure/aggregation_test.go @@ -82,7 +82,7 @@ func findAggRow(t *testing.T, b *vectorized.RecordBatch, key string) int { func TestBatchAggregation_AggModeAll_SumInt64(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -125,7 +125,7 @@ func TestBatchAggregation_AggModeAll_SumInt64(t *testing.T) { func TestBatchAggregation_AggModeAll_SumFloat64(t *testing.T) { s := aggFloatSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -151,7 +151,7 @@ func TestBatchAggregation_AggModeAll_SumFloat64(t *testing.T) { func TestBatchAggregation_AggModeAll_Count(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggCount, InputCol: 1, Output: "n"}}, AggModeAll, 8) + []AggSpec{{Func: AggCount, InputCol: 1, Output: "n"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -191,7 +191,7 @@ func TestBatchAggregation_AggModeAll_Count(t *testing.T) { func TestBatchAggregation_AggModeAll_Min_Int64(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggMin, InputCol: 1, Output: "min_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggMin, InputCol: 1, Output: "min_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() feedAggInt(t, op, s, @@ -222,7 +222,7 @@ func TestBatchAggregation_AggModeAll_Min_Int64(t *testing.T) { func TestBatchAggregation_AggModeAll_Max_Float64(t *testing.T) { s := aggFloatSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggMax, InputCol: 1, Output: "max_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggMax, InputCol: 1, Output: "max_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -246,7 +246,7 @@ func TestBatchAggregation_AggModeAll_Max_Float64(t *testing.T) { func TestBatchAggregation_AggModeAll_Mean_Float64(t *testing.T) { s := aggFloatSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggMean, InputCol: 1, Output: "mean_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggMean, InputCol: 1, Output: "mean_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -275,7 +275,7 @@ func TestBatchAggregation_AggModeAll_NullField_ExcludedFromAggregation(t *testin []AggSpec{ {Func: AggSum, InputCol: 1, Output: "sum_v"}, {Func: AggCount, InputCol: 1, Output: "n"}, - }, AggModeAll, 8) + }, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() feedAggInt(t, op, s, @@ -317,7 +317,7 @@ func TestBatchAggregation_NULInStringKey_NoCollision(t *testing.T) { {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, }) op := NewBatchAggregation(s, []int{0, 1}, - []AggSpec{{Func: AggSum, InputCol: 2, Output: "sum"}}, AggModeAll, 8) + []AggSpec{{Func: AggSum, InputCol: 2, Output: "sum"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -341,7 +341,7 @@ func TestBatchAggregation_NULInStringKey_NoCollision(t *testing.T) { func TestBatchAggregation_AggModeMap_ReturnsErrNotImplemented(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeMap, 8) + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeMap, 8, vectorized.NewMemoryTracker(1<<30), 0) if err := op.Init(context.Background()); err != nil { t.Fatalf("Init must succeed regardless of mode; got %v", err) } @@ -364,7 +364,7 @@ func TestBatchAggregation_AggModeMap_ReturnsErrNotImplemented(t *testing.T) { func TestBatchAggregation_AggModeReduce_ReturnsErrNotImplemented(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeReduce, 8) + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeReduce, 8, vectorized.NewMemoryTracker(1<<30), 0) if err := op.Init(context.Background()); err != nil { t.Fatalf("Init must succeed regardless of mode; got %v", err) } @@ -392,7 +392,7 @@ func TestBatchAggregation_AggModeReduce_ReturnsErrNotImplemented(t *testing.T) { func TestBatchAggregation_DelegatesToAggregationPackage(t *testing.T) { s := aggIntSchema() op := NewBatchAggregation(s, []int{0}, - []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() @@ -432,7 +432,7 @@ func TestBatchAggregation_Correctness_MatchesManualComputation(t *testing.T) { {Func: AggSum, InputCol: 1, Output: "sum_v"}, {Func: AggMin, InputCol: 1, Output: "min_v"}, {Func: AggMax, InputCol: 1, Output: "max_v"}, - }, AggModeAll, 8) + }, AggModeAll, 8, vectorized.NewMemoryTracker(1<<30), 0) _ = op.Init(context.Background()) defer op.Close() feedAggInt(t, op, s, diff --git a/pkg/query/vectorized/measure/shared_tracker_test.go b/pkg/query/vectorized/measure/shared_tracker_test.go new file mode 100644 index 000000000..5e4e1de22 --- /dev/null +++ b/pkg/query/vectorized/measure/shared_tracker_test.go @@ -0,0 +1,139 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "strings" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// TestSharedTracker_GroupByAggregation_StacksAgainstSingleBudget pins the G7a +// invariant: when BatchGroupBy and BatchAggregation are constructed with the +// same MemoryTracker, their per-group reservations stack against one budget. +// Without sharing, each operator would independently fit under the limit and +// the pipeline would consume 2x the intended ceiling. +func TestSharedTracker_GroupByAggregation_StacksAgainstSingleBudget(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) + + // Tight budget: only enough for GroupBy's 2 groups (2*100=200), no + // headroom for Aggregation to reserve for the same groups. + const budget = 250 + tracker := vectorized.NewMemoryTracker(budget) + + gbPool := vectorized.NewBatchPool(schema, 8) + gb := NewBatchGroupBy(schema, []int{0}, gbPool, 8, tracker, 100, 0) + if initErr := gb.Init(context.Background()); initErr != nil { + t.Fatalf("groupby init: %v", initErr) + } + defer gb.Close() + + agg := NewBatchAggregation(schema, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, + AggModeAll, 8, tracker, 100) + if initErr := agg.Init(context.Background()); initErr != nil { + t.Fatalf("agg init: %v", initErr) + } + defer agg.Close() + + // Two distinct groups, one row each. GroupBy reserves 2*100=200 (used=200). + // Aggregation then sees the GroupBy output and tries to reserve 100 for + // its first new group: 200+100=300 > 250 → budget exhausted. + in := vectorized.NewRecordBatch(schema, 2) + in.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + in.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + in.Columns[0].(*vectorized.TypedColumn[string]).Append("b") + in.Columns[1].(*vectorized.TypedColumn[int64]).Append(2) + in.Len = 2 + + if consumeErr := gb.Consume(context.Background(), in); consumeErr != nil { + t.Fatalf("groupby consume should fit under budget: %v", consumeErr) + } + if got := tracker.Used(); got != 200 { + t.Fatalf("after groupby consume: tracker used = %d, want 200", got) + } + if finalizeErr := gb.Finalize(context.Background()); finalizeErr != nil { + t.Fatalf("groupby finalize: %v", finalizeErr) + } + + // Drain GroupBy and feed each batch into Aggregation. Expect a single + // budget-exhausted error. + var aggErr error + for { + out, pullErr := gb.NextBatch(context.Background()) + if pullErr != nil { + t.Fatalf("groupby NextBatch: %v", pullErr) + } + if out == nil { + break + } + aggErr = agg.Consume(context.Background(), out) + if aggErr != nil { + break + } + } + if aggErr == nil { + t.Fatal("aggregation consume should overflow shared budget but did not") + } + if !strings.Contains(aggErr.Error(), "memory budget exceeded") { + t.Fatalf("aggregation error must surface budget exhaustion, got: %v", aggErr) + } +} + +// TestSharedTracker_ReleaseOnClose pins the lifecycle: Close releases the +// outstanding reservation, returning the budget to fully unused. Important +// for queries reusing trackers across stages or pipelines. +func TestSharedTracker_ReleaseOnClose(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) + tracker := vectorized.NewMemoryTracker(1 << 20) + + agg := NewBatchAggregation(schema, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, + AggModeAll, 8, tracker, 128) + _ = agg.Init(context.Background()) + + in := vectorized.NewRecordBatch(schema, 3) + in.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + in.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + in.Columns[0].(*vectorized.TypedColumn[string]).Append("b") + in.Columns[1].(*vectorized.TypedColumn[int64]).Append(2) + in.Columns[0].(*vectorized.TypedColumn[string]).Append("c") + in.Columns[1].(*vectorized.TypedColumn[int64]).Append(3) + in.Len = 3 + + if consumeErr := agg.Consume(context.Background(), in); consumeErr != nil { + t.Fatalf("agg consume: %v", consumeErr) + } + if got := tracker.Used(); got != 3*128 { + t.Fatalf("after consume: tracker used = %d, want %d", got, 3*128) + } + if closeErr := agg.Close(); closeErr != nil { + t.Fatalf("agg close: %v", closeErr) + } + if got := tracker.Used(); got != 0 { + t.Fatalf("after close: tracker used = %d, want 0 (reservation must be refunded)", got) + } +} diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go index b12221555..150c9b2e7 100644 --- a/pkg/query/vectorized/pipeline.go +++ b/pkg/query/vectorized/pipeline.go @@ -25,8 +25,9 @@ import ( // Pipeline is the composed sequence of stages from source to final breaker. // It exposes a single PullOperator-shaped Next method to the driver. type Pipeline struct { - head PullOperator - closed bool + head PullOperator + tracker *MemoryTracker + closed bool } // Next returns the next batch from the head stage. @@ -34,6 +35,11 @@ func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) { return p.head.NextBatch(ctx) } +// Tracker returns the shared per-pipeline MemoryTracker, or nil if the builder +// did not set one. Operators that bookkeep memory should be constructed with +// this tracker so they all draw from a single budget. +func (p *Pipeline) Tracker() *MemoryTracker { return p.tracker } + // Close closes the head stage. Idempotent — repeat calls are no-ops. func (p *Pipeline) Close() error { if p.closed { @@ -46,6 +52,7 @@ func (p *Pipeline) Close() error { // PipelineBuilder fluently composes a Pipeline. type PipelineBuilder struct { source PullOperator + tracker *MemoryTracker pendingFused []FusibleOperator breakers []BreakerOperator } @@ -56,6 +63,14 @@ func NewPipelineBuilder() *PipelineBuilder { return &PipelineBuilder{} } // From sets the leaf source. func (b *PipelineBuilder) From(p PullOperator) *PipelineBuilder { b.source = p; return b } +// WithMemoryTracker attaches a shared MemoryTracker to the pipeline. Operators +// that bookkeep memory (BatchGroupBy, BatchAggregation) should be constructed +// with this same tracker so reservations stack against a single budget. +func (b *PipelineBuilder) WithMemoryTracker(t *MemoryTracker) *PipelineBuilder { + b.tracker = t + return b +} + // Apply queues a FusibleOperator to fold into the next stage. func (b *PipelineBuilder) Apply(f FusibleOperator) *PipelineBuilder { b.pendingFused = append(b.pendingFused, f) @@ -77,7 +92,7 @@ func (b *PipelineBuilder) Build() (*Pipeline, error) { for _, br := range b.breakers { head = newBreakerStage(head, br) } - return &Pipeline{head: head}, nil + return &Pipeline{head: head, tracker: b.tracker}, nil } // breakerStage wraps a BreakerOperator so it acts as a PullOperator for the next stage.
