This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 278b2949091ab0295ab049906a82c47757c77b92 Author: Hongtao Gao <[email protected]> AuthorDate: Mon May 4 06:40:31 2026 +0000 feat(query/vectorized/measure): add operators + output (gate G3) Third gate of the vectorized query pipeline. Implements the four core measure operators (Limit, GroupBy, Aggregation, Top), the RecordBatch serializer, and the MIterator adapter that lets the existing gRPC handler drive a vectorized pipeline transparently. Modules: - limit.go BatchLimit FusibleOperator. Window via cumulative-seen counter; returns vectorized.ErrLimitExhausted on close, fused stage emits the truncated batch + EOF on next pull. - groupby.go BatchGroupBy BreakerOperator. Per-batch reserve+refund memory accounting (entrySize + rowSize); fnv64a hashed buckets with collision-chain disambiguation; length- prefixed key encoding to prevent NUL-in-key collisions; Close releases every outstanding reservation. - aggregation.go BatchAggregation BreakerOperator. AggMode dispatcher (All/Map/Reduce) with Map/Reduce returning ErrAggModeNotImplemented per-method (R4 distributed forward-compat). Per-function arithmetic delegates to pkg/query/aggregation Map[N] so int/float semantics match the row path. - top.go BatchTop BreakerOperator. Bounded heap of size N keyed on a designated field column; stable tie-break via seq ordering; nulls treated as lowest; n<=0 short-circuits. - serialize.go serializeBatchToProto. Iterates active rows, builds measurev1.InternalDataPoint with grouped TagFamilies + Fields. The focal point of differential parity testing. - adapter.go vectorizedMIterator. Adapts a vectorized.Pipeline to executor.MIterator so the existing gRPC handler can drive a vectorized query without knowing it is columnar. Folded into this gate: revert of the post-G2 TagValue_Timestamp case in extract.go (Copilot G3 review issue #4) — without schema-level tag-kind metadata the variant cannot round-trip through serialize, so v1 rejects it consistently with the row path's mustDecodeTagValue. Risks closed: R4 distributed forward-compat (AggMode dispatcher present, Map/Reduce paths exist and explicitly error rather than silently behave wrong). 92 tests, all green under -race in ~1s. go vet clean. Includes regression pins for both Copilot review rounds: NUL-in-key collisions, +0/-0 group equivalence, BatchTop n<=0 no-op, Timestamp-variant rejection. --- pkg/query/vectorized/measure/adapter.go | 81 ++++ pkg/query/vectorized/measure/adapter_test.go | 119 ++++++ pkg/query/vectorized/measure/aggregation.go | 369 ++++++++++++++++++ pkg/query/vectorized/measure/aggregation_test.go | 474 +++++++++++++++++++++++ pkg/query/vectorized/measure/extract.go | 12 +- pkg/query/vectorized/measure/extract_test.go | 18 +- pkg/query/vectorized/measure/groupby.go | 291 ++++++++++++++ pkg/query/vectorized/measure/groupby_test.go | 433 +++++++++++++++++++++ pkg/query/vectorized/measure/limit.go | 85 ++++ pkg/query/vectorized/measure/limit_test.go | 154 ++++++++ pkg/query/vectorized/measure/serialize.go | 127 ++++++ pkg/query/vectorized/measure/serialize_test.go | 169 ++++++++ pkg/query/vectorized/measure/top.go | 247 ++++++++++++ pkg/query/vectorized/measure/top_test.go | 221 +++++++++++ 14 files changed, 2785 insertions(+), 15 deletions(-) diff --git a/pkg/query/vectorized/measure/adapter.go b/pkg/query/vectorized/measure/adapter.go new file mode 100644 index 000000000..1e872f82d --- /dev/null +++ b/pkg/query/vectorized/measure/adapter.go @@ -0,0 +1,81 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// vectorizedMIterator adapts a vectorized Pipeline to the executor.MIterator +// interface so the existing gRPC handler can drive it without knowing it +// is internally columnar. +// +// Lifecycle: Next pulls a RecordBatch from the pipeline, serializes it via +// serializeBatchToProto, and exposes the result through Current. EOF and +// errors both terminate iteration; the latter is exposed via Err. +type vectorizedMIterator struct { + ctx context.Context + pipeline *vectorized.Pipeline + err error + current []*measurev1.InternalDataPoint + done bool +} + +// newVectorizedMIterator constructs an adapter bound to ctx. +func newVectorizedMIterator(ctx context.Context, p *vectorized.Pipeline) *vectorizedMIterator { + return &vectorizedMIterator{ctx: ctx, pipeline: p} +} + +// Next pulls one batch from the pipeline and serializes it. Returns true while +// data remains. After EOF or error, returns false and stays in the terminal state. +func (i *vectorizedMIterator) Next() bool { + if i.done { + return false + } + b, err := i.pipeline.Next(i.ctx) + if err != nil { + i.err = err + i.done = true + return false + } + if b == nil { + i.done = true + return false + } + i.current = serializeBatchToProto(b, i.current[:0]) + return true +} + +// Current returns the most recently serialized batch. +func (i *vectorizedMIterator) Current() []*measurev1.InternalDataPoint { + return i.current +} + +// Err returns the storage error that terminated iteration, or nil. +func (i *vectorizedMIterator) Err() error { + return i.err +} + +// Close delegates to the underlying pipeline. Pipeline.Close is idempotent, +// so repeated Close calls on this adapter are safe. +func (i *vectorizedMIterator) Close() error { + return i.pipeline.Close() +} diff --git a/pkg/query/vectorized/measure/adapter_test.go b/pkg/query/vectorized/measure/adapter_test.go new file mode 100644 index 000000000..56d455ff8 --- /dev/null +++ b/pkg/query/vectorized/measure/adapter_test.go @@ -0,0 +1,119 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// buildAdapterPipeline wires a BatchScan into a minimal Pipeline against the +// supplied fake MeasureQueryResult. +func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) *vectorized.Pipeline { + t.Helper() + schema := minimalSchema() + pool := vectorized.NewBatchPool(schema, 4) + scan := NewBatchScan(qr, schema, pool, 4) + p, err := vectorized.NewPipelineBuilder().From(scan).Build() + if err != nil { + t.Fatal(err) + } + if err := scan.Init(context.Background()); err != nil { + t.Fatal(err) + } + return p +} + +func TestVectorizedMIterator_Next_PullsAndSerializes(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100, 200)}} + p := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p) + defer it.Close() + + if !it.Next() { + t.Fatalf("Next must return true while pipeline has data; err=%v", it.Err()) + } + dps := it.Current() + if len(dps) != 2 { + t.Fatalf("Current length: want 2, got %d", len(dps)) + } +} + +func TestVectorizedMIterator_Next_ReturnsFalseOnEOF(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: nil} + p := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p) + defer it.Close() + + if it.Next() { + t.Fatal("Next on empty source must return false") + } + if err := it.Err(); err != nil { + t.Fatalf("EOF must not surface as error; got %v", err) + } +} + +func TestVectorizedMIterator_Next_ReturnsFalseOnError_ErrExposedViaErr(t *testing.T) { + boom := errors.New("storage boom") + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResultErr(boom)}} + p := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p) + defer it.Close() + + if it.Next() { + t.Fatal("Next must return false on storage error") + } + if !errors.Is(it.Err(), boom) { + t.Fatalf("Err must surface the storage error; got %v", it.Err()) + } +} + +func TestVectorizedMIterator_Current_ReturnsLastSerializedBatch(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + p := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p) + defer it.Close() + + _ = it.Next() + first := it.Current() + if len(first) != 1 { + t.Fatalf("first Current len: want 1, got %d", len(first)) + } + // Current called repeatedly should keep returning the last batch. + if got := it.Current(); len(got) != 1 { + t.Fatalf("repeat Current must return same batch; got len %d", len(got)) + } +} + +func TestVectorizedMIterator_Close_DelegatesToPipelineClose_Idempotent(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + p := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p) + if err := it.Close(); err != nil { + t.Fatal(err) + } + // Pipeline.Close is idempotent at the pipeline level; calling adapter.Close + // twice must not error. + if err := it.Close(); err != nil { + t.Fatalf("second Close must be no-op, got %v", err) + } +} diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go new file mode 100644 index 000000000..4a6f5f4c7 --- /dev/null +++ b/pkg/query/vectorized/measure/aggregation.go @@ -0,0 +1,369 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "fmt" + "slices" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/aggregation" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// AggMode selects the per-node aggregation strategy. v1 implements AggModeAll +// (single-node, full reduce). Map and Reduce are dispatcher slots reserved for +// future distributed work; they return ErrAggModeNotImplemented today so the +// switch site does not need to change when distributed lands. +type AggMode int + +// AggMode values. +const ( + AggModeAll AggMode = iota + AggModeMap + AggModeReduce +) + +// AggFunc selects the reduction function applied to a column. +type AggFunc int + +// AggFunc values. +const ( + AggSum AggFunc = iota + AggCount + AggMin + AggMax + AggMean +) + +// ErrAggModeNotImplemented is returned by Consume / Finalize / NextBatch when +// AggMode is Map or Reduce. v1 implements only AggModeAll; the dispatcher +// exists at the per-method level so distributed mode fills in those branches +// without an interface change. +var ErrAggModeNotImplemented = errors.New("vectorized.measure: AggMode Map/Reduce not implemented in v1") + +// AggSpec configures one aggregation output column. +type AggSpec struct { + Output string + Func AggFunc + InputCol int // index into the input schema; must be int64 or float64 +} + +// BatchAggregation is a BreakerOperator that groups input rows by the configured +// key columns and reduces value columns via the configured AggSpec list. +// +// Per-function arithmetic delegates to pkg/query/aggregation, the same package +// the row-based path uses. This keeps numeric semantics in lockstep across the +// two paths so a fix in one path is shared by the other. +// +// Output schema = key columns (same definitions as input) + one column per AggSpec. +// Output rows are emitted one per group, in group-insertion order, paginated by batchSize. +type BatchAggregation struct { + inputSchema *vectorized.BatchSchema + outputSchema *vectorized.BatchSchema + pool *vectorized.BatchPool + groups map[string]*aggGroup + insertion []*aggGroup + keyIndices []int + aggs []AggSpec + mode AggMode + batchSize int + cursor int + closed bool +} + +// aggGroup carries one bucket's reduction state plus a copy of its key column values. +type aggGroup struct { + keyCols []vectorized.Column // each holds exactly one row — the representative key + slots []aggSlot // one per AggSpec + key string +} + +// aggSlot holds an aggregation.Map of either int64 or float64. Whether int or +// float is decided at construction time by AggFunc + input column type: +// +// - SUM/MIN/MAX: matches input type (preserve precision). +// - COUNT: always int64. +// - MEAN: always float64 (so int inputs yield fractional means). +// +// Exactly one of intMap/floatMap is non-nil per slot. +type aggSlot struct { + intMap aggregation.Map[int64] + floatMap aggregation.Map[float64] + fn AggFunc + inputIsFloat bool +} + +// NewBatchAggregation constructs a BatchAggregation. It builds the output +// schema internally (keys + agg outputs) and owns its output BatchPool. +func NewBatchAggregation( + input *vectorized.BatchSchema, keyIndices []int, + aggs []AggSpec, mode AggMode, batchSize int, +) *BatchAggregation { + outputSchema := buildAggOutputSchema(input, keyIndices, aggs) + return &BatchAggregation{ + inputSchema: input, + outputSchema: outputSchema, + pool: vectorized.NewBatchPool(outputSchema, batchSize), + keyIndices: slices.Clone(keyIndices), + aggs: slices.Clone(aggs), + mode: mode, + batchSize: batchSize, + } +} + +// Init prepares the group map. It does NOT validate the mode — mode rejection +// happens at the per-method level (Consume/Finalize/NextBatch) so the +// dispatcher matches the spec's distributed forward-compat language. +func (a *BatchAggregation) Init(_ context.Context) error { + a.groups = make(map[string]*aggGroup) + return nil +} + +// OutputSchema returns the schema of emitted batches: key columns followed by +// agg output columns. +func (a *BatchAggregation) OutputSchema() *vectorized.BatchSchema { return a.outputSchema } + +// Consume folds every active row into its group's accumulator. Null values are +// excluded from aggregation (count not incremented; sum/min/max unchanged). +func (a *BatchAggregation) Consume(_ context.Context, b *vectorized.RecordBatch) error { + if a.mode != AggModeAll { + return ErrAggModeNotImplemented + } + active := activeIndices(b) + for _, rowIdx := range active { + key := a.computeKey(b, int(rowIdx)) + group, exists := a.groups[key] + if !exists { + newGroup, newErr := a.newGroup(b, int(rowIdx), key) + if newErr != nil { + return newErr + } + a.groups[key] = newGroup + a.insertion = append(a.insertion, newGroup) + group = newGroup + } + for slotIdx, spec := range a.aggs { + a.fold(b, int(rowIdx), &group.slots[slotIdx], spec) + } + } + return nil +} + +// Finalize rejects unsupported modes; AggModeAll is a no-op (accumulators +// are eagerly maintained in Consume). +func (a *BatchAggregation) Finalize(_ context.Context) error { + if a.mode != AggModeAll { + return ErrAggModeNotImplemented + } + return nil +} + +// NextBatch emits aggregated rows in group-insertion order, paginated by batchSize. +func (a *BatchAggregation) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { + if a.mode != AggModeAll { + return nil, ErrAggModeNotImplemented + } + if a.cursor >= len(a.insertion) { + return nil, nil + } + out := a.pool.Get() + for out.Len < a.batchSize && a.cursor < len(a.insertion) { + group := a.insertion[a.cursor] + a.emitGroupRow(out, group) + out.Len++ + a.cursor++ + } + if out.Len == 0 { + a.pool.Put(out) + return nil, nil + } + return out, nil +} + +// Close releases the group map. Idempotent. +func (a *BatchAggregation) Close() error { + if a.closed { + return nil + } + a.closed = true + a.groups = nil + a.insertion = nil + return nil +} + +func (a *BatchAggregation) newGroup(b *vectorized.RecordBatch, rowIdx int, key string) (*aggGroup, error) { + keyCols := make([]vectorized.Column, len(a.keyIndices)) + for i, kIdx := range a.keyIndices { + keyCols[i] = vectorized.NewColumnForType(a.inputSchema.Columns[kIdx].Type, 1) + copyOneValue(keyCols[i], b.Columns[kIdx], rowIdx) + } + slots := make([]aggSlot, len(a.aggs)) + for i, spec := range a.aggs { + inputIsFloat := a.inputSchema.Columns[spec.InputCol].Type == vectorized.ColumnTypeFloat64 + slot, slotErr := newAggSlot(spec.Func, inputIsFloat) + if slotErr != nil { + return nil, slotErr + } + slots[i] = slot + } + return &aggGroup{key: key, keyCols: keyCols, slots: slots}, nil +} + +// fold delegates one row's value to the slot's underlying aggregation.Map. +// Nulls are skipped — neither the count nor the running min/max/sum is touched. +func (a *BatchAggregation) fold(b *vectorized.RecordBatch, rowIdx int, slot *aggSlot, spec AggSpec) { + col := b.Columns[spec.InputCol] + if col.IsNull(rowIdx) { + return + } + if slot.intMap != nil { + var v int64 + if slot.inputIsFloat { + v = int64(col.(*vectorized.TypedColumn[float64]).Data()[rowIdx]) + } else { + v = col.(*vectorized.TypedColumn[int64]).Data()[rowIdx] + } + slot.intMap.In(v) + return + } + var v float64 + if slot.inputIsFloat { + v = col.(*vectorized.TypedColumn[float64]).Data()[rowIdx] + } else { + v = float64(col.(*vectorized.TypedColumn[int64]).Data()[rowIdx]) + } + slot.floatMap.In(v) +} + +func (a *BatchAggregation) emitGroupRow(out *vectorized.RecordBatch, group *aggGroup) { + // Key columns come first in the output schema, in keyIndices order. + for i := range a.keyIndices { + copyOneValue(out.Columns[i], group.keyCols[i], 0) + } + // Then agg output columns. + for slotIdx := range a.aggs { + colIdx := len(a.keyIndices) + slotIdx + group.slots[slotIdx].write(out.Columns[colIdx]) + } +} + +func (a *BatchAggregation) computeKey(b *vectorized.RecordBatch, rowIdx int) string { + // Shared encoding with BatchGroupBy (length-prefixed variable components, + // canonicalised float zero) — see appendKeyComponent in groupby.go. + var sb [64]byte + buf := sb[:0] + for _, kIdx := range a.keyIndices { + buf = appendKeyComponent(buf, b.Columns[kIdx], rowIdx) + } + return string(buf) +} + +// newAggSlot builds an aggregation.Map of the appropriate numeric type for the +// (function, input type) pair. The mapping rules mirror aggOutputType so the +// slot's value can be Append'd directly to the typed output column. +func newAggSlot(fn AggFunc, inputIsFloat bool) (aggSlot, error) { + af, modelErr := toModelAggFunc(fn) + if modelErr != nil { + return aggSlot{}, modelErr + } + slot := aggSlot{fn: fn, inputIsFloat: inputIsFloat} + useFloat := false + switch fn { + case AggCount: + useFloat = false + case AggMean: + useFloat = true + default: + useFloat = inputIsFloat + } + if useFloat { + m, mapErr := aggregation.NewMap[float64](af) + if mapErr != nil { + return aggSlot{}, mapErr + } + slot.floatMap = m + } else { + m, mapErr := aggregation.NewMap[int64](af) + if mapErr != nil { + return aggSlot{}, mapErr + } + slot.intMap = m + } + return slot, nil +} + +// write emits the slot's reduced value to the typed output column. +func (s *aggSlot) write(col vectorized.Column) { + if s.intMap != nil { + col.(*vectorized.TypedColumn[int64]).Append(s.intMap.Val()) + return + } + col.(*vectorized.TypedColumn[float64]).Append(s.floatMap.Val()) +} + +func toModelAggFunc(fn AggFunc) (modelv1.AggregationFunction, error) { + switch fn { + case AggSum: + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM, nil + case AggCount: + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_COUNT, nil + case AggMin: + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN, nil + case AggMax: + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX, nil + case AggMean: + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN, nil + } + return modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED, + fmt.Errorf("vectorized.measure: unknown AggFunc %d", fn) +} + +func buildAggOutputSchema( + input *vectorized.BatchSchema, keyIndices []int, aggs []AggSpec, +) *vectorized.BatchSchema { + defs := make([]vectorized.ColumnDef, 0, len(keyIndices)+len(aggs)) + for _, ki := range keyIndices { + defs = append(defs, input.Columns[ki]) + } + for _, agg := range aggs { + defs = append(defs, vectorized.ColumnDef{ + Role: vectorized.RoleField, + Name: agg.Output, + Type: aggOutputType(input.Columns[agg.InputCol].Type, agg.Func), + }) + } + return vectorized.NewBatchSchema(defs) +} + +// aggOutputType maps (input type, agg func) to the output column type. +// - COUNT is always int64. +// - MEAN is always float64. +// - SUM/MIN/MAX preserve the input type. +func aggOutputType(in vectorized.ColumnType, fn AggFunc) vectorized.ColumnType { + switch fn { + case AggCount: + return vectorized.ColumnTypeInt64 + case AggMean: + return vectorized.ColumnTypeFloat64 + } + return in +} + diff --git a/pkg/query/vectorized/measure/aggregation_test.go b/pkg/query/vectorized/measure/aggregation_test.go new file mode 100644 index 000000000..8c77c519f --- /dev/null +++ b/pkg/query/vectorized/measure/aggregation_test.go @@ -0,0 +1,474 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "testing" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/aggregation" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +func aggIntSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) +} + +func aggFloatSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "g", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeFloat64}, + }) +} + +// feedAggInt builds a single batch of (g,v) int64 pairs and Consumes it. +func feedAggInt(t *testing.T, op *BatchAggregation, schema *vectorized.BatchSchema, pairs ...struct { + g string + v int64 + null bool +}, +) { + t.Helper() + b := vectorized.NewRecordBatch(schema, len(pairs)) + gCol := b.Columns[0].(*vectorized.TypedColumn[string]) + vCol := b.Columns[1].(*vectorized.TypedColumn[int64]) + for _, p := range pairs { + gCol.Append(p.g) + if p.null { + vCol.AppendNull() + } else { + vCol.Append(p.v) + } + } + b.Len = len(pairs) + if err := op.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } +} + +// findAggRow returns the output row index whose key column has the requested value. +func findAggRow(t *testing.T, b *vectorized.RecordBatch, key string) int { + t.Helper() + col := b.Columns[0].(*vectorized.TypedColumn[string]) + for i := range b.Len { + if col.Data()[i] == key { + return i + } + } + t.Fatalf("output row for key %q not found", key) + return -1 +} + +func TestBatchAggregation_AggModeAll_SumInt64(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + feedAggInt(t, op, s, + struct { + g string + v int64 + null bool + }{"a", 1, false}, + struct { + g string + v int64 + null bool + }{"b", 3, false}, + struct { + g string + v int64 + null bool + }{"a", 2, false}, + struct { + g string + v int64 + null bool + }{"b", 4, false}, + ) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + if out.Len != 2 { + t.Fatalf("Len: want 2, got %d", out.Len) + } + sums := out.Columns[1].(*vectorized.TypedColumn[int64]).Data() + if got := sums[findAggRow(t, out, "a")]; got != 3 { + t.Fatalf("sum(a): want 3, got %d", got) + } + if got := sums[findAggRow(t, out, "b")]; got != 7 { + t.Fatalf("sum(b): want 7, got %d", got) + } +} + +func TestBatchAggregation_AggModeAll_SumFloat64(t *testing.T) { + s := aggFloatSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + b := vectorized.NewRecordBatch(s, 3) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("x") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(1.5) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("x") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(2.5) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("y") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(0.5) + b.Len = 3 + if err := op.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + sums := out.Columns[1].(*vectorized.TypedColumn[float64]).Data() + if got := sums[findAggRow(t, out, "x")]; got != 4.0 { + t.Fatalf("sum(x): want 4.0, got %v", got) + } +} + +func TestBatchAggregation_AggModeAll_Count(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggCount, InputCol: 1, Output: "n"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + feedAggInt(t, op, s, + struct { + g string + v int64 + null bool + }{"a", 0, false}, + struct { + g string + v int64 + null bool + }{"a", 0, false}, + struct { + g string + v int64 + null bool + }{"a", 0, false}, + struct { + g string + v int64 + null bool + }{"b", 0, false}, + ) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + counts := out.Columns[1].(*vectorized.TypedColumn[int64]).Data() + if got := counts[findAggRow(t, out, "a")]; got != 3 { + t.Fatalf("count(a): want 3, got %d", got) + } + if got := counts[findAggRow(t, out, "b")]; got != 1 { + t.Fatalf("count(b): want 1, got %d", got) + } +} + +func TestBatchAggregation_AggModeAll_Min_Int64(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggMin, InputCol: 1, Output: "min_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + feedAggInt(t, op, s, + struct { + g string + v int64 + null bool + }{"a", 7, false}, + struct { + g string + v int64 + null bool + }{"a", 3, false}, + struct { + g string + v int64 + null bool + }{"a", 5, false}, + ) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + mins := out.Columns[1].(*vectorized.TypedColumn[int64]).Data() + if got := mins[findAggRow(t, out, "a")]; got != 3 { + t.Fatalf("min(a): want 3, got %d", got) + } +} + +func TestBatchAggregation_AggModeAll_Max_Float64(t *testing.T) { + s := aggFloatSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggMax, InputCol: 1, Output: "max_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + b := vectorized.NewRecordBatch(s, 3) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(1.0) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(9.5) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(4.0) + b.Len = 3 + _ = op.Consume(context.Background(), b) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + maxs := out.Columns[1].(*vectorized.TypedColumn[float64]).Data() + if got := maxs[findAggRow(t, out, "g")]; got != 9.5 { + t.Fatalf("max(g): want 9.5, got %v", got) + } +} + +func TestBatchAggregation_AggModeAll_Mean_Float64(t *testing.T) { + s := aggFloatSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggMean, InputCol: 1, Output: "mean_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + b := vectorized.NewRecordBatch(s, 4) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(1.0) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(2.0) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(3.0) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("g") + b.Columns[1].(*vectorized.TypedColumn[float64]).Append(4.0) + b.Len = 4 + _ = op.Consume(context.Background(), b) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + means := out.Columns[1].(*vectorized.TypedColumn[float64]).Data() + if got := means[findAggRow(t, out, "g")]; got != 2.5 { + t.Fatalf("mean(g): want 2.5, got %v", got) + } +} + +func TestBatchAggregation_AggModeAll_NullField_ExcludedFromAggregation(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{ + {Func: AggSum, InputCol: 1, Output: "sum_v"}, + {Func: AggCount, InputCol: 1, Output: "n"}, + }, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + feedAggInt(t, op, s, + struct { + g string + v int64 + null bool + }{"a", 5, false}, + struct { + g string + v int64 + null bool + }{"a", 0, true}, // null — must be skipped + struct { + g string + v int64 + null bool + }{"a", 7, false}, + ) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + row := findAggRow(t, out, "a") + sum := out.Columns[1].(*vectorized.TypedColumn[int64]).Data()[row] + count := out.Columns[2].(*vectorized.TypedColumn[int64]).Data()[row] + if sum != 12 { + t.Fatalf("sum(a): null must be excluded; want 12, got %d", sum) + } + if count != 2 { + t.Fatalf("count(a): null must be excluded; want 2, got %d", count) + } +} + +// Pins the regression flagged by Copilot: distinct (a,b) tuples whose +// components contain embedded NUL bytes must NOT collapse into one group. +func TestBatchAggregation_NULInStringKey_NoCollision(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "a", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "b", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) + op := NewBatchAggregation(s, []int{0, 1}, + []AggSpec{{Func: AggSum, InputCol: 2, Output: "sum"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a\x00b") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("c") + b.Columns[2].(*vectorized.TypedColumn[int64]).Append(10) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("b\x00c") + b.Columns[2].(*vectorized.TypedColumn[int64]).Append(20) + b.Len = 2 + + _ = op.Consume(context.Background(), b) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + if out.Len != 2 { + t.Fatalf("two distinct (a,b) tuples must produce two output rows; got Len=%d (NUL-in-key collision merged)", out.Len) + } +} + +func TestBatchAggregation_AggModeMap_ReturnsErrNotImplemented(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeMap, 8) + if err := op.Init(context.Background()); err != nil { + t.Fatalf("Init must succeed regardless of mode; got %v", err) + } + defer op.Close() + b := vectorized.NewRecordBatch(s, 1) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + b.Len = 1 + if err := op.Consume(context.Background(), b); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("Consume(AggModeMap) must surface ErrAggModeNotImplemented, got %v", err) + } + if err := op.Finalize(context.Background()); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("Finalize(AggModeMap) must surface ErrAggModeNotImplemented, got %v", err) + } + if _, err := op.NextBatch(context.Background()); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("NextBatch(AggModeMap) must surface ErrAggModeNotImplemented, got %v", err) + } +} + +func TestBatchAggregation_AggModeReduce_ReturnsErrNotImplemented(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeReduce, 8) + if err := op.Init(context.Background()); err != nil { + t.Fatalf("Init must succeed regardless of mode; got %v", err) + } + defer op.Close() + b := vectorized.NewRecordBatch(s, 1) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + b.Len = 1 + if err := op.Consume(context.Background(), b); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("Consume(AggModeReduce) must surface ErrAggModeNotImplemented, got %v", err) + } + if err := op.Finalize(context.Background()); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("Finalize(AggModeReduce) must surface ErrAggModeNotImplemented, got %v", err) + } + if _, err := op.NextBatch(context.Background()); !errors.Is(err, ErrAggModeNotImplemented) { + t.Fatalf("NextBatch(AggModeReduce) must surface ErrAggModeNotImplemented, got %v", err) + } +} + +// TestBatchAggregation_DelegatesToAggregationPackage verifies that the same +// arithmetic computed via pkg/query/aggregation directly produces results +// identical to BatchAggregation's. This is the structural proof that +// BatchAggregation does not reimplement reduction logic — if it did, drift +// between the two paths would surface here. +func TestBatchAggregation_DelegatesToAggregationPackage(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{{Func: AggSum, InputCol: 1, Output: "sum_v"}}, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + + values := []int64{3, 1, 4, 1, 5, 9, 2, 6} + for _, v := range values { + feedAggInt(t, op, s, struct { + g string + v int64 + null bool + }{"a", v, false}) + } + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + gotSum := out.Columns[1].(*vectorized.TypedColumn[int64]).Data()[findAggRow(t, out, "a")] + + // Reference: same computation via aggregation.NewMap[int64] directly. + ref, refErr := aggregation.NewMap[int64](modelv1.AggregationFunction_AGGREGATION_FUNCTION_SUM) + if refErr != nil { + t.Fatal(refErr) + } + for _, v := range values { + ref.In(v) + } + wantSum := ref.Val() + if gotSum != wantSum { + t.Fatalf("BatchAggregation SUM (%d) must equal aggregation.NewMap SUM (%d) — divergence indicates delegation broke", + gotSum, wantSum) + } +} + +// TestBatchAggregation_Correctness_MatchesManualComputation pins parity-relevant +// arithmetic against a manual reference, complementing the delegation assertion. +func TestBatchAggregation_Correctness_MatchesManualComputation(t *testing.T) { + s := aggIntSchema() + op := NewBatchAggregation(s, []int{0}, + []AggSpec{ + {Func: AggSum, InputCol: 1, Output: "sum_v"}, + {Func: AggMin, InputCol: 1, Output: "min_v"}, + {Func: AggMax, InputCol: 1, Output: "max_v"}, + }, AggModeAll, 8) + _ = op.Init(context.Background()) + defer op.Close() + feedAggInt(t, op, s, + struct { + g string + v int64 + null bool + }{"a", 3, false}, + struct { + g string + v int64 + null bool + }{"a", 1, false}, + struct { + g string + v int64 + null bool + }{"a", 4, false}, + struct { + g string + v int64 + null bool + }{"a", 1, false}, + struct { + g string + v int64 + null bool + }{"a", 5, false}, + ) + _ = op.Finalize(context.Background()) + out, _ := op.NextBatch(context.Background()) + row := findAggRow(t, out, "a") + sum := out.Columns[1].(*vectorized.TypedColumn[int64]).Data()[row] + mn := out.Columns[2].(*vectorized.TypedColumn[int64]).Data()[row] + mx := out.Columns[3].(*vectorized.TypedColumn[int64]).Data()[row] + if sum != 14 || mn != 1 || mx != 5 { + t.Fatalf("sum/min/max: want 14/1/5, got %d/%d/%d", sum, mn, mx) + } +} diff --git a/pkg/query/vectorized/measure/extract.go b/pkg/query/vectorized/measure/extract.go index 5673b407c..70504f31e 100644 --- a/pkg/query/vectorized/measure/extract.go +++ b/pkg/query/vectorized/measure/extract.go @@ -80,14 +80,12 @@ func extractTagRow(col vectorized.Column, rowIdx int, tv *modelv1.TagValue) erro } c.Data()[rowIdx] = slices.Clone(v.StrArray.GetValue()) return nil - case *modelv1.TagValue_Timestamp: - c, ok := col.(*vectorized.TypedColumn[int64]) - if !ok { - return columnTypeMismatch(col, "int64", rowIdx) - } - c.Data()[rowIdx] = v.Timestamp.AsTime().UnixNano() - return nil default: + // TagValue_Timestamp falls through here in v1: the schema doesn't + // carry tag-kind metadata, so a timestamp variant cannot round-trip + // to TagValue_Timestamp on serialize. Failing fast here keeps the + // vectorized path consistent with the row-path's mustDecodeTagValue + // (which panics on Timestamp value type). return fmt.Errorf("vectorized.measure: unsupported TagValue variant %T at row %d", tv.Value, rowIdx) } } diff --git a/pkg/query/vectorized/measure/extract_test.go b/pkg/query/vectorized/measure/extract_test.go index f81557004..955df2e6a 100644 --- a/pkg/query/vectorized/measure/extract_test.go +++ b/pkg/query/vectorized/measure/extract_test.go @@ -113,15 +113,17 @@ func TestExtractTagRow_Str_WritesValue(t *testing.T) { } } -func TestExtractTagRow_Timestamp_WritesUnixNano(t *testing.T) { - ts := time.Date(2026, 5, 3, 12, 34, 56, 789, time.UTC) - col := vectorized.NewInt64Column(2) +// Per Copilot G3 review (option A): TagValue_Timestamp is unsupported in v1. +// Without schema-level tag-kind metadata, extract+serialize cannot round-trip +// the variant; the row path's mustDecodeTagValue panics on Timestamp tags +// anyway. This test pins the explicit-error contract — silent degradation is +// forbidden. +func TestExtractTagRow_Timestamp_ReturnsError(t *testing.T) { + ts := time.Date(2026, 5, 4, 12, 34, 56, 789, time.UTC) + col := vectorized.NewInt64Column(1) preallocInt64(col, 1) - if err := extractTagRow(col, 0, tvTimestamp(ts)); err != nil { - t.Fatal(err) - } - if got := col.Data()[0]; got != ts.UnixNano() { - t.Fatalf("timestamp roundtrip: got %d, want %d", got, ts.UnixNano()) + if err := extractTagRow(col, 0, tvTimestamp(ts)); err == nil { + t.Fatal("Timestamp variant unsupported in v1 — extract must return error, not silently degrade") } } diff --git a/pkg/query/vectorized/measure/groupby.go b/pkg/query/vectorized/measure/groupby.go new file mode 100644 index 000000000..37441d232 --- /dev/null +++ b/pkg/query/vectorized/measure/groupby.go @@ -0,0 +1,291 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "bytes" + "context" + "encoding/binary" + "fmt" + "hash/fnv" + "math" + "slices" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BatchGroupBy is a BreakerOperator that partitions input rows by the values +// of one or more key columns. Output preserves the input schema; rows are +// emitted in group-insertion order, with all rows of the first group flushed +// before the next group begins. +// +// Memory accounting follows a pessimistic-reserve, refund-unused pattern: +// +// - entrySize is the per-new-group bucket overhead. +// - rowSize is the per-row data cost (independent of group identity). +// - On Consume, reserve worst-case = activeRows * (entrySize + rowSize). +// - After actual consumption, refund (worstCaseNewGroups - actualNewGroups) * entrySize. +// +// Close releases every outstanding reservation, even mid-Consume cancellations. +type BatchGroupBy struct { + schema *vectorized.BatchSchema + pool *vectorized.BatchPool + tracker *vectorized.MemoryTracker + groups map[uint64][]*groupBucket + insertion []*groupBucket + keyIndices []int + keyEncBuf []byte // scratch buffer reused across rows during Consume + entrySize int64 + rowSize int64 + reserved int64 + batchSize int + cursorBucket int + cursorRow int + closed bool +} + +type groupBucket struct { + cols []vectorized.Column + keyBytes []byte +} + +// NewBatchGroupBy constructs a BatchGroupBy. +// +// - entrySize: per-new-group bucket overhead. +// - rowSize: per-row data cost. +// +// Pass rowSize=0 to charge only per-new-group; pass entrySize=0 to charge +// only per-row. +func NewBatchGroupBy( + schema *vectorized.BatchSchema, keyIndices []int, + pool *vectorized.BatchPool, batchSize int, + tracker *vectorized.MemoryTracker, entrySize, rowSize int64, +) *BatchGroupBy { + return &BatchGroupBy{ + schema: schema, + keyIndices: slices.Clone(keyIndices), + pool: pool, + batchSize: batchSize, + tracker: tracker, + entrySize: entrySize, + rowSize: rowSize, + } +} + +// Init prepares the group map. +func (g *BatchGroupBy) Init(_ context.Context) error { + g.groups = make(map[uint64][]*groupBucket) + return nil +} + +// OutputSchema returns the unchanged input schema. +func (g *BatchGroupBy) OutputSchema() *vectorized.BatchSchema { return g.schema } + +// Consume reserves the worst-case bytes for new groups + per-row cost, +// accumulates rows into per-group buckets, then refunds the unused reservation. +func (g *BatchGroupBy) Consume(_ context.Context, b *vectorized.RecordBatch) error { + active := activeIndices(b) + n := int64(len(active)) + estBytes := n*g.entrySize + n*g.rowSize + if estBytes > 0 { + if reserveErr := g.tracker.Reserve(estBytes); reserveErr != nil { + return fmt.Errorf("group-by memory budget exceeded: %w", reserveErr) + } + g.reserved += estBytes + } + actualBytes := g.consumeRows(b, active) + if actualBytes < estBytes { + refund := estBytes - actualBytes + g.tracker.Release(refund) + g.reserved -= refund + } + return nil +} + +func (g *BatchGroupBy) consumeRows(b *vectorized.RecordBatch, active []uint16) int64 { + var newGroups int64 + for _, rowIdx := range active { + bucket, isNew := g.findOrCreate(b, int(rowIdx)) + if isNew { + newGroups++ + } + for colIdx, srcCol := range b.Columns { + copyOneValue(bucket.cols[colIdx], srcCol, int(rowIdx)) + } + } + return newGroups*g.entrySize + int64(len(active))*g.rowSize +} + +// findOrCreate locates the bucket for the row's key, creating it if absent. +// Hash collisions are resolved by linear scan over the bucket chain — the +// keyBytes are compared exactly, so two distinct keys that hash to the same +// uint64 still map to different buckets. +func (g *BatchGroupBy) findOrCreate(b *vectorized.RecordBatch, rowIdx int) (*groupBucket, bool) { + g.keyEncBuf = g.encodeKey(g.keyEncBuf[:0], b, rowIdx) + h := hashKey(g.keyEncBuf) + candidates := g.groups[h] + for _, candidate := range candidates { + if bytes.Equal(candidate.keyBytes, g.keyEncBuf) { + return candidate, false + } + } + bucket := newGroupBucket(g.schema) + bucket.keyBytes = slices.Clone(g.keyEncBuf) + g.groups[h] = append(candidates, bucket) + g.insertion = append(g.insertion, bucket) + return bucket, true +} + +// Finalize is a no-op for v1 — groups are already materialized. +func (g *BatchGroupBy) Finalize(_ context.Context) error { return nil } + +// NextBatch emits accumulated rows in group-insertion order, paginated by +// batchSize. Returns (nil, nil) when all groups are drained. +func (g *BatchGroupBy) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { + if g.cursorBucket >= len(g.insertion) { + return nil, nil + } + out := g.pool.Get() + for out.Len < g.batchSize && g.cursorBucket < len(g.insertion) { + bucket := g.insertion[g.cursorBucket] + bucketLen := bucket.cols[0].Len() + for g.cursorRow < bucketLen && out.Len < g.batchSize { + for colIdx := range out.Columns { + copyOneValue(out.Columns[colIdx], bucket.cols[colIdx], g.cursorRow) + } + out.Len++ + g.cursorRow++ + } + if g.cursorRow >= bucketLen { + g.cursorBucket++ + g.cursorRow = 0 + } + } + if out.Len == 0 { + g.pool.Put(out) + return nil, nil + } + return out, nil +} + +// Close releases every outstanding memory reservation. Idempotent. +func (g *BatchGroupBy) Close() error { + if g.closed { + return nil + } + g.closed = true + if g.reserved > 0 { + g.tracker.Release(g.reserved) + g.reserved = 0 + } + g.groups = nil + g.insertion = nil + return nil +} + +// encodeKey serializes the configured key column values at rowIdx into a +// length-stable byte sequence suitable both as a hash input and as an +// exact-equality comparand on hash collisions. +// +// Variable-width components (string, bytes) are length-prefixed so embedded +// NUL bytes cannot collapse distinct tuples into the same encoding. Fixed- +// width components (int64, float64) need no prefix; float zero is +// canonicalised so +0.0 and -0.0 produce identical bytes. +func (g *BatchGroupBy) encodeKey(dst []byte, b *vectorized.RecordBatch, rowIdx int) []byte { + for _, kIdx := range g.keyIndices { + dst = appendKeyComponent(dst, b.Columns[kIdx], rowIdx) + } + return dst +} + +// appendKeyComponent encodes one column's value at rowIdx into a length- +// stable byte representation: +// - int64: 8 raw little-endian bytes. +// - float64: 8 little-endian bytes of math.Float64bits, with -0.0 → +0.0. +// - string: 4-byte little-endian length prefix + raw bytes. +// - bytes: 4-byte little-endian length prefix + raw bytes. +// +// This helper is shared by BatchGroupBy.encodeKey and BatchAggregation.computeKey +// so the two operators agree on key equivalence (Copilot G3 review issues 1+2). +func appendKeyComponent(dst []byte, col vectorized.Column, rowIdx int) []byte { + switch c := col.(type) { + case *vectorized.TypedColumn[int64]: + var b [8]byte + binary.LittleEndian.PutUint64(b[:], uint64(c.Data()[rowIdx])) + return append(dst, b[:]...) + case *vectorized.TypedColumn[float64]: + v := c.Data()[rowIdx] + if v == 0 { + v = 0 // canonicalise -0.0 → +0.0 so they hash identically + } + var b [8]byte + binary.LittleEndian.PutUint64(b[:], math.Float64bits(v)) + return append(dst, b[:]...) + case *vectorized.TypedColumn[string]: + s := c.Data()[rowIdx] + var lb [4]byte + binary.LittleEndian.PutUint32(lb[:], uint32(len(s))) + dst = append(dst, lb[:]...) + return append(dst, s...) + case *vectorized.TypedColumn[[]byte]: + bs := c.Data()[rowIdx] + var lb [4]byte + binary.LittleEndian.PutUint32(lb[:], uint32(len(bs))) + dst = append(dst, lb[:]...) + return append(dst, bs...) + } + return dst +} + +// hashKey returns a stable uint64 fingerprint of the encoded key bytes. +// fnv64a is fast and has good dispersion for the typical 8–64-byte keys we see. +func hashKey(b []byte) uint64 { + h := fnv.New64a() + _, _ = h.Write(b) + return h.Sum64() +} + +func newGroupBucket(schema *vectorized.BatchSchema) *groupBucket { + cols := make([]vectorized.Column, len(schema.Columns)) + for i, def := range schema.Columns { + cols[i] = vectorized.NewColumnForType(def.Type, 8) + } + return &groupBucket{cols: cols} +} + +// copyOneValue appends src[rowIdx] to dst, preserving null status. +func copyOneValue(dst, src vectorized.Column, rowIdx int) { + if src.IsNull(rowIdx) { + dst.AppendNull() + return + } + switch s := src.(type) { + case *vectorized.TypedColumn[int64]: + dst.(*vectorized.TypedColumn[int64]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[float64]: + dst.(*vectorized.TypedColumn[float64]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[string]: + dst.(*vectorized.TypedColumn[string]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[[]byte]: + dst.(*vectorized.TypedColumn[[]byte]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[[]int64]: + dst.(*vectorized.TypedColumn[[]int64]).Append(s.Data()[rowIdx]) + case *vectorized.TypedColumn[[]string]: + dst.(*vectorized.TypedColumn[[]string]).Append(s.Data()[rowIdx]) + } +} diff --git a/pkg/query/vectorized/measure/groupby_test.go b/pkg/query/vectorized/measure/groupby_test.go new file mode 100644 index 000000000..5d304b07c --- /dev/null +++ b/pkg/query/vectorized/measure/groupby_test.go @@ -0,0 +1,433 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "math" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// groupbyTestSchema is "tag.svc (string), field value (int64)". +func groupbyTestSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) +} + +// mkGroupbyBatch constructs a batch with one (svc, v) row per pair. +func mkGroupbyBatch(s *vectorized.BatchSchema, pairs ...struct { + svc string + v int64 +}, +) *vectorized.RecordBatch { + b := vectorized.NewRecordBatch(s, len(pairs)) + svcCol := b.Columns[0].(*vectorized.TypedColumn[string]) + vCol := b.Columns[1].(*vectorized.TypedColumn[int64]) + for _, p := range pairs { + svcCol.Append(p.svc) + vCol.Append(p.v) + } + b.Len = len(pairs) + return b +} + +// drainGroupBy pulls every output batch and returns them concatenated as +// (svc, v) pairs in emission order. +func drainGroupBy(t *testing.T, g *BatchGroupBy) []struct { + svc string + v int64 +} { + t.Helper() + var out []struct { + svc string + v int64 + } + for { + b, err := g.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if b == nil { + break + } + svcCol := b.Columns[0].(*vectorized.TypedColumn[string]) + vCol := b.Columns[1].(*vectorized.TypedColumn[int64]) + for i := range b.Len { + out = append(out, struct { + svc string + v int64 + }{svcCol.Data()[i], vCol.Data()[i]}) + } + } + return out +} + +func TestBatchGroupBy_SingleKeyColumn_GroupsByDistinctValue(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, 64, 0) + _ = g.Init(context.Background()) + defer g.Close() + + b := mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 1}, + struct { + svc string + v int64 + }{"b", 2}, + struct { + svc string + v int64 + }{"a", 3}, + struct { + svc string + v int64 + }{"c", 4}, + ) + if err := g.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + if err := g.Finalize(context.Background()); err != nil { + t.Fatal(err) + } + rows := drainGroupBy(t, g) + // Expected emission order (insertion order): all of "a" rows, then "b", then "c". + want := []string{"a", "a", "b", "c"} + if len(rows) != len(want) { + t.Fatalf("row count: want %d, got %d", len(want), len(rows)) + } + for i, w := range want { + if rows[i].svc != w { + t.Fatalf("row %d svc: want %q, got %q", i, w, rows[i].svc) + } + } +} + +func TestBatchGroupBy_MultipleKeyColumns_HashCombines(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "a", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "b", Type: vectorized.ColumnTypeString}, + }) + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0, 1}, pool, 8, tracker, 64, 0) + _ = g.Init(context.Background()) + defer g.Close() + + b := vectorized.NewRecordBatch(s, 4) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("x") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("1") + b.Columns[0].(*vectorized.TypedColumn[string]).Append("x") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("2") + b.Columns[0].(*vectorized.TypedColumn[string]).Append("x") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("1") + b.Len = 3 + + if err := g.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + if err := g.Finalize(context.Background()); err != nil { + t.Fatal(err) + } + rows := drainGroupBy(t, &BatchGroupBy{}) // placeholder + _ = rows + // We have 2 distinct (a,b) groups: ("x","1") with 2 rows, ("x","2") with 1 row. + // Re-pull through the configured g (the placeholder above is just a compile-test; + // actual drain below). + out, _ := g.NextBatch(context.Background()) + if out == nil { + t.Fatal("expected output batch") + } + if out.Len != 3 { + t.Fatalf("row count: want 3, got %d", out.Len) + } + bs := out.Columns[1].(*vectorized.TypedColumn[string]).Data() + // Group order: ("x","1") rows first (2 rows), then ("x","2") (1 row). + want := []string{"1", "1", "2"} + for i, w := range want { + if bs[i] != w { + t.Fatalf("row %d b: want %q, got %q", i, w, bs[i]) + } + } +} + +func TestBatchGroupBy_RepeatedKeyAcrossBatches_AccumulatesIntoSameGroup(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, 64, 0) + _ = g.Init(context.Background()) + defer g.Close() + + _ = g.Consume(context.Background(), mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 1}, + struct { + svc string + v int64 + }{"b", 2}, + )) + _ = g.Consume(context.Background(), mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 3}, // same group as first batch's "a" + )) + _ = g.Finalize(context.Background()) + + rows := drainGroupBy(t, g) + // Expected: "a" group has 2 rows, "b" has 1; insertion order ["a","b"]. + want := []string{"a", "a", "b"} + if len(rows) != len(want) { + t.Fatalf("row count: want %d, got %d", len(want), len(rows)) + } + for i, w := range want { + if rows[i].svc != w { + t.Fatalf("row %d svc: want %q, got %q", i, w, rows[i].svc) + } + } +} + +func TestBatchGroupBy_MemoryReserve_FailsAtLimit_BubblesError(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(50) // tight + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, 100, 0) + _ = g.Init(context.Background()) + defer g.Close() + + b := mkGroupbyBatch(s, struct { + svc string + v int64 + }{"a", 1}) + err := g.Consume(context.Background(), b) + if err == nil { + t.Fatal("Consume must surface tracker error when reserve exceeds limit") + } +} + +func TestBatchGroupBy_MemoryRefund_OnRepeatedKeys_LeavesUsedAccurate(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + const entrySize int64 = 100 + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, entrySize, 0) + _ = g.Init(context.Background()) + defer g.Close() + + // Consume a batch with all 4 rows in the same group → only 1 new group. + // Worst-case reserve = 4 * 100 = 400; actual usage = 1 * 100 = 100. + // After refund, tracker.Used must be 100. + b := mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 1}, + struct { + svc string + v int64 + }{"a", 2}, + struct { + svc string + v int64 + }{"a", 3}, + struct { + svc string + v int64 + }{"a", 4}, + ) + if err := g.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + if got := tracker.Used(); got != entrySize { + t.Fatalf("tracker.Used after refund: want %d, got %d", entrySize, got) + } +} + +func TestBatchGroupBy_NextBatch_PaginatesGroupsAcrossOutputBatches(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 2) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0}, pool, 2, tracker, 64, 0) // batchSize=2 + _ = g.Init(context.Background()) + defer g.Close() + + _ = g.Consume(context.Background(), mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 1}, + struct { + svc string + v int64 + }{"b", 2}, + struct { + svc string + v int64 + }{"c", 3}, + struct { + svc string + v int64 + }{"d", 4}, + struct { + svc string + v int64 + }{"e", 5}, + )) + _ = g.Finalize(context.Background()) + + var batches int + for { + b, err := g.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if b == nil { + break + } + batches++ + if b.Len > 2 { + t.Fatalf("batch %d Len exceeds batchSize: %d", batches, b.Len) + } + } + if batches < 3 { + t.Fatalf("5 groups @ batchSize=2 should yield ≥3 batches, got %d", batches) + } +} + +// Pins the regression flagged by Copilot: NUL-byte separators alone cannot +// disambiguate string tuples whose components contain embedded NUL bytes. +// Distinct tuples like ("a\x00b","c") and ("a","b\x00c") must NOT collapse +// into one group. +func TestBatchGroupBy_NULInStringKey_NoCollision(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "a", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "b", Type: vectorized.ColumnTypeString}, + }) + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + const entrySize = int64(64) + g := NewBatchGroupBy(s, []int{0, 1}, pool, 8, tracker, entrySize, 0) + _ = g.Init(context.Background()) + defer g.Close() + + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a\x00b") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("c") + b.Columns[0].(*vectorized.TypedColumn[string]).Append("a") + b.Columns[1].(*vectorized.TypedColumn[string]).Append("b\x00c") + b.Len = 2 + + if err := g.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + if got := tracker.Used(); got != 2*entrySize { + t.Fatalf("two distinct (a,b) tuples must produce two groups; tracker.Used=%d, want %d (NUL-in-key collision merged the rows)", + got, 2*entrySize) + } +} + +// Pins the regression flagged by Copilot: float keys must canonicalize +0.0 +// and -0.0 before hashing so they fall into the same group, matching what +// BatchAggregation already does. +func TestBatchGroupBy_PositiveAndNegativeZero_SameGroup(t *testing.T) { + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTag, TagFamily: "default", Name: "k", Type: vectorized.ColumnTypeFloat64}, + }) + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + const entrySize = int64(64) + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, entrySize, 0) + _ = g.Init(context.Background()) + defer g.Close() + + b := vectorized.NewRecordBatch(s, 2) + b.Columns[0].(*vectorized.TypedColumn[float64]).Append(0.0) + b.Columns[0].(*vectorized.TypedColumn[float64]).Append(math.Copysign(0, -1)) + b.Len = 2 + + if err := g.Consume(context.Background(), b); err != nil { + t.Fatal(err) + } + if got := tracker.Used(); got != entrySize { + t.Fatalf("+0 and -0 must produce the same group; tracker.Used=%d, want %d", + got, entrySize) + } +} + +func TestBatchGroupBy_Close_AfterPartialConsume_ReleasesEveryReservation(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, 100, 0) + _ = g.Init(context.Background()) + + _ = g.Consume(context.Background(), mkGroupbyBatch(s, + struct { + svc string + v int64 + }{"a", 1}, + struct { + svc string + v int64 + }{"b", 2}, + )) + if tracker.Used() == 0 { + t.Fatal("tracker should have outstanding reservation after Consume") + } + if err := g.Close(); err != nil { + t.Fatal(err) + } + if got := tracker.Used(); got != 0 { + t.Fatalf("Close after partial Consume must release everything: tracker.Used=%d", got) + } +} + +func TestBatchGroupBy_Close_Idempotent_NoDoubleRelease(t *testing.T) { + s := groupbyTestSchema() + pool := vectorized.NewBatchPool(s, 8) + tracker := vectorized.NewMemoryTracker(1 << 20) + g := NewBatchGroupBy(s, []int{0}, pool, 8, tracker, 100, 0) + _ = g.Init(context.Background()) + + _ = g.Consume(context.Background(), mkGroupbyBatch(s, struct { + svc string + v int64 + }{"a", 1})) + _ = g.Close() + used1 := tracker.Used() + _ = g.Close() // second Close should not double-release + used2 := tracker.Used() + if used1 != used2 { + t.Fatalf("second Close changed tracker.Used: before=%d after=%d", used1, used2) + } + if !errors.Is(nil, nil) { + _ = used2 // unused-import guard + } +} diff --git a/pkg/query/vectorized/measure/limit.go b/pkg/query/vectorized/measure/limit.go new file mode 100644 index 000000000..37f15301d --- /dev/null +++ b/pkg/query/vectorized/measure/limit.go @@ -0,0 +1,85 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BatchLimit applies offset+limit windowing as a fusible in-place selection +// rewrite. State across batches is carried via a cumulative-seen counter. +// +// When the window closes (seen >= offset+limit), the current batch's selection +// is sliced to whatever portion of the window it contributed and Process +// returns vectorized.ErrLimitExhausted. The fused stage translates that +// sentinel into "emit current batch, then EOF on next pull". +type BatchLimit struct { + schema *vectorized.BatchSchema + offset uint32 + limit uint32 + seen uint32 +} + +// NewBatchLimit constructs a fusible limit operator. +func NewBatchLimit(schema *vectorized.BatchSchema, offset, limit uint32) *BatchLimit { + return &BatchLimit{schema: schema, offset: offset, limit: limit} +} + +// Init is a no-op. Limit has no per-pipeline setup. +func (l *BatchLimit) Init(_ context.Context) error { return nil } + +// OutputSchema returns the unchanged input schema. +func (l *BatchLimit) OutputSchema() *vectorized.BatchSchema { return l.schema } + +// Close is idempotent and a no-op. +func (l *BatchLimit) Close() error { return nil } + +// Process rewrites b.Selection to keep only rows in [offset, offset+limit) of +// the cumulative active stream. +func (l *BatchLimit) Process(_ context.Context, b *vectorized.RecordBatch) error { + active := activeIndices(b) + out := make([]uint16, 0, len(active)) + end := l.offset + l.limit + for _, idx := range active { + if l.seen >= l.offset && l.seen < end { + out = append(out, idx) + } + l.seen++ + if l.seen >= end { + b.Selection = out + return vectorized.ErrLimitExhausted + } + } + b.Selection = out + return nil +} + +// activeIndices returns the row indices of b that are currently active. If +// Selection is nil it materializes [0, Len). +func activeIndices(b *vectorized.RecordBatch) []uint16 { + if b.Selection != nil { + return b.Selection + } + out := make([]uint16, b.Len) + for i := range out { + out[i] = uint16(i) + } + return out +} diff --git a/pkg/query/vectorized/measure/limit_test.go b/pkg/query/vectorized/measure/limit_test.go new file mode 100644 index 000000000..dd2669fa1 --- /dev/null +++ b/pkg/query/vectorized/measure/limit_test.go @@ -0,0 +1,154 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +func limitTestSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + }) +} + +func mkBatchN(s *vectorized.BatchSchema, n int) *vectorized.RecordBatch { + b := vectorized.NewRecordBatch(s, n) + col := b.Columns[0].(*vectorized.TypedColumn[int64]) + for i := range n { + col.Append(int64(i)) + } + b.Len = n + return b +} + +func processAllowExhausted(t *testing.T, op *BatchLimit, b *vectorized.RecordBatch) { + t.Helper() + err := op.Process(context.Background(), b) + if err != nil && !errors.Is(err, vectorized.ErrLimitExhausted) { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestBatchLimit_OffsetZero_LimitN_KeepsFirstN(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 0, 3) + _ = op.Init(context.Background()) + b := mkBatchN(s, 5) + processAllowExhausted(t, op, b) // limit closes mid-batch + if got := b.ActiveLen(); got != 3 { + t.Fatalf("ActiveLen: want 3, got %d", got) + } + for i, want := range []uint16{0, 1, 2} { + if b.Selection[i] != want { + t.Fatalf("selection[%d]: got %d, want %d", i, b.Selection[i], want) + } + } +} + +func TestBatchLimit_OffsetN_LimitM_KeepsRowsNToNPlusM(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 2, 2) + _ = op.Init(context.Background()) + b := mkBatchN(s, 5) + processAllowExhausted(t, op, b) + if got := b.ActiveLen(); got != 2 { + t.Fatalf("ActiveLen: want 2, got %d", got) + } + for i, want := range []uint16{2, 3} { + if b.Selection[i] != want { + t.Fatalf("selection[%d]: got %d, want %d", i, b.Selection[i], want) + } + } +} + +func TestBatchLimit_OffsetBeyondData_EmptyNonNilSelection(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 100, 5) + _ = op.Init(context.Background()) + b := mkBatchN(s, 3) + _ = op.Process(context.Background(), b) + if b.Selection == nil { + t.Fatal("Selection must be non-nil empty slice, got nil") + } + if len(b.Selection) != 0 { + t.Fatalf("Selection must be empty, got %v", b.Selection) + } +} + +func TestBatchLimit_LimitExhausted_ReturnsErrLimitExhausted_AndCurrentBatchSliced(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 0, 2) + _ = op.Init(context.Background()) + b := mkBatchN(s, 5) + err := op.Process(context.Background(), b) + if !errors.Is(err, vectorized.ErrLimitExhausted) { + t.Fatalf("want ErrLimitExhausted, got %v", err) + } + if got := b.ActiveLen(); got != 2 { + t.Fatalf("current batch must be sliced to limit: got ActiveLen=%d", got) + } +} + +func TestBatchLimit_PriorSelectionRespected(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 0, 2) + _ = op.Init(context.Background()) + b := mkBatchN(s, 5) + b.Selection = []uint16{1, 3, 4} // 3 active rows + processAllowExhausted(t, op, b) + if got := b.ActiveLen(); got != 2 { + t.Fatalf("limit windows over active rows; ActiveLen want 2, got %d", got) + } + for i, want := range []uint16{1, 3} { + if b.Selection[i] != want { + t.Fatalf("selection[%d]: got %d, want %d", i, b.Selection[i], want) + } + } +} + +func TestBatchLimit_AcrossMultipleBatches_StateCarriesViaSeenCounter(t *testing.T) { + s := limitTestSchema() + op := NewBatchLimit(s, 1, 4) + _ = op.Init(context.Background()) + + // Batch 1 has 3 rows. After Process: rows seen=3, kept indices [1, 2] (offset 1, count 2). + b1 := mkBatchN(s, 3) + if err := op.Process(context.Background(), b1); err != nil { + t.Fatalf("batch 1: %v", err) + } + if got := b1.ActiveLen(); got != 2 { + t.Fatalf("batch 1: ActiveLen want 2, got %d", got) + } + + // Batch 2 has 4 rows. State: seen=3, want 2 more. After Process: rows seen=5, last 2 + // admitted; remaining 2 dropped via ErrLimitExhausted on next batch start? Actually + // limit closes on this batch — should return ErrLimitExhausted with selection sliced. + b2 := mkBatchN(s, 4) + err := op.Process(context.Background(), b2) + if !errors.Is(err, vectorized.ErrLimitExhausted) { + t.Fatalf("batch 2: want ErrLimitExhausted, got %v", err) + } + if got := b2.ActiveLen(); got != 2 { + t.Fatalf("batch 2: tail kept rows want 2, got %d", got) + } +} diff --git a/pkg/query/vectorized/measure/serialize.go b/pkg/query/vectorized/measure/serialize.go new file mode 100644 index 000000000..7bb6d5aba --- /dev/null +++ b/pkg/query/vectorized/measure/serialize.go @@ -0,0 +1,127 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// serializeBatchToProto converts the active rows of b into measurev1.InternalDataPoint +// messages, appending to dst. Pass dst=nil to allocate; pass dst[:0] to reuse capacity. +// +// This is the focal point of differential parity testing: it is the only place +// where the vectorized output shape diverges from the row-based output shape. +// Row order matches batch row order (respecting Selection); the row-based path +// produces messages in the same order from MeasureResult iteration. +func serializeBatchToProto(b *vectorized.RecordBatch, dst []*measurev1.InternalDataPoint) []*measurev1.InternalDataPoint { + if dst == nil { + dst = make([]*measurev1.InternalDataPoint, 0, b.ActiveLen()) + } + schema := b.Schema + active := activeIndices(b) + for _, rowIdx := range active { + dp := buildDataPoint(b, schema, int(rowIdx)) + idp := &measurev1.InternalDataPoint{DataPoint: dp} + if i := schema.ShardIDIndex(); i >= 0 { + idp.ShardId = uint32(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]) + } + dst = append(dst, idp) + } + return dst +} + +// buildDataPoint materializes one DataPoint from row rowIdx of b. Tags are +// grouped into TagFamilies by their TagFamily name; field columns become +// DataPoint_Field entries in schema order. +func buildDataPoint(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, rowIdx int) *measurev1.DataPoint { + dp := &measurev1.DataPoint{} + if i := schema.TimestampIndex(); i >= 0 { + ns := b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx] + dp.Timestamp = timestamppb.New(time.Unix(0, ns)) + } + if i := schema.VersionIndex(); i >= 0 { + dp.Version = b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx] + } + if i := schema.SeriesIDIndex(); i >= 0 { + dp.Sid = uint64(b.Columns[i].(*vectorized.TypedColumn[int64]).Data()[rowIdx]) + } + tagFamilies := map[string]*modelv1.TagFamily{} + for colIdx, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + tf, exists := tagFamilies[def.TagFamily] + if !exists { + tf = &modelv1.TagFamily{Name: def.TagFamily} + tagFamilies[def.TagFamily] = tf + dp.TagFamilies = append(dp.TagFamilies, tf) + } + tf.Tags = append(tf.Tags, &modelv1.Tag{ + Key: def.Name, + Value: columnValueToTagValue(b.Columns[colIdx], rowIdx), + }) + case vectorized.RoleField: + dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{ + Name: def.Name, + Value: columnValueToFieldValue(b.Columns[colIdx], rowIdx), + }) + } + } + return dp +} + +func columnValueToTagValue(col vectorized.Column, rowIdx int) *modelv1.TagValue { + if col.IsNull(rowIdx) { + return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} + } + switch c := col.(type) { + case *vectorized.TypedColumn[int64]: + return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[string]: + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[[]byte]: + return &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: c.Data()[rowIdx]}} + case *vectorized.TypedColumn[[]int64]: + return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[[]string]: + return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: c.Data()[rowIdx]}}} + } + return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} +} + +func columnValueToFieldValue(col vectorized.Column, rowIdx int) *modelv1.FieldValue { + if col.IsNull(rowIdx) { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} + } + switch c := col.(type) { + case *vectorized.TypedColumn[int64]: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[float64]: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[string]: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: c.Data()[rowIdx]}}} + case *vectorized.TypedColumn[[]byte]: + return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: c.Data()[rowIdx]}} + } + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} +} diff --git a/pkg/query/vectorized/measure/serialize_test.go b/pkg/query/vectorized/measure/serialize_test.go new file mode 100644 index 000000000..ced5be7a2 --- /dev/null +++ b/pkg/query/vectorized/measure/serialize_test.go @@ -0,0 +1,169 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "bytes" + "testing" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// serializeFullSchema is "ts (int64), version (int64), sid (int64), +// tag.svc (string), field v_int (int64), field v_float (float64), field v_str (string), +// field v_bytes (bytes)". +func serializeFullSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v_int", Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleField, Name: "v_float", Type: vectorized.ColumnTypeFloat64}, + {Role: vectorized.RoleField, Name: "v_str", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "v_bytes", Type: vectorized.ColumnTypeBytes}, + }) +} + +func mkSerializeRow(b *vectorized.RecordBatch, ts, ver, sid int64, svc string, + vInt int64, vFloat float64, vStr string, vBytes []byte, +) { + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(ts) + b.Columns[1].(*vectorized.TypedColumn[int64]).Append(ver) + b.Columns[2].(*vectorized.TypedColumn[int64]).Append(sid) + b.Columns[3].(*vectorized.TypedColumn[string]).Append(svc) + b.Columns[4].(*vectorized.TypedColumn[int64]).Append(vInt) + b.Columns[5].(*vectorized.TypedColumn[float64]).Append(vFloat) + b.Columns[6].(*vectorized.TypedColumn[string]).Append(vStr) + b.Columns[7].(*vectorized.TypedColumn[[]byte]).Append(vBytes) + b.Len++ +} + +func TestSerializeBatchToProto_RoundTrips_AllScalarVariants(t *testing.T) { + s := serializeFullSchema() + b := vectorized.NewRecordBatch(s, 2) + mkSerializeRow(b, 100, 1, 7, "svcA", 42, 3.14, "hello", []byte("xyz")) + + out := serializeBatchToProto(b, nil) + if len(out) != 1 { + t.Fatalf("len: want 1, got %d", len(out)) + } + dp := out[0].DataPoint + if dp.Sid != 7 { + t.Fatalf("sid: want 7, got %d", dp.Sid) + } + if dp.Version != 1 { + t.Fatalf("version: want 1, got %d", dp.Version) + } + // Tag round-trip: svc must be Str variant with value "svcA". + tf := dp.TagFamilies[0] + if tf.Name != "default" || tf.Tags[0].Key != "svc" { + t.Fatalf("tag family/key mismatch: %+v", tf) + } + if got := tf.Tags[0].Value.GetStr().GetValue(); got != "svcA" { + t.Fatalf("svc tag: want svcA, got %q", got) + } + // Field round-trips. + if got := dp.Fields[0].Value.GetInt().GetValue(); got != 42 { + t.Fatalf("v_int: want 42, got %d", got) + } + if got := dp.Fields[1].Value.GetFloat().GetValue(); got != 3.14 { + t.Fatalf("v_float: want 3.14, got %v", got) + } + if got := dp.Fields[2].Value.GetStr().GetValue(); got != "hello" { + t.Fatalf("v_str: want hello, got %q", got) + } + if got := dp.Fields[3].Value.GetBinaryData(); !bytes.Equal(got, []byte("xyz")) { + t.Fatalf("v_bytes: want xyz, got %q", got) + } +} + +func TestSerializeBatchToProto_RespectsSelectionVector(t *testing.T) { + s := serializeFullSchema() + b := vectorized.NewRecordBatch(s, 4) + mkSerializeRow(b, 100, 1, 1, "a", 10, 1.0, "", nil) + mkSerializeRow(b, 200, 1, 2, "b", 20, 2.0, "", nil) + mkSerializeRow(b, 300, 1, 3, "c", 30, 3.0, "", nil) + b.Selection = []uint16{0, 2} // only rows 0 and 2 are active + + out := serializeBatchToProto(b, nil) + if len(out) != 2 { + t.Fatalf("len: want 2, got %d", len(out)) + } + if out[0].DataPoint.Sid != 1 || out[1].DataPoint.Sid != 3 { + t.Fatalf("selected sids: want [1, 3], got [%d, %d]", + out[0].DataPoint.Sid, out[1].DataPoint.Sid) + } +} + +func TestSerializeBatchToProto_NullsRoundtripped(t *testing.T) { + s := serializeFullSchema() + b := vectorized.NewRecordBatch(s, 1) + // Build a row with explicit nulls in the tag and one field. + b.Columns[0].(*vectorized.TypedColumn[int64]).Append(100) + b.Columns[1].(*vectorized.TypedColumn[int64]).Append(1) + b.Columns[2].(*vectorized.TypedColumn[int64]).Append(7) + b.Columns[3].(*vectorized.TypedColumn[string]).AppendNull() + b.Columns[4].(*vectorized.TypedColumn[int64]).AppendNull() + b.Columns[5].(*vectorized.TypedColumn[float64]).Append(0) + b.Columns[6].(*vectorized.TypedColumn[string]).Append("") + b.Columns[7].(*vectorized.TypedColumn[[]byte]).Append(nil) + b.Len = 1 + + out := serializeBatchToProto(b, nil) + tagVal := out[0].DataPoint.TagFamilies[0].Tags[0].Value + if _, ok := tagVal.Value.(*modelv1.TagValue_Null); !ok { + t.Fatalf("null tag must serialize to TagValue_Null, got %T", tagVal.Value) + } + fieldVal := out[0].DataPoint.Fields[0].Value + if _, ok := fieldVal.Value.(*modelv1.FieldValue_Null); !ok { + t.Fatalf("null field must serialize to FieldValue_Null, got %T", fieldVal.Value) + } +} + +func TestSerializeBatchToProto_RowOrderMatchesRowPath(t *testing.T) { + // Parity contract: serialized row order matches batch row order. + s := serializeFullSchema() + b := vectorized.NewRecordBatch(s, 5) + for i := range 5 { + mkSerializeRow(b, int64(i*100), 1, int64(i+1), "svc", 0, 0, "", nil) + } + + out := serializeBatchToProto(b, nil) + for i, idp := range out { + want := uint64(i + 1) + if idp.DataPoint.Sid != want { + t.Fatalf("row %d: sid want %d, got %d", i, want, idp.DataPoint.Sid) + } + } +} + +func TestSerializeBatchToProto_ReusesDestinationSlice(t *testing.T) { + s := serializeFullSchema() + b := vectorized.NewRecordBatch(s, 1) + mkSerializeRow(b, 100, 1, 7, "x", 0, 0, "", nil) + + dst := make([]*measurev1.InternalDataPoint, 0, 16) + beforeCap := cap(dst) + out := serializeBatchToProto(b, dst[:0]) + if cap(out) != beforeCap { + t.Fatalf("cap must be reused: want %d, got %d", beforeCap, cap(out)) + } +} diff --git a/pkg/query/vectorized/measure/top.go b/pkg/query/vectorized/measure/top.go new file mode 100644 index 000000000..191724715 --- /dev/null +++ b/pkg/query/vectorized/measure/top.go @@ -0,0 +1,247 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "container/heap" + "context" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BatchTop is a BreakerOperator that retains the top-N rows by a designated +// field column. asc=true keeps the lowest N; asc=false keeps the highest N. +// +// Tie-break is stable on insertion order — earlier rows win. Nulls in the key +// column are treated as the lowest value (kept first in asc, evicted first in desc). +type BatchTop struct { + schema *vectorized.BatchSchema + pool *vectorized.BatchPool + heapState *topHeap + sorted []*topRow + fieldCol int + n int + batchSize int + inputCount int + cursor int + asc bool + closed bool +} + +// topRow materializes one input row plus its sort key for heap comparisons. +type topRow struct { + cols []vectorized.Column // schema-shaped, exactly 1 row each + floatVal float64 + intVal int64 + seq int + isNull bool + isFloat bool +} + +type topHeap struct { + rows []*topRow + asc bool +} + +func (h *topHeap) Len() int { return len(h.rows) } + +func (h *topHeap) Less(i, j int) bool { + c := cmpTopVal(h.rows[i], h.rows[j]) + if c != 0 { + // asc → max-heap (largest at root, evict on overflow); + // desc → min-heap (smallest at root). + if h.asc { + return c > 0 + } + return c < 0 + } + // Tie: prefer the later (larger seq) row at root so it gets evicted / + // popped first. After Finalize reverses the pop order, ties surface in + // insertion order — earlier-row-wins. + return h.rows[i].seq > h.rows[j].seq +} + +func (h *topHeap) Swap(i, j int) { h.rows[i], h.rows[j] = h.rows[j], h.rows[i] } + +func (h *topHeap) Push(x any) { h.rows = append(h.rows, x.(*topRow)) } + +func (h *topHeap) Pop() any { + n := len(h.rows) + x := h.rows[n-1] + h.rows = h.rows[:n-1] + return x +} + +// cmpTopVal returns -1 / 0 / +1 for a < b / == / >. Nulls sort lowest. +func cmpTopVal(a, b *topRow) int { + if a.isNull && b.isNull { + return 0 + } + if a.isNull { + return -1 + } + if b.isNull { + return 1 + } + if a.isFloat { + switch { + case a.floatVal < b.floatVal: + return -1 + case a.floatVal > b.floatVal: + return 1 + } + return 0 + } + switch { + case a.intVal < b.intVal: + return -1 + case a.intVal > b.intVal: + return 1 + } + return 0 +} + +// NewBatchTop constructs a BatchTop. fieldCol is the index of the int64 or +// float64 column to sort by; n is the bound; asc selects ascending or descending order. +func NewBatchTop(schema *vectorized.BatchSchema, fieldCol, n int, asc bool, batchSize int) *BatchTop { + return &BatchTop{ + schema: schema, + pool: vectorized.NewBatchPool(schema, batchSize), + fieldCol: fieldCol, + n: n, + batchSize: batchSize, + asc: asc, + } +} + +// Init initialises the heap. +func (t *BatchTop) Init(_ context.Context) error { + t.heapState = &topHeap{asc: t.asc} + return nil +} + +// OutputSchema returns the unchanged input schema. +func (t *BatchTop) OutputSchema() *vectorized.BatchSchema { return t.schema } + +// Consume considers each active row for inclusion in the top-N heap. +// +// n <= 0 is a no-op (matches the row-path's top-N convention) — without this +// guard the bounded-heap logic would dereference an empty heap on the first row. +func (t *BatchTop) Consume(_ context.Context, b *vectorized.RecordBatch) error { + if t.n <= 0 { + return nil + } + active := activeIndices(b) + isFloat := t.schema.Columns[t.fieldCol].Type == vectorized.ColumnTypeFloat64 + for _, rowIdx := range active { + candidate := t.materialize(b, int(rowIdx), isFloat) + candidate.seq = t.inputCount + t.inputCount++ + if t.heapState.Len() < t.n { + heap.Push(t.heapState, candidate) + continue + } + root := t.heapState.rows[0] + if t.shouldReplace(candidate, root) { + t.heapState.rows[0] = candidate + heap.Fix(t.heapState, 0) + } + } + return nil +} + +// shouldReplace returns true iff candidate is strictly better than root for +// the configured order. Strict comparison is what gives us stable tie-break: +// equal values do not replace. +func (t *BatchTop) shouldReplace(candidate, root *topRow) bool { + c := cmpTopVal(candidate, root) + if t.asc { + return c < 0 + } + return c > 0 +} + +// Finalize drains the heap into a sorted slice in user-facing order. +func (t *BatchTop) Finalize(_ context.Context) error { + out := make([]*topRow, 0, t.heapState.Len()) + for t.heapState.Len() > 0 { + out = append(out, heap.Pop(t.heapState).(*topRow)) + } + // Pop order is reverse of desired: max-heap pops largest first (asc wants + // smallest first); min-heap pops smallest first (desc wants largest first). + for i, j := 0, len(out)-1; i < j; i, j = i+1, j-1 { + out[i], out[j] = out[j], out[i] + } + t.sorted = out + t.heapState = &topHeap{asc: t.asc} // free heap backing slice + return nil +} + +// NextBatch emits the sorted rows in batches of batchSize. +func (t *BatchTop) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { + if t.cursor >= len(t.sorted) { + return nil, nil + } + out := t.pool.Get() + for out.Len < t.batchSize && t.cursor < len(t.sorted) { + row := t.sorted[t.cursor] + for colIdx := range out.Columns { + copyOneValue(out.Columns[colIdx], row.cols[colIdx], 0) + } + out.Len++ + t.cursor++ + } + if out.Len == 0 { + t.pool.Put(out) + return nil, nil + } + return out, nil +} + +// Close releases the heap and sorted buffer. Idempotent. +func (t *BatchTop) Close() error { + if t.closed { + return nil + } + t.closed = true + t.heapState = nil + t.sorted = nil + return nil +} + +// materialize copies row rowIdx of b into a new topRow, reading the sort key +// from the configured field column. +func (t *BatchTop) materialize(b *vectorized.RecordBatch, rowIdx int, isFloat bool) *topRow { + cols := make([]vectorized.Column, len(t.schema.Columns)) + for i, def := range t.schema.Columns { + cols[i] = vectorized.NewColumnForType(def.Type, 1) + copyOneValue(cols[i], b.Columns[i], rowIdx) + } + row := &topRow{cols: cols, isFloat: isFloat} + keyCol := b.Columns[t.fieldCol] + if keyCol.IsNull(rowIdx) { + row.isNull = true + return row + } + if isFloat { + row.floatVal = keyCol.(*vectorized.TypedColumn[float64]).Data()[rowIdx] + } else { + row.intVal = keyCol.(*vectorized.TypedColumn[int64]).Data()[rowIdx] + } + return row +} diff --git a/pkg/query/vectorized/measure/top_test.go b/pkg/query/vectorized/measure/top_test.go new file mode 100644 index 000000000..3515d2203 --- /dev/null +++ b/pkg/query/vectorized/measure/top_test.go @@ -0,0 +1,221 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +func topTestSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + }) +} + +// mkTopBatch builds a single-column int64 batch. nullMask[i] true → AppendNull. +func mkTopBatch(s *vectorized.BatchSchema, vals []int64, nullMask []bool) *vectorized.RecordBatch { + b := vectorized.NewRecordBatch(s, len(vals)) + col := b.Columns[0].(*vectorized.TypedColumn[int64]) + for i, v := range vals { + if i < len(nullMask) && nullMask[i] { + col.AppendNull() + } else { + col.Append(v) + } + } + b.Len = len(vals) + return b +} + +func drainTop(t *testing.T, top *BatchTop) []int64 { + t.Helper() + var out []int64 + for { + b, err := top.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if b == nil { + break + } + col := b.Columns[0].(*vectorized.TypedColumn[int64]) + for i := range b.Len { + if col.IsNull(i) { + out = append(out, -999) // sentinel for null in test + } else { + out = append(out, col.Data()[i]) + } + } + } + return out +} + +func TestBatchTop_AscendingHeap_KeepsLowestN(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 3, true, 8) + _ = top.Init(context.Background()) + defer top.Close() + _ = top.Consume(context.Background(), mkTopBatch(s, []int64{5, 2, 8, 1, 7, 3}, nil)) + _ = top.Finalize(context.Background()) + got := drainTop(t, top) + want := []int64{1, 2, 3} + if len(got) != len(want) { + t.Fatalf("len: want %d, got %d (%v)", len(want), len(got), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("row %d: want %d, got %d", i, want[i], got[i]) + } + } +} + +func TestBatchTop_DescendingHeap_KeepsHighestN(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 3, false, 8) + _ = top.Init(context.Background()) + defer top.Close() + _ = top.Consume(context.Background(), mkTopBatch(s, []int64{5, 2, 8, 1, 7, 3}, nil)) + _ = top.Finalize(context.Background()) + got := drainTop(t, top) + want := []int64{8, 7, 5} + for i := range want { + if got[i] != want[i] { + t.Fatalf("row %d: want %d, got %d (full: %v)", i, want[i], got[i], got) + } + } +} + +func TestBatchTop_FewerInputThanN_ReturnsAll_InOrder(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 5, true, 8) + _ = top.Init(context.Background()) + defer top.Close() + _ = top.Consume(context.Background(), mkTopBatch(s, []int64{3, 1, 2}, nil)) + _ = top.Finalize(context.Background()) + got := drainTop(t, top) + want := []int64{1, 2, 3} + if len(got) != len(want) { + t.Fatalf("len: want %d, got %d (%v)", len(want), len(got), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("row %d: want %d, got %d", i, want[i], got[i]) + } + } +} + +func TestBatchTop_TieBreaker_Stable(t *testing.T) { + // All values equal; only first 2 should be retained — earlier-row-wins. + s := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleField, Name: "seq", Type: vectorized.ColumnTypeInt64}, + }) + top := NewBatchTop(s, 0, 2, true, 8) + _ = top.Init(context.Background()) + defer top.Close() + + b := vectorized.NewRecordBatch(s, 4) + v := b.Columns[0].(*vectorized.TypedColumn[int64]) + seq := b.Columns[1].(*vectorized.TypedColumn[int64]) + for i := range 4 { + v.Append(5) + seq.Append(int64(i)) + } + b.Len = 4 + _ = top.Consume(context.Background(), b) + _ = top.Finalize(context.Background()) + + out, _ := top.NextBatch(context.Background()) + if out.Len != 2 { + t.Fatalf("Len: want 2, got %d", out.Len) + } + seqs := out.Columns[1].(*vectorized.TypedColumn[int64]).Data() + // Expected: rows with seq 0 and 1 — the earlier ones. + want := []int64{0, 1} + for i := range want { + if seqs[i] != want[i] { + t.Fatalf("seq[%d]: want %d, got %d", i, want[i], seqs[i]) + } + } +} + +func TestBatchTop_NullField_TreatedAsLowest(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 3, true, 8) + _ = top.Init(context.Background()) + defer top.Close() + // Values: [5, null, 3, 8, 1]. Asc top-3 should keep lowest: [null, 1, 3]. + _ = top.Consume(context.Background(), mkTopBatch(s, + []int64{5, 0, 3, 8, 1}, + []bool{false, true, false, false, false}, + )) + _ = top.Finalize(context.Background()) + got := drainTop(t, top) + // drainTop replaces null with -999 sentinel. + if len(got) != 3 || got[0] != -999 { + t.Fatalf("null must come first in asc order; got %v", got) + } + if got[1] != 1 || got[2] != 3 { + t.Fatalf("rest of top-3: want [1, 3], got %v", got[1:]) + } +} + +// Pins the regression flagged by Copilot: BatchTop with n <= 0 must be a +// no-op rather than panicking on first row. +func TestBatchTop_ZeroN_NoOp(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 0, true, 8) + _ = top.Init(context.Background()) + defer top.Close() + defer func() { + if r := recover(); r != nil { + t.Fatalf("BatchTop with n=0 must not panic; got %v", r) + } + }() + if err := top.Consume(context.Background(), mkTopBatch(s, []int64{1, 2, 3}, nil)); err != nil { + t.Fatal(err) + } + if err := top.Finalize(context.Background()); err != nil { + t.Fatal(err) + } + out, err := top.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if out != nil { + t.Fatalf("BatchTop with n=0 must yield no output; got batch with Len=%d", out.Len) + } +} + +func TestBatchTop_Close_AfterPartialConsume_ReleasesHeapAllocation(t *testing.T) { + s := topTestSchema() + top := NewBatchTop(s, 0, 3, true, 8) + _ = top.Init(context.Background()) + _ = top.Consume(context.Background(), mkTopBatch(s, []int64{5, 2, 8}, nil)) + if err := top.Close(); err != nil { + t.Fatal(err) + } + // Inspect the (unexported) heap field via the type's behavior — calling + // Close again must be a no-op and not panic. + if err := top.Close(); err != nil { + t.Fatalf("second Close must be no-op, got %v", err) + } +}
