This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a59dabeef53d7daaac7711e8feaf536f820b75ac Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 14 09:17:42 2026 +0000 fix(banyand/query): fall through to row path on emitPartial GroupBy+Agg requests In distributed mode, the data-node processor calls executeMeasurePlan with emitPartial=true for GroupBy+Agg requests so the row path emits Map-mode partial results (sum + count for MEAN) the liaison can Reduce. The vec subsystem only implements AggModeAll (single-node full reduce); when vec dispatch handled the data-node call, each shard emitted a final aggregation and the liaison double-counted as if they were partials. distributed/multi_segments/Scanning Measures hit this for nine specs covering MEAN groups and top/bottom by id (which use GroupBy+Agg under the hood). Standalone parity was unaffected because executeMeasurePlan runs there with emitPartial=false. tryVecDispatch now declines GroupBy+Agg when emitPartial is true. Implementing AggModeMap is post-v1 work and would let vec take this path too. via [HAPI](https://hapi.run) --- banyand/query/processor.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/banyand/query/processor.go b/banyand/query/processor.go index e42b1b1aa..7ef4477a1 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -234,7 +234,7 @@ func executeMeasurePlan( mctx *measureExecutionContext, emitPartial bool, ) (executor.MIterator, string, error) { - if mit, planStr, handled, dispatchErr := tryVecDispatch(ctx, queryCriteria, mctx); dispatchErr != nil { + if mit, planStr, handled, dispatchErr := tryVecDispatch(ctx, queryCriteria, mctx, emitPartial); dispatchErr != nil { return nil, "", fmt.Errorf("fail to dispatch the query request for measure %s: %w", queryCriteria.GetName(), dispatchErr) } else if handled { if e := mctx.ml.Debug(); e.Enabled() { @@ -262,14 +262,25 @@ func executeMeasurePlan( // tryVecDispatch is the thin adapter from measureExecutionContext into // the vec dispatch inputs. Multi-measure queries are not yet wired (G8 // follow-up), so they fall through to the row path. +// +// Distributed Map-mode GroupBy+Agg requests (emitPartial=true) also fall +// through: the vec subsystem only implements AggModeAll (single-node full +// reduce), so emitting AggModeAll results on each data node and letting +// the liaison naively merge them would double-count groups. The row path +// handles the Map/Reduce split via measure_plan_aggregation.go's +// emitPartial flag. func tryVecDispatch( ctx context.Context, queryCriteria *measurev1.QueryRequest, mctx *measureExecutionContext, + emitPartial bool, ) (executor.MIterator, string, bool, error) { if len(mctx.ecc) != 1 { return nil, "", false, nil } + if emitPartial && (queryCriteria.GetGroupBy() != nil || queryCriteria.GetAgg() != nil) { + return nil, "", false, nil + } vec, ok := mctx.ecc[0].(vecExecutionContext) if !ok { return nil, "", false, nil
