This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d71bc8890e9455987c5004de17e587ac59659863 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 13:59:22 2026 +0000 feat(query/vectorized/measure/plan): vec dispatch entry point (G8d.1) Adds Dispatch — the top-level entry into the vec measure subsystem. When the request is eligible, Dispatch: 1. Analyzes the request into a VecPlan via plan.Analyze (G8b). 2. Resolves the index.Query + entity table the storage layer needs, using inverted.BuildQuery / BuildIndexModeQuery — the same helpers the deprecated row path uses. The logicalSchema parameter threads through unchanged, isolating the dependency on the deprecated row-path schema to a single hop at the dispatch site. 3. Calls ec.Query(ctx, opts) to obtain the storage result, then wraps it as a vec PullOperator (BatchSourceFromBatchResult fast path when available; BatchScan fallback) and installs it on the leaf Scan node. 4. Executes the plan via plan.Execute (G8c) and returns the iterator. Returns (iter, planStr, true, nil) when handled; the caller MUST return that iterator and skip the row plan. Returns (nil, "", false, nil) when the request is NOT eligible — the caller should fall through. Returns (nil, "", true, err) when the request was eligible but execution failed; the caller surfaces the error rather than re-trying the row path. Eligibility gate (v1): - cfg.Enabled must be true - request must NOT carry GroupBy or Agg (column-type bridging at the scan source still pending; see executor.go's TODO(G8d)) - request must NOT carry Top (BatchTop's single-heap semantic differs from the row path's per-timestamp top-N) - request must carry TimeRange - request must NOT have hidden criteria tags (those need an egress strip wrapper that v1 dispatch does not yet implement) - runtime context must be non-nil Tests cover every gate (disabled / GroupBy / Agg / Top / no-TimeRange / nil-runtime-ctx) plus the eligibility path through index.Query construction with an empty-result fakeEC (must fall through to row) and the ec.Query error propagation (must report handled=true so the caller does not double-execute on the row path). The processor.go wire-up follows in the next commit. --- pkg/query/vectorized/measure/plan/dispatch.go | 226 ++++++++++++++++++++ pkg/query/vectorized/measure/plan/dispatch_test.go | 230 +++++++++++++++++++++ 2 files changed, 456 insertions(+) diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go new file mode 100644 index 000000000..a57228050 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -0,0 +1,226 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "fmt" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/index/inverted" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/query/logical" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// Dispatch is the G8d top-level entry into the vec measure subsystem. +// +// Called from banyand/query/processor.go before the row-path Analyze runs. +// When the request is eligible for the vec subsystem, Dispatch: +// +// 1. Analyzes the request into a VecPlan via plan.Analyze (G8b) +// 2. Resolves the index.Query + entity table the storage layer needs +// (using inverted.BuildQuery / BuildIndexModeQuery — the same helpers +// the deprecated row path uses; the logical.Schema parameter threads +// through unchanged) +// 3. Calls ec.Query(ctx, opts) to obtain the MeasureQueryResult +// 4. Wraps the result as a vec PullOperator (BatchSourceFromBatchResult +// fast path when available; BatchScan fallback otherwise) and installs +// it on the leaf Scan node +// 5. Executes the plan via plan.Execute (G8c) to return an MIterator +// +// Returns (iter, planStr, true, nil) when the request is handled; the +// caller MUST return that iterator and skip the row plan. Returns +// (nil, "", false, nil) when the request is NOT eligible — the caller +// should fall through to the row path. Returns (nil, "", true, err) +// when the request was eligible but execution failed; the caller must +// surface the error rather than fall through (the storage query may +// have already touched state). +// +// Eligibility gate (v1): +// - cfg.Enabled must be true +// - request must NOT carry GroupBy or Agg (column-type bridging at the +// scan source still pending; see executor.go's TODO(G8d)) +// - request must NOT carry Top (BatchTop's single-heap semantic differs +// from the row path's per-timestamp top-N) +// - request must carry TimeRange (storage requires a bounded window) +// - request must NOT have hidden criteria tags (those need an egress +// strip wrapper that v1 dispatch does not implement) +// - measureSchema and logicalSchema must be non-nil +func Dispatch( + ctx context.Context, + req *measurev1.QueryRequest, + metadata *commonv1.Metadata, + measureSchema *databasev1.Measure, + logicalSchema logical.Schema, + ec executor.MeasureExecutionContext, + cfg measure.VectorizedConfig, +) (executor.MIterator, string, bool, error) { + if !cfg.Enabled { + return nil, "", false, nil + } + if req == nil { + return nil, "", false, nil + } + if req.GetGroupBy() != nil || req.GetAgg() != nil || req.GetTop() != nil { + // G8d.2 will lift GroupBy/Agg once the scan-source column type + // bridging is in place. Top awaits per-timestamp partitioning of + // BatchTop. + return nil, "", false, nil + } + if req.GetTimeRange() == nil { + return nil, "", false, nil + } + // Defensive nil guards on the runtime context. These should not fire + // in production paths — buildMeasureContext populates all of them — + // but a defensive fallthrough is safer than a nil dereference. + if measureSchema == nil || logicalSchema == nil || ec == nil || metadata == nil { + return nil, "", false, nil + } + + // Hidden-tag detection: criteria may reference tags that are NOT in + // the projection (they're needed only as filter inputs). The row + // path strips them at egress via hiddenTagsMIterator. v1 dispatch + // does not implement that strip yet, so fall through when present. + projectedTagNames := projectedNames(req.GetTagProjection()) + entityList := logicalSchema.EntityList() + entityMap := make(map[string]int, len(entityList)) + entity := make([]*modelv1.TagValue, len(entityList)) + for idx, e := range entityList { + entityMap[e] = idx + entity[idx] = pbv1.AnyTagValue + } + familyNames := make([]string, 0, len(measureSchema.GetTagFamilies())) + for _, tf := range measureSchema.GetTagFamilies() { + familyNames = append(familyNames, tf.GetName()) + } + hidden, _ := logical.CollectHiddenCriteriaTags( + req.GetCriteria(), projectedTagNames, entityMap, logicalSchema, + func() []string { return familyNames }, + ) + if !hidden.IsEmpty() { + return nil, "", false, nil + } + + // Resolve the index.Query + entities the same way the row path does + // in unresolvedIndexScan.Analyze. + var query index.Query + var entities [][]*modelv1.TagValue + var qErr error + if measureSchema.GetIndexMode() { + query, qErr = inverted.BuildIndexModeQuery(metadata.GetName(), req.GetCriteria(), logicalSchema) + } else { + query, entities, _, qErr = inverted.BuildQuery(req.GetCriteria(), logicalSchema, entityMap, entity) + } + if qErr != nil { + return nil, "", true, fmt.Errorf("vec dispatch: build query: %w", qErr) + } + + // Build the structural plan tree. + p, analyzeErr := Analyze(req, measureSchema) + if analyzeErr != nil { + return nil, "", true, fmt.Errorf("vec dispatch: analyze: %w", analyzeErr) + } + scan := locateScan(p) + if scan == nil { + return nil, "", true, fmt.Errorf("vec dispatch: plan missing Scan node") + } + tr := timestamp.NewInclusiveTimeRange( + req.GetTimeRange().GetBegin().AsTime(), + req.GetTimeRange().GetEnd().AsTime(), + ) + scan.Params.TimeRange = &tr + scan.Params.Query = query + scan.Params.Entities = entities + + // Execute the storage query. The vec source is constructed from the + // returned MeasureQueryResult and threaded into the Scan node. + opts := model.MeasureQueryOptions{ + Name: metadata.GetName(), + TimeRange: scan.Params.TimeRange, + Entities: entities, + Query: query, + TagProjection: scan.Params.TagProjection, + FieldProjection: scan.Params.FieldProjection, + } + result, queryErr := ec.Query(ctx, opts) + if queryErr != nil { + return nil, "", true, fmt.Errorf("vec dispatch: query measure: %w", queryErr) + } + if result == nil { + // Match the row path's typed-nil handling: an empty query result + // flows through the row iterator as a no-op. Falling back lets + // that machinery surface the empty response unchanged. + return nil, "", false, nil + } + + pool := vectorized.NewBatchPool(scan.BatchSchema, cfg.BatchSize) + var source vectorized.PullOperator + if br, ok := result.(model.MeasureBatchResult); ok { + source = measure.NewBatchSourceFromBatchResult(br, scan.BatchSchema, pool, cfg.BatchSize) + } else { + source = measure.NewBatchScan(result, scan.BatchSchema, pool, cfg.BatchSize) + } + scan.Source = source + + iter, execErr := Execute(ctx, p, cfg) + if execErr != nil { + // Execute closes the pipeline on Build/Init failure, which + // closes the source, which releases result. No extra Release + // here. + return nil, "", true, fmt.Errorf("vec dispatch: execute: %w", execErr) + } + return iter, p.String(), true, nil +} + +// locateScan walks a vec plan tree to find the leaf Scan node. Today there +// is exactly one Scan per plan (multi-measure merge is a G8 follow-up). +func locateScan(p VecPlan) *Scan { + if s, ok := p.(*Scan); ok { + return s + } + for _, c := range p.Children() { + if s := locateScan(c); s != nil { + return s + } + } + return nil +} + +// projectedNames flattens a TagProjection into the {tagName -> struct{}} +// set used by logical.CollectHiddenCriteriaTags. +func projectedNames(tp *modelv1.TagProjection) map[string]struct{} { + out := make(map[string]struct{}) + if tp == nil { + return out + } + for _, tf := range tp.GetTagFamilies() { + for _, t := range tf.GetTags() { + out[t] = struct{}{} + } + } + return out +} diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go new file mode 100644 index 000000000..ea5f9ae99 --- /dev/null +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -0,0 +1,230 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package plan + +import ( + "context" + "testing" + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + logicalmeasure "github.com/apache/skywalking-banyandb/pkg/query/logical/measure" + "github.com/apache/skywalking-banyandb/pkg/query/model" + measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +func dispatchCfg(enabled bool) measure.VectorizedConfig { + return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, QueryMemoryMiB: 16} +} + +func bareReq() *measurev1.QueryRequest { + return &measurev1.QueryRequest{ + Name: "demo", + Groups: []string{"default"}, + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + TimeRange: &modelv1.TimeRange{ + Begin: timestamppb.New(time.Unix(0, 0)), + End: timestamppb.New(time.Unix(0, 1_000_000)), + }, + } +} + +// TestDispatch_NotEnabled_FallsThrough verifies cfg.Enabled=false returns +// (nil, "", false, nil) immediately, before any other check. +func TestDispatch_NotEnabled_FallsThrough(t *testing.T) { + iter, planStr, handled, err := Dispatch(context.Background(), + bareReq(), nil, nil, nil, nil, dispatchCfg(false)) + if err != nil { + t.Fatalf("disabled config should not error: %v", err) + } + if handled { + t.Fatal("disabled config must not handle the request") + } + if iter != nil || planStr != "" { + t.Fatalf("disabled config: iter/planStr must be zero, got %v / %q", iter, planStr) + } +} + +// TestDispatch_GroupBy_FallsThrough covers the column-bridging gate. +func TestDispatch_GroupBy_FallsThrough(t *testing.T) { + req := bareReq() + req.GroupBy = &measurev1.QueryRequest_GroupBy{ + TagProjection: projTagProj(), + FieldName: "value", + } + req.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + } + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("GroupBy fallthrough must not error: %v", err) + } + if handled { + t.Fatal("GroupBy+Agg must fall through to row path in G8d.1") + } +} + +// TestDispatch_Agg_FallsThrough covers Agg-without-GroupBy (which the +// analyzer would reject, but the dispatch gate fires before Analyze). +func TestDispatch_Agg_FallsThrough(t *testing.T) { + req := bareReq() + req.Agg = &measurev1.QueryRequest_Aggregation{ + Function: modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, + FieldName: "value", + } + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("Agg fallthrough must not error: %v", err) + } + if handled { + t.Fatal("Agg must fall through to row path in G8d.1") + } +} + +// TestDispatch_Top_FallsThrough covers the per-timestamp top-N gap. +func TestDispatch_Top_FallsThrough(t *testing.T) { + req := bareReq() + req.Top = &measurev1.QueryRequest_Top{ + Number: 5, + FieldName: "value", + FieldValueSort: modelv1.Sort_SORT_DESC, + } + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("Top fallthrough must not error: %v", err) + } + if handled { + t.Fatal("Top must fall through (BatchTop semantics differ from row TopN)") + } +} + +// TestDispatch_NoTimeRange_FallsThrough covers the bounded-window +// requirement. +func TestDispatch_NoTimeRange_FallsThrough(t *testing.T) { + req := bareReq() + req.TimeRange = nil + _, _, handled, err := Dispatch(context.Background(), + req, nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("no-TimeRange fallthrough must not error: %v", err) + } + if handled { + t.Fatal("missing TimeRange must fall through") + } +} + +// TestDispatch_NilRuntimeContext_FallsThrough covers the defensive guard +// against nil ec / schema / metadata. These should not arise in +// production but a fallthrough is safer than a nil dereference. +func TestDispatch_NilRuntimeContext_FallsThrough(t *testing.T) { + _, _, handled, err := Dispatch(context.Background(), + bareReq(), nil, nil, nil, nil, dispatchCfg(true)) + if err != nil { + t.Fatalf("nil runtime ctx must not error, got %v", err) + } + if handled { + t.Fatal("nil runtime ctx must fall through") + } +} + +// fakeEC is a stub MeasureExecutionContext that records its Query call +// and returns a configured (result, error) pair. +type fakeEC struct { + wantResult model.MeasureQueryResult + wantErr error + lastOpts model.MeasureQueryOptions + called bool +} + +func (f *fakeEC) Query(_ context.Context, opts model.MeasureQueryOptions) (model.MeasureQueryResult, error) { + f.called = true + f.lastOpts = opts + return f.wantResult, f.wantErr +} + +// TestDispatch_EmptyResult_FallsThrough exercises the full eligibility +// path: an eligible request reaches ec.Query, ec returns (nil, nil) +// (empty range), Dispatch reports fallthrough so the row path can surface +// the empty response. This also confirms the index.Query construction +// and Analyze invocation complete without error against a real +// logical.Schema. +func TestDispatch_EmptyResult_FallsThrough(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + ec := &fakeEC{wantResult: nil, wantErr: nil} + + iter, planStr, handled, err := Dispatch(context.Background(), + bareReq(), metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) + if err != nil { + t.Fatalf("dispatch must not error on empty result: %v", err) + } + if handled { + t.Fatal("empty result must fall through to row path") + } + if iter != nil || planStr != "" { + t.Fatalf("expect zero outputs on fallthrough, got iter=%v planStr=%q", iter, planStr) + } + if !ec.called { + t.Fatal("ec.Query must be invoked before fallthrough decision") + } + if ec.lastOpts.Name != "demo" { + t.Fatalf("opts.Name: want demo, got %q", ec.lastOpts.Name) + } + if ec.lastOpts.TimeRange == nil { + t.Fatal("opts.TimeRange must be set from req.TimeRange") + } +} + +// TestDispatch_QueryError_BubblesUp covers the error propagation when +// the storage query itself fails. Dispatch must report (nil, "", true, +// err) so the caller surfaces the error rather than re-trying the row +// path. +func TestDispatch_QueryError_BubblesUp(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + wantErr := context.DeadlineExceeded + ec := &fakeEC{wantErr: wantErr} + + _, _, handled, err := Dispatch(context.Background(), + bareReq(), metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) + if err == nil { + t.Fatal("ec.Query error must surface as a dispatch error") + } + if !handled { + t.Fatal("ec.Query error must report handled=true so caller does not re-try row path") + } +}
