This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d71bc8890e9455987c5004de17e587ac59659863
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 13:59:22 2026 +0000

    feat(query/vectorized/measure/plan): vec dispatch entry point (G8d.1)
    
    Adds Dispatch — the top-level entry into the vec measure subsystem.
    When the request is eligible, Dispatch:
    
      1. Analyzes the request into a VecPlan via plan.Analyze (G8b).
      2. Resolves the index.Query + entity table the storage layer needs,
         using inverted.BuildQuery / BuildIndexModeQuery — the same helpers
         the deprecated row path uses. The logicalSchema parameter threads
         through unchanged, isolating the dependency on the deprecated
         row-path schema to a single hop at the dispatch site.
      3. Calls ec.Query(ctx, opts) to obtain the storage result, then
         wraps it as a vec PullOperator (BatchSourceFromBatchResult fast
         path when available; BatchScan fallback) and installs it on the
         leaf Scan node.
      4. Executes the plan via plan.Execute (G8c) and returns the iterator.
    
    Returns (iter, planStr, true, nil) when handled; the caller MUST return
    that iterator and skip the row plan. Returns (nil, "", false, nil) when
    the request is NOT eligible — the caller should fall through. Returns
    (nil, "", true, err) when the request was eligible but execution
    failed; the caller surfaces the error rather than re-trying the row
    path.
    
    Eligibility gate (v1):
      - cfg.Enabled must be true
      - request must NOT carry GroupBy or Agg (column-type bridging at the
        scan source still pending; see executor.go's TODO(G8d))
      - request must NOT carry Top (BatchTop's single-heap semantic differs
        from the row path's per-timestamp top-N)
      - request must carry TimeRange
      - request must NOT have hidden criteria tags (those need an egress
        strip wrapper that v1 dispatch does not yet implement)
      - runtime context must be non-nil
    
    Tests cover every gate (disabled / GroupBy / Agg / Top / no-TimeRange /
    nil-runtime-ctx) plus the eligibility path through index.Query
    construction with an empty-result fakeEC (must fall through to row) and
    the ec.Query error propagation (must report handled=true so the caller
    does not double-execute on the row path).
    
    The processor.go wire-up follows in the next commit.
---
 pkg/query/vectorized/measure/plan/dispatch.go      | 226 ++++++++++++++++++++
 pkg/query/vectorized/measure/plan/dispatch_test.go | 230 +++++++++++++++++++++
 2 files changed, 456 insertions(+)

diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
new file mode 100644
index 000000000..a57228050
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -0,0 +1,226 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "fmt"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/query/logical"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// Dispatch is the G8d top-level entry into the vec measure subsystem.
+//
+// Called from banyand/query/processor.go before the row-path Analyze runs.
+// When the request is eligible for the vec subsystem, Dispatch:
+//
+//  1. Analyzes the request into a VecPlan via plan.Analyze (G8b)
+//  2. Resolves the index.Query + entity table the storage layer needs
+//     (using inverted.BuildQuery / BuildIndexModeQuery — the same helpers
+//     the deprecated row path uses; the logical.Schema parameter threads
+//     through unchanged)
+//  3. Calls ec.Query(ctx, opts) to obtain the MeasureQueryResult
+//  4. Wraps the result as a vec PullOperator (BatchSourceFromBatchResult
+//     fast path when available; BatchScan fallback otherwise) and installs
+//     it on the leaf Scan node
+//  5. Executes the plan via plan.Execute (G8c) to return an MIterator
+//
+// Returns (iter, planStr, true, nil) when the request is handled; the
+// caller MUST return that iterator and skip the row plan. Returns
+// (nil, "", false, nil) when the request is NOT eligible — the caller
+// should fall through to the row path. Returns (nil, "", true, err)
+// when the request was eligible but execution failed; the caller must
+// surface the error rather than fall through (the storage query may
+// have already touched state).
+//
+// Eligibility gate (v1):
+//   - cfg.Enabled must be true
+//   - request must NOT carry GroupBy or Agg (column-type bridging at the
+//     scan source still pending; see executor.go's TODO(G8d))
+//   - request must NOT carry Top (BatchTop's single-heap semantic differs
+//     from the row path's per-timestamp top-N)
+//   - request must carry TimeRange (storage requires a bounded window)
+//   - request must NOT have hidden criteria tags (those need an egress
+//     strip wrapper that v1 dispatch does not implement)
+//   - measureSchema and logicalSchema must be non-nil
+func Dispatch(
+       ctx context.Context,
+       req *measurev1.QueryRequest,
+       metadata *commonv1.Metadata,
+       measureSchema *databasev1.Measure,
+       logicalSchema logical.Schema,
+       ec executor.MeasureExecutionContext,
+       cfg measure.VectorizedConfig,
+) (executor.MIterator, string, bool, error) {
+       if !cfg.Enabled {
+               return nil, "", false, nil
+       }
+       if req == nil {
+               return nil, "", false, nil
+       }
+       if req.GetGroupBy() != nil || req.GetAgg() != nil || req.GetTop() != 
nil {
+               // G8d.2 will lift GroupBy/Agg once the scan-source column type
+               // bridging is in place. Top awaits per-timestamp partitioning 
of
+               // BatchTop.
+               return nil, "", false, nil
+       }
+       if req.GetTimeRange() == nil {
+               return nil, "", false, nil
+       }
+       // Defensive nil guards on the runtime context. These should not fire
+       // in production paths — buildMeasureContext populates all of them —
+       // but a defensive fallthrough is safer than a nil dereference.
+       if measureSchema == nil || logicalSchema == nil || ec == nil || 
metadata == nil {
+               return nil, "", false, nil
+       }
+
+       // Hidden-tag detection: criteria may reference tags that are NOT in
+       // the projection (they're needed only as filter inputs). The row
+       // path strips them at egress via hiddenTagsMIterator. v1 dispatch
+       // does not implement that strip yet, so fall through when present.
+       projectedTagNames := projectedNames(req.GetTagProjection())
+       entityList := logicalSchema.EntityList()
+       entityMap := make(map[string]int, len(entityList))
+       entity := make([]*modelv1.TagValue, len(entityList))
+       for idx, e := range entityList {
+               entityMap[e] = idx
+               entity[idx] = pbv1.AnyTagValue
+       }
+       familyNames := make([]string, 0, len(measureSchema.GetTagFamilies()))
+       for _, tf := range measureSchema.GetTagFamilies() {
+               familyNames = append(familyNames, tf.GetName())
+       }
+       hidden, _ := logical.CollectHiddenCriteriaTags(
+               req.GetCriteria(), projectedTagNames, entityMap, logicalSchema,
+               func() []string { return familyNames },
+       )
+       if !hidden.IsEmpty() {
+               return nil, "", false, nil
+       }
+
+       // Resolve the index.Query + entities the same way the row path does
+       // in unresolvedIndexScan.Analyze.
+       var query index.Query
+       var entities [][]*modelv1.TagValue
+       var qErr error
+       if measureSchema.GetIndexMode() {
+               query, qErr = inverted.BuildIndexModeQuery(metadata.GetName(), 
req.GetCriteria(), logicalSchema)
+       } else {
+               query, entities, _, qErr = 
inverted.BuildQuery(req.GetCriteria(), logicalSchema, entityMap, entity)
+       }
+       if qErr != nil {
+               return nil, "", true, fmt.Errorf("vec dispatch: build query: 
%w", qErr)
+       }
+
+       // Build the structural plan tree.
+       p, analyzeErr := Analyze(req, measureSchema)
+       if analyzeErr != nil {
+               return nil, "", true, fmt.Errorf("vec dispatch: analyze: %w", 
analyzeErr)
+       }
+       scan := locateScan(p)
+       if scan == nil {
+               return nil, "", true, fmt.Errorf("vec dispatch: plan missing 
Scan node")
+       }
+       tr := timestamp.NewInclusiveTimeRange(
+               req.GetTimeRange().GetBegin().AsTime(),
+               req.GetTimeRange().GetEnd().AsTime(),
+       )
+       scan.Params.TimeRange = &tr
+       scan.Params.Query = query
+       scan.Params.Entities = entities
+
+       // Execute the storage query. The vec source is constructed from the
+       // returned MeasureQueryResult and threaded into the Scan node.
+       opts := model.MeasureQueryOptions{
+               Name:            metadata.GetName(),
+               TimeRange:       scan.Params.TimeRange,
+               Entities:        entities,
+               Query:           query,
+               TagProjection:   scan.Params.TagProjection,
+               FieldProjection: scan.Params.FieldProjection,
+       }
+       result, queryErr := ec.Query(ctx, opts)
+       if queryErr != nil {
+               return nil, "", true, fmt.Errorf("vec dispatch: query measure: 
%w", queryErr)
+       }
+       if result == nil {
+               // Match the row path's typed-nil handling: an empty query 
result
+               // flows through the row iterator as a no-op. Falling back lets
+               // that machinery surface the empty response unchanged.
+               return nil, "", false, nil
+       }
+
+       pool := vectorized.NewBatchPool(scan.BatchSchema, cfg.BatchSize)
+       var source vectorized.PullOperator
+       if br, ok := result.(model.MeasureBatchResult); ok {
+               source = measure.NewBatchSourceFromBatchResult(br, 
scan.BatchSchema, pool, cfg.BatchSize)
+       } else {
+               source = measure.NewBatchScan(result, scan.BatchSchema, pool, 
cfg.BatchSize)
+       }
+       scan.Source = source
+
+       iter, execErr := Execute(ctx, p, cfg)
+       if execErr != nil {
+               // Execute closes the pipeline on Build/Init failure, which
+               // closes the source, which releases result. No extra Release
+               // here.
+               return nil, "", true, fmt.Errorf("vec dispatch: execute: %w", 
execErr)
+       }
+       return iter, p.String(), true, nil
+}
+
+// locateScan walks a vec plan tree to find the leaf Scan node. Today there
+// is exactly one Scan per plan (multi-measure merge is a G8 follow-up).
+func locateScan(p VecPlan) *Scan {
+       if s, ok := p.(*Scan); ok {
+               return s
+       }
+       for _, c := range p.Children() {
+               if s := locateScan(c); s != nil {
+                       return s
+               }
+       }
+       return nil
+}
+
+// projectedNames flattens a TagProjection into the {tagName -> struct{}}
+// set used by logical.CollectHiddenCriteriaTags.
+func projectedNames(tp *modelv1.TagProjection) map[string]struct{} {
+       out := make(map[string]struct{})
+       if tp == nil {
+               return out
+       }
+       for _, tf := range tp.GetTagFamilies() {
+               for _, t := range tf.GetTags() {
+                       out[t] = struct{}{}
+               }
+       }
+       return out
+}
diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go 
b/pkg/query/vectorized/measure/plan/dispatch_test.go
new file mode 100644
index 000000000..ea5f9ae99
--- /dev/null
+++ b/pkg/query/vectorized/measure/plan/dispatch_test.go
@@ -0,0 +1,230 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package plan
+
+import (
+       "context"
+       "testing"
+       "time"
+
+       "google.golang.org/protobuf/types/known/timestamppb"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       logicalmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+)
+
+func dispatchCfg(enabled bool) measure.VectorizedConfig {
+       return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, 
QueryMemoryMiB: 16}
+}
+
+func bareReq() *measurev1.QueryRequest {
+       return &measurev1.QueryRequest{
+               Name:            "demo",
+               Groups:          []string{"default"},
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               TimeRange: &modelv1.TimeRange{
+                       Begin: timestamppb.New(time.Unix(0, 0)),
+                       End:   timestamppb.New(time.Unix(0, 1_000_000)),
+               },
+       }
+}
+
+// TestDispatch_NotEnabled_FallsThrough verifies cfg.Enabled=false returns
+// (nil, "", false, nil) immediately, before any other check.
+func TestDispatch_NotEnabled_FallsThrough(t *testing.T) {
+       iter, planStr, handled, err := Dispatch(context.Background(),
+               bareReq(), nil, nil, nil, nil, dispatchCfg(false))
+       if err != nil {
+               t.Fatalf("disabled config should not error: %v", err)
+       }
+       if handled {
+               t.Fatal("disabled config must not handle the request")
+       }
+       if iter != nil || planStr != "" {
+               t.Fatalf("disabled config: iter/planStr must be zero, got %v / 
%q", iter, planStr)
+       }
+}
+
+// TestDispatch_GroupBy_FallsThrough covers the column-bridging gate.
+func TestDispatch_GroupBy_FallsThrough(t *testing.T) {
+       req := bareReq()
+       req.GroupBy = &measurev1.QueryRequest_GroupBy{
+               TagProjection: projTagProj(),
+               FieldName:     "value",
+       }
+       req.Agg = &measurev1.QueryRequest_Aggregation{
+               Function:  modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+               FieldName: "value",
+       }
+       _, _, handled, err := Dispatch(context.Background(),
+               req, nil, nil, nil, nil, dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("GroupBy fallthrough must not error: %v", err)
+       }
+       if handled {
+               t.Fatal("GroupBy+Agg must fall through to row path in G8d.1")
+       }
+}
+
+// TestDispatch_Agg_FallsThrough covers Agg-without-GroupBy (which the
+// analyzer would reject, but the dispatch gate fires before Analyze).
+func TestDispatch_Agg_FallsThrough(t *testing.T) {
+       req := bareReq()
+       req.Agg = &measurev1.QueryRequest_Aggregation{
+               Function:  modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM,
+               FieldName: "value",
+       }
+       _, _, handled, err := Dispatch(context.Background(),
+               req, nil, nil, nil, nil, dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("Agg fallthrough must not error: %v", err)
+       }
+       if handled {
+               t.Fatal("Agg must fall through to row path in G8d.1")
+       }
+}
+
+// TestDispatch_Top_FallsThrough covers the per-timestamp top-N gap.
+func TestDispatch_Top_FallsThrough(t *testing.T) {
+       req := bareReq()
+       req.Top = &measurev1.QueryRequest_Top{
+               Number:         5,
+               FieldName:      "value",
+               FieldValueSort: modelv1.Sort_SORT_DESC,
+       }
+       _, _, handled, err := Dispatch(context.Background(),
+               req, nil, nil, nil, nil, dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("Top fallthrough must not error: %v", err)
+       }
+       if handled {
+               t.Fatal("Top must fall through (BatchTop semantics differ from 
row TopN)")
+       }
+}
+
+// TestDispatch_NoTimeRange_FallsThrough covers the bounded-window
+// requirement.
+func TestDispatch_NoTimeRange_FallsThrough(t *testing.T) {
+       req := bareReq()
+       req.TimeRange = nil
+       _, _, handled, err := Dispatch(context.Background(),
+               req, nil, nil, nil, nil, dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("no-TimeRange fallthrough must not error: %v", err)
+       }
+       if handled {
+               t.Fatal("missing TimeRange must fall through")
+       }
+}
+
+// TestDispatch_NilRuntimeContext_FallsThrough covers the defensive guard
+// against nil ec / schema / metadata. These should not arise in
+// production but a fallthrough is safer than a nil dereference.
+func TestDispatch_NilRuntimeContext_FallsThrough(t *testing.T) {
+       _, _, handled, err := Dispatch(context.Background(),
+               bareReq(), nil, nil, nil, nil, dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("nil runtime ctx must not error, got %v", err)
+       }
+       if handled {
+               t.Fatal("nil runtime ctx must fall through")
+       }
+}
+
+// fakeEC is a stub MeasureExecutionContext that records its Query call
+// and returns a configured (result, error) pair.
+type fakeEC struct {
+       wantResult model.MeasureQueryResult
+       wantErr    error
+       lastOpts   model.MeasureQueryOptions
+       called     bool
+}
+
+func (f *fakeEC) Query(_ context.Context, opts model.MeasureQueryOptions) 
(model.MeasureQueryResult, error) {
+       f.called = true
+       f.lastOpts = opts
+       return f.wantResult, f.wantErr
+}
+
+// TestDispatch_EmptyResult_FallsThrough exercises the full eligibility
+// path: an eligible request reaches ec.Query, ec returns (nil, nil)
+// (empty range), Dispatch reports fallthrough so the row path can surface
+// the empty response. This also confirms the index.Query construction
+// and Analyze invocation complete without error against a real
+// logical.Schema.
+func TestDispatch_EmptyResult_FallsThrough(t *testing.T) {
+       measureSchema := testMeasureSchema()
+       // nolint:staticcheck // SA1019 — row-path BuildSchema is the only 
schema builder until G8 replaces it.
+       logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, 
nil)
+       if schemaErr != nil {
+               t.Fatalf("BuildSchema: %v", schemaErr)
+       }
+       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       ec := &fakeEC{wantResult: nil, wantErr: nil}
+
+       iter, planStr, handled, err := Dispatch(context.Background(),
+               bareReq(), metadata, measureSchema, logicalSchema, ec, 
dispatchCfg(true))
+       if err != nil {
+               t.Fatalf("dispatch must not error on empty result: %v", err)
+       }
+       if handled {
+               t.Fatal("empty result must fall through to row path")
+       }
+       if iter != nil || planStr != "" {
+               t.Fatalf("expect zero outputs on fallthrough, got iter=%v 
planStr=%q", iter, planStr)
+       }
+       if !ec.called {
+               t.Fatal("ec.Query must be invoked before fallthrough decision")
+       }
+       if ec.lastOpts.Name != "demo" {
+               t.Fatalf("opts.Name: want demo, got %q", ec.lastOpts.Name)
+       }
+       if ec.lastOpts.TimeRange == nil {
+               t.Fatal("opts.TimeRange must be set from req.TimeRange")
+       }
+}
+
+// TestDispatch_QueryError_BubblesUp covers the error propagation when
+// the storage query itself fails. Dispatch must report (nil, "", true,
+// err) so the caller surfaces the error rather than re-trying the row
+// path.
+func TestDispatch_QueryError_BubblesUp(t *testing.T) {
+       measureSchema := testMeasureSchema()
+       // nolint:staticcheck // SA1019 — row-path BuildSchema is the only 
schema builder until G8 replaces it.
+       logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, 
nil)
+       if schemaErr != nil {
+               t.Fatalf("BuildSchema: %v", schemaErr)
+       }
+       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       wantErr := context.DeadlineExceeded
+       ec := &fakeEC{wantErr: wantErr}
+
+       _, _, handled, err := Dispatch(context.Background(),
+               bareReq(), metadata, measureSchema, logicalSchema, ec, 
dispatchCfg(true))
+       if err == nil {
+               t.Fatal("ec.Query error must surface as a dispatch error")
+       }
+       if !handled {
+               t.Fatal("ec.Query error must report handled=true so caller does 
not re-try row path")
+       }
+}

Reply via email to