This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit bb1f751419bd8bd7f0ab6496895cc910a62deb6a Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 13:45:37 2026 +0000 feat(query/vectorized/measure/plan): vec executor (G8c) Execute(ctx, plan, cfg) composes a VecPlan tree into a runnable *vectorized.Pipeline and returns it as an executor.MIterator: 1. Builds a per-pipeline MemoryTracker sized from cfg.QueryMemoryMiB so every memory-bookkeeping operator (G7a) draws from a single budget. 2. Threads the tracker + PipelineBuilder + cfg through a BuildContext that each plan node consults during Build. 3. Walks the tree leaf-up via plan.Build (Scan calls Builder.From; fusibles call Builder.Apply; breakers call Builder.Break). 4. Finalizes via builder.Build() and Pipeline.Init(ctx). 5. Wraps the Pipeline as a VectorizedMIterator via measure.NewIteratorFromPipeline, with the egress BatchPool sized from the terminal node's output schema (Schema-rewriting breakers like BatchAggregation change the column layout). The executor is independent of storage — Scan.Source must be populated by the caller before Execute. G8d's top-level dispatch is responsible for building scan sources from a MeasureExecutionContext. Tests cover nil plan, invalid config, ScanLimit drain end-to-end, GroupByAgg producing aggregated rows with nil Timestamp (D2), and Build-error propagation (e.g. unset Scan.Source). Known gap (TODO(G8d)): an end-to-end Analyze+Execute test against the analyzer-produced BatchSchema would require column-type bridging since BuildBatchSchema emits passthrough *modelv1.FieldValue columns but BatchAggregation.fold needs native int64/float64. G8d will need either a passthrough->native fusible at the scan-source boundary or an extension to BatchAggregation. The current executor tests use hand-built native-typed schemas to verify executor correctness in isolation; the bridge is tracked for G8d. --- pkg/query/vectorized/measure/plan/executor.go | 80 +++++++++++ pkg/query/vectorized/measure/plan/executor_test.go | 150 +++++++++++++++++++++ 2 files changed, 230 insertions(+) diff --git a/pkg/query/vectorized/measure/plan/executor.go b/pkg/query/vectorized/measure/plan/executor.go new file mode 100644 index 000000000..e4f8db759 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/executor.go @@ -0,0 +1,80 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// Execute composes a vec plan tree into a runnable *vectorized.Pipeline and +// returns it as an executor.MIterator. The Scan node(s) in the plan must +// have Source already set — the executor (G8c) does not resolve storage on +// its own; G8d's top-level dispatch is responsible for building the scan +// source from a MeasureExecutionContext and stitching it in. +// +// Lifecycle: a per-pipeline MemoryTracker is constructed from +// cfg.QueryMemoryMiB and threaded through the BuildContext so every +// memory-bookkeeping operator (BatchAggregation, future BatchGroupBy) +// charges against a single budget (G7a). Pipeline.Init is invoked before +// the iterator is returned so breaker stages (which lazily allocate their +// state on Init) are ready for the first Next. +// +// On any error during Build, builder.Build, or Pipeline.Init, the function +// returns the error and closes any partially-constructed pipeline so source +// resources (BatchPool, underlying MeasureBatchResult) are released. +func Execute(ctx context.Context, plan VecPlan, cfg measure.VectorizedConfig) (executor.MIterator, error) { + if plan == nil { + return nil, fmt.Errorf("plan.Execute: nil plan") + } + if cfgErr := cfg.Validate(); cfgErr != nil { + return nil, fmt.Errorf("plan.Execute: %w", cfgErr) + } + + tracker := vectorized.NewMemoryTracker(int64(cfg.QueryMemoryMiB) * 1024 * 1024) + bc := &BuildContext{ + Builder: vectorized.NewPipelineBuilder().WithMemoryTracker(tracker), + Tracker: tracker, + Config: cfg, + } + + if buildErr := plan.Build(ctx, bc); buildErr != nil { + return nil, fmt.Errorf("plan.Execute: %w", buildErr) + } + + pipeline, pipelineErr := bc.Builder.Build() + if pipelineErr != nil { + return nil, fmt.Errorf("plan.Execute: %w", pipelineErr) + } + + if initErr := pipeline.Init(ctx); initErr != nil { + _ = pipeline.Close() + return nil, fmt.Errorf("plan.Execute: %w", initErr) + } + + // The egress pool matches the terminal operator's output schema. For a + // schema-rewriting breaker (BatchAggregation), that is the agg output + // (keys + agg field); for schema-preserving plans, it is the scan + // schema. plan.Schema() walks to the root node. + egressPool := vectorized.NewBatchPool(plan.Schema(), cfg.BatchSize) + return measure.NewIteratorFromPipeline(ctx, pipeline, egressPool), nil +} diff --git a/pkg/query/vectorized/measure/plan/executor_test.go b/pkg/query/vectorized/measure/plan/executor_test.go new file mode 100644 index 000000000..81e170ab5 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/executor_test.go @@ -0,0 +1,150 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "strings" + "testing" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +func execCfg() measure.VectorizedConfig { + return measure.VectorizedConfig{Enabled: true, BatchSize: 1024, QueryMemoryMiB: 16} +} + +func TestExecute_NilPlan_Errors(t *testing.T) { + if _, err := Execute(context.Background(), nil, execCfg()); err == nil { + t.Fatal("nil plan must error") + } +} + +func TestExecute_InvalidConfig_Errors(t *testing.T) { + schema, _ := buildScanInput(t) + scan := NewScan(schema, ScanParams{}) + scan.Source = &fakePullSource{schema: schema} + root := NewLimit(scan, 0, 10) + + bad := measure.VectorizedConfig{BatchSize: 0, QueryMemoryMiB: 16} + if _, err := Execute(context.Background(), root, bad); err == nil { + t.Fatal("invalid config (BatchSize=0) must error") + } +} + +func TestExecute_ScanLimit_DrainsThroughIterator(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + scan := NewScan(schema, ScanParams{}) + scan.Source = src + root := NewLimit(scan, 0, 10) + + iter, err := Execute(context.Background(), root, execCfg()) + if err != nil { + t.Fatalf("Execute: %v", err) + } + defer iter.Close() + + rows := 0 + for iter.Next() { + rows += len(iter.Current()) + } + if rows != 5 { + t.Fatalf("Scan+Limit should emit 5 rows, got %d", rows) + } +} + +func TestExecute_GroupByAgg_EmitsAggregatedRowsWithNilTimestamp(t *testing.T) { + schema, batch := buildScanInput(t) + src := &fakePullSource{schema: schema, batches: []*vectorized.RecordBatch{batch}} + scan := NewScan(schema, ScanParams{}) + scan.Source = src + gba, err := NewGroupByAgg(scan, + &model.MeasureGroupBy{TagFamily: "default", TagNames: []string{"svc"}}, + &model.MeasureAgg{FieldName: "value", Func: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM}, + ) + if err != nil { + t.Fatalf("NewGroupByAgg: %v", err) + } + root := NewLimit(gba, 0, 10) + + iter, err := Execute(context.Background(), root, execCfg()) + if err != nil { + t.Fatalf("Execute: %v", err) + } + defer iter.Close() + + bySvc := map[string]int64{} + for iter.Next() { + dps := iter.Current() + if len(dps) != 1 { + t.Fatalf("Current must yield 1 InternalDataPoint per Next, got %d", len(dps)) + } + idp := dps[0] + if idp.DataPoint.Timestamp != nil { + t.Fatalf("aggregation row must have nil Timestamp (D2); got %v", idp.DataPoint.Timestamp) + } + if len(idp.DataPoint.TagFamilies) != 1 || idp.DataPoint.TagFamilies[0].Name != "default" { + t.Fatalf("want one TagFamily 'default', got %+v", idp.DataPoint.TagFamilies) + } + tags := idp.DataPoint.TagFamilies[0].Tags + if len(tags) != 1 || tags[0].Key != "svc" { + t.Fatalf("want one Tag 'svc', got %+v", tags) + } + svc := tags[0].Value.GetStr().GetValue() + if len(idp.DataPoint.Fields) != 1 || idp.DataPoint.Fields[0].Name != "value_sum" { + t.Fatalf("want one Field 'value_sum', got %+v", idp.DataPoint.Fields) + } + bySvc[svc] = idp.DataPoint.Fields[0].Value.GetInt().GetValue() + } + if bySvc["a"] != 6 { + t.Fatalf("sum(a): want 6, got %d", bySvc["a"]) + } + if bySvc["b"] != 9 { + t.Fatalf("sum(b): want 9, got %d", bySvc["b"]) + } +} + +// TODO(G8d): an end-to-end Analyze+Execute test against the analyzer- +// produced BatchSchema would exercise the agg pipeline against the same +// column layout the scan source emits in production. Today +// BuildBatchSchema uses passthrough *modelv1.FieldValue columns (to keep +// row-path egress zero-alloc), but BatchAggregation.fold requires native +// int64/float64. G8d must either: +// - convert passthrough → native via a fusible at the scan-source +// boundary when the pipeline contains a fold operator, or +// - teach BatchAggregation to fold over *modelv1.FieldValue. +// Until that bridge lands, the executor tests above use a hand-built +// schema with native typed columns to verify executor correctness in +// isolation. + +func TestExecute_BuildError_SurfacesAsExecuteError(t *testing.T) { + schema, _ := buildScanInput(t) + scan := NewScan(schema, ScanParams{}) // Source intentionally unset + root := NewLimit(scan, 0, 10) + _, err := Execute(context.Background(), root, execCfg()) + if err == nil { + t.Fatal("Execute must propagate Build error from unset Scan.Source") + } + if !strings.Contains(err.Error(), "Source") { + t.Fatalf("error should mention Source, got %v", err) + } +}
