This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 9128d9fc2ac0c3c42af458133c1d75899f14adca
Author: Hongtao Gao <[email protected]>
AuthorDate: Fri May 15 10:10:49 2026 +0000

    feat(query/vectorized/measure/plan): G9a single-node Top dispatch + 
analyzer sort-direction fix
---
 pkg/query/vectorized/measure/plan/analyzer.go      |  6 ++-
 pkg/query/vectorized/measure/plan/analyzer_test.go | 35 ++++++++++++++
 pkg/query/vectorized/measure/plan/dispatch.go      | 14 +++---
 pkg/query/vectorized/measure/plan/dispatch_test.go | 46 ++++++++++++------
 pkg/query/vectorized/measure/plan/top.go           |  8 ++--
 pkg/query/vectorized/measure/top.go                | 54 +++++++++++++++++-----
 pkg/query/vectorized/measure/top_test.go           | 44 ++++++++++++++++++
 7 files changed, 169 insertions(+), 38 deletions(-)

diff --git a/pkg/query/vectorized/measure/plan/analyzer.go 
b/pkg/query/vectorized/measure/plan/analyzer.go
index 6eee444fb..035ae3882 100644
--- a/pkg/query/vectorized/measure/plan/analyzer.go
+++ b/pkg/query/vectorized/measure/plan/analyzer.go
@@ -22,6 +22,7 @@ import (
 
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/model"
        measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -137,7 +138,10 @@ func Analyze(req *measurev1.QueryRequest, measureSchema 
*databasev1.Measure) (Ve
        }
 
        if t := req.GetTop(); t != nil {
-               asc := t.GetFieldValueSort() == 1 // SORT_ASC == 1 in 
modelv1.Sort
+               // Match the row path (pkg/query/logical/measure.unresolvedTop):
+               // FieldValueSort==SORT_ASC keeps the lowest N (BatchTop 
asc=true);
+               // anything else (SORT_DESC / SORT_UNSPECIFIED) keeps the 
highest N.
+               asc := t.GetFieldValueSort() == modelv1.Sort_SORT_ASC
                plan = NewTop(plan, t.GetFieldName(), int(t.GetNumber()), asc)
        }
 
diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go 
b/pkg/query/vectorized/measure/plan/analyzer_test.go
index f7041dd87..544b6fc5b 100644
--- a/pkg/query/vectorized/measure/plan/analyzer_test.go
+++ b/pkg/query/vectorized/measure/plan/analyzer_test.go
@@ -142,6 +142,41 @@ func TestAnalyze_TopBetweenGroupByAggAndLimit(t 
*testing.T) {
        if _, ok := top.Child.(*GroupByAgg); !ok {
                t.Fatalf("Top child: want *GroupByAgg, got %T", top.Child)
        }
+       // SORT_DESC must map to a descending top (asc=false) so the vec
+       // analyzer matches the row path's reverted semantics. Sort enum
+       // values are SORT_UNSPECIFIED=0, SORT_DESC=1, SORT_ASC=2 — a naive
+       // "==1 means asc" inverts every Top fixture.
+       if top.Asc {
+               t.Fatal("SORT_DESC must produce a descending Top (Asc=false)")
+       }
+}
+
+func TestAnalyze_TopSortAsc_MapsToAscending(t *testing.T) {
+       req := &measurev1.QueryRequest{
+               Name:            "demo",
+               TagProjection:   projTagProj(),
+               FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"value"}},
+               Top: &measurev1.QueryRequest_Top{
+                       Number:         3,
+                       FieldName:      "value",
+                       FieldValueSort: modelv1.Sort_SORT_ASC,
+               },
+       }
+       p, err := Analyze(req, testMeasureSchema())
+       if err != nil {
+               t.Fatalf("Analyze: %v", err)
+       }
+       limit, ok := p.(*Limit)
+       if !ok {
+               t.Fatalf("root: want *Limit, got %T", p)
+       }
+       top, ok := limit.Child.(*Top)
+       if !ok {
+               t.Fatalf("Limit child: want *Top, got %T", limit.Child)
+       }
+       if !top.Asc {
+               t.Fatal("SORT_ASC must produce an ascending Top (Asc=true)")
+       }
 }
 
 // TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy verifies the raw
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index bafceb089..c1864a21e 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -91,8 +91,9 @@ func FellThroughCount() int64 { return 
fellThroughCount.Load() }
 //   - request may carry GroupBy and/or Agg in any combination (group+agg,
 //     scalar reduce, raw GroupBy); plan.Analyze auto-extends the
 //     projection so the keys / agg field always resolve
-//   - request must NOT carry Top (BatchTop's single-heap semantic differs
-//     from the row path's per-timestamp top-N)
+//   - request may carry Top: the analyzer emits Scan → Top → Limit
+//     (or Scan → GroupByAgg → Top → Limit) and BatchTop reproduces the
+//     row path's top-N (G9a)
 //   - request must carry TimeRange (storage requires a bounded window)
 //   - hidden criteria tags (criteria tags absent from the projection)
 //     are projected for storage-side filtering, then stripped at egress
@@ -126,12 +127,9 @@ func Dispatch(
        if req == nil {
                return nil, "", false, nil
        }
-       if req.GetTop() != nil {
-               // Top awaits per-timestamp partitioning of BatchTop; the row
-               // path's TopN semantic differs from BatchTop's single-heap
-               // today.
-               return nil, "", false, nil
-       }
+       // Top is handled by the vec subsystem: plan.Analyze emits
+       // Scan → Top → Limit (or Scan → GroupByAgg → Top → Limit) and
+       // BatchTop reproduces the row path's top-N (G9a).
        // GroupBy and Agg are handled by the vec subsystem in all three
        // shapes — group+agg, scalar reduce (Agg only), raw GroupBy (GroupBy
        // only). plan.Analyze auto-extends the projection so the GroupBy keys
diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go 
b/pkg/query/vectorized/measure/plan/dispatch_test.go
index a809995b2..2e2a979a8 100644
--- a/pkg/query/vectorized/measure/plan/dispatch_test.go
+++ b/pkg/query/vectorized/measure/plan/dispatch_test.go
@@ -163,21 +163,37 @@ func 
TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) {
        }
 }
 
-// TestDispatch_Top_FallsThrough covers the per-timestamp top-N gap.
-func TestDispatch_Top_FallsThrough(t *testing.T) {
+// TestDispatch_Top_ReachesEcQuery confirms G9a removed the Top gate:
+// req.Top no longer triggers an eligibility fall-through. The request
+// proceeds to ec.Query, where the analyzer has emitted
+// Scan → Top → Limit. fakeEC returns nil so dispatch falls through after
+// ec.Query (the empty-result branch) — what matters is that ec.Query was
+// invoked at all, proving the Top gate no longer rejects the request.
+func TestDispatch_Top_ReachesEcQuery(t *testing.T) {
+       measureSchema := testMeasureSchema()
+       // nolint:staticcheck // SA1019 — row-path BuildSchema is the only 
schema builder until G8 replaces it.
+       logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, 
nil)
+       if schemaErr != nil {
+               t.Fatalf("BuildSchema: %v", schemaErr)
+       }
+       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       ec := &fakeEC{wantResult: nil, wantErr: nil}
+
        req := bareReq()
        req.Top = &measurev1.QueryRequest_Top{
                Number:         5,
                FieldName:      "value",
                FieldValueSort: modelv1.Sort_SORT_DESC,
        }
-       _, _, handled, err := Dispatch(context.Background(),
-               req, nil, nil, nil, nil, dispatchCfg(true))
+
+       iter, planStr, handled, err := Dispatch(context.Background(),
+               req, metadata, measureSchema, logicalSchema, ec, 
dispatchCfg(true))
        if err != nil {
-               t.Fatalf("Top fallthrough must not error: %v", err)
+               t.Fatalf("Top request must not error before ec.Query: %v", err)
        }
-       if handled {
-               t.Fatal("Top must fall through (BatchTop semantics differ from 
row TopN)")
+       if !ec.called {
+               t.Fatalf("Top request must reach ec.Query (G9a removed the Top 
gate); "+
+                       "got iter=%v planStr=%q handled=%v", iter, planStr, 
handled)
        }
 }
 
@@ -467,17 +483,17 @@ func TestDispatch_Counters_TrackFellThroughCalls(t 
*testing.T) {
        startHandled := HandledCount()
        startFellThrough := FellThroughCount()
 
-       // Three fallthroughs of distinct shapes. topReq trips the Top gate;
-       // the other two trip the PERMANENT nil-runtime-context guard (all-nil
-       // schema/ec/metadata). Note: post-G9c a nil TimeRange is no longer a
-       // fall-through on its own — noTimeReq falls through here only because
-       // the runtime context is nil.
-       topReq := bareReq()
-       topReq.Top = &measurev1.QueryRequest_Top{Number: 5, FieldName: "value"}
+       // Post-G9 the Top / GroupBy / nil-TimeRange shapes are all handled by
+       // the vec subsystem, so these requests fall through only via the
+       // PERMANENT nil-runtime-context guard (Dispatch is called with all-nil
+       // schema/ec/metadata in the loop below). The counter is still
+       // exercised on the clean (non-error) fall-through path.
+       gbReq := bareReq()
+       gbReq.GroupBy = &measurev1.QueryRequest_GroupBy{TagProjection: 
projTagProj(), FieldName: "value"}
        noTimeReq := bareReq()
        noTimeReq.TimeRange = nil
 
-       for _, req := range []*measurev1.QueryRequest{topReq, noTimeReq, 
bareReq() /* nil ec */} {
+       for _, req := range []*measurev1.QueryRequest{gbReq, noTimeReq, 
bareReq() /* nil ec */} {
                _, _, handled, dispatchErr := Dispatch(context.Background(),
                        req, nil, nil, nil, nil, dispatchCfg(true))
                if dispatchErr != nil {
diff --git a/pkg/query/vectorized/measure/plan/top.go 
b/pkg/query/vectorized/measure/plan/top.go
index 65dc62628..f80f1bfcc 100644
--- a/pkg/query/vectorized/measure/plan/top.go
+++ b/pkg/query/vectorized/measure/plan/top.go
@@ -26,9 +26,11 @@ import (
 )
 
 // Top selects the top-N (or bottom-N when Asc) rows by FieldName. Wraps
-// `measure.BatchTop`, which uses a single global heap — the row path's
-// per-timestamp TopN semantic is not yet reproduced here (BatchTop
-// extension is tracked separately; see G6 plan).
+// `measure.BatchTop`, a single global heap. This matches the row path's
+// req.Top handler (pkg/query/logical/measure.topOp), which also inserts
+// every data point into one TopQueue — req.Top is a whole-result top-N,
+// not the per-timestamp TopNQuery RPC (out of scope, see .omc/g9-plan.md
+// G9a).
 //
 // Schema-preserving.
 type Top struct {
diff --git a/pkg/query/vectorized/measure/top.go 
b/pkg/query/vectorized/measure/top.go
index 4bf63e0ba..f58fc99b3 100644
--- a/pkg/query/vectorized/measure/top.go
+++ b/pkg/query/vectorized/measure/top.go
@@ -21,6 +21,7 @@ import (
        "container/heap"
        "context"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
 )
 
@@ -147,9 +148,8 @@ func (t *BatchTop) Consume(_ context.Context, b 
*vectorized.RecordBatch) error {
                return nil
        }
        active := activeIndices(b)
-       isFloat := t.schema.Columns[t.fieldCol].Type == 
vectorized.ColumnTypeFloat64
        for _, rowIdx := range active {
-               candidate := t.materialize(b, int(rowIdx), isFloat)
+               candidate := t.materialize(b, int(rowIdx))
                candidate.seq = t.inputCount
                t.inputCount++
                if t.heapState.Len() < t.n {
@@ -226,22 +226,54 @@ func (t *BatchTop) Close() error {
 
 // materialize copies row rowIdx of b into a new topRow, reading the sort key
 // from the configured field column.
-func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int, isFloat 
bool) *topRow {
+//
+// The key column may be a native typed column (ColumnTypeInt64 /
+// ColumnTypeFloat64 — promoted when an Agg reduces over the field) or a
+// passthrough *modelv1.FieldValue column (ColumnTypeFieldValue — the
+// non-Agg Scan→Top→Limit path, where BuildBatchSchema leaves projected
+// fields as passthrough). Both are handled so the float/int decision is
+// per-column-shape, matching the row path's schema-field-type dispatch
+// (pkg/query/logical/measure.topOp.Execute).
+func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int) *topRow {
        cols := make([]vectorized.Column, len(t.schema.Columns))
        for i, def := range t.schema.Columns {
                cols[i] = vectorized.NewColumnForType(def.Type, 1)
                copyOneValue(cols[i], b.Columns[i], rowIdx)
        }
-       row := &topRow{cols: cols, isFloat: isFloat}
+       row := &topRow{cols: cols}
        keyCol := b.Columns[t.fieldCol]
-       if keyCol.IsNull(rowIdx) {
+       switch c := keyCol.(type) {
+       case *vectorized.TypedColumn[float64]:
+               row.isFloat = true
+               if c.IsNull(rowIdx) {
+                       row.isNull = true
+                       return row
+               }
+               row.floatVal = c.Data()[rowIdx]
+       case *vectorized.TypedColumn[int64]:
+               if c.IsNull(rowIdx) {
+                       row.isNull = true
+                       return row
+               }
+               row.intVal = c.Data()[rowIdx]
+       case *vectorized.TypedColumn[*modelv1.FieldValue]:
+               fv := c.Data()[rowIdx]
+               switch v := fv.GetValue().(type) {
+               case *modelv1.FieldValue_Float:
+                       row.isFloat = true
+                       row.floatVal = v.Float.GetValue()
+               case *modelv1.FieldValue_Int:
+                       row.intVal = v.Int.GetValue()
+               default:
+                       // Null / unset / non-numeric field value: treat as 
lowest,
+                       // matching the native-column null handling and the row 
path.
+                       row.isNull = true
+               }
+       default:
+               // Unexpected key column shape (string / bytes / arrays): no
+               // numeric sort key. Treat as lowest so the query still 
completes
+               // rather than panicking.
                row.isNull = true
-               return row
-       }
-       if isFloat {
-               row.floatVal = 
keyCol.(*vectorized.TypedColumn[float64]).Data()[rowIdx]
-       } else {
-               row.intVal = 
keyCol.(*vectorized.TypedColumn[int64]).Data()[rowIdx]
        }
        return row
 }
diff --git a/pkg/query/vectorized/measure/top_test.go 
b/pkg/query/vectorized/measure/top_test.go
index 3515d2203..c68202d40 100644
--- a/pkg/query/vectorized/measure/top_test.go
+++ b/pkg/query/vectorized/measure/top_test.go
@@ -21,6 +21,7 @@ import (
        "context"
        "testing"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/query/vectorized"
 )
 
@@ -205,6 +206,49 @@ func TestBatchTop_ZeroN_NoOp(t *testing.T) {
        }
 }
 
+// TestBatchTop_FieldValuePassthroughKey_Float pins the G9a regression: the
+// non-Agg Scan→Top→Limit path leaves the projected field as a passthrough
+// ColumnTypeFieldValue column (BuildBatchSchema only promotes Agg fields to
+// native typed columns). BatchTop must read the float sort key out of the
+// *modelv1.FieldValue rather than panicking on a TypedColumn[int64] cast.
+func TestBatchTop_FieldValuePassthroughKey_Float(t *testing.T) {
+       s := vectorized.NewBatchSchema([]vectorized.ColumnDef{
+               {Role: vectorized.RoleField, Name: "value", Type: 
vectorized.ColumnTypeFieldValue},
+       })
+       top := NewBatchTop(s, 0, 3, false, 8) // desc top-3
+       _ = top.Init(context.Background())
+       defer top.Close()
+
+       vals := []float64{12.5, 3.0, 99.25, 7.1, 42.0, 88.0}
+       b := vectorized.NewRecordBatch(s, len(vals))
+       col := b.Columns[0].(*vectorized.TypedColumn[*modelv1.FieldValue])
+       for _, v := range vals {
+               col.Append(&modelv1.FieldValue{Value: 
&modelv1.FieldValue_Float{Float: &modelv1.Float{Value: v}}})
+       }
+       b.Len = len(vals)
+
+       if err := top.Consume(context.Background(), b); err != nil {
+               t.Fatalf("Consume must not panic on passthrough FieldValue key: 
%v", err)
+       }
+       if err := top.Finalize(context.Background()); err != nil {
+               t.Fatal(err)
+       }
+       out, err := top.NextBatch(context.Background())
+       if err != nil {
+               t.Fatal(err)
+       }
+       if out == nil || out.Len != 3 {
+               t.Fatalf("want 3 rows, got %v", out)
+       }
+       got := 
out.Columns[0].(*vectorized.TypedColumn[*modelv1.FieldValue]).Data()
+       want := []float64{99.25, 88.0, 42.0} // highest 3, descending
+       for i := range want {
+               if g := got[i].GetFloat().GetValue(); g != want[i] {
+                       t.Fatalf("row %d: want %v, got %v", i, want[i], g)
+               }
+       }
+}
+
 func TestBatchTop_Close_AfterPartialConsume_ReleasesHeapAllocation(t 
*testing.T) {
        s := topTestSchema()
        top := NewBatchTop(s, 0, 3, true, 8)

Reply via email to