This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 3755984c43b58dbf469c2716a8286062bdaba012 Author: Hongtao Gao <[email protected]> AuthorDate: Sun May 3 11:41:42 2026 +0000 feat(query/vectorized/measure): add measure scan layer (gate G2) Second gate of the vectorized query pipeline. Implements the measure-specific extract/cursor/scan stack that consumes the existing MeasureQueryResult interface and produces RecordBatches. Modules: - config.go VectorizedConfig with Enabled/BatchSize/QueryMemoryMiB and Validate; defaults Enabled=false, 1024 rows, 256 MiB. - extract.go extractTagRow / extractFieldRow / extractTagBulk / extractFieldBulk. Reads already-decoded *modelv1.TagValue / *modelv1.FieldValue values from MeasureResult and writes them into TypedColumn[T] slices. Defensive copies for slice-typed values; hard-fail on unknown variants; handles all 7 TagValue cases including Timestamp (mapped to int64 UnixNano). - cursor.go SeriesCursor with explicit state machine, sticky-error propagation (R1 critical), and bulk-copy fast path exposed via Current/Advance accessors. - scan.go BatchScan PullOperator. Drains the cursor with a bulk fill of timestamp/version/sid/shardid + per-row tag/field extracts; discards the partial batch immediately on any cursor error so failures cannot be silently buried. Risks closed at this gate (per the design spec): R1 multi-series cursor with sticky-error semantics, R2 extract-not-decode with defensive slice copies and hard-fail on unknown variants. 47 tests, all green under -race in ~1s. go vet clean. --- pkg/query/vectorized/measure/config.go | 45 ++++ pkg/query/vectorized/measure/config_test.go | 52 ++++ pkg/query/vectorized/measure/cursor.go | 166 ++++++++++++ pkg/query/vectorized/measure/cursor_test.go | 206 +++++++++++++++ pkg/query/vectorized/measure/doc.go | 20 ++ pkg/query/vectorized/measure/extract.go | 171 +++++++++++++ pkg/query/vectorized/measure/extract_test.go | 353 ++++++++++++++++++++++++++ pkg/query/vectorized/measure/fixtures_test.go | 61 +++++ pkg/query/vectorized/measure/scan.go | 233 +++++++++++++++++ pkg/query/vectorized/measure/scan_test.go | 255 +++++++++++++++++++ 10 files changed, 1562 insertions(+) diff --git a/pkg/query/vectorized/measure/config.go b/pkg/query/vectorized/measure/config.go new file mode 100644 index 000000000..83d2eed7a --- /dev/null +++ b/pkg/query/vectorized/measure/config.go @@ -0,0 +1,45 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import "fmt" + +// VectorizedConfig controls the v1 vectorized Measure query path. +type VectorizedConfig struct { + BatchSize int + QueryMemoryMiB int + Enabled bool +} + +// DefaultConfig returns the v1 default — disabled, 1024-row batches, 256 MiB +// per-query memory budget. v1 ships with Enabled false; the rollout PR flips +// the default after soak and bench gates pass. +func DefaultConfig() VectorizedConfig { + return VectorizedConfig{Enabled: false, BatchSize: 1024, QueryMemoryMiB: 256} +} + +// Validate rejects nonsense configurations. +func (c VectorizedConfig) Validate() error { + if c.BatchSize <= 0 { + return fmt.Errorf("vectorized.measure: BatchSize must be > 0, got %d", c.BatchSize) + } + if c.QueryMemoryMiB <= 0 { + return fmt.Errorf("vectorized.measure: QueryMemoryMiB must be > 0, got %d", c.QueryMemoryMiB) + } + return nil +} diff --git a/pkg/query/vectorized/measure/config_test.go b/pkg/query/vectorized/measure/config_test.go new file mode 100644 index 000000000..628bd77b2 --- /dev/null +++ b/pkg/query/vectorized/measure/config_test.go @@ -0,0 +1,52 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import "testing" + +func TestVectorizedConfig_Default_Disabled_BatchSize1024_Memory256(t *testing.T) { + c := DefaultConfig() + if c.Enabled { + t.Fatal("default Enabled must be false (v1 ships disabled)") + } + if c.BatchSize != 1024 { + t.Fatalf("default BatchSize: want 1024, got %d", c.BatchSize) + } + if c.QueryMemoryMiB != 256 { + t.Fatalf("default QueryMemoryMiB: want 256, got %d", c.QueryMemoryMiB) + } + if err := c.Validate(); err != nil { + t.Fatalf("default config must Validate cleanly, got %v", err) + } +} + +func TestVectorizedConfig_Validate_ZeroBatchSize_ReturnsError(t *testing.T) { + c := DefaultConfig() + c.BatchSize = 0 + if err := c.Validate(); err == nil { + t.Fatal("BatchSize=0 must fail Validate") + } +} + +func TestVectorizedConfig_Validate_NegativeMemoryMiB_ReturnsError(t *testing.T) { + c := DefaultConfig() + c.QueryMemoryMiB = -1 + if err := c.Validate(); err == nil { + t.Fatal("negative QueryMemoryMiB must fail Validate") + } +} diff --git a/pkg/query/vectorized/measure/cursor.go b/pkg/query/vectorized/measure/cursor.go new file mode 100644 index 000000000..bf183c1d8 --- /dev/null +++ b/pkg/query/vectorized/measure/cursor.go @@ -0,0 +1,166 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +// seriesRow is the ephemeral per-row view exposed by SeriesCursor.NextRow. +// The slices and pointers reference data owned by the underlying MeasureResult; +// callers must not retain a row past the next NextRow call. +type seriesRow struct { + tagFamilies []model.TagFamily + fields []model.Field + timestamp int64 + version int64 + sid common.SeriesID + shardID common.ShardID + rowIdx int +} + +// SeriesCursor walks across MeasureResult instances yielded by a MeasureQueryResult, +// presenting either single-row access (NextRow) or bulk-copy access (Current+Advance) +// to BatchScan. +// +// Sticky-error contract: once Pull yields a result with Error != nil, the +// cursor stores the error and every subsequent NextRow returns it. Init is +// the only way to reset. +type SeriesCursor struct { + qr model.MeasureQueryResult + current *model.MeasureResult + err error + pos int + exhausted bool +} + +// Init resets the cursor and advances to the first non-empty series (or EOF/err). +func (c *SeriesCursor) Init(qr model.MeasureQueryResult) { + c.qr = qr + c.current = nil + c.pos = 0 + c.err = nil + c.exhausted = false + c.advanceSeries() +} + +// Err returns the sticky storage error, or nil. Once non-nil it remains so +// until Init is called again. +func (c *SeriesCursor) Err() error { return c.err } + +// Exhausted reports whether the cursor has run out of input (clean EOF or +// after an error). +func (c *SeriesCursor) Exhausted() bool { return c.exhausted } + +// RemainingInSeries returns how many rows are left in the current MeasureResult. +// 0 when at EOF, on error, or between series. +func (c *SeriesCursor) RemainingInSeries() int { + if c.exhausted || c.current == nil { + return 0 + } + return len(c.current.Timestamps) - c.pos +} + +// Current returns the underlying MeasureResult and the cursor's position within +// it. Used by BatchScan's bulk-copy path to read parallel arrays directly. +func (c *SeriesCursor) Current() (*model.MeasureResult, int) { + return c.current, c.pos +} + +// Advance moves the cursor n rows forward. Crosses to the next series if the +// new position reaches the end of the current MeasureResult. +func (c *SeriesCursor) Advance(n int) { + c.pos += n + if c.current != nil && c.pos >= len(c.current.Timestamps) { + c.advanceSeries() + } +} + +// NextRow returns the next row across series boundaries. +// +// Returns: +// - (row, nil) when a row is available; +// - (zero, nil) on clean EOF; +// - (zero, stickyErr) if a storage error has been observed; subsequent calls +// return the same error until Init is called again. +func (c *SeriesCursor) NextRow() (seriesRow, error) { + if c.err != nil { + return seriesRow{}, c.err + } + if c.exhausted { + return seriesRow{}, nil + } + if c.current == nil || c.pos >= len(c.current.Timestamps) { + c.advanceSeries() + if c.err != nil { + return seriesRow{}, c.err + } + if c.exhausted { + return seriesRow{}, nil + } + } + row := seriesRow{ + sid: c.current.SID, + timestamp: c.current.Timestamps[c.pos], + version: c.current.Versions[c.pos], + rowIdx: c.pos, + tagFamilies: c.current.TagFamilies, + fields: c.current.Fields, + } + if c.pos < len(c.current.ShardIDs) { + row.shardID = c.current.ShardIDs[c.pos] + } + c.pos++ + return row, nil +} + +// Close releases the underlying MeasureQueryResult exactly once. Idempotent +// and safe after EOF. +func (c *SeriesCursor) Close() { + if c.qr != nil { + c.qr.Release() + c.qr = nil + } + c.exhausted = true +} + +// advanceSeries pulls successive MeasureResults until one yields rows, EOF is +// reached, or an error is observed. Empty results (zero timestamps) are skipped. +func (c *SeriesCursor) advanceSeries() { + for { + next := c.qr.Pull() + if next == nil { + c.exhausted = true + c.current = nil + return + } + if next.Error != nil { + c.err = next.Error + c.exhausted = true + c.current = nil + return + } + if len(next.Timestamps) == 0 { + continue + } + c.current = next + c.pos = 0 + return + } +} diff --git a/pkg/query/vectorized/measure/cursor_test.go b/pkg/query/vectorized/measure/cursor_test.go new file mode 100644 index 000000000..eb6d64d8c --- /dev/null +++ b/pkg/query/vectorized/measure/cursor_test.go @@ -0,0 +1,206 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "errors" + "testing" + + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +func TestSeriesCursor_Init_FirstSeriesActivated(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100, 200, 300)}} + var c SeriesCursor + c.Init(qr) + if got := c.RemainingInSeries(); got != 3 { + t.Fatalf("RemainingInSeries after Init: want 3, got %d", got) + } +} + +func TestSeriesCursor_NextRow_AdvancesWithinSeries(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 10, 20, 30)}} + var c SeriesCursor + c.Init(qr) + r1, err := c.NextRow() + if err != nil { + t.Fatal(err) + } + if r1.timestamp != 10 || r1.sid != 1 { + t.Fatalf("row 1: ts=%d sid=%d", r1.timestamp, r1.sid) + } + r2, _ := c.NextRow() + if r2.timestamp != 20 { + t.Fatalf("row 2: ts=%d", r2.timestamp) + } + r3, _ := c.NextRow() + if r3.timestamp != 30 { + t.Fatalf("row 3: ts=%d", r3.timestamp) + } +} + +func TestSeriesCursor_NextRow_CrossesSeriesBoundary(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(1, 100, 200), + mkResult(2, 300), + }} + var c SeriesCursor + c.Init(qr) + _, _ = c.NextRow() // ts=100, sid=1 + _, _ = c.NextRow() // ts=200, sid=1 + r, err := c.NextRow() + if err != nil { + t.Fatal(err) + } + if r.sid != 2 || r.timestamp != 300 { + t.Fatalf("expected (sid=2, ts=300), got (sid=%d, ts=%d)", r.sid, r.timestamp) + } +} + +func TestSeriesCursor_NextRow_SkipsEmptyMeasureResult(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(1, 100), + mkResult(2), // empty — must be skipped + mkResult(3, 300), + }} + var c SeriesCursor + c.Init(qr) + _, _ = c.NextRow() // sid=1 + r, _ := c.NextRow() + if r.sid != 3 || r.timestamp != 300 { + t.Fatalf("expected to skip empty series 2; got sid=%d ts=%d", r.sid, r.timestamp) + } +} + +func TestSeriesCursor_NextRow_EOF_ReturnsZeroValueAndNilErr(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + var c SeriesCursor + c.Init(qr) + _, _ = c.NextRow() + r, err := c.NextRow() + if err != nil { + t.Fatalf("EOF must return nil err, got %v", err) + } + if r.timestamp != 0 || r.sid != 0 { + t.Fatalf("EOF must return zero seriesRow, got %+v", r) + } +} + +func TestSeriesCursor_StorageError_StickyOnFirstCall(t *testing.T) { + boom := errors.New("storage boom") + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResultErr(boom)}} + var c SeriesCursor + c.Init(qr) + _, err := c.NextRow() + if !errors.Is(err, boom) { + t.Fatalf("first NextRow must surface storage error, got %v", err) + } +} + +// R1-critical — sticky-error contract. The previous draft of this code lost +// the error and silently returned EOF. This test ensures the regression cannot recur. +func TestSeriesCursor_StorageError_StickyOnSubsequentCalls(t *testing.T) { + boom := errors.New("storage boom") + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResultErr(boom)}} + var c SeriesCursor + c.Init(qr) + for i := range 3 { + _, err := c.NextRow() + if !errors.Is(err, boom) { + t.Fatalf("call %d: storage error must remain sticky, got %v", i, err) + } + } +} + +func TestSeriesCursor_RemainingInSeries_DropsAsPosAdvances(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 10, 20, 30)}} + var c SeriesCursor + c.Init(qr) + if c.RemainingInSeries() != 3 { + t.Fatalf("initial RemainingInSeries: want 3, got %d", c.RemainingInSeries()) + } + _, _ = c.NextRow() + if c.RemainingInSeries() != 2 { + t.Fatalf("after 1 NextRow: want 2, got %d", c.RemainingInSeries()) + } + _, _ = c.NextRow() + if c.RemainingInSeries() != 1 { + t.Fatalf("after 2 NextRows: want 1, got %d", c.RemainingInSeries()) + } +} + +func TestSeriesCursor_Current_ReturnsCurrentResultAndPos(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(7, 11, 22, 33)}} + var c SeriesCursor + c.Init(qr) + cur, pos := c.Current() + if cur == nil || cur.SID != 7 || pos != 0 { + t.Fatalf("Current after Init: got cur=%v pos=%d", cur, pos) + } + _, _ = c.NextRow() + cur, pos = c.Current() + if cur == nil || pos != 1 { + t.Fatalf("Current after 1 NextRow: pos=%d", pos) + } +} + +func TestSeriesCursor_Advance_CrossesSeriesWhenPosReachesEnd(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(1, 100, 200, 300), + mkResult(2, 400), + }} + var c SeriesCursor + c.Init(qr) + c.Advance(3) // exhaust series 1 + r, err := c.NextRow() + if err != nil { + t.Fatal(err) + } + if r.sid != 2 || r.timestamp != 400 { + t.Fatalf("after Advance(3) we should be at series 2: got sid=%d ts=%d", r.sid, r.timestamp) + } +} + +func TestSeriesCursor_Close_Idempotent_SafeAfterEOF(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + var c SeriesCursor + c.Init(qr) + _, _ = c.NextRow() + _, _ = c.NextRow() // EOF + defer func() { + if r := recover(); r != nil { + t.Fatalf("Close must not panic: %v", r) + } + }() + c.Close() + c.Close() +} + +func TestSeriesCursor_Close_ReleasesUnderlyingMeasureQueryResult(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + var c SeriesCursor + c.Init(qr) + c.Close() + if qr.releaseCnt != 1 { + t.Fatalf("Release must be called exactly once on first Close: got %d", qr.releaseCnt) + } + c.Close() // idempotent — must not double-release + if qr.releaseCnt != 1 { + t.Fatalf("second Close must not re-release: got %d", qr.releaseCnt) + } +} diff --git a/pkg/query/vectorized/measure/doc.go b/pkg/query/vectorized/measure/doc.go new file mode 100644 index 000000000..7eb6e6575 --- /dev/null +++ b/pkg/query/vectorized/measure/doc.go @@ -0,0 +1,20 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package measure implements measure-specific vectorized operators (scan, +// cursor, extract, limit, group-by, aggregation, top, output serialization). +package measure diff --git a/pkg/query/vectorized/measure/extract.go b/pkg/query/vectorized/measure/extract.go new file mode 100644 index 000000000..5673b407c --- /dev/null +++ b/pkg/query/vectorized/measure/extract.go @@ -0,0 +1,171 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "fmt" + "slices" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// extractTagRow writes one already-decoded *modelv1.TagValue into row rowIdx +// of col. The row must already exist in the column (length >= rowIdx+1); +// extract does not grow the column. +// +// Slice-typed values (BinaryData, IntArray, StrArray) are defensively copied +// because the storage layer may reuse the protobuf objects between Pull() +// calls; storing slices by reference would alias prior batches into later ones. +// +// An unknown TagValue oneof variant returns an error — silent null fallback +// would mask schema-evolution bugs. +func extractTagRow(col vectorized.Column, rowIdx int, tv *modelv1.TagValue) error { + if tv == nil { + return fmt.Errorf("vectorized.measure: nil TagValue at row %d", rowIdx) + } + switch v := tv.Value.(type) { + case *modelv1.TagValue_Null: + col.MarkNullAt(rowIdx) + return nil + case *modelv1.TagValue_Int: + c, ok := col.(*vectorized.TypedColumn[int64]) + if !ok { + return columnTypeMismatch(col, "int64", rowIdx) + } + c.Data()[rowIdx] = v.Int.GetValue() + return nil + case *modelv1.TagValue_Str: + c, ok := col.(*vectorized.TypedColumn[string]) + if !ok { + return columnTypeMismatch(col, "string", rowIdx) + } + c.Data()[rowIdx] = v.Str.GetValue() + return nil + case *modelv1.TagValue_BinaryData: + c, ok := col.(*vectorized.TypedColumn[[]byte]) + if !ok { + return columnTypeMismatch(col, "bytes", rowIdx) + } + buf := make([]byte, len(v.BinaryData)) + copy(buf, v.BinaryData) + c.Data()[rowIdx] = buf + return nil + case *modelv1.TagValue_IntArray: + c, ok := col.(*vectorized.TypedColumn[[]int64]) + if !ok { + return columnTypeMismatch(col, "int64[]", rowIdx) + } + c.Data()[rowIdx] = slices.Clone(v.IntArray.GetValue()) + return nil + case *modelv1.TagValue_StrArray: + c, ok := col.(*vectorized.TypedColumn[[]string]) + if !ok { + return columnTypeMismatch(col, "string[]", rowIdx) + } + c.Data()[rowIdx] = slices.Clone(v.StrArray.GetValue()) + return nil + case *modelv1.TagValue_Timestamp: + c, ok := col.(*vectorized.TypedColumn[int64]) + if !ok { + return columnTypeMismatch(col, "int64", rowIdx) + } + c.Data()[rowIdx] = v.Timestamp.AsTime().UnixNano() + return nil + default: + return fmt.Errorf("vectorized.measure: unsupported TagValue variant %T at row %d", tv.Value, rowIdx) + } +} + +// extractFieldRow is the field-side counterpart of extractTagRow. Same rules +// for defensive copies and unknown variants. +func extractFieldRow(col vectorized.Column, rowIdx int, fv *modelv1.FieldValue) error { + if fv == nil { + return fmt.Errorf("vectorized.measure: nil FieldValue at row %d", rowIdx) + } + switch v := fv.Value.(type) { + case *modelv1.FieldValue_Null: + col.MarkNullAt(rowIdx) + return nil + case *modelv1.FieldValue_Int: + c, ok := col.(*vectorized.TypedColumn[int64]) + if !ok { + return columnTypeMismatch(col, "int64", rowIdx) + } + c.Data()[rowIdx] = v.Int.GetValue() + return nil + case *modelv1.FieldValue_Float: + c, ok := col.(*vectorized.TypedColumn[float64]) + if !ok { + return columnTypeMismatch(col, "float64", rowIdx) + } + c.Data()[rowIdx] = v.Float.GetValue() + return nil + case *modelv1.FieldValue_Str: + c, ok := col.(*vectorized.TypedColumn[string]) + if !ok { + return columnTypeMismatch(col, "string", rowIdx) + } + c.Data()[rowIdx] = v.Str.GetValue() + return nil + case *modelv1.FieldValue_BinaryData: + c, ok := col.(*vectorized.TypedColumn[[]byte]) + if !ok { + return columnTypeMismatch(col, "bytes", rowIdx) + } + buf := make([]byte, len(v.BinaryData)) + copy(buf, v.BinaryData) + c.Data()[rowIdx] = buf + return nil + default: + return fmt.Errorf("vectorized.measure: unsupported FieldValue variant %T at row %d", fv.Value, rowIdx) + } +} + +// extractTagBulk extracts n tag values starting at src[0] into col[offset..offset+n). +// The v1 implementation is a tight per-row loop; future SIMD-friendly fast paths +// can specialize this signature without changing callers. +func extractTagBulk(col vectorized.Column, offset int, src []*modelv1.TagValue, n int) error { + if n > len(src) { + return fmt.Errorf("vectorized.measure: extractTagBulk n=%d > len(src)=%d", n, len(src)) + } + for i := range n { + if extractErr := extractTagRow(col, offset+i, src[i]); extractErr != nil { + return extractErr + } + } + return nil +} + +// extractFieldBulk is the field-side counterpart of extractTagBulk. +func extractFieldBulk(col vectorized.Column, offset int, src []*modelv1.FieldValue, n int) error { + if n > len(src) { + return fmt.Errorf("vectorized.measure: extractFieldBulk n=%d > len(src)=%d", n, len(src)) + } + for i := range n { + if extractErr := extractFieldRow(col, offset+i, src[i]); extractErr != nil { + return extractErr + } + } + return nil +} + +func columnTypeMismatch(col vectorized.Column, want string, row int) error { + return fmt.Errorf("vectorized.measure: column type %s, value type %s, row %d", + col.Type(), want, row) +} diff --git a/pkg/query/vectorized/measure/extract_test.go b/pkg/query/vectorized/measure/extract_test.go new file mode 100644 index 000000000..76b146a59 --- /dev/null +++ b/pkg/query/vectorized/measure/extract_test.go @@ -0,0 +1,353 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "bytes" + "testing" + "time" + + "google.golang.org/protobuf/types/known/timestamppb" + + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// Test helpers — minimal constructors for *modelv1.TagValue / *modelv1.FieldValue. + +func tvNull() *modelv1.TagValue { return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} } + +func tvInt(v int64) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: v}}} +} + +func tvStr(v string) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: v}}} +} + +func tvBytes(v []byte) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: v}} +} + +func tvIntArr(v []int64) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: v}}} +} + +func tvStrArr(v []string) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: v}}} +} + +func tvTimestamp(ts time.Time) *modelv1.TagValue { + return &modelv1.TagValue{Value: &modelv1.TagValue_Timestamp{Timestamp: timestamppb.New(ts)}} +} + +func fvNull() *modelv1.FieldValue { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} +} + +func fvInt(v int64) *modelv1.FieldValue { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: v}}} +} + +func fvFloat(v float64) *modelv1.FieldValue { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: v}}} +} + +func fvStr(v string) *modelv1.FieldValue { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: v}}} +} + +func fvBytes(v []byte) *modelv1.FieldValue { + return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: v}} +} + +// preallocInt64 grows an Int64 column by n zero rows so extractTagRow can +// write into pre-existing slots (mirrors how BatchScan pre-grows columns +// before calling per-row extractors at series boundaries). +func preallocInt64(c *vectorized.TypedColumn[int64], n int) { + for range n { + c.Append(0) + } +} + +// === extractTagRow positives === + +func TestExtractTagRow_Int_WritesValue(t *testing.T) { + col := vectorized.NewInt64Column(4) + preallocInt64(col, 3) + if err := extractTagRow(col, 1, tvInt(42)); err != nil { + t.Fatal(err) + } + if col.Data()[1] != 42 { + t.Fatalf("row 1: got %d, want 42", col.Data()[1]) + } + if col.IsNull(1) { + t.Fatal("row 1 must not be null after writing valid int") + } +} + +func TestExtractTagRow_Str_WritesValue(t *testing.T) { + col := vectorized.NewStringColumn(4) + col.Append("") + col.Append("") + if err := extractTagRow(col, 0, tvStr("hello")); err != nil { + t.Fatal(err) + } + if col.Data()[0] != "hello" { + t.Fatalf("row 0: got %q, want hello", col.Data()[0]) + } +} + +func TestExtractTagRow_Timestamp_WritesUnixNano(t *testing.T) { + ts := time.Date(2026, 5, 3, 12, 34, 56, 789, time.UTC) + col := vectorized.NewInt64Column(2) + preallocInt64(col, 1) + if err := extractTagRow(col, 0, tvTimestamp(ts)); err != nil { + t.Fatal(err) + } + if got := col.Data()[0]; got != ts.UnixNano() { + t.Fatalf("timestamp roundtrip: got %d, want %d", got, ts.UnixNano()) + } +} + +// === extractTagRow defensive-copy parity-killers === + +func TestExtractTagRow_BinaryData_DefensiveCopy(t *testing.T) { + src := []byte("abc") + tv := tvBytes(src) + col := vectorized.NewBytesColumn(2) + col.Append(nil) + if err := extractTagRow(col, 0, tv); err != nil { + t.Fatal(err) + } + src[0] = 'z' // mutate source AFTER extract + if !bytes.Equal(col.Data()[0], []byte("abc")) { + t.Fatalf("aliasing detected: column observed mutation of source, got %q", col.Data()[0]) + } +} + +func TestExtractTagRow_IntArray_DefensiveCopy(t *testing.T) { + src := []int64{1, 2, 3} + tv := tvIntArr(src) + col := vectorized.NewInt64ArrayColumn(2) + col.Append(nil) + if err := extractTagRow(col, 0, tv); err != nil { + t.Fatal(err) + } + src[0] = 99 + if col.Data()[0][0] != 1 { + t.Fatalf("aliasing detected: column observed source mutation, got %v", col.Data()[0]) + } +} + +func TestExtractTagRow_StrArray_DefensiveCopy(t *testing.T) { + src := []string{"a", "b"} + tv := tvStrArr(src) + col := vectorized.NewStrArrayColumn(2) + col.Append(nil) + if err := extractTagRow(col, 0, tv); err != nil { + t.Fatal(err) + } + src[0] = "z" + if col.Data()[0][0] != "a" { + t.Fatalf("aliasing detected: column observed source mutation, got %v", col.Data()[0]) + } +} + +// === extractTagRow Null === + +func TestExtractTagRow_Null_MarksValidityWithoutGrowingLen(t *testing.T) { + col := vectorized.NewInt64Column(4) + preallocInt64(col, 3) + beforeLen := col.Len() + if err := extractTagRow(col, 1, tvNull()); err != nil { + t.Fatal(err) + } + if col.Len() != beforeLen { + t.Fatalf("Len must not change on Null extract: before=%d after=%d", beforeLen, col.Len()) + } + if !col.IsNull(1) { + t.Fatal("row 1 must be null") + } +} + +// === extractTagRow error paths === + +func TestExtractTagRow_UnknownVariant_ReturnsError(t *testing.T) { + tv := &modelv1.TagValue{} // Value is nil — no oneof set + col := vectorized.NewInt64Column(1) + preallocInt64(col, 1) + if err := extractTagRow(col, 0, tv); err == nil { + t.Fatal("unknown TagValue variant must return error (silent null fallback would mask schema-evolution bugs)") + } +} + +func TestExtractTagRow_NilTagValue_ReturnsError(t *testing.T) { + col := vectorized.NewInt64Column(1) + preallocInt64(col, 1) + if err := extractTagRow(col, 0, nil); err == nil { + t.Fatal("nil TagValue must return error") + } +} + +func TestExtractTagRow_ColumnTypeMismatch_ReturnsError(t *testing.T) { + intCol := vectorized.NewInt64Column(1) + preallocInt64(intCol, 1) + if err := extractTagRow(intCol, 0, tvStr("oops")); err == nil { + t.Fatal("string TagValue into int64 column must return error") + } +} + +// === extractFieldRow positives === + +func TestExtractFieldRow_Int_WritesValue(t *testing.T) { + col := vectorized.NewInt64Column(2) + preallocInt64(col, 1) + if err := extractFieldRow(col, 0, fvInt(7)); err != nil { + t.Fatal(err) + } + if col.Data()[0] != 7 { + t.Fatalf("got %d, want 7", col.Data()[0]) + } +} + +func TestExtractFieldRow_Float_WritesValue(t *testing.T) { + col := vectorized.NewFloat64Column(1) + col.Append(0) + if err := extractFieldRow(col, 0, fvFloat(3.14)); err != nil { + t.Fatal(err) + } + if col.Data()[0] != 3.14 { + t.Fatalf("got %v, want 3.14", col.Data()[0]) + } +} + +func TestExtractFieldRow_Str_WritesValue(t *testing.T) { + col := vectorized.NewStringColumn(1) + col.Append("") + if err := extractFieldRow(col, 0, fvStr("hi")); err != nil { + t.Fatal(err) + } + if col.Data()[0] != "hi" { + t.Fatal("string field roundtrip failed") + } +} + +func TestExtractFieldRow_BinaryData_DefensiveCopy(t *testing.T) { + src := []byte("xyz") + fv := fvBytes(src) + col := vectorized.NewBytesColumn(1) + col.Append(nil) + if err := extractFieldRow(col, 0, fv); err != nil { + t.Fatal(err) + } + src[0] = 'q' + if !bytes.Equal(col.Data()[0], []byte("xyz")) { + t.Fatalf("aliasing detected on FieldValue_BinaryData: %q", col.Data()[0]) + } +} + +func TestExtractFieldRow_Null_MarksValidity(t *testing.T) { + col := vectorized.NewInt64Column(1) + preallocInt64(col, 1) + if err := extractFieldRow(col, 0, fvNull()); err != nil { + t.Fatal(err) + } + if !col.IsNull(0) { + t.Fatal("Null FieldValue must mark column null") + } +} + +// === extractFieldRow error paths === + +func TestExtractFieldRow_UnknownVariant_ReturnsError(t *testing.T) { + fv := &modelv1.FieldValue{} + col := vectorized.NewInt64Column(1) + preallocInt64(col, 1) + if err := extractFieldRow(col, 0, fv); err == nil { + t.Fatal("unknown FieldValue variant must return error") + } +} + +func TestExtractFieldRow_NilFieldValue_ReturnsError(t *testing.T) { + col := vectorized.NewInt64Column(1) + preallocInt64(col, 1) + if err := extractFieldRow(col, 0, nil); err == nil { + t.Fatal("nil FieldValue must return error") + } +} + +func TestExtractFieldRow_ColumnTypeMismatch_ReturnsError(t *testing.T) { + intCol := vectorized.NewInt64Column(1) + preallocInt64(intCol, 1) + if err := extractFieldRow(intCol, 0, fvFloat(1.5)); err == nil { + t.Fatal("Float FieldValue into int64 column must return error") + } +} + +// === extract*Bulk === + +func TestExtractTagBulk_HappyPath(t *testing.T) { + col := vectorized.NewStringColumn(4) + for range 4 { + col.Append("") + } + src := []*modelv1.TagValue{tvStr("a"), tvStr("b"), tvStr("c"), tvStr("d")} + if err := extractTagBulk(col, 0, src, 4); err != nil { + t.Fatal(err) + } + for i, want := range []string{"a", "b", "c", "d"} { + if col.Data()[i] != want { + t.Fatalf("row %d: got %q, want %q", i, col.Data()[i], want) + } + } +} + +func TestExtractTagBulk_NTooLarge_ReturnsError(t *testing.T) { + col := vectorized.NewStringColumn(4) + for range 4 { + col.Append("") + } + src := []*modelv1.TagValue{tvStr("a")} + if err := extractTagBulk(col, 0, src, 5); err == nil { + t.Fatal("n > len(src) must return error") + } +} + +func TestExtractFieldBulk_HappyPath(t *testing.T) { + col := vectorized.NewInt64Column(3) + preallocInt64(col, 3) + src := []*modelv1.FieldValue{fvInt(10), fvInt(20), fvInt(30)} + if err := extractFieldBulk(col, 0, src, 3); err != nil { + t.Fatal(err) + } + for i, want := range []int64{10, 20, 30} { + if col.Data()[i] != want { + t.Fatalf("row %d: got %d, want %d", i, col.Data()[i], want) + } + } +} + +func TestExtractFieldBulk_NTooLarge_ReturnsError(t *testing.T) { + col := vectorized.NewInt64Column(2) + preallocInt64(col, 2) + src := []*modelv1.FieldValue{fvInt(1)} + if err := extractFieldBulk(col, 0, src, 2); err == nil { + t.Fatal("n > len(src) must return error") + } +} diff --git a/pkg/query/vectorized/measure/fixtures_test.go b/pkg/query/vectorized/measure/fixtures_test.go new file mode 100644 index 000000000..6b15a2d2c --- /dev/null +++ b/pkg/query/vectorized/measure/fixtures_test.go @@ -0,0 +1,61 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +// fakeMeasureQueryResult yields a hand-built sequence of MeasureResults. +// Mirrors the shape produced by banyand/measure/query.go's queryResult.Pull. +type fakeMeasureQueryResult struct { + seq []*model.MeasureResult + idx int + releaseCnt int +} + +func (f *fakeMeasureQueryResult) Pull() *model.MeasureResult { + if f.idx >= len(f.seq) { + return nil + } + r := f.seq[f.idx] + f.idx++ + return r +} + +func (f *fakeMeasureQueryResult) Release() { + f.releaseCnt++ +} + +// mkResult constructs a minimal MeasureResult with parallel arrays sized to +// match ts. Versions and ShardIDs are zero-valued; tags and fields are empty. +func mkResult(sid common.SeriesID, ts ...int64) *model.MeasureResult { + return &model.MeasureResult{ + SID: sid, + Timestamps: ts, + Versions: make([]int64, len(ts)), + ShardIDs: make([]common.ShardID, len(ts)), + } +} + +// mkResultErr constructs a MeasureResult carrying a storage error. +// MeasureQueryResult.Pull may return such results to signal pull-time failures. +func mkResultErr(err error) *model.MeasureResult { + return &model.MeasureResult{Error: err} +} diff --git a/pkg/query/vectorized/measure/scan.go b/pkg/query/vectorized/measure/scan.go new file mode 100644 index 000000000..6f06b6d1e --- /dev/null +++ b/pkg/query/vectorized/measure/scan.go @@ -0,0 +1,233 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BatchScan is the v1 PullOperator that wraps a MeasureQueryResult and +// produces RecordBatches. It uses a SeriesCursor to manage cross-series +// boundaries and bulk-extracts metadata, tags, and fields per series fill. +type BatchScan struct { + qr model.MeasureQueryResult + schema *vectorized.BatchSchema + pool *vectorized.BatchPool + cursor SeriesCursor + batchSize int + closed bool +} + +// NewBatchScan returns a BatchScan; call Init before NextBatch. +func NewBatchScan(qr model.MeasureQueryResult, schema *vectorized.BatchSchema, + pool *vectorized.BatchPool, batchSize int, +) *BatchScan { + return &BatchScan{qr: qr, schema: schema, pool: pool, batchSize: batchSize} +} + +// Init prepares the cursor and pulls the first non-empty MeasureResult. +func (s *BatchScan) Init(_ context.Context) error { + s.cursor.Init(s.qr) + return nil +} + +// OutputSchema returns the schema declared at construction. +func (s *BatchScan) OutputSchema() *vectorized.BatchSchema { return s.schema } + +// Close releases the underlying cursor exactly once. +func (s *BatchScan) Close() error { + if s.closed { + return nil + } + s.closed = true + s.cursor.Close() + return nil +} + +// NextBatch fills a fresh batch up to batchSize rows. Returns: +// - (batch, nil) for a valid batch with Len > 0; +// - (nil, nil) for clean EOF; +// - (nil, err) for storage error (the partial batch is dropped to GC). +func (s *BatchScan) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { + b := s.pool.Get() + for b.Len < s.batchSize && !s.cursor.Exhausted() && s.cursor.Err() == nil { + remaining := s.cursor.RemainingInSeries() + if remaining == 0 { + break + } + n := remaining + if avail := s.batchSize - b.Len; n > avail { + n = avail + } + if fillErr := s.fillFromCurrent(b, n); fillErr != nil { + // R3-style discard: do not return b to the pool. + return nil, fillErr + } + b.Len += n + s.cursor.Advance(n) + // Advance may trigger an internal advanceSeries that observes a + // storage error on the next MeasureResult. Discard the partial + // batch immediately so the error is never silently buried — the + // caller must learn about the failure on this call, not the next. + if cursorErr := s.cursor.Err(); cursorErr != nil { + return nil, cursorErr + } + } + if b.Len == 0 { + s.pool.Put(b) + if cursorErr := s.cursor.Err(); cursorErr != nil { + return nil, cursorErr + } + return nil, nil + } + return b, nil +} + +// fillFromCurrent copies n rows from the cursor's current MeasureResult into +// b starting at b.Len. Caller must ensure n <= cursor.RemainingInSeries(). +func (s *BatchScan) fillFromCurrent(b *vectorized.RecordBatch, n int) error { + cur, pos := s.cursor.Current() + offset := b.Len + if fillErr := fillMetadata(b, s.schema, cur, pos, n); fillErr != nil { + return fillErr + } + if fillErr := fillTags(b, s.schema, cur, pos, offset, n); fillErr != nil { + return fillErr + } + if fillErr := fillFields(b, s.schema, cur, pos, offset, n); fillErr != nil { + return fillErr + } + return nil +} + +// fillMetadata populates the timestamp, version, series-id, and shard-id +// columns for n rows starting at pos. Uses copy() for parallel int64 arrays; +// SID is a constant per series and is filled with n repetitions. +func fillMetadata(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, + cur *model.MeasureResult, pos, n int, +) error { + offset := b.Len + if i := schema.TimestampIndex(); i >= 0 { + c := b.Columns[i].(*vectorized.TypedColumn[int64]) + appendInt64Zeros(c, n) + copy(c.Data()[offset:offset+n], cur.Timestamps[pos:pos+n]) + } + if i := schema.VersionIndex(); i >= 0 { + c := b.Columns[i].(*vectorized.TypedColumn[int64]) + appendInt64Zeros(c, n) + copy(c.Data()[offset:offset+n], cur.Versions[pos:pos+n]) + } + if i := schema.SeriesIDIndex(); i >= 0 { + c := b.Columns[i].(*vectorized.TypedColumn[int64]) + sid := int64(cur.SID) + for range n { + c.Append(sid) + } + } + if i := schema.ShardIDIndex(); i >= 0 { + c := b.Columns[i].(*vectorized.TypedColumn[int64]) + for k := range n { + var v int64 + if pos+k < len(cur.ShardIDs) { + v = int64(cur.ShardIDs[pos+k]) + } + c.Append(v) + } + } + return nil +} + +// fillTags extracts every tag column for n rows starting at pos into the batch +// at offset. Tag families and tag names are matched against the BatchSchema; +// unmatched tags in cur are skipped. +func fillTags(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, + cur *model.MeasureResult, pos, offset, n int, +) error { + for _, tf := range cur.TagFamilies { + for _, tag := range tf.Tags { + colIdx, ok := schema.TagIndex(tf.Name, tag.Name) + if !ok { + continue + } + col := b.Columns[colIdx] + growColumn(col, n) + if extractErr := extractTagBulk(col, offset, tag.Values[pos:pos+n], n); extractErr != nil { + return extractErr + } + } + } + return nil +} + +// fillFields is the field-side counterpart of fillTags. +func fillFields(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, + cur *model.MeasureResult, pos, offset, n int, +) error { + for _, f := range cur.Fields { + colIdx, ok := schema.FieldIndex(f.Name) + if !ok { + continue + } + col := b.Columns[colIdx] + growColumn(col, n) + if extractErr := extractFieldBulk(col, offset, f.Values[pos:pos+n], n); extractErr != nil { + return extractErr + } + } + return nil +} + +func appendInt64Zeros(c *vectorized.TypedColumn[int64], n int) { + for range n { + c.Append(0) + } +} + +// growColumn appends n zero-valued rows so subsequent extract* calls have +// pre-existing slots to overwrite. +func growColumn(col vectorized.Column, n int) { + switch c := col.(type) { + case *vectorized.TypedColumn[int64]: + for range n { + c.Append(0) + } + case *vectorized.TypedColumn[float64]: + for range n { + c.Append(0) + } + case *vectorized.TypedColumn[string]: + for range n { + c.Append("") + } + case *vectorized.TypedColumn[[]byte]: + for range n { + c.Append(nil) + } + case *vectorized.TypedColumn[[]int64]: + for range n { + c.Append(nil) + } + case *vectorized.TypedColumn[[]string]: + for range n { + c.Append(nil) + } + } +} diff --git a/pkg/query/vectorized/measure/scan_test.go b/pkg/query/vectorized/measure/scan_test.go new file mode 100644 index 000000000..76f67f1fc --- /dev/null +++ b/pkg/query/vectorized/measure/scan_test.go @@ -0,0 +1,255 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "testing" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +func minimalSchema() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + }) +} + +func newScan(qr model.MeasureQueryResult, schema *vectorized.BatchSchema, batchSize int) *BatchScan { + pool := vectorized.NewBatchPool(schema, batchSize) + return NewBatchScan(qr, schema, pool, batchSize) +} + +func TestBatchScan_SingleSeries_FillsBatchViaBulkPath(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100, 200, 300)}} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + if err := scan.Init(context.Background()); err != nil { + t.Fatal(err) + } + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if err != nil || b == nil { + t.Fatalf("err=%v b=%v", err, b) + } + if b.Len != 3 { + t.Fatalf("Len: want 3, got %d", b.Len) + } + ts := b.Columns[0].(*vectorized.TypedColumn[int64]).Data() + for i, want := range []int64{100, 200, 300} { + if ts[i] != want { + t.Fatalf("ts[%d]: got %d, want %d", i, ts[i], want) + } + } +} + +func TestBatchScan_MultiSeries_CrossingBoundaryUsesPerRowPath(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(1, 1, 2), + mkResult(2, 3, 4, 5), + }} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + if b.Len != 4 { + t.Fatalf("first batch Len: want 4, got %d", b.Len) + } + sids := b.Columns[2].(*vectorized.TypedColumn[int64]).Data() + want := []int64{1, 1, 2, 2} + for i := range want { + if sids[i] != want[i] { + t.Fatalf("sid[%d]: got %d, want %d", i, sids[i], want[i]) + } + } + b2, _ := scan.NextBatch(context.Background()) + if b2.Len != 1 || b2.Columns[2].(*vectorized.TypedColumn[int64]).Data()[0] != 2 { + t.Fatalf("second batch: Len=%d sid[0]=%d", b2.Len, b2.Columns[2].(*vectorized.TypedColumn[int64]).Data()[0]) + } +} + +func TestBatchScan_BatchSizeExceedsTotalRows_OneBatchThenEOF(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 10, 20)}} + schema := minimalSchema() + scan := newScan(qr, schema, 16) + _ = scan.Init(context.Background()) + defer scan.Close() + b, _ := scan.NextBatch(context.Background()) + if b.Len != 2 { + t.Fatalf("Len: want 2, got %d", b.Len) + } + eof, err := scan.NextBatch(context.Background()) + if err != nil || eof != nil { + t.Fatalf("expected EOF, err=%v eof=%v", err, eof) + } +} + +func TestBatchScan_BatchSizeBelowSeriesSize_MultipleFullBatches(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 1, 2, 3, 4, 5)}} + schema := minimalSchema() + scan := newScan(qr, schema, 2) + _ = scan.Init(context.Background()) + defer scan.Close() + b1, _ := scan.NextBatch(context.Background()) + if b1.Len != 2 { + t.Fatalf("first batch Len: want 2, got %d", b1.Len) + } + b2, _ := scan.NextBatch(context.Background()) + if b2.Len != 2 { + t.Fatalf("second batch Len: want 2, got %d", b2.Len) + } + b3, _ := scan.NextBatch(context.Background()) + if b3.Len != 1 { + t.Fatalf("third (tail) batch Len: want 1, got %d", b3.Len) + } + eof, _ := scan.NextBatch(context.Background()) + if eof != nil { + t.Fatal("expected EOF after tail") + } +} + +func TestBatchScan_EmptyResult_EOFImmediately(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: nil} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if err != nil || b != nil { + t.Fatalf("expected (nil, nil), got (%v, %v)", b, err) + } +} + +// Pins the regression flagged by Copilot: when a valid series precedes an +// errored MeasureResult, the partial batch must be discarded on the same +// NextBatch call — not returned as if successful with the error buried. +func TestBatchScan_CursorErrorAfterValidSeries_DiscardsPartialBatch(t *testing.T) { + boom := errors.New("storage boom mid-stream") + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(1, 100, 200), + mkResultErr(boom), + }} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want storage error, got err=%v", err) + } + if b != nil { + t.Fatalf("partial batch must be discarded on cursor error; got b.Len=%d", b.Len) + } +} + +func TestBatchScan_CursorError_PropagatesAsError_NoBatchEmitted(t *testing.T) { + boom := errors.New("storage boom") + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResultErr(boom)}} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want storage error, got %v", err) + } + if b != nil { + t.Fatal("error path must return nil batch") + } +} + +func TestBatchScan_TagAndFieldColumns_PopulatedFromAlreadyDecodedValues(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "service", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) + mr := &model.MeasureResult{ + SID: common.SeriesID(1), + Timestamps: []int64{100, 200}, + Versions: []int64{0, 0}, + ShardIDs: []common.ShardID{0, 0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "service", Values: []*modelv1.TagValue{tvStr("a"), tvStr("b")}}, + }}, + }, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{fvInt(10), fvInt(20)}}, + }, + } + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mr}} + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, err := scan.NextBatch(context.Background()) + if err != nil { + t.Fatal(err) + } + tagIdx, _ := schema.TagIndex("default", "service") + tags := b.Columns[tagIdx].(*vectorized.TypedColumn[string]).Data() + if tags[0] != "a" || tags[1] != "b" { + t.Fatalf("tags: got %v", tags) + } + fieldIdx, _ := schema.FieldIndex("value") + fields := b.Columns[fieldIdx].(*vectorized.TypedColumn[int64]).Data() + if fields[0] != 10 || fields[1] != 20 { + t.Fatalf("fields: got %v", fields) + } +} + +func TestBatchScan_SeriesIDColumn_AlwaysPopulated(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{ + mkResult(common.SeriesID(42), 100, 200), + }} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + defer scan.Close() + b, _ := scan.NextBatch(context.Background()) + sids := b.Columns[2].(*vectorized.TypedColumn[int64]).Data() + for i, sid := range sids { + if sid != 42 { + t.Fatalf("sid[%d]: got %d, want 42", i, sid) + } + } +} + +func TestBatchScan_Close_ReleasesCursor(t *testing.T) { + qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} + schema := minimalSchema() + scan := newScan(qr, schema, 4) + _ = scan.Init(context.Background()) + if err := scan.Close(); err != nil { + t.Fatal(err) + } + if qr.releaseCnt != 1 { + t.Fatalf("Close must release the underlying MeasureQueryResult: got releaseCnt=%d", qr.releaseCnt) + } +}
