This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e2f30bda6876cb406624dec70d8fe49d43ba5395 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 09:56:32 2026 +0000 feat(query/vectorized): expose Pipeline.Init for multi-stage cascades Pipeline carried an unexported head whose Init cascaded through every stage (fusedStage / breakerStage / source). Pre-G8 production used only the source — NewMIterator called source.Init directly — so the cascade was unreachable from outside the package. G8a wires real breakers (BatchAggregation) into the pipeline via plan nodes outside this package. Breaker stages need their Init called so the operator's per-pipeline state (e.g. BatchAggregation's groups map) is allocated before the first Consume. Add Pipeline.Init as the public entry point that triggers the cascade. Existing NewMIterator continues to call source.Init for now; G8c will migrate it to Pipeline.Init when the vec executor lands. --- pkg/query/vectorized/pipeline.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go index 150c9b2e7..d3c4158cc 100644 --- a/pkg/query/vectorized/pipeline.go +++ b/pkg/query/vectorized/pipeline.go @@ -30,6 +30,14 @@ type Pipeline struct { closed bool } +// Init cascades initialization down through every stage to the source. +// Must be called once before the first Next, after Build. Re-calling is +// safe but pointless — each stage's Init is idempotent only if its +// underlying operator's Init is. +func (p *Pipeline) Init(ctx context.Context) error { + return p.head.Init(ctx) +} + // Next returns the next batch from the head stage. func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) { return p.head.NextBatch(ctx)
