This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 9128d9fc2ac0c3c42af458133c1d75899f14adca Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 15 10:10:49 2026 +0000 feat(query/vectorized/measure/plan): G9a single-node Top dispatch + analyzer sort-direction fix --- pkg/query/vectorized/measure/plan/analyzer.go | 6 ++- pkg/query/vectorized/measure/plan/analyzer_test.go | 35 ++++++++++++++ pkg/query/vectorized/measure/plan/dispatch.go | 14 +++--- pkg/query/vectorized/measure/plan/dispatch_test.go | 46 ++++++++++++------ pkg/query/vectorized/measure/plan/top.go | 8 ++-- pkg/query/vectorized/measure/top.go | 54 +++++++++++++++++----- pkg/query/vectorized/measure/top_test.go | 44 ++++++++++++++++++ 7 files changed, 169 insertions(+), 38 deletions(-) diff --git a/pkg/query/vectorized/measure/plan/analyzer.go b/pkg/query/vectorized/measure/plan/analyzer.go index 6eee444fb..035ae3882 100644 --- a/pkg/query/vectorized/measure/plan/analyzer.go +++ b/pkg/query/vectorized/measure/plan/analyzer.go @@ -22,6 +22,7 @@ import ( databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/model" measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -137,7 +138,10 @@ func Analyze(req *measurev1.QueryRequest, measureSchema *databasev1.Measure) (Ve } if t := req.GetTop(); t != nil { - asc := t.GetFieldValueSort() == 1 // SORT_ASC == 1 in modelv1.Sort + // Match the row path (pkg/query/logical/measure.unresolvedTop): + // FieldValueSort==SORT_ASC keeps the lowest N (BatchTop asc=true); + // anything else (SORT_DESC / SORT_UNSPECIFIED) keeps the highest N. + asc := t.GetFieldValueSort() == modelv1.Sort_SORT_ASC plan = NewTop(plan, t.GetFieldName(), int(t.GetNumber()), asc) } diff --git a/pkg/query/vectorized/measure/plan/analyzer_test.go b/pkg/query/vectorized/measure/plan/analyzer_test.go index f7041dd87..544b6fc5b 100644 --- a/pkg/query/vectorized/measure/plan/analyzer_test.go +++ b/pkg/query/vectorized/measure/plan/analyzer_test.go @@ -142,6 +142,41 @@ func TestAnalyze_TopBetweenGroupByAggAndLimit(t *testing.T) { if _, ok := top.Child.(*GroupByAgg); !ok { t.Fatalf("Top child: want *GroupByAgg, got %T", top.Child) } + // SORT_DESC must map to a descending top (asc=false) so the vec + // analyzer matches the row path's reverted semantics. Sort enum + // values are SORT_UNSPECIFIED=0, SORT_DESC=1, SORT_ASC=2 — a naive + // "==1 means asc" inverts every Top fixture. + if top.Asc { + t.Fatal("SORT_DESC must produce a descending Top (Asc=false)") + } +} + +func TestAnalyze_TopSortAsc_MapsToAscending(t *testing.T) { + req := &measurev1.QueryRequest{ + Name: "demo", + TagProjection: projTagProj(), + FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"value"}}, + Top: &measurev1.QueryRequest_Top{ + Number: 3, + FieldName: "value", + FieldValueSort: modelv1.Sort_SORT_ASC, + }, + } + p, err := Analyze(req, testMeasureSchema()) + if err != nil { + t.Fatalf("Analyze: %v", err) + } + limit, ok := p.(*Limit) + if !ok { + t.Fatalf("root: want *Limit, got %T", p) + } + top, ok := limit.Child.(*Top) + if !ok { + t.Fatalf("Limit child: want *Top, got %T", limit.Child) + } + if !top.Asc { + t.Fatal("SORT_ASC must produce an ascending Top (Asc=true)") + } } // TestAnalyze_GroupByWithoutAgg_BuildsRawGroupBy verifies the raw diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index bafceb089..c1864a21e 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -91,8 +91,9 @@ func FellThroughCount() int64 { return fellThroughCount.Load() } // - request may carry GroupBy and/or Agg in any combination (group+agg, // scalar reduce, raw GroupBy); plan.Analyze auto-extends the // projection so the keys / agg field always resolve -// - request must NOT carry Top (BatchTop's single-heap semantic differs -// from the row path's per-timestamp top-N) +// - request may carry Top: the analyzer emits Scan → Top → Limit +// (or Scan → GroupByAgg → Top → Limit) and BatchTop reproduces the +// row path's top-N (G9a) // - request must carry TimeRange (storage requires a bounded window) // - hidden criteria tags (criteria tags absent from the projection) // are projected for storage-side filtering, then stripped at egress @@ -126,12 +127,9 @@ func Dispatch( if req == nil { return nil, "", false, nil } - if req.GetTop() != nil { - // Top awaits per-timestamp partitioning of BatchTop; the row - // path's TopN semantic differs from BatchTop's single-heap - // today. - return nil, "", false, nil - } + // Top is handled by the vec subsystem: plan.Analyze emits + // Scan → Top → Limit (or Scan → GroupByAgg → Top → Limit) and + // BatchTop reproduces the row path's top-N (G9a). // GroupBy and Agg are handled by the vec subsystem in all three // shapes — group+agg, scalar reduce (Agg only), raw GroupBy (GroupBy // only). plan.Analyze auto-extends the projection so the GroupBy keys diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index a809995b2..2e2a979a8 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -163,21 +163,37 @@ func TestDispatch_GroupByAggUncoveredProjection_FallsThrough(t *testing.T) { } } -// TestDispatch_Top_FallsThrough covers the per-timestamp top-N gap. -func TestDispatch_Top_FallsThrough(t *testing.T) { +// TestDispatch_Top_ReachesEcQuery confirms G9a removed the Top gate: +// req.Top no longer triggers an eligibility fall-through. The request +// proceeds to ec.Query, where the analyzer has emitted +// Scan → Top → Limit. fakeEC returns nil so dispatch falls through after +// ec.Query (the empty-result branch) — what matters is that ec.Query was +// invoked at all, proving the Top gate no longer rejects the request. +func TestDispatch_Top_ReachesEcQuery(t *testing.T) { + measureSchema := testMeasureSchema() + // nolint:staticcheck // SA1019 — row-path BuildSchema is the only schema builder until G8 replaces it. + logicalSchema, schemaErr := logicalmeasure.BuildSchema(measureSchema, nil) + if schemaErr != nil { + t.Fatalf("BuildSchema: %v", schemaErr) + } + metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + ec := &fakeEC{wantResult: nil, wantErr: nil} + req := bareReq() req.Top = &measurev1.QueryRequest_Top{ Number: 5, FieldName: "value", FieldValueSort: modelv1.Sort_SORT_DESC, } - _, _, handled, err := Dispatch(context.Background(), - req, nil, nil, nil, nil, dispatchCfg(true)) + + iter, planStr, handled, err := Dispatch(context.Background(), + req, metadata, measureSchema, logicalSchema, ec, dispatchCfg(true)) if err != nil { - t.Fatalf("Top fallthrough must not error: %v", err) + t.Fatalf("Top request must not error before ec.Query: %v", err) } - if handled { - t.Fatal("Top must fall through (BatchTop semantics differ from row TopN)") + if !ec.called { + t.Fatalf("Top request must reach ec.Query (G9a removed the Top gate); "+ + "got iter=%v planStr=%q handled=%v", iter, planStr, handled) } } @@ -467,17 +483,17 @@ func TestDispatch_Counters_TrackFellThroughCalls(t *testing.T) { startHandled := HandledCount() startFellThrough := FellThroughCount() - // Three fallthroughs of distinct shapes. topReq trips the Top gate; - // the other two trip the PERMANENT nil-runtime-context guard (all-nil - // schema/ec/metadata). Note: post-G9c a nil TimeRange is no longer a - // fall-through on its own — noTimeReq falls through here only because - // the runtime context is nil. - topReq := bareReq() - topReq.Top = &measurev1.QueryRequest_Top{Number: 5, FieldName: "value"} + // Post-G9 the Top / GroupBy / nil-TimeRange shapes are all handled by + // the vec subsystem, so these requests fall through only via the + // PERMANENT nil-runtime-context guard (Dispatch is called with all-nil + // schema/ec/metadata in the loop below). The counter is still + // exercised on the clean (non-error) fall-through path. + gbReq := bareReq() + gbReq.GroupBy = &measurev1.QueryRequest_GroupBy{TagProjection: projTagProj(), FieldName: "value"} noTimeReq := bareReq() noTimeReq.TimeRange = nil - for _, req := range []*measurev1.QueryRequest{topReq, noTimeReq, bareReq() /* nil ec */} { + for _, req := range []*measurev1.QueryRequest{gbReq, noTimeReq, bareReq() /* nil ec */} { _, _, handled, dispatchErr := Dispatch(context.Background(), req, nil, nil, nil, nil, dispatchCfg(true)) if dispatchErr != nil { diff --git a/pkg/query/vectorized/measure/plan/top.go b/pkg/query/vectorized/measure/plan/top.go index 65dc62628..f80f1bfcc 100644 --- a/pkg/query/vectorized/measure/plan/top.go +++ b/pkg/query/vectorized/measure/plan/top.go @@ -26,9 +26,11 @@ import ( ) // Top selects the top-N (or bottom-N when Asc) rows by FieldName. Wraps -// `measure.BatchTop`, which uses a single global heap — the row path's -// per-timestamp TopN semantic is not yet reproduced here (BatchTop -// extension is tracked separately; see G6 plan). +// `measure.BatchTop`, a single global heap. This matches the row path's +// req.Top handler (pkg/query/logical/measure.topOp), which also inserts +// every data point into one TopQueue — req.Top is a whole-result top-N, +// not the per-timestamp TopNQuery RPC (out of scope, see .omc/g9-plan.md +// G9a). // // Schema-preserving. type Top struct { diff --git a/pkg/query/vectorized/measure/top.go b/pkg/query/vectorized/measure/top.go index 4bf63e0ba..f58fc99b3 100644 --- a/pkg/query/vectorized/measure/top.go +++ b/pkg/query/vectorized/measure/top.go @@ -21,6 +21,7 @@ import ( "container/heap" "context" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" ) @@ -147,9 +148,8 @@ func (t *BatchTop) Consume(_ context.Context, b *vectorized.RecordBatch) error { return nil } active := activeIndices(b) - isFloat := t.schema.Columns[t.fieldCol].Type == vectorized.ColumnTypeFloat64 for _, rowIdx := range active { - candidate := t.materialize(b, int(rowIdx), isFloat) + candidate := t.materialize(b, int(rowIdx)) candidate.seq = t.inputCount t.inputCount++ if t.heapState.Len() < t.n { @@ -226,22 +226,54 @@ func (t *BatchTop) Close() error { // materialize copies row rowIdx of b into a new topRow, reading the sort key // from the configured field column. -func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int, isFloat bool) *topRow { +// +// The key column may be a native typed column (ColumnTypeInt64 / +// ColumnTypeFloat64 — promoted when an Agg reduces over the field) or a +// passthrough *modelv1.FieldValue column (ColumnTypeFieldValue — the +// non-Agg Scan→Top→Limit path, where BuildBatchSchema leaves projected +// fields as passthrough). Both are handled so the float/int decision is +// per-column-shape, matching the row path's schema-field-type dispatch +// (pkg/query/logical/measure.topOp.Execute). +func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int) *topRow { cols := make([]vectorized.Column, len(t.schema.Columns)) for i, def := range t.schema.Columns { cols[i] = vectorized.NewColumnForType(def.Type, 1) copyOneValue(cols[i], b.Columns[i], rowIdx) } - row := &topRow{cols: cols, isFloat: isFloat} + row := &topRow{cols: cols} keyCol := b.Columns[t.fieldCol] - if keyCol.IsNull(rowIdx) { + switch c := keyCol.(type) { + case *vectorized.TypedColumn[float64]: + row.isFloat = true + if c.IsNull(rowIdx) { + row.isNull = true + return row + } + row.floatVal = c.Data()[rowIdx] + case *vectorized.TypedColumn[int64]: + if c.IsNull(rowIdx) { + row.isNull = true + return row + } + row.intVal = c.Data()[rowIdx] + case *vectorized.TypedColumn[*modelv1.FieldValue]: + fv := c.Data()[rowIdx] + switch v := fv.GetValue().(type) { + case *modelv1.FieldValue_Float: + row.isFloat = true + row.floatVal = v.Float.GetValue() + case *modelv1.FieldValue_Int: + row.intVal = v.Int.GetValue() + default: + // Null / unset / non-numeric field value: treat as lowest, + // matching the native-column null handling and the row path. + row.isNull = true + } + default: + // Unexpected key column shape (string / bytes / arrays): no + // numeric sort key. Treat as lowest so the query still completes + // rather than panicking. row.isNull = true - return row - } - if isFloat { - row.floatVal = keyCol.(*vectorized.TypedColumn[float64]).Data()[rowIdx] - } else { - row.intVal = keyCol.(*vectorized.TypedColumn[int64]).Data()[rowIdx] } return row } diff --git a/pkg/query/vectorized/measure/top_test.go b/pkg/query/vectorized/measure/top_test.go index 3515d2203..c68202d40 100644 --- a/pkg/query/vectorized/measure/top_test.go +++ b/pkg/query/vectorized/measure/top_test.go @@ -21,6 +21,7 @@ import ( "context" "testing" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" ) @@ -205,6 +206,49 @@ func TestBatchTop_ZeroN_NoOp(t *testing.T) { } } +// TestBatchTop_FieldValuePassthroughKey_Float pins the G9a regression: the +// non-Agg Scan→Top→Limit path leaves the projected field as a passthrough +// ColumnTypeFieldValue column (BuildBatchSchema only promotes Agg fields to +// native typed columns). BatchTop must read the float sort key out of the +// *modelv1.FieldValue rather than panicking on a TypedColumn[int64] cast. +func TestBatchTop_FieldValuePassthroughKey_Float(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeFieldValue}, + }) + top := NewBatchTop(s, 0, 3, false, 8) // desc top-3 + _ = top.Init(context.Background()) + defer top.Close() + + vals := []float64{12.5, 3.0, 99.25, 7.1, 42.0, 88.0} + b := vectorized.NewRecordBatch(s, len(vals)) + col := b.Columns[0].(*vectorized.TypedColumn[*modelv1.FieldValue]) + for _, v := range vals { + col.Append(&modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: v}}}) + } + b.Len = len(vals) + + if err := top.Consume(context.Background(), b); err != nil { + t.Fatalf("Consume must not panic on passthrough FieldValue key: %v", err) + } + if err := top.Finalize(context.Background()); err != nil { + t.Fatal(err) + } + out, err := top.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if out == nil || out.Len != 3 { + t.Fatalf("want 3 rows, got %v", out) + } + got := out.Columns[0].(*vectorized.TypedColumn[*modelv1.FieldValue]).Data() + want := []float64{99.25, 88.0, 42.0} // highest 3, descending + for i := range want { + if g := got[i].GetFloat().GetValue(); g != want[i] { + t.Fatalf("row %d: want %v, got %v", i, want[i], g) + } + } +} + func TestBatchTop_Close_AfterPartialConsume_ReleasesHeapAllocation(t *testing.T) { s := topTestSchema() top := NewBatchTop(s, 0, 3, true, 8)
