This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e2f30bda6876cb406624dec70d8fe49d43ba5395
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 09:56:32 2026 +0000

    feat(query/vectorized): expose Pipeline.Init for multi-stage cascades
    
    Pipeline carried an unexported head whose Init cascaded through every
    stage (fusedStage / breakerStage / source). Pre-G8 production used only
    the source — NewMIterator called source.Init directly — so the cascade
    was unreachable from outside the package.
    
    G8a wires real breakers (BatchAggregation) into the pipeline via plan
    nodes outside this package. Breaker stages need their Init called so
    the operator's per-pipeline state (e.g. BatchAggregation's groups map)
    is allocated before the first Consume. Add Pipeline.Init as the public
    entry point that triggers the cascade.
    
    Existing NewMIterator continues to call source.Init for now; G8c will
    migrate it to Pipeline.Init when the vec executor lands.
---
 pkg/query/vectorized/pipeline.go | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go
index 150c9b2e7..d3c4158cc 100644
--- a/pkg/query/vectorized/pipeline.go
+++ b/pkg/query/vectorized/pipeline.go
@@ -30,6 +30,14 @@ type Pipeline struct {
        closed  bool
 }
 
+// Init cascades initialization down through every stage to the source.
+// Must be called once before the first Next, after Build. Re-calling is
+// safe but pointless — each stage's Init is idempotent only if its
+// underlying operator's Init is.
+func (p *Pipeline) Init(ctx context.Context) error {
+       return p.head.Init(ctx)
+}
+
 // Next returns the next batch from the head stage.
 func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) {
        return p.head.NextBatch(ctx)

Reply via email to