This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit cf44971fd1beecea33e6572b8d1942f3a64f8229 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 13 13:59:32 2026 +0000 feat(banyand/query): wire vec dispatch into executeMeasurePlan (G8d.2) executeMeasurePlan now tries plan.Dispatch (G8d.1) before invoking the deprecated row-path Analyze. When dispatch returns handled=true, the iterator is returned directly and the row plan is skipped — this is the structural goal of the G8 plan: the vec subsystem owns its eligible queries end-to-end, no leaf substitution. Plumbing: - executeMeasurePlan now returns (MIterator, string, error) instead of (MIterator, logical.Plan, error). The string is the rendered plan used for tracing — abstracted from logical.Plan so the vec subsystem (which does not produce one) can participate. - A small tryVecDispatch adapter extracts the optional VectorizedConfig + *databasev1.Measure capabilities from the measureExecutionContext's first ec via the new vecExecutionContext interface. Multi-measure queries fall through to row (G8 follow-up). - Both call sites of executeMeasurePlan (the QueryRequest handler at the standalone path and the InternalQueryRequest handler at the data-node path) use the new planStr return value for tracing in place of plan.String(). Eligibility for vec dispatch is gated inside plan.Dispatch — the wire- up here just exposes the capability and falls through on rejection. GroupBy/Agg/Top queries continue through the row path unchanged. Build + lint + tests clean. --- banyand/query/processor.go | 68 ++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 60 insertions(+), 8 deletions(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index fe9ad7f1f..e42b1b1aa 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" @@ -46,6 +47,8 @@ import ( logical_stream "github.com/apache/skywalking-banyandb/pkg/query/logical/stream" logical_trace "github.com/apache/skywalking-banyandb/pkg/query/logical/trace" "github.com/apache/skywalking-banyandb/pkg/query/model" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" + vecplan "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure/plan" ) const ( @@ -203,17 +206,47 @@ func buildMeasureContext(measureService measure.Service, log *logger.Logger, que }, nil } +// vecExecutionContext is the optional capability the production +// MeasureExecutionContext implementation exposes to surface the active +// VectorizedConfig and the *databasev1.Measure schema needed by the vec +// dispatch. Production satisfies it; tests that swap the row executor +// can leave it unimplemented to keep dispatch dormant. +type vecExecutionContext interface { + executor.MeasureExecutionContext + VectorizedConfig() vmeasure.VectorizedConfig + GetSchema() *databasev1.Measure +} + // executeMeasurePlan executes the measure query plan and returns the iterator. +// +// G8d: the vec subsystem is tried first via vecplan.Dispatch. When the +// request is vec-eligible (no GroupBy/Agg/Top, has TimeRange, no hidden +// criteria tags), dispatch returns a vec MIterator and the row-path +// Analyze is skipped entirely. Otherwise, control flows through to the +// deprecated row plan unchanged. +// +// The second return value is the rendered plan string used by the caller +// for tracing — abstracted to a string so the vec subsystem (which does +// not produce a logical.Plan) can participate. func executeMeasurePlan( ctx context.Context, queryCriteria *measurev1.QueryRequest, mctx *measureExecutionContext, emitPartial bool, -) (executor.MIterator, logical.Plan, error) { +) (executor.MIterator, string, error) { + if mit, planStr, handled, dispatchErr := tryVecDispatch(ctx, queryCriteria, mctx); dispatchErr != nil { + return nil, "", fmt.Errorf("fail to dispatch the query request for measure %s: %w", queryCriteria.GetName(), dispatchErr) + } else if handled { + if e := mctx.ml.Debug(); e.Enabled() { + e.Str("plan", planStr).Msg("vec query plan") + } + return mit, planStr, nil + } + // nolint:staticcheck // SA1019 — row-path Analyze is the only production path until G8 ships. plan, planErr := logical_measure.Analyze(queryCriteria, mctx.metadata, mctx.schemas, mctx.ecc, emitPartial) if planErr != nil { - return nil, nil, fmt.Errorf("fail to analyze the query request for measure %s: %w", queryCriteria.GetName(), planErr) + return nil, "", fmt.Errorf("fail to analyze the query request for measure %s: %w", queryCriteria.GetName(), planErr) } if e := mctx.ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("query plan") @@ -221,9 +254,28 @@ func executeMeasurePlan( mIterator, execErr := plan.(executor.MeasureExecutable).Execute(ctx) if execErr != nil { mctx.ml.Error().Err(execErr).RawJSON("req", logger.Proto(queryCriteria)).Msg("fail to query") - return nil, nil, fmt.Errorf("fail to execute the query plan for measure %s: %w", queryCriteria.GetName(), execErr) + return nil, "", fmt.Errorf("fail to execute the query plan for measure %s: %w", queryCriteria.GetName(), execErr) } - return mIterator, plan, nil + return mIterator, plan.String(), nil +} + +// tryVecDispatch is the thin adapter from measureExecutionContext into +// the vec dispatch inputs. Multi-measure queries are not yet wired (G8 +// follow-up), so they fall through to the row path. +func tryVecDispatch( + ctx context.Context, + queryCriteria *measurev1.QueryRequest, + mctx *measureExecutionContext, +) (executor.MIterator, string, bool, error) { + if len(mctx.ecc) != 1 { + return nil, "", false, nil + } + vec, ok := mctx.ecc[0].(vecExecutionContext) + if !ok { + return nil, "", false, nil + } + return vecplan.Dispatch(ctx, queryCriteria, mctx.metadata[0], vec.GetSchema(), + mctx.schemas[0], vec, vec.VectorizedConfig()) } // collectInternalDataPoints collects InternalDataPoints from the iterator. @@ -269,7 +321,7 @@ func (p *measureQueryProcessor) executeQuery(ctx context.Context, queryCriteria e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a query event") } - mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx, false) + mIterator, planStr, execErr := executeMeasurePlan(ctx, queryCriteria, mctx, false) if execErr != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", execErr)) return @@ -285,7 +337,7 @@ func (p *measureQueryProcessor) executeQuery(ctx context.Context, queryCriteria if queryCriteria.Trace { tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) span, ctx = tracer.StartSpan(ctx, "data-%s", p.queryService.nodeID) - span.Tag("plan", plan.String()) + span.Tag("plan", planStr) defer func() { data := resp.Data() switch d := data.(type) { @@ -373,7 +425,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received an internal query event") } - mIterator, plan, execErr := executeMeasurePlan(ctx, queryCriteria, mctx, internalRequest.GetAggReturnPartial()) + mIterator, planStr, execErr := executeMeasurePlan(ctx, queryCriteria, mctx, internalRequest.GetAggReturnPartial()) if execErr != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("%v", execErr)) return @@ -389,7 +441,7 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes if queryCriteria.Trace { tracer, ctx = query.NewTracer(ctx, n.Format(time.RFC3339Nano)) span, _ = tracer.StartSpan(ctx, "data-%s", p.queryService.nodeID) - span.Tag("plan", plan.String()) + span.Tag("plan", planStr) defer func() { respData := resp.Data() switch d := respData.(type) {
