This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 348c09c297d07d633870d3e18f99d6dc63cde7a3
Author: Hongtao Gao <[email protected]>
AuthorDate: Thu May 14 04:39:20 2026 +0000

    fix(query/vectorized/measure/plan): thread GroupBy/Agg into Scan schema so 
vec/storage column types agree (G8d.3)
    
    plan.Analyze built scan.BatchSchema without GroupBy/Agg context, so the
    operator-facing schema declared passthrough TagValue/FieldValue columns
    even though storage rebuilt result.batchSchema with native typed columns
    for the agg-relevant keys/field. The mismatch surfaced as a hard
    type-assert panic in BatchAggregation.fold (TypedColumn[int64] against a
    passthrough source) or, with storage forced to match, a TypedColumn[T]
    mismatch in BatchSourceFromBatchResult.appendColumnRange.
    
    Now both halves of the bridge derive their schema from the same opts.
    ScanParams.GroupBy/Agg carries the analyzer's translated specs through to
    dispatch's ec.Query call.
    
    via [HAPI](https://hapi.run)
---
 pkg/query/vectorized/measure/plan/analyzer.go | 33 +++++++++++++++++++--------
 pkg/query/vectorized/measure/plan/dispatch.go | 15 ++++++++++--
 pkg/query/vectorized/measure/plan/scan.go     |  2 ++
 3 files changed, 39 insertions(+), 11 deletions(-)

diff --git a/pkg/query/vectorized/measure/plan/analyzer.go 
b/pkg/query/vectorized/measure/plan/analyzer.go
index 69632f5ea..620c9dcc5 100644
--- a/pkg/query/vectorized/measure/plan/analyzer.go
+++ b/pkg/query/vectorized/measure/plan/analyzer.go
@@ -60,9 +60,30 @@ func Analyze(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (Ve
 
        tagProjection := buildTagProjection(req)
        fieldProjection := req.GetFieldProjection().GetNames()
+
+       // GroupBy + Agg must be resolved BEFORE BuildBatchSchema so the Scan
+       // node's BatchSchema declares native typed columns for the GroupBy
+       // keys and Agg field. The operator (BatchAggregation.fold) hard-casts
+       // those columns to TypedColumn[int64] / [float64]; passthrough
+       // columns would panic. Storage's queryResult.batchSchema is rebuilt
+       // from the same opts in banyand/measure/query.go, so both halves of
+       // the bridge agree on column types.
+       hasGroupBy := req.GetGroupBy() != nil
+       hasAgg := req.GetAgg() != nil
+       var gbModel *model.MeasureGroupBy
+       var aggModel *model.MeasureAgg
+       if hasGroupBy && hasAgg {
+               var translateErr error
+               gbModel, aggModel, translateErr = translateGroupByAgg(req, 
measureSchema)
+               if translateErr != nil {
+                       return nil, translateErr
+               }
+       }
        opts := model.MeasureQueryOptions{
                TagProjection:   tagProjection,
                FieldProjection: fieldProjection,
+               GroupBy:         gbModel,
+               Agg:             aggModel,
        }
        batchSchema, schemaErr := measure.BuildBatchSchema(measureSchema, opts)
        if schemaErr != nil {
@@ -80,19 +101,13 @@ func Analyze(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (Ve
                TimeRange:       tr,
                TagProjection:   tagProjection,
                FieldProjection: fieldProjection,
+               GroupBy:         gbModel,
+               Agg:             aggModel,
        })
 
-       // GroupBy + Agg coalesced into a single GroupByAgg node. Validation
-       // matches the G7d planner's contract.
-       hasGroupBy := req.GetGroupBy() != nil
-       hasAgg := req.GetAgg() != nil
        switch {
        case hasGroupBy && hasAgg:
-               gb, aggSpec, validateErr := translateGroupByAgg(req, 
measureSchema)
-               if validateErr != nil {
-                       return nil, validateErr
-               }
-               gba, gbaErr := NewGroupByAgg(plan, gb, aggSpec)
+               gba, gbaErr := NewGroupByAgg(plan, gbModel, aggModel)
                if gbaErr != nil {
                        return nil, gbaErr
                }
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index 8ec0f9ee0..6666af7cc 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -239,11 +239,20 @@ func Dispatch(
 
        // Execute the storage query. The vec source is constructed from the
        // returned MeasureQueryResult and threaded into the Scan node.
+       //
+       // GroupBy/Agg must be threaded into opts so banyand/measure/query.go
+       // rebuilds result.batchSchema with the same native typed columns the
+       // analyzer baked into scan.BatchSchema. Mismatched halves (one side
+       // native, the other passthrough) surface as a type-assert panic in
+       // BatchAggregation.fold or a TypedColumn[T] mismatch in
+       // BatchSourceFromBatchResult.appendColumnRange.
        opts := model.MeasureQueryOptions{
                Name:            metadata.GetName(),
                TimeRange:       scan.Params.TimeRange,
                Entities:        entities,
                Query:           query,
+               GroupBy:         scan.Params.GroupBy,
+               Agg:             scan.Params.Agg,
                TagProjection:   scan.Params.TagProjection,
                FieldProjection: scan.Params.FieldProjection,
        }
@@ -364,7 +373,9 @@ func findSchemaTagFamily(m *databasev1.Measure, name 
string) *databasev1.TagFami
 //
 // v1 requires GroupBy.tag_projection to name a single family; that
 // family must appear in req.TagProjection with every tag in GroupBy
-// present. Agg.field_name must appear in req.FieldProjection.
+// present. Agg.field_name must appear in req.FieldProjection. Non-key
+// projected tags are allowed; BatchAggregation carries them forward as
+// first-seen-per-group, matching the row path.
 func aggProjectionCoverage(req *measurev1.QueryRequest) bool {
        gb := req.GetGroupBy()
        if gb == nil {
@@ -385,7 +396,6 @@ func aggProjectionCoverage(req *measurev1.QueryRequest) 
bool {
                        return false
                }
        }
-       // Agg field must be in FieldProjection.
        aggField := req.GetAgg().GetFieldName()
        if aggField == "" {
                return false
@@ -414,3 +424,4 @@ func projectedTagsByFamily(tp *modelv1.TagProjection) 
map[string]map[string]stru
        }
        return out
 }
+
diff --git a/pkg/query/vectorized/measure/plan/scan.go 
b/pkg/query/vectorized/measure/plan/scan.go
index f54f28726..2469ec411 100644
--- a/pkg/query/vectorized/measure/plan/scan.go
+++ b/pkg/query/vectorized/measure/plan/scan.go
@@ -39,6 +39,8 @@ type ScanParams struct {
        TimeRange       *timestamp.TimeRange
        Query           index.Query
        Entities        [][]*modelv1.TagValue
+       GroupBy         *model.MeasureGroupBy
+       Agg             *model.MeasureAgg
        TagProjection   []model.TagProjection
        FieldProjection []string
 }

Reply via email to