This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit cf44971fd1beecea33e6572b8d1942f3a64f8229
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 13:59:32 2026 +0000

    feat(banyand/query): wire vec dispatch into executeMeasurePlan (G8d.2)
    
    executeMeasurePlan now tries plan.Dispatch (G8d.1) before invoking the
    deprecated row-path Analyze. When dispatch returns handled=true, the
    iterator is returned directly and the row plan is skipped — this is
    the structural goal of the G8 plan: the vec subsystem owns its eligible
    queries end-to-end, no leaf substitution.
    
    Plumbing:
    
      - executeMeasurePlan now returns (MIterator, string, error) instead
        of (MIterator, logical.Plan, error). The string is the rendered
        plan used for tracing — abstracted from logical.Plan so the vec
        subsystem (which does not produce one) can participate.
    
      - A small tryVecDispatch adapter extracts the optional
        VectorizedConfig + *databasev1.Measure capabilities from the
        measureExecutionContext's first ec via the new vecExecutionContext
        interface. Multi-measure queries fall through to row (G8 follow-up).
    
      - Both call sites of executeMeasurePlan (the QueryRequest handler at
        the standalone path and the InternalQueryRequest handler at the
        data-node path) use the new planStr return value for tracing in
        place of plan.String().
    
    Eligibility for vec dispatch is gated inside plan.Dispatch — the wire-
    up here just exposes the capability and falls through on rejection.
    GroupBy/Agg/Top queries continue through the row path unchanged.
    
    Build + lint + tests clean.
---
 banyand/query/processor.go | 68 ++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 60 insertions(+), 8 deletions(-)

diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index fe9ad7f1f..e42b1b1aa 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
@@ -46,6 +47,8 @@ import (
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
        logical_trace 
"github.com/apache/skywalking-banyandb/pkg/query/logical/trace"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
+       vmeasure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
+       vecplan 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure/plan"
 )
 
 const (
@@ -203,17 +206,47 @@ func buildMeasureContext(measureService measure.Service, 
log *logger.Logger, que
        }, nil
 }
 
+// vecExecutionContext is the optional capability the production
+// MeasureExecutionContext implementation exposes to surface the active
+// VectorizedConfig and the *databasev1.Measure schema needed by the vec
+// dispatch. Production satisfies it; tests that swap the row executor
+// can leave it unimplemented to keep dispatch dormant.
+type vecExecutionContext interface {
+       executor.MeasureExecutionContext
+       VectorizedConfig() vmeasure.VectorizedConfig
+       GetSchema() *databasev1.Measure
+}
+
 // executeMeasurePlan executes the measure query plan and returns the iterator.
+//
+// G8d: the vec subsystem is tried first via vecplan.Dispatch. When the
+// request is vec-eligible (no GroupBy/Agg/Top, has TimeRange, no hidden
+// criteria tags), dispatch returns a vec MIterator and the row-path
+// Analyze is skipped entirely. Otherwise, control flows through to the
+// deprecated row plan unchanged.
+//
+// The second return value is the rendered plan string used by the caller
+// for tracing — abstracted to a string so the vec subsystem (which does
+// not produce a logical.Plan) can participate.
 func executeMeasurePlan(
        ctx context.Context,
        queryCriteria *measurev1.QueryRequest,
        mctx *measureExecutionContext,
        emitPartial bool,
-) (executor.MIterator, logical.Plan, error) {
+) (executor.MIterator, string, error) {
+       if mit, planStr, handled, dispatchErr := tryVecDispatch(ctx, 
queryCriteria, mctx); dispatchErr != nil {
+               return nil, "", fmt.Errorf("fail to dispatch the query request 
for measure %s: %w", queryCriteria.GetName(), dispatchErr)
+       } else if handled {
+               if e := mctx.ml.Debug(); e.Enabled() {
+                       e.Str("plan", planStr).Msg("vec query plan")
+               }
+               return mit, planStr, nil
+       }
+
        // nolint:staticcheck // SA1019 — row-path Analyze is the only 
production path until G8 ships.
        plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata, 
mctx.schemas, mctx.ecc, emitPartial)
        if planErr != nil {
-               return nil, nil, fmt.Errorf("fail to analyze the query request 
for measure %s: %w", queryCriteria.GetName(), planErr)
+               return nil, "", fmt.Errorf("fail to analyze the query request 
for measure %s: %w", queryCriteria.GetName(), planErr)
        }
        if e := mctx.ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("query plan")
@@ -221,9 +254,28 @@ func executeMeasurePlan(
        mIterator, execErr := plan.(executor.MeasureExecutable).Execute(ctx)
        if execErr != nil {
                mctx.ml.Error().Err(execErr).RawJSON("req", 
logger.Proto(queryCriteria)).Msg("fail to query")
-               return nil, nil, fmt.Errorf("fail to execute the query plan for 
measure %s: %w", queryCriteria.GetName(), execErr)
+               return nil, "", fmt.Errorf("fail to execute the query plan for 
measure %s: %w", queryCriteria.GetName(), execErr)
        }
-       return mIterator, plan, nil
+       return mIterator, plan.String(), nil
+}
+
+// tryVecDispatch is the thin adapter from measureExecutionContext into
+// the vec dispatch inputs. Multi-measure queries are not yet wired (G8
+// follow-up), so they fall through to the row path.
+func tryVecDispatch(
+       ctx context.Context,
+       queryCriteria *measurev1.QueryRequest,
+       mctx *measureExecutionContext,
+) (executor.MIterator, string, bool, error) {
+       if len(mctx.ecc) != 1 {
+               return nil, "", false, nil
+       }
+       vec, ok := mctx.ecc[0].(vecExecutionContext)
+       if !ok {
+               return nil, "", false, nil
+       }
+       return vecplan.Dispatch(ctx, queryCriteria, mctx.metadata[0], 
vec.GetSchema(),
+               mctx.schemas[0], vec, vec.VectorizedConfig())
 }
 
 // collectInternalDataPoints collects InternalDataPoints from the iterator.
@@ -269,7 +321,7 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a 
query event")
        }
 
-       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, false)
+       mIterator, planStr, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, false)
        if execErr != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
                return
@@ -285,7 +337,7 @@ func (p *measureQueryProcessor) executeQuery(ctx 
context.Context, queryCriteria
        if queryCriteria.Trace {
                tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
                span, ctx = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
-               span.Tag("plan", plan.String())
+               span.Tag("plan", planStr)
                defer func() {
                        data := resp.Data()
                        switch d := data.(type) {
@@ -373,7 +425,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an 
internal query event")
        }
 
-       mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, internalRequest.GetAggReturnPartial())
+       mIterator, planStr, execErr := executeMeasurePlan(ctx, queryCriteria, 
mctx, internalRequest.GetAggReturnPartial())
        if execErr != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", 
execErr))
                return
@@ -389,7 +441,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
        if queryCriteria.Trace {
                tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano))
                span, _ = tracer.StartSpan(ctx, "data-%s", 
p.queryService.nodeID)
-               span.Tag("plan", plan.String())
+               span.Tag("plan", planStr)
                defer func() {
                        respData := resp.Data()
                        switch d := respData.(type) {

Reply via email to