This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit bb1f751419bd8bd7f0ab6496895cc910a62deb6a
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 13:45:37 2026 +0000

    feat(query/vectorized/measure/plan): vec executor (G8c)
    
    Execute(ctx, plan, cfg) composes a VecPlan tree into a runnable
    *vectorized.Pipeline and returns it as an executor.MIterator:
    
    1. Builds a per-pipeline MemoryTracker sized from cfg.QueryMemoryMiB
       so every memory-bookkeeping operator (G7a) draws from a single
       budget.
    2. Threads the tracker + PipelineBuilder + cfg through a BuildContext
       that each plan node consults during Build.
    3. Walks the tree leaf-up via plan.Build (Scan calls Builder.From;
       fusibles call Builder.Apply; breakers call Builder.Break).
    4. Finalizes via builder.Build() and Pipeline.Init(ctx).
    5. Wraps the Pipeline as a VectorizedMIterator via
       measure.NewIteratorFromPipeline, with the egress BatchPool sized
       from the terminal node's output schema (Schema-rewriting breakers
       like BatchAggregation change the column layout).
    
    The executor is independent of storage — Scan.Source must be populated
    by the caller before Execute. G8d's top-level dispatch is responsible
    for building scan sources from a MeasureExecutionContext.
    
    Tests cover nil plan, invalid config, ScanLimit drain end-to-end,
    GroupByAgg producing aggregated rows with nil Timestamp (D2), and
    Build-error propagation (e.g. unset Scan.Source).
    
    Known gap (TODO(G8d)): an end-to-end Analyze+Execute test against the
    analyzer-produced BatchSchema would require column-type bridging since
    BuildBatchSchema emits passthrough *modelv1.FieldValue columns but
    BatchAggregation.fold needs native int64/float64. G8d will need either
    a passthrough->native fusible at the scan-source boundary or an
    extension to BatchAggregation. The current executor tests use
    hand-built native-typed schemas to verify executor correctness in
    isolation; the bridge is tracked for G8d.
---
 pkg/query/vectorized/measure/plan/executor.go      |  80 +++++++++++
 pkg/query/vectorized/measure/plan/executor_test.go | 150 +++++++++++++++++++++
 2 files changed, 230 insertions(+)

diff --git a/pkg/query/vectorized/measure/plan/executor.go 
b/pkg/query/vectorized/measure/plan/executor.go
new file mode 100644
index 000000000..e4f8db759
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/executor.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+// Execute composes a vec plan tree into a runnable *vectorized.Pipeline and
+// returns it as an executor.MIterator. The Scan node(s) in the plan must
+// have Source already set — the executor (G8c) does not resolve storage on
+// its own; G8d's top-level dispatch is responsible for building the scan
+// source from a MeasureExecutionContext and stitching it in.
+//
+// Lifecycle: a per-pipeline MemoryTracker is constructed from
+// cfg.QueryMemoryMiB and threaded through the BuildContext so every
+// memory-bookkeeping operator (BatchAggregation, future BatchGroupBy)
+// charges against a single budget (G7a). Pipeline.Init is invoked before
+// the iterator is returned so breaker stages (which lazily allocate their
+// state on Init) are ready for the first Next.
+//
+// On any error during Build, builder.Build, or Pipeline.Init, the function
+// returns the error and closes any partially-constructed pipeline so source
+// resources (BatchPool, underlying MeasureBatchResult) are released.
+func Execute(ctx context.Context, plan VecPlan, cfg measure.VectorizedConfig) 
(executor.MIterator, error) {
+       if plan == nil {
+               return nil, fmt.Errorf("plan.Execute: nil plan")
+       }
+       if cfgErr := cfg.Validate(); cfgErr != nil {
+               return nil, fmt.Errorf("plan.Execute: %w", cfgErr)
+       }
+
+       tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 
* 1024)
+       bc := &BuildContext{
+               Builder: 
vectorized.NewPipelineBuilder().WithMemoryTracker(tracker),
+               Tracker: tracker,
+               Config:  cfg,
+       }
+
+       if buildErr := plan.Build(ctx, bc); buildErr != nil {
+               return nil, fmt.Errorf("plan.Execute: %w", buildErr)
+       }
+
+       pipeline, pipelineErr := bc.Builder.Build()
+       if pipelineErr != nil {
+               return nil, fmt.Errorf("plan.Execute: %w", pipelineErr)
+       }
+
+       if initErr := pipeline.Init(ctx); initErr != nil {
+               _ = pipeline.Close()
+               return nil, fmt.Errorf("plan.Execute: %w", initErr)
+       }
+
+       // The egress pool matches the terminal operator's output schema. For a
+       // schema-rewriting breaker (BatchAggregation), that is the agg output
+       // (keys + agg field); for schema-preserving plans, it is the scan
+       // schema. plan.Schema() walks to the root node.
+       egressPool := vectorized.NewBatchPool(plan.Schema(), cfg.BatchSize)
+       return measure.NewIteratorFromPipeline(ctx, pipeline, egressPool), nil
+}
diff --git a/pkg/query/vectorized/measure/plan/executor_test.go 
b/pkg/query/vectorized/measure/plan/executor_test.go
new file mode 100644
index 000000000..81e170ab5
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/executor_test.go
@@ -0,0 +1,150 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "strings"
+       "testing"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+func execCfg() measure.VectorizedConfig {
+       return measure.VectorizedConfig{Enabled: true, BatchSize: 1024, 
QueryMemoryMiB: 16}
+}
+
+func TestExecute_NilPlan_Errors(t *testing.T) {
+       if _, err := Execute(context.Background(), nil, execCfg()); err == nil {
+               t.Fatal("nil plan must error")
+       }
+}
+
+func TestExecute_InvalidConfig_Errors(t *testing.T) {
+       schema, _ := buildScanInput(t)
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = &fakePullSource{schema: schema}
+       root := NewLimit(scan, 0, 10)
+
+       bad := measure.VectorizedConfig{BatchSize: 0, QueryMemoryMiB: 16}
+       if _, err := Execute(context.Background(), root, bad); err == nil {
+               t.Fatal("invalid config (BatchSize=0) must error")
+       }
+}
+
+func TestExecute_ScanLimit_DrainsThroughIterator(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       root := NewLimit(scan, 0, 10)
+
+       iter, err := Execute(context.Background(), root, execCfg())
+       if err != nil {
+               t.Fatalf("Execute: %v", err)
+       }
+       defer iter.Close()
+
+       rows := 0
+       for iter.Next() {
+               rows += len(iter.Current())
+       }
+       if rows != 5 {
+               t.Fatalf("Scan+Limit should emit 5 rows, got %d", rows)
+       }
+}
+
+func TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) {
+       schema, batch := buildScanInput(t)
+       src := &fakePullSource{schema: schema, batches: 
[]*vectorized.RecordBatch{batch}}
+       scan := NewScan(schema, ScanParams{})
+       scan.Source = src
+       gba, err := NewGroupByAgg(scan,
+               &model.MeasureGroupBy{TagFamily: "default", TagNames: 
[]string{"svc"}},
+               &model.MeasureAgg{FieldName: "value", Func: 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM},
+       )
+       if err != nil {
+               t.Fatalf("NewGroupByAgg: %v", err)
+       }
+       root := NewLimit(gba, 0, 10)
+
+       iter, err := Execute(context.Background(), root, execCfg())
+       if err != nil {
+               t.Fatalf("Execute: %v", err)
+       }
+       defer iter.Close()
+
+       bySvc := map[string]int64{}
+       for iter.Next() {
+               dps := iter.Current()
+               if len(dps) != 1 {
+                       t.Fatalf("Current must yield 1 InternalDataPoint per 
Next, got %d", len(dps))
+               }
+               idp := dps[0]
+               if idp.DataPoint.Timestamp != nil {
+                       t.Fatalf("aggregation row must have nil Timestamp (D2); 
got %v", idp.DataPoint.Timestamp)
+               }
+               if len(idp.DataPoint.TagFamilies) != 1 || 
idp.DataPoint.TagFamilies[0].Name != "default" {
+                       t.Fatalf("want one TagFamily 'default', got %+v", 
idp.DataPoint.TagFamilies)
+               }
+               tags := idp.DataPoint.TagFamilies[0].Tags
+               if len(tags) != 1 || tags[0].Key != "svc" {
+                       t.Fatalf("want one Tag 'svc', got %+v", tags)
+               }
+               svc := tags[0].Value.GetStr().GetValue()
+               if len(idp.DataPoint.Fields) != 1 || 
idp.DataPoint.Fields[0].Name != "value_sum" {
+                       t.Fatalf("want one Field 'value_sum', got %+v", 
idp.DataPoint.Fields)
+               }
+               bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue()
+       }
+       if bySvc["a"] != 6 {
+               t.Fatalf("sum(a): want 6, got %d", bySvc["a"])
+       }
+       if bySvc["b"] != 9 {
+               t.Fatalf("sum(b): want 9, got %d", bySvc["b"])
+       }
+}
+
+// TODO(G8d): an end-to-end Analyze+Execute test against the analyzer-
+// produced BatchSchema would exercise the agg pipeline against the same
+// column layout the scan source emits in production. Today
+// BuildBatchSchema uses passthrough *modelv1.FieldValue columns (to keep
+// row-path egress zero-alloc), but BatchAggregation.fold requires native
+// int64/float64. G8d must either:
+//   - convert passthrough → native via a fusible at the scan-source
+//     boundary when the pipeline contains a fold operator, or
+//   - teach BatchAggregation to fold over *modelv1.FieldValue.
+// Until that bridge lands, the executor tests above use a hand-built
+// schema with native typed columns to verify executor correctness in
+// isolation.
+
+func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) {
+       schema, _ := buildScanInput(t)
+       scan := NewScan(schema, ScanParams{}) // Source intentionally unset
+       root := NewLimit(scan, 0, 10)
+       _, err := Execute(context.Background(), root, execCfg())
+       if err == nil {
+               t.Fatal("Execute must propagate Build error from unset 
Scan.Source")
+       }
+       if !strings.Contains(err.Error(), "Source") {
+               t.Fatalf("error should mention Source, got %v", err)
+       }
+}

Reply via email to