This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 348c09c297d07d633870d3e18f99d6dc63cde7a3 Author: Hongtao Gao <[email protected]> AuthorDate: Thu May 14 04:39:20 2026 +0000 fix(query/vectorized/measure/plan): thread GroupBy/Agg into Scan schema so vec/storage column types agree (G8d.3) plan.Analyze built scan.BatchSchema without GroupBy/Agg context, so the operator-facing schema declared passthrough TagValue/FieldValue columns even though storage rebuilt result.batchSchema with native typed columns for the agg-relevant keys/field. The mismatch surfaced as a hard type-assert panic in BatchAggregation.fold (TypedColumn[int64] against a passthrough source) or, with storage forced to match, a TypedColumn[T] mismatch in BatchSourceFromBatchResult.appendColumnRange. Now both halves of the bridge derive their schema from the same opts. ScanParams.GroupBy/Agg carries the analyzer's translated specs through to dispatch's ec.Query call. via [HAPI](https://hapi.run) --- pkg/query/vectorized/measure/plan/analyzer.go | 33 +++++++++++++++++++-------- pkg/query/vectorized/measure/plan/dispatch.go | 15 ++++++++++-- pkg/query/vectorized/measure/plan/scan.go | 2 ++ 3 files changed, 39 insertions(+), 11 deletions(-) diff --git a/pkg/query/vectorized/measure/plan/analyzer.go b/pkg/query/vectorized/measure/plan/analyzer.go index 69632f5ea..620c9dcc5 100644 --- a/pkg/query/vectorized/measure/plan/analyzer.go +++ b/pkg/query/vectorized/measure/plan/analyzer.go @@ -60,9 +60,30 @@ func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (Ve tagProjection := buildTagProjection(req) fieldProjection := req.GetFieldProjection().GetNames() + + // GroupBy + Agg must be resolved BEFORE BuildBatchSchema so the Scan + // node's BatchSchema declares native typed columns for the GroupBy + // keys and Agg field. The operator (BatchAggregation.fold) hard-casts + // those columns to TypedColumn[int64] / [float64]; passthrough + // columns would panic. Storage's queryResult.batchSchema is rebuilt + // from the same opts in banyand/measure/query.go, so both halves of + // the bridge agree on column types. + hasGroupBy := req.GetGroupBy() != nil + hasAgg := req.GetAgg() != nil + var gbModel *model.MeasureGroupBy + var aggModel *model.MeasureAgg + if hasGroupBy && hasAgg { + var translateErr error + gbModel, aggModel, translateErr = translateGroupByAgg(req, measureSchema) + if translateErr != nil { + return nil, translateErr + } + } opts := model.MeasureQueryOptions{ TagProjection: tagProjection, FieldProjection: fieldProjection, + GroupBy: gbModel, + Agg: aggModel, } batchSchema, schemaErr := measure.BuildBatchSchema(measureSchema, opts) if schemaErr != nil { @@ -80,19 +101,13 @@ func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (Ve TimeRange: tr, TagProjection: tagProjection, FieldProjection: fieldProjection, + GroupBy: gbModel, + Agg: aggModel, }) - // GroupBy + Agg coalesced into a single GroupByAgg node. Validation - // matches the G7d planner's contract. - hasGroupBy := req.GetGroupBy() != nil - hasAgg := req.GetAgg() != nil switch { case hasGroupBy && hasAgg: - gb, aggSpec, validateErr := translateGroupByAgg(req, measureSchema) - if validateErr != nil { - return nil, validateErr - } - gba, gbaErr := NewGroupByAgg(plan, gb, aggSpec) + gba, gbaErr := NewGroupByAgg(plan, gbModel, aggModel) if gbaErr != nil { return nil, gbaErr } diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index 8ec0f9ee0..6666af7cc 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -239,11 +239,20 @@ func Dispatch( // Execute the storage query. The vec source is constructed from the // returned MeasureQueryResult and threaded into the Scan node. + // + // GroupBy/Agg must be threaded into opts so banyand/measure/query.go + // rebuilds result.batchSchema with the same native typed columns the + // analyzer baked into scan.BatchSchema. Mismatched halves (one side + // native, the other passthrough) surface as a type-assert panic in + // BatchAggregation.fold or a TypedColumn[T] mismatch in + // BatchSourceFromBatchResult.appendColumnRange. opts := model.MeasureQueryOptions{ Name: metadata.GetName(), TimeRange: scan.Params.TimeRange, Entities: entities, Query: query, + GroupBy: scan.Params.GroupBy, + Agg: scan.Params.Agg, TagProjection: scan.Params.TagProjection, FieldProjection: scan.Params.FieldProjection, } @@ -364,7 +373,9 @@ func findSchemaTagFamily(m *databasev1.Measure, name string) *databasev1.TagFami // // v1 requires GroupBy.tag_projection to name a single family; that // family must appear in req.TagProjection with every tag in GroupBy -// present. Agg.field_name must appear in req.FieldProjection. +// present. Agg.field_name must appear in req.FieldProjection. Non-key +// projected tags are allowed; BatchAggregation carries them forward as +// first-seen-per-group, matching the row path. func aggProjectionCoverage(req *measurev1.QueryRequest) bool { gb := req.GetGroupBy() if gb == nil { @@ -385,7 +396,6 @@ func aggProjectionCoverage(req *measurev1.QueryRequest) bool { return false } } - // Agg field must be in FieldProjection. aggField := req.GetAgg().GetFieldName() if aggField == "" { return false @@ -414,3 +424,4 @@ func projectedTagsByFamily(tp *modelv1.TagProjection) map[string]map[string]stru } return out } + diff --git a/pkg/query/vectorized/measure/plan/scan.go b/pkg/query/vectorized/measure/plan/scan.go index f54f28726..2469ec411 100644 --- a/pkg/query/vectorized/measure/plan/scan.go +++ b/pkg/query/vectorized/measure/plan/scan.go @@ -39,6 +39,8 @@ type ScanParams struct { TimeRange *timestamp.TimeRange Query index.Query Entities [][]*modelv1.TagValue + GroupBy *model.MeasureGroupBy + Agg *model.MeasureAgg TagProjection []model.TagProjection FieldProjection []string }
