This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 253c56538f51755069828f1fbca8be8481c3d510
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 13 22:48:36 2026 +0000

    feat(query/vectorized/measure): BuildBatchSchema emits native columns for 
GroupBy keys + Agg field (G8d.2)
    
    Schema/storage bridge for the vec aggregation path. BuildBatchSchema
    defaults to passthrough TagValue / FieldValue columns (preserves the
    G5a zero-alloc egress for plain scans), but when opts.GroupBy or
    opts.Agg name a projected column, that specific column is emitted as
    a NATIVE typed column instead (int64 / float64 / string / etc.):
    
      - opts.GroupBy.TagFamily + each TagName → native tag columns so
        BatchGroupBy / BatchAggregation can compute group keys directly
        from typed primitives.
    
      - opts.Agg.FieldName → native field column so BatchAggregation.fold
        reads int64 / float64 values directly instead of unwrapping a
        *modelv1.FieldValue per row.
    
    The storage decoders (banyand/measure/batch_decode.go's
    appendDecodedTagBytesAsTyped / appendDecodedFieldBytesAsTyped) already
    handle both shapes — switching on the destination column type — so the
    column-type choice is fully owned by BuildBatchSchema.
    
    Also threads opts.GroupBy + opts.Agg through the storage-side
    BuildBatchSchema invocation at banyand/measure/query.go so the
    queryResult.batchSchema matches what the dispatch caller built. Without
    this the storage layer would produce passthrough columns while
    dispatch's schema declared native, leading to appendColumnRange type
    mismatches in BatchSourceFromBatchResult.
    
    The dispatch gate that consumes the native columns lands in the next
    commit behind an AggregationEnabled feature flag (default off) so
    production keeps routing GroupBy+Agg through the row path until the
    egress contract reaches row-path parity. Plain queries are unaffected.
    
    Tests: existing measure + plan suites pass; integration suite green
    (488/488) with the gate closed (default state for this commit).
---
 banyand/measure/query.go                    |  8 +++
 pkg/query/vectorized/measure/integration.go | 91 +++++++++++++++++++++--------
 2 files changed, 74 insertions(+), 25 deletions(-)

diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 2819552a4..3d5f82ef1 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -228,9 +228,17 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
        // projection. Falls back to nil on schema-build failure; PullBatch
        // checks for nil and returns a clean error rather than degrading
        // the row-path Pull().
+       // G8d.2: thread GroupBy + Agg through so BuildBatchSchema emits native
+       // column types for the agg-relevant columns. Plain queries get
+       // passthrough as before; queries that the vec subsystem will fold
+       // over get native int64/float64/string/etc. columns ready for the
+       // BatchAggregation operator (computeKey + fold). The storage decoders
+       // (banyand/measure/batch_decode.go) already handle both column shapes.
        result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, 
model.MeasureQueryOptions{
                TagProjection:   result.tagProjection,
                FieldProjection: mqo.FieldProjection,
+               GroupBy:         mqo.GroupBy,
+               Agg:             mqo.Agg,
        })
 
        return &result, nil
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
index ee46610ea..2e3d40257 100644
--- a/pkg/query/vectorized/measure/integration.go
+++ b/pkg/query/vectorized/measure/integration.go
@@ -38,6 +38,25 @@ import (
 // dropped — the row path silently skips unknown tags as well, so this matches
 // existing semantics. Fields not present in the schema yield a Null-typed
 // column so projection still produces a slot in the output.
+//
+// Column types — passthrough vs native (G8d.2):
+//
+// Tag and field projections default to passthrough columns: the column
+// cell type is *modelv1.TagValue / *modelv1.FieldValue, holding the
+// original protobuf pointer from the scan source unchanged. The egress
+// serializer returns those pointers directly, matching the row path's
+// zero-alloc per-cell behavior. With the gRPC wire format frozen
+// (`*measurev1.InternalDataPoint` is row-shaped), passthrough wins for
+// plain scans because native columns would force egress to reconstruct
+// the protobuf wrapper (3 allocs/cell), regressing the G5a bench gates.
+//
+// When opts.GroupBy or opts.Agg name a projected tag or field, that
+// specific column is emitted as a NATIVE typed column instead. The
+// downstream BatchAggregation operator reads typed primitives directly
+// from those columns (computeKey / fold) and produces aggregated rows
+// whose count is bounded by group cardinality, amortizing the eventual
+// wrapper reconstruction at egress. Columns not referenced by GroupBy /
+// Agg remain passthrough.
 func BuildBatchSchema(measureSchema *databasev1.Measure, opts 
model.MeasureQueryOptions) (*vectorized.BatchSchema, error) {
        if measureSchema == nil {
                return nil, fmt.Errorf("vectorized.measure: nil Measure schema")
@@ -58,42 +77,29 @@ func BuildBatchSchema(measureSchema *databasev1.Measure, 
opts model.MeasureQuery
                tagSpecs[tf.GetName()] = byName
        }
 
-       // Tag and field projections become passthrough columns: the column cell
-       // type is *modelv1.TagValue / *modelv1.FieldValue, holding the original
-       // protobuf pointer from the scan source unchanged. The egress 
serializer
-       // returns those pointers directly, matching the row path's zero-alloc
-       // per-cell behavior.
-       //
-       // Why passthrough beats native here: with the gRPC wire format frozen
-       // (`*measurev1.InternalDataPoint` is row-shaped), egress must produce
-       // a *modelv1.TagValue per cell. Passthrough lets the cell flow through
-       // the pipeline pre-built; native columns force the egress to
-       // reconstruct the protobuf wrapper (3 allocs/cell), regressing the
-       // G5a bench gates by ~1.5–2× ns/op. Native column types only pay off
-       // when downstream operators (BatchGroupBy / BatchAggregation /
-       // BatchTop, planned for G6) consume the typed primitives — at which
-       // point the operator output reduces row count enough to amortize the
-       // reconstruction. Until then, passthrough is the production-correct
-       // choice.
-       //
-       // We still validate that the schema declares each projected name with
-       // a supported variant so the row-path null fill (for projection
-       // entries absent from a multi-group result) carries known semantics.
+       nativeTagSet := buildNativeTagSet(opts.GroupBy)
+       nativeFieldSet := buildNativeFieldSet(opts.Agg)
+
        for _, tp := range opts.TagProjection {
                family := tagSpecs[tp.Family]
                for _, name := range tp.Names {
+                       colType := vectorized.ColumnTypeTagValue
                        if family != nil {
                                if spec, found := family[name]; found {
-                                       if _, mapErr := 
tagTypeToColumnType(spec.GetType()); mapErr != nil {
+                                       mapped, mapErr := 
tagTypeToColumnType(spec.GetType())
+                                       if mapErr != nil {
                                                return nil, 
fmt.Errorf("vectorized.measure: tag %s.%s: %w", tp.Family, name, mapErr)
                                        }
+                                       if _, native := 
nativeTagSet[nativeKey(tp.Family, name)]; native {
+                                               colType = mapped
+                                       }
                                }
                        }
                        cols = append(cols, vectorized.ColumnDef{
                                Role:      vectorized.RoleTag,
                                TagFamily: tp.Family,
                                Name:      name,
-                               Type:      vectorized.ColumnTypeTagValue,
+                               Type:      colType,
                        })
                }
        }
@@ -103,21 +109,56 @@ func BuildBatchSchema(measureSchema *databasev1.Measure, 
opts model.MeasureQuery
                fieldSpecs[fs.GetName()] = fs
        }
        for _, name := range opts.FieldProjection {
+               colType := vectorized.ColumnTypeFieldValue
                if spec, found := fieldSpecs[name]; found {
-                       if _, mapErr := 
fieldTypeToColumnType(spec.GetFieldType()); mapErr != nil {
+                       mapped, mapErr := 
fieldTypeToColumnType(spec.GetFieldType())
+                       if mapErr != nil {
                                return nil, fmt.Errorf("vectorized.measure: 
field %s: %w", name, mapErr)
                        }
+                       if _, native := nativeFieldSet[name]; native {
+                               colType = mapped
+                       }
                }
                cols = append(cols, vectorized.ColumnDef{
                        Role: vectorized.RoleField,
                        Name: name,
-                       Type: vectorized.ColumnTypeFieldValue,
+                       Type: colType,
                })
        }
 
        return vectorized.NewBatchSchema(cols), nil
 }
 
+// nativeKey produces the composite key used to look up tag (family, name)
+// pairs in the native-tag set without allocating.
+func nativeKey(family, name string) string { return family + "\x00" + name }
+
+// buildNativeTagSet collects the (family, name) tuples that should be
+// materialized as native typed columns because a GroupBy clause keys
+// off them. Returns an empty set when GroupBy is unset.
+func buildNativeTagSet(gb *model.MeasureGroupBy) map[string]struct{} {
+       out := make(map[string]struct{})
+       if gb == nil || gb.TagFamily == "" {
+               return out
+       }
+       for _, name := range gb.TagNames {
+               out[nativeKey(gb.TagFamily, name)] = struct{}{}
+       }
+       return out
+}
+
+// buildNativeFieldSet collects the field names whose columns must be
+// native because Agg reduces over them. Returns an empty set when Agg is
+// unset.
+func buildNativeFieldSet(agg *model.MeasureAgg) map[string]struct{} {
+       out := make(map[string]struct{})
+       if agg == nil || agg.FieldName == "" {
+               return out
+       }
+       out[agg.FieldName] = struct{}{}
+       return out
+}
+
 func tagTypeToColumnType(t databasev1.TagType) (vectorized.ColumnType, error) {
        switch t {
        case databasev1.TagType_TAG_TYPE_INT:

Reply via email to