This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b39ed477b99ee1a9d75638d0fb7550b53b075138 Author: Hongtao Gao <[email protected]> AuthorDate: Sun May 3 02:05:24 2026 +0000 feat(query/vectorized): add vectorized pipeline foundation (gate G1) First gate of a multi-gate implementation of the columnar query pipeline for local Measure queries. This commit lands the foundation primitives under pkg/query/vectorized/, with no edits to existing code (zero-regression guaranteed structurally — nothing imports this yet). Modules: - schema.go BatchSchema with O(1) lookups; tagKey struct (no string concatenation, no separator collision) - column.go Column interface, ColumnType enum, validityBitmap - typed_column.go TypedColumn[T] for int64/float64/string/[]byte/[]int64/[]string - batch.go RecordBatch + Reset + ActiveLen (handles nil vs [] selection footgun explicitly) - pool.go BatchPool with Reset-on-Get and foreign-schema rejection - operator.go PullOperator/FusibleOperator/BreakerOperator interfaces + ErrLimitExhausted sentinel - memory.go MemoryTracker (CAS-based, panics on negative bytes) - fused.go fusedStage with discard-on-error and idempotent Close - pipeline.go Pipeline + PipelineBuilder + breakerStage with sticky error and idempotent Close Risks closed at this gate (per the design spec): R3 fused-stage error discard, R5 memory tracker semantics, R6 uint16 selection constraint, R8 Reset semantics, R9 nil vs empty batch. 52 tests, all green under -race in ~1s. go vet clean. --- pkg/query/vectorized/batch.go | 66 +++++++++ pkg/query/vectorized/batch_test.go | 101 +++++++++++++ pkg/query/vectorized/column.go | 94 ++++++++++++ pkg/query/vectorized/column_test.go | 81 +++++++++++ pkg/query/vectorized/doc.go | 20 +++ pkg/query/vectorized/fused.go | 109 ++++++++++++++ pkg/query/vectorized/fused_test.go | 225 +++++++++++++++++++++++++++++ pkg/query/vectorized/memory.go | 79 ++++++++++ pkg/query/vectorized/memory_test.go | 119 +++++++++++++++ pkg/query/vectorized/operator.go | 70 +++++++++ pkg/query/vectorized/operator_test.go | 34 +++++ pkg/query/vectorized/pipeline.go | 154 ++++++++++++++++++++ pkg/query/vectorized/pipeline_test.go | 231 ++++++++++++++++++++++++++++++ pkg/query/vectorized/pool.go | 54 +++++++ pkg/query/vectorized/pool_test.go | 69 +++++++++ pkg/query/vectorized/schema.go | 114 +++++++++++++++ pkg/query/vectorized/schema_test.go | 127 ++++++++++++++++ pkg/query/vectorized/typed_column.go | 110 ++++++++++++++ pkg/query/vectorized/typed_column_test.go | 142 ++++++++++++++++++ 19 files changed, 1999 insertions(+) diff --git a/pkg/query/vectorized/batch.go b/pkg/query/vectorized/batch.go new file mode 100644 index 000000000..4e345ab30 --- /dev/null +++ b/pkg/query/vectorized/batch.go @@ -0,0 +1,66 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +// DefaultBatchSize is the canonical batch row capacity. +// uint16 selection vectors cap batch size at 65536. +const DefaultBatchSize = 1024 + +// RecordBatch is the unit of work flowing through the pipeline. +// All columns share Len logical rows. Selection optionally narrows that to a subset. +// +// Selection contract: +// - nil → all rows in [0, Len) are active +// - [] → zero rows active (post-fusible-filter empty case) +// - non-empty → only the listed row indices are active +// +// Operators must use ActiveLen() to compute the effective row count. +type RecordBatch struct { + Schema *BatchSchema + Columns []Column + Selection []uint16 + Len int +} + +// NewRecordBatch allocates a batch with column capacities sized to capacity rows. +func NewRecordBatch(schema *BatchSchema, capacity int) *RecordBatch { + cols := make([]Column, len(schema.Columns)) + for i, def := range schema.Columns { + cols[i] = NewColumnForType(def.Type, capacity) + } + return &RecordBatch{Schema: schema, Columns: cols} +} + +// Reset clears every column and the selection vector. Schema is preserved. +func (b *RecordBatch) Reset() { + b.Len = 0 + b.Selection = nil + for _, c := range b.Columns { + c.Reset() + } +} + +// ActiveLen returns the number of rows operators should process. +// +// nil Selection → Len. Non-nil Selection → len(Selection), even when zero. +func (b *RecordBatch) ActiveLen() int { + if b.Selection == nil { + return b.Len + } + return len(b.Selection) +} diff --git a/pkg/query/vectorized/batch_test.go b/pkg/query/vectorized/batch_test.go new file mode 100644 index 000000000..fcf6c8e23 --- /dev/null +++ b/pkg/query/vectorized/batch_test.go @@ -0,0 +1,101 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import "testing" + +func TestRecordBatch_New_AllocatesColumnPerSchemaEntry(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTimestamp, Type: ColumnTypeInt64}, + {Role: RoleField, Name: "v", Type: ColumnTypeFloat64}, + {Role: RoleTag, TagFamily: "default", Name: "service", Type: ColumnTypeString}, + }) + b := NewRecordBatch(s, 8) + if len(b.Columns) != 3 { + t.Fatalf("Columns: want 3, got %d", len(b.Columns)) + } + if b.Columns[0].Type() != ColumnTypeInt64 { + t.Fatalf("col 0 type: want int64, got %v", b.Columns[0].Type()) + } + if b.Columns[1].Type() != ColumnTypeFloat64 { + t.Fatalf("col 1 type: want float64, got %v", b.Columns[1].Type()) + } + if b.Columns[2].Type() != ColumnTypeString { + t.Fatalf("col 2 type: want string, got %v", b.Columns[2].Type()) + } +} + +func TestRecordBatch_Reset_ClearsLenSelectionAndColumns(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + b := NewRecordBatch(s, 8) + col := b.Columns[0].(*TypedColumn[int64]) + col.Append(100) + col.Append(200) + b.Len = 2 + b.Selection = []uint16{0, 1} + b.Reset() + if b.Len != 0 { + t.Fatalf("Len: want 0, got %d", b.Len) + } + if b.Selection != nil { + t.Fatalf("Selection: want nil, got %v", b.Selection) + } + if b.Columns[0].Len() != 0 { + t.Fatalf("column not reset: Len=%d", b.Columns[0].Len()) + } +} + +func TestRecordBatch_Reset_PreservesSchemaPointer(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + b := NewRecordBatch(s, 8) + before := b.Schema + b.Reset() + if b.Schema != before { + t.Fatal("Reset must preserve Schema pointer") + } +} + +func TestRecordBatch_ActiveLen_NilSelection_ReturnsLen(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + b := NewRecordBatch(s, 8) + b.Len = 5 + b.Selection = nil + if got := b.ActiveLen(); got != 5 { + t.Fatalf("ActiveLen with nil Selection: want 5, got %d", got) + } +} + +func TestRecordBatch_ActiveLen_NonNilSelection_ReturnsSelectionLen(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + b := NewRecordBatch(s, 8) + b.Len = 5 + b.Selection = []uint16{0, 2, 4} + if got := b.ActiveLen(); got != 3 { + t.Fatalf("ActiveLen with selection: want 3, got %d", got) + } +} + +func TestRecordBatch_ActiveLen_EmptyNonNilSelection_ReturnsZero(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + b := NewRecordBatch(s, 8) + b.Len = 5 + b.Selection = []uint16{} + if got := b.ActiveLen(); got != 0 { + t.Fatalf("ActiveLen with empty non-nil Selection (post-fusible-filter footgun): want 0, got %d", got) + } +} diff --git a/pkg/query/vectorized/column.go b/pkg/query/vectorized/column.go new file mode 100644 index 000000000..117163979 --- /dev/null +++ b/pkg/query/vectorized/column.go @@ -0,0 +1,94 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +// ColumnType is the runtime type tag for a Column. +type ColumnType int + +// ColumnType variants. Each value corresponds to a TypedColumn[T] specialization. +const ( + ColumnTypeInt64 ColumnType = iota + ColumnTypeFloat64 + ColumnTypeString + ColumnTypeBytes + ColumnTypeInt64Array + ColumnTypeStrArray +) + +// String returns a human label used in diagnostics and error messages. +func (c ColumnType) String() string { + switch c { + case ColumnTypeInt64: + return "int64" + case ColumnTypeFloat64: + return "float64" + case ColumnTypeString: + return "string" + case ColumnTypeBytes: + return "bytes" + case ColumnTypeInt64Array: + return "int64[]" + case ColumnTypeStrArray: + return "string[]" + } + return "unknown" +} + +// Column is the storage-agnostic view of one column in a RecordBatch. +type Column interface { + Type() ColumnType + Len() int + IsNull(i int) bool + Reset() + AppendNull() + MarkNullAt(i int) +} + +// validityBitmap tracks per-row null state. nil bits = all valid. +// Allocation is lazy on first MarkNull. +type validityBitmap struct { + bits []uint64 +} + +// IsNull reports whether bit i is null. +func (v *validityBitmap) IsNull(i int) bool { + if i < 0 { + return false + } + word := i / 64 + if word >= len(v.bits) { + return false + } + return (v.bits[word]>>uint(i%64))&1 == 1 +} + +// MarkNull sets bit i to null, growing the bitmap as needed. +func (v *validityBitmap) MarkNull(i int) { + word := i / 64 + for word >= len(v.bits) { + v.bits = append(v.bits, 0) + } + v.bits[word] |= 1 << uint(i%64) +} + +// Reset clears every null mark but keeps the underlying slice for reuse. +func (v *validityBitmap) Reset() { + for i := range v.bits { + v.bits[i] = 0 + } +} diff --git a/pkg/query/vectorized/column_test.go b/pkg/query/vectorized/column_test.go new file mode 100644 index 000000000..6f400aa0b --- /dev/null +++ b/pkg/query/vectorized/column_test.go @@ -0,0 +1,81 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import "testing" + +func TestValidityBitmap_Default_AllValid(t *testing.T) { + var v validityBitmap + for i := range 64 { + if v.IsNull(i) { + t.Fatalf("default bitmap: bit %d unexpectedly null", i) + } + } +} + +func TestValidityBitmap_MarkNull_SetsExpectedBit(t *testing.T) { + var v validityBitmap + v.MarkNull(3) + if !v.IsNull(3) { + t.Fatal("bit 3 should be null after MarkNull(3)") + } + if v.IsNull(2) || v.IsNull(4) { + t.Fatalf("only bit 3 should be null; got 2=%v 4=%v", v.IsNull(2), v.IsNull(4)) + } +} + +func TestValidityBitmap_GrowthBeyondInitialWord(t *testing.T) { + var v validityBitmap + v.MarkNull(150) + if !v.IsNull(150) { + t.Fatal("bit 150 should be null after growth") + } + if v.IsNull(149) || v.IsNull(151) { + t.Fatal("only bit 150 should be null after growth") + } +} + +func TestValidityBitmap_Reset_ClearsAllBits_RetainsSlice(t *testing.T) { + var v validityBitmap + v.MarkNull(10) + v.MarkNull(200) + beforeCap := cap(v.bits) + v.Reset() + if v.IsNull(10) || v.IsNull(200) { + t.Fatal("Reset should clear all nulls") + } + if cap(v.bits) != beforeCap { + t.Fatalf("Reset must retain slice capacity: before=%d after=%d", beforeCap, cap(v.bits)) + } +} + +func TestColumnType_String_NotEmpty(t *testing.T) { + types := []ColumnType{ + ColumnTypeInt64, + ColumnTypeFloat64, + ColumnTypeString, + ColumnTypeBytes, + ColumnTypeInt64Array, + ColumnTypeStrArray, + } + for _, ct := range types { + if ct.String() == "" { + t.Fatalf("ColumnType(%d).String() must not be empty", int(ct)) + } + } +} diff --git a/pkg/query/vectorized/doc.go b/pkg/query/vectorized/doc.go new file mode 100644 index 000000000..f2bb49d15 --- /dev/null +++ b/pkg/query/vectorized/doc.go @@ -0,0 +1,20 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Package vectorized provides RecordBatch-based columnar execution primitives: +// schema, columns, operator interfaces, pipeline composition, and memory tracking. +package vectorized diff --git a/pkg/query/vectorized/fused.go b/pkg/query/vectorized/fused.go new file mode 100644 index 000000000..c1b2617e0 --- /dev/null +++ b/pkg/query/vectorized/fused.go @@ -0,0 +1,109 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "context" + "errors" +) + +// fusedStage combines a PullOperator source with a list of FusibleOperators +// applied in order to each pulled batch. On any operator error the batch is +// discarded (R3: not returned to the caller's pool) to prevent corrupted +// batches from leaking into subsequent queries. +// +// When a fusible returns ErrLimitExhausted, the current batch is emitted and +// subsequent NextBatch calls return (nil, nil). +type fusedStage struct { + source PullOperator + fused []FusibleOperator + limitDone bool + closed bool +} + +func newFusedStage(source PullOperator, fused []FusibleOperator) *fusedStage { + return &fusedStage{source: source, fused: fused} +} + +// Init wires the source and every fused operator. +func (s *fusedStage) Init(ctx context.Context) error { + if initErr := s.source.Init(ctx); initErr != nil { + return initErr + } + for _, op := range s.fused { + if initErr := op.Init(ctx); initErr != nil { + return initErr + } + } + return nil +} + +// OutputSchema is the source's schema; fusible operators must not change column layout. +func (s *fusedStage) OutputSchema() *BatchSchema { return s.source.OutputSchema() } + +// NextBatch pulls one batch and applies every fused operator in order. +// +// Error handling: if any fusible returns an error other than ErrLimitExhausted, +// the batch is dropped and (nil, err) is returned. The batch is NOT returned +// to the pool — see R3 in the design spec. +// +// Limit handling: if a fusible returns ErrLimitExhausted, the current batch +// is emitted (with whatever selection vector it has) and the next call +// returns (nil, nil) — EOF. +func (s *fusedStage) NextBatch(ctx context.Context) (*RecordBatch, error) { + if s.limitDone { + return nil, nil + } + b, pullErr := s.source.NextBatch(ctx) + if pullErr != nil || b == nil { + return b, pullErr + } + for _, op := range s.fused { + processErr := op.Process(ctx, b) + if processErr == nil { + continue + } + if errors.Is(processErr, ErrLimitExhausted) { + s.limitDone = true + return b, nil + } + // R3: drop b to GC. Do NOT pool.Put(b). + return nil, processErr + } + return b, nil +} + +// Close closes the source and every underlying operator. Idempotent — repeat +// calls return without re-invoking children's Close so the BatchOperator +// contract holds for the stage itself, not just its leaves. +func (s *fusedStage) Close() error { + if s.closed { + return nil + } + s.closed = true + var firstErr error + if closeErr := s.source.Close(); closeErr != nil { + firstErr = closeErr + } + for _, op := range s.fused { + if closeErr := op.Close(); closeErr != nil && firstErr == nil { + firstErr = closeErr + } + } + return firstErr +} diff --git a/pkg/query/vectorized/fused_test.go b/pkg/query/vectorized/fused_test.go new file mode 100644 index 000000000..d077057d7 --- /dev/null +++ b/pkg/query/vectorized/fused_test.go @@ -0,0 +1,225 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "context" + "errors" + "fmt" + "testing" +) + +// fakePull is a test PullOperator that yields a hand-built sequence of batches. +type fakePull struct { + pullErr error + schema *BatchSchema + batches []*RecordBatch + idx int + closeCnt int + initCnt int +} + +func (f *fakePull) Init(_ context.Context) error { f.initCnt++; return nil } +func (f *fakePull) OutputSchema() *BatchSchema { return f.schema } +func (f *fakePull) Close() error { f.closeCnt++; return nil } +func (f *fakePull) NextBatch(_ context.Context) (*RecordBatch, error) { + if f.pullErr != nil { + return nil, f.pullErr + } + if f.idx >= len(f.batches) { + return nil, nil + } + b := f.batches[f.idx] + f.idx++ + return b, nil +} + +// fakeFusible is a test FusibleOperator. If processFn is set, it overrides the default +// "increment first column by 1". processErr makes Process fail before any mutation. +type fakeFusible struct { + processErr error + processFn func(b *RecordBatch) error + schema *BatchSchema + processed int + closeCnt int +} + +func (f *fakeFusible) Init(_ context.Context) error { return nil } +func (f *fakeFusible) OutputSchema() *BatchSchema { return f.schema } +func (f *fakeFusible) Close() error { f.closeCnt++; return nil } +func (f *fakeFusible) Process(_ context.Context, b *RecordBatch) error { + if f.processErr != nil { + return f.processErr + } + f.processed++ + if f.processFn != nil { + return f.processFn(b) + } + col := b.Columns[0].(*TypedColumn[int64]) + for i := range col.Data() { + col.Data()[i]++ + } + return nil +} + +func mkInt64Batch(s *BatchSchema, vals ...int64) *RecordBatch { + b := NewRecordBatch(s, len(vals)) + col := b.Columns[0].(*TypedColumn[int64]) + for _, v := range vals { + col.Append(v) + } + b.Len = len(vals) + return b +} + +func TestFusedStage_HappyPath_AppliesEveryOperatorInOrder(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1, 2, 3)}} + f1 := &fakeFusible{schema: s} // +1 + f2 := &fakeFusible{schema: s} // +1 + stage := newFusedStage(src, []FusibleOperator{f1, f2}) + if err := stage.Init(context.Background()); err != nil { + t.Fatal(err) + } + out, err := stage.NextBatch(context.Background()) + if err != nil || out == nil { + t.Fatalf("happy path: err=%v out=%v", err, out) + } + got := out.Columns[0].(*TypedColumn[int64]).Data() + want := []int64{3, 4, 5} + for i := range want { + if got[i] != want[i] { + t.Fatalf("row %d: want %d, got %d", i, want[i], got[i]) + } + } + if f1.processed != 1 || f2.processed != 1 { + t.Fatalf("each fusible processed once: f1=%d f2=%d", f1.processed, f2.processed) + } +} + +func TestFusedStage_FirstFusibleErrors_ReturnsNilBatchAndError(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}} + boom := errors.New("first fusible boom") + f1 := &fakeFusible{schema: s, processErr: boom} + f2 := &fakeFusible{schema: s} + stage := newFusedStage(src, []FusibleOperator{f1, f2}) + _ = stage.Init(context.Background()) + out, err := stage.NextBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want first-fusible error, got %v", err) + } + if out != nil { + t.Fatal("error path must return nil batch (R3: discard, do not pool)") + } + if f2.processed != 0 { + t.Fatalf("second fusible must not run after first errors: processed=%d", f2.processed) + } +} + +func TestFusedStage_SecondFusibleErrors_StillReturnsNilBatch(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}} + boom := errors.New("second fusible boom") + f1 := &fakeFusible{schema: s} + f2 := &fakeFusible{schema: s, processErr: boom} + stage := newFusedStage(src, []FusibleOperator{f1, f2}) + _ = stage.Init(context.Background()) + out, err := stage.NextBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want second-fusible error, got %v", err) + } + if out != nil { + t.Fatal("error path must return nil batch even after first fusible mutated") + } +} + +func TestFusedStage_LimitExhausted_EmitsCurrentBatchThenEOF(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{ + mkInt64Batch(s, 1, 2, 3), + mkInt64Batch(s, 4, 5, 6), + }} + limiter := &fakeFusible{ + schema: s, + processFn: func(_ *RecordBatch) error { + return fmt.Errorf("limit done: %w", ErrLimitExhausted) + }, + } + stage := newFusedStage(src, []FusibleOperator{limiter}) + _ = stage.Init(context.Background()) + first, err := stage.NextBatch(context.Background()) + if err != nil { + t.Fatalf("first call after limit-exhausted: want (b, nil), got err=%v", err) + } + if first == nil { + t.Fatal("first call must emit the current batch with truncated selection") + } + second, err := stage.NextBatch(context.Background()) + if err != nil || second != nil { + t.Fatalf("second call after limit-exhausted: want (nil, nil), got (%v, %v)", second, err) + } +} + +func TestFusedStage_SourceEOF_NoFusibleInvoked(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: nil} // EOF immediately + f := &fakeFusible{schema: s} + stage := newFusedStage(src, []FusibleOperator{f}) + _ = stage.Init(context.Background()) + out, err := stage.NextBatch(context.Background()) + if err != nil || out != nil { + t.Fatalf("source EOF: want (nil, nil), got (%v, %v)", out, err) + } + if f.processed != 0 { + t.Fatalf("fusible must not run on EOF: processed=%d", f.processed) + } +} + +func TestFusedStage_SourceError_PropagatesUnchanged(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + boom := errors.New("source boom") + src := &fakePull{schema: s, pullErr: boom} + f := &fakeFusible{schema: s} + stage := newFusedStage(src, []FusibleOperator{f}) + _ = stage.Init(context.Background()) + out, err := stage.NextBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want source error, got %v", err) + } + if out != nil { + t.Fatal("source error must return nil batch") + } + if f.processed != 0 { + t.Fatal("fusible must not run on source error") + } +} + +func TestFusedStage_Close_Idempotent_CallsChildrenOnce(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s} + f1 := &fakeFusible{schema: s} + f2 := &fakeFusible{schema: s} + stage := newFusedStage(src, []FusibleOperator{f1, f2}) + _ = stage.Close() + _ = stage.Close() + if src.closeCnt != 1 || f1.closeCnt != 1 || f2.closeCnt != 1 { + t.Fatalf("Close must be idempotent (children invoked once): src=%d f1=%d f2=%d", + src.closeCnt, f1.closeCnt, f2.closeCnt) + } +} diff --git a/pkg/query/vectorized/memory.go b/pkg/query/vectorized/memory.go new file mode 100644 index 000000000..95543cd9f --- /dev/null +++ b/pkg/query/vectorized/memory.go @@ -0,0 +1,79 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "fmt" + "sync/atomic" +) + +// MemoryTracker is a lock-free per-query memory budget. +// +// Reserve grows used iff used+bytes <= limit; otherwise it returns an error and +// leaves used unchanged. Release shrinks used. Used reads the current value. +type MemoryTracker struct { + used atomic.Int64 + limit int64 +} + +// NewMemoryTracker returns a tracker with the given byte limit. +func NewMemoryTracker(limit int64) *MemoryTracker { + return &MemoryTracker{limit: limit} +} + +// Reserve attempts to allocate bytes from the budget. +// On success used grows by bytes; on failure used is unchanged. +// +// A negative bytes value indicates a programmer error (Reserve must not be +// used to decrement) and panics. Zero bytes is a valid no-op. +func (m *MemoryTracker) Reserve(bytes int64) error { + if bytes < 0 { + panic("vectorized: MemoryTracker.Reserve called with negative bytes") + } + if bytes == 0 { + return nil + } + for { + current := m.used.Load() + next := current + bytes + if next > m.limit { + return fmt.Errorf("memory budget exceeded: used %d + requested %d > limit %d", + current, bytes, m.limit) + } + if m.used.CompareAndSwap(current, next) { + return nil + } + } +} + +// Release returns bytes to the budget. +// A negative bytes value indicates a programmer error and panics. +func (m *MemoryTracker) Release(bytes int64) { + if bytes < 0 { + panic("vectorized: MemoryTracker.Release called with negative bytes") + } + if bytes == 0 { + return + } + m.used.Add(-bytes) +} + +// Used returns the current outstanding reservation. +func (m *MemoryTracker) Used() int64 { + return m.used.Load() +} diff --git a/pkg/query/vectorized/memory_test.go b/pkg/query/vectorized/memory_test.go new file mode 100644 index 000000000..171720937 --- /dev/null +++ b/pkg/query/vectorized/memory_test.go @@ -0,0 +1,119 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "sync" + "testing" +) + +func TestMemoryTracker_Reserve_SucceedsWithinLimit(t *testing.T) { + m := NewMemoryTracker(1024) + if err := m.Reserve(512); err != nil { + t.Fatalf("Reserve(512): %v", err) + } + if m.Used() != 512 { + t.Fatalf("Used after Reserve(512): want 512, got %d", m.Used()) + } + if err := m.Reserve(256); err != nil { + t.Fatalf("Reserve(256): %v", err) + } + if m.Used() != 768 { + t.Fatalf("Used after second reserve: want 768, got %d", m.Used()) + } +} + +func TestMemoryTracker_Reserve_FailsAtLimit_UsedUnchanged(t *testing.T) { + m := NewMemoryTracker(100) + if err := m.Reserve(50); err != nil { + t.Fatalf("first reserve: %v", err) + } + if err := m.Reserve(60); err == nil { + t.Fatal("Reserve(60) on top of 50/100 must fail") + } + if m.Used() != 50 { + t.Fatalf("failed Reserve must not advance used: got %d", m.Used()) + } +} + +func TestMemoryTracker_Release_DecreasesUsed(t *testing.T) { + m := NewMemoryTracker(1024) + _ = m.Reserve(512) + m.Release(200) + if m.Used() != 312 { + t.Fatalf("after Release(200): want 312, got %d", m.Used()) + } +} + +func TestMemoryTracker_Used_ReturnsCurrent(t *testing.T) { + m := NewMemoryTracker(1024) + if m.Used() != 0 { + t.Fatalf("initial Used: want 0, got %d", m.Used()) + } + _ = m.Reserve(7) + if m.Used() != 7 { + t.Fatalf("after Reserve(7): want 7, got %d", m.Used()) + } +} + +func TestMemoryTracker_Reserve_NegativeBytes_Panics(t *testing.T) { + m := NewMemoryTracker(1024) + defer func() { + if r := recover(); r == nil { + t.Fatal("Reserve(-1) must panic — programmer error, not a recoverable runtime condition") + } + }() + _ = m.Reserve(-1) +} + +func TestMemoryTracker_Release_NegativeBytes_Panics(t *testing.T) { + m := NewMemoryTracker(1024) + if err := m.Reserve(100); err != nil { + t.Fatal(err) + } + defer func() { + if r := recover(); r == nil { + t.Fatal("Release(-1) must panic — programmer error") + } + }() + m.Release(-1) +} + +func TestMemoryTracker_Concurrent_TotalExact(t *testing.T) { + m := NewMemoryTracker(100000) + const goroutines = 100 + const perGoroutine = 100 + var wg sync.WaitGroup + wg.Add(goroutines) + for range goroutines { + go func() { + defer wg.Done() + for range perGoroutine { + if err := m.Reserve(1); err != nil { + t.Errorf("unexpected reserve failure: %v", err) + return + } + } + }() + } + wg.Wait() + want := int64(goroutines * perGoroutine) + if m.Used() != want { + t.Fatalf("concurrent Used: want %d, got %d", want, m.Used()) + } +} diff --git a/pkg/query/vectorized/operator.go b/pkg/query/vectorized/operator.go new file mode 100644 index 000000000..2e6690f79 --- /dev/null +++ b/pkg/query/vectorized/operator.go @@ -0,0 +1,70 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "context" + "errors" +) + +// ErrLimitExhausted is returned by FusibleOperators that finished their work and +// want the fused stage to translate the signal into "emit current batch + EOF on next call". +var ErrLimitExhausted = errors.New("vectorized: limit exhausted") + +// BatchOperator is the lifecycle base. All operators implement it. +// +// Close is idempotent and safe to call at any phase — after Init, mid-Consume, +// after Finalize, or after any error. It must release every MemoryTracker.Reserve +// the operator made during its lifetime. +type BatchOperator interface { + Init(ctx context.Context) error + OutputSchema() *BatchSchema + Close() error +} + +// PullOperator produces batches. It is the source of a pipeline. +// +// NextBatch contract: +// - (non-nil, nil) → valid batch with Len > 0 +// - (nil, nil) → EOF; caller must not call NextBatch again +// - (nil, non-nil) → error; pipeline stops +// +// NextBatch may block — channel-backed implementations are explicitly supported +// for future distributed remote-scan operators. +type PullOperator interface { + BatchOperator + NextBatch(ctx context.Context) (*RecordBatch, error) +} + +// FusibleOperator transforms a batch in place. No state across batches. +// +// Process must not retain references to the batch beyond the call. Returning +// ErrLimitExhausted signals "emit this batch then EOF on the next pull". +type FusibleOperator interface { + BatchOperator + Process(ctx context.Context, b *RecordBatch) error +} + +// BreakerOperator buffers all input via Consume, then produces output via NextBatch +// after Finalize is called. +type BreakerOperator interface { + BatchOperator + Consume(ctx context.Context, b *RecordBatch) error + Finalize(ctx context.Context) error + NextBatch(ctx context.Context) (*RecordBatch, error) +} diff --git a/pkg/query/vectorized/operator_test.go b/pkg/query/vectorized/operator_test.go new file mode 100644 index 000000000..44d4dcafa --- /dev/null +++ b/pkg/query/vectorized/operator_test.go @@ -0,0 +1,34 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "errors" + "fmt" + "testing" +) + +func TestErrLimitExhausted_IsSentinel_MatchableViaErrorsIs(t *testing.T) { + wrapped := fmt.Errorf("limit operator finished: %w", ErrLimitExhausted) + if !errors.Is(wrapped, ErrLimitExhausted) { + t.Fatal("ErrLimitExhausted must be matchable via errors.Is on a wrapped error") + } + if errors.Is(errors.New("unrelated"), ErrLimitExhausted) { + t.Fatal("unrelated errors must not match ErrLimitExhausted") + } +} diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go new file mode 100644 index 000000000..b12221555 --- /dev/null +++ b/pkg/query/vectorized/pipeline.go @@ -0,0 +1,154 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "context" + "errors" +) + +// Pipeline is the composed sequence of stages from source to final breaker. +// It exposes a single PullOperator-shaped Next method to the driver. +type Pipeline struct { + head PullOperator + closed bool +} + +// Next returns the next batch from the head stage. +func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) { + return p.head.NextBatch(ctx) +} + +// Close closes the head stage. Idempotent — repeat calls are no-ops. +func (p *Pipeline) Close() error { + if p.closed { + return nil + } + p.closed = true + return p.head.Close() +} + +// PipelineBuilder fluently composes a Pipeline. +type PipelineBuilder struct { + source PullOperator + pendingFused []FusibleOperator + breakers []BreakerOperator +} + +// NewPipelineBuilder starts a builder. +func NewPipelineBuilder() *PipelineBuilder { return &PipelineBuilder{} } + +// From sets the leaf source. +func (b *PipelineBuilder) From(p PullOperator) *PipelineBuilder { b.source = p; return b } + +// Apply queues a FusibleOperator to fold into the next stage. +func (b *PipelineBuilder) Apply(f FusibleOperator) *PipelineBuilder { + b.pendingFused = append(b.pendingFused, f) + return b +} + +// Break appends a breaker, finalizing the current fused-stage prefix. +func (b *PipelineBuilder) Break(br BreakerOperator) *PipelineBuilder { + b.breakers = append(b.breakers, br) + return b +} + +// Build validates and constructs the Pipeline. +func (b *PipelineBuilder) Build() (*Pipeline, error) { + if b.source == nil { + return nil, errors.New("vectorized: pipeline missing source (use From)") + } + var head PullOperator = newFusedStage(b.source, b.pendingFused) + for _, br := range b.breakers { + head = newBreakerStage(head, br) + } + return &Pipeline{head: head}, nil +} + +// breakerStage wraps a BreakerOperator so it acts as a PullOperator for the next stage. +// It drains the upstream PullOperator into the breaker's Consume, calls Finalize once, +// then serves the breaker's output via NextBatch. +// +// Error stickiness: if upstream Pull, Consume, or Finalize fails, the error is +// stored and returned on every subsequent NextBatch call. drained is set as +// soon as the consume loop is entered, so a retry never re-runs Consume or +// Finalize. This guarantees Finalize executes at most once per stage. +type breakerStage struct { + err error + upstream PullOperator + breaker BreakerOperator + drained bool + closed bool +} + +func newBreakerStage(upstream PullOperator, br BreakerOperator) *breakerStage { + return &breakerStage{upstream: upstream, breaker: br} +} + +func (s *breakerStage) Init(ctx context.Context) error { + if initErr := s.upstream.Init(ctx); initErr != nil { + return initErr + } + return s.breaker.Init(ctx) +} + +func (s *breakerStage) OutputSchema() *BatchSchema { return s.breaker.OutputSchema() } + +func (s *breakerStage) NextBatch(ctx context.Context) (*RecordBatch, error) { + if s.err != nil { + return nil, s.err + } + if !s.drained { + s.drained = true + for { + b, pullErr := s.upstream.NextBatch(ctx) + if pullErr != nil { + s.err = pullErr + return nil, pullErr + } + if b == nil { + break + } + if consumeErr := s.breaker.Consume(ctx, b); consumeErr != nil { + s.err = consumeErr + return nil, consumeErr + } + } + if finalizeErr := s.breaker.Finalize(ctx); finalizeErr != nil { + s.err = finalizeErr + return nil, finalizeErr + } + } + return s.breaker.NextBatch(ctx) +} + +// Close is idempotent — children are invoked once across repeated calls. +func (s *breakerStage) Close() error { + if s.closed { + return nil + } + s.closed = true + var firstErr error + if closeErr := s.upstream.Close(); closeErr != nil { + firstErr = closeErr + } + if closeErr := s.breaker.Close(); closeErr != nil && firstErr == nil { + firstErr = closeErr + } + return firstErr +} diff --git a/pkg/query/vectorized/pipeline_test.go b/pkg/query/vectorized/pipeline_test.go new file mode 100644 index 000000000..2628a4e1d --- /dev/null +++ b/pkg/query/vectorized/pipeline_test.go @@ -0,0 +1,231 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "context" + "errors" + "testing" +) + +// fakeBreaker is a test BreakerOperator that records its lifecycle and +// emits a hand-built sequence of output batches after Finalize. +type fakeBreaker struct { + consumeErr error + finalizeErr error + schema *BatchSchema + consumed []*RecordBatch + output []*RecordBatch + outIdx int + consumeCnt int + finalizeCnt int + emitCnt int + closeCnt int +} + +func (b *fakeBreaker) Init(_ context.Context) error { return nil } +func (b *fakeBreaker) OutputSchema() *BatchSchema { return b.schema } +func (b *fakeBreaker) Close() error { b.closeCnt++; return nil } +func (b *fakeBreaker) Consume(_ context.Context, batch *RecordBatch) error { + b.consumeCnt++ + if b.consumeErr != nil { + return b.consumeErr + } + b.consumed = append(b.consumed, batch) + return nil +} +func (b *fakeBreaker) Finalize(_ context.Context) error { b.finalizeCnt++; return b.finalizeErr } +func (b *fakeBreaker) NextBatch(_ context.Context) (*RecordBatch, error) { + if b.outIdx >= len(b.output) { + return nil, nil + } + batch := b.output[b.outIdx] + b.outIdx++ + b.emitCnt++ + return batch, nil +} + +func TestPipelineBuilder_BuildWithoutSource_ReturnsError(t *testing.T) { + if _, err := NewPipelineBuilder().Build(); err == nil { + t.Fatal("Build without From() should return an error") + } +} + +func TestPipelineBuilder_FromOnly_BuildsSingleFusedStage(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}} + p, err := NewPipelineBuilder().From(src).Build() + if err != nil { + t.Fatal(err) + } + if err := p.head.Init(context.Background()); err != nil { + t.Fatal(err) + } + out, err := p.Next(context.Background()) + if err != nil || out == nil { + t.Fatalf("From-only pipeline should yield the source's batch: out=%v err=%v", out, err) + } + eof, err := p.Next(context.Background()) + if err != nil || eof != nil { + t.Fatalf("expected EOF: out=%v err=%v", eof, err) + } +} + +func TestPipelineBuilder_FromApplyApply_BuildsFusedStageWithTwoFusibles(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1, 2)}} + f1 := &fakeFusible{schema: s} + f2 := &fakeFusible{schema: s} + p, err := NewPipelineBuilder().From(src).Apply(f1).Apply(f2).Build() + if err != nil { + t.Fatal(err) + } + _ = p.head.Init(context.Background()) + out, err := p.Next(context.Background()) + if err != nil { + t.Fatal(err) + } + if out.Columns[0].(*TypedColumn[int64]).Data()[0] != 3 { + t.Fatalf("two fusibles should each +1 the row: got %d", out.Columns[0].(*TypedColumn[int64]).Data()[0]) + } +} + +func TestPipelineBuilder_FromBreak_BuildsBreakerStageOnTopOfSource(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}} + br := &fakeBreaker{schema: s, output: []*RecordBatch{mkInt64Batch(s, 99)}} + p, err := NewPipelineBuilder().From(src).Break(br).Build() + if err != nil { + t.Fatal(err) + } + _ = p.head.Init(context.Background()) + out, err := p.Next(context.Background()) + if err != nil || out == nil { + t.Fatalf("breaker stage should emit output after draining: err=%v out=%v", err, out) + } + if out.Columns[0].(*TypedColumn[int64]).Data()[0] != 99 { + t.Fatalf("expected breaker output 99, got %d", out.Columns[0].(*TypedColumn[int64]).Data()[0]) + } +} + +func TestBreakerStage_DrainsUpstreamBeforeEmittingOutput(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{ + mkInt64Batch(s, 1), + mkInt64Batch(s, 2), + mkInt64Batch(s, 3), + }} + br := &fakeBreaker{schema: s, output: []*RecordBatch{mkInt64Batch(s, 100)}} + p, _ := NewPipelineBuilder().From(src).Break(br).Build() + _ = p.head.Init(context.Background()) + if br.consumeCnt != 0 || br.finalizeCnt != 0 { + t.Fatal("Init must not invoke Consume/Finalize") + } + _, _ = p.Next(context.Background()) + if br.consumeCnt != 3 { + t.Fatalf("breaker should Consume every upstream batch: got %d", br.consumeCnt) + } + if br.finalizeCnt != 1 { + t.Fatalf("breaker should Finalize once: got %d", br.finalizeCnt) + } +} + +func TestBreakerStage_UpstreamError_ShortCircuitsBeforeFinalize(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + boom := errors.New("upstream boom") + src := &fakePull{schema: s, pullErr: boom} + br := &fakeBreaker{schema: s} + p, _ := NewPipelineBuilder().From(src).Break(br).Build() + _ = p.head.Init(context.Background()) + _, err := p.Next(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("want upstream error, got %v", err) + } + if br.finalizeCnt != 0 { + t.Fatalf("Finalize must not run on upstream error: got %d", br.finalizeCnt) + } +} + +func TestBreakerStage_FinalizeError_StickyOnRetry(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s} // EOF immediately + boom := errors.New("finalize boom") + br := &fakeBreaker{schema: s, finalizeErr: boom} + p, _ := NewPipelineBuilder().From(src).Break(br).Build() + _ = p.head.Init(context.Background()) + + _, err1 := p.Next(context.Background()) + if !errors.Is(err1, boom) { + t.Fatalf("first Next: want finalize error, got %v", err1) + } + _, err2 := p.Next(context.Background()) + if !errors.Is(err2, boom) { + t.Fatalf("second Next must return sticky finalize error, got %v", err2) + } + if br.finalizeCnt != 1 { + t.Fatalf("Finalize must be called exactly once across retries; got %d", br.finalizeCnt) + } +} + +func TestBreakerStage_ConsumeError_StickyOnRetry(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}} + boom := errors.New("consume boom") + br := &fakeBreaker{schema: s, consumeErr: boom} + p, _ := NewPipelineBuilder().From(src).Break(br).Build() + _ = p.head.Init(context.Background()) + + _, err1 := p.Next(context.Background()) + if !errors.Is(err1, boom) { + t.Fatalf("first Next: want consume error, got %v", err1) + } + _, err2 := p.Next(context.Background()) + if !errors.Is(err2, boom) { + t.Fatalf("second Next must return sticky consume error, got %v", err2) + } + if br.finalizeCnt != 0 { + t.Fatalf("Finalize must not run after consume error; got %d", br.finalizeCnt) + } +} + +func TestBreakerStage_Close_Idempotent_CallsChildrenOnce(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s} + br := &fakeBreaker{schema: s} + stage := newBreakerStage(src, br) + _ = stage.Close() + _ = stage.Close() + if src.closeCnt != 1 || br.closeCnt != 1 { + t.Fatalf("breakerStage.Close must be idempotent: src=%d br=%d", src.closeCnt, br.closeCnt) + } +} + +func TestPipeline_Close_Idempotent(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + src := &fakePull{schema: s} + p, _ := NewPipelineBuilder().From(src).Build() + if err := p.Close(); err != nil { + t.Fatal(err) + } + if err := p.Close(); err != nil { + t.Fatalf("second Close should be no-op, got %v", err) + } + if src.closeCnt != 1 { + t.Fatalf("second Close must not propagate to source: got %d", src.closeCnt) + } +} diff --git a/pkg/query/vectorized/pool.go b/pkg/query/vectorized/pool.go new file mode 100644 index 000000000..a080bde51 --- /dev/null +++ b/pkg/query/vectorized/pool.go @@ -0,0 +1,54 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import "sync" + +// BatchPool reuses RecordBatches across pipeline iterations. +// All batches in a pool share the same schema and capacity. +// +// Caller contract: Put a batch only when it is consistent. Error paths must +// discard rather than Put — see fusedStage.NextBatch. +type BatchPool struct { + schema *BatchSchema + pool sync.Pool + capacity int +} + +// NewBatchPool returns a pool whose Get yields freshly Reset batches. +func NewBatchPool(schema *BatchSchema, capacity int) *BatchPool { + p := &BatchPool{schema: schema, capacity: capacity} + p.pool.New = func() any { return NewRecordBatch(schema, capacity) } + return p +} + +// Get returns a Reset batch. Caller may write rows up to capacity. +func (p *BatchPool) Get() *RecordBatch { + b := p.pool.Get().(*RecordBatch) + b.Reset() + return b +} + +// Put returns a batch to the pool. Nil batches and batches with a foreign +// schema are silently dropped. +func (p *BatchPool) Put(b *RecordBatch) { + if b == nil || b.Schema != p.schema { + return + } + p.pool.Put(b) +} diff --git a/pkg/query/vectorized/pool_test.go b/pkg/query/vectorized/pool_test.go new file mode 100644 index 000000000..f20507de0 --- /dev/null +++ b/pkg/query/vectorized/pool_test.go @@ -0,0 +1,69 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import "testing" + +func TestBatchPool_Get_ReturnsResetBatch(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + p := NewBatchPool(s, DefaultBatchSize) + dirty := p.Get() + dirty.Columns[0].(*TypedColumn[int64]).Append(99) + dirty.Len = 1 + dirty.Selection = []uint16{0} + p.Put(dirty) + clean := p.Get() + if clean.Len != 0 { + t.Fatalf("Get must return Reset batch: Len=%d", clean.Len) + } + if clean.Selection != nil { + t.Fatalf("Get must return Reset batch: Selection=%v", clean.Selection) + } + if clean.Columns[0].Len() != 0 { + t.Fatalf("Get must return Reset columns: col Len=%d", clean.Columns[0].Len()) + } +} + +func TestBatchPool_Put_NilBatch_NoOp(t *testing.T) { + s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + p := NewBatchPool(s, 8) + defer func() { + if r := recover(); r != nil { + t.Fatalf("Put(nil) should not panic; got %v", r) + } + }() + p.Put(nil) +} + +func TestBatchPool_Put_ForeignSchema_NoOp(t *testing.T) { + s1 := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + s2 := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: ColumnTypeInt64}}) + p1 := NewBatchPool(s1, 8) + foreign := NewRecordBatch(s2, 8) + defer func() { + if r := recover(); r != nil { + t.Fatalf("Put(foreign) should not panic; got %v", r) + } + }() + p1.Put(foreign) + // Confirm pool is still functional after rejecting the foreign batch. + got := p1.Get() + if got.Schema != s1 { + t.Fatal("pool returned a batch with foreign schema after rejection") + } +} diff --git a/pkg/query/vectorized/schema.go b/pkg/query/vectorized/schema.go new file mode 100644 index 000000000..2b604177c --- /dev/null +++ b/pkg/query/vectorized/schema.go @@ -0,0 +1,114 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +// ColumnRole identifies the semantic role of a column within a RecordBatch. +type ColumnRole int + +// Column roles. Each batch schema may include at most one column per metadata role. +const ( + RoleTimestamp ColumnRole = iota + RoleVersion + RoleSeriesID + RoleShardID + RoleTag + RoleField +) + +// ColumnDef describes one column in a BatchSchema. +type ColumnDef struct { + Name string + TagFamily string + Role ColumnRole + Type ColumnType +} + +// tagKey is the composite key used to index tag columns by (family, name). +// Using a struct key eliminates two issues with a string-concatenated key: +// - it cannot collide when family or name contains the separator character; +// - the lookup avoids allocating a fresh string on every hot-path call. +type tagKey struct { + family string + name string +} + +// BatchSchema is the immutable column layout shared by every RecordBatch in a pipeline. +type BatchSchema struct { + tagByPath map[tagKey]int + fieldByName map[string]int + Columns []ColumnDef + timestampIdx int + versionIdx int + seriesIDIdx int + shardIDIdx int +} + +// NewBatchSchema builds a BatchSchema and precomputes lookup indices. +func NewBatchSchema(cols []ColumnDef) *BatchSchema { + s := &BatchSchema{ + Columns: cols, + timestampIdx: -1, + versionIdx: -1, + seriesIDIdx: -1, + shardIDIdx: -1, + tagByPath: make(map[tagKey]int), + fieldByName: make(map[string]int), + } + for i, c := range cols { + switch c.Role { + case RoleTimestamp: + s.timestampIdx = i + case RoleVersion: + s.versionIdx = i + case RoleSeriesID: + s.seriesIDIdx = i + case RoleShardID: + s.shardIDIdx = i + case RoleTag: + s.tagByPath[tagKey{family: c.TagFamily, name: c.Name}] = i + case RoleField: + s.fieldByName[c.Name] = i + } + } + return s +} + +// TimestampIndex returns the timestamp column index, or -1 if absent. +func (s *BatchSchema) TimestampIndex() int { return s.timestampIdx } + +// VersionIndex returns the version column index, or -1 if absent. +func (s *BatchSchema) VersionIndex() int { return s.versionIdx } + +// SeriesIDIndex returns the series-id column index, or -1 if absent. +func (s *BatchSchema) SeriesIDIndex() int { return s.seriesIDIdx } + +// ShardIDIndex returns the shard-id column index, or -1 if absent. +func (s *BatchSchema) ShardIDIndex() int { return s.shardIDIdx } + +// TagIndex returns the column index for a (family, name) tag. +// Lookup uses a struct key, so it does not allocate. +func (s *BatchSchema) TagIndex(family, name string) (int, bool) { + i, ok := s.tagByPath[tagKey{family: family, name: name}] + return i, ok +} + +// FieldIndex returns the column index for a field name. +func (s *BatchSchema) FieldIndex(name string) (int, bool) { + i, ok := s.fieldByName[name] + return i, ok +} diff --git a/pkg/query/vectorized/schema_test.go b/pkg/query/vectorized/schema_test.go new file mode 100644 index 000000000..e753fc3f9 --- /dev/null +++ b/pkg/query/vectorized/schema_test.go @@ -0,0 +1,127 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import "testing" + +func TestBatchSchema_LookupHelpers_TimestampVersionSeriesIDShardID(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTimestamp, Type: ColumnTypeInt64}, + {Role: RoleVersion, Type: ColumnTypeInt64}, + {Role: RoleSeriesID, Type: ColumnTypeInt64}, + {Role: RoleShardID, Type: ColumnTypeInt64}, + }) + if got := s.TimestampIndex(); got != 0 { + t.Fatalf("TimestampIndex: want 0, got %d", got) + } + if got := s.VersionIndex(); got != 1 { + t.Fatalf("VersionIndex: want 1, got %d", got) + } + if got := s.SeriesIDIndex(); got != 2 { + t.Fatalf("SeriesIDIndex: want 2, got %d", got) + } + if got := s.ShardIDIndex(); got != 3 { + t.Fatalf("ShardIDIndex: want 3, got %d", got) + } +} + +func TestBatchSchema_TagIndex_FoundAndMissing(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTimestamp, Type: ColumnTypeInt64}, + {Role: RoleTag, TagFamily: "default", Name: "service", Type: ColumnTypeString}, + {Role: RoleTag, TagFamily: "default", Name: "endpoint", Type: ColumnTypeString}, + }) + idx, ok := s.TagIndex("default", "service") + if !ok || idx != 1 { + t.Fatalf("TagIndex(default,service): want (1,true), got (%d,%v)", idx, ok) + } + idx, ok = s.TagIndex("default", "endpoint") + if !ok || idx != 2 { + t.Fatalf("TagIndex(default,endpoint): want (2,true), got (%d,%v)", idx, ok) + } + if _, ok := s.TagIndex("default", "missing"); ok { + t.Fatalf("TagIndex(default,missing): expected miss") + } +} + +func TestBatchSchema_FieldIndex_FoundAndMissing(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTimestamp, Type: ColumnTypeInt64}, + {Role: RoleField, Name: "value", Type: ColumnTypeFloat64}, + {Role: RoleField, Name: "count", Type: ColumnTypeInt64}, + }) + idx, ok := s.FieldIndex("value") + if !ok || idx != 1 { + t.Fatalf("FieldIndex(value): want (1,true), got (%d,%v)", idx, ok) + } + idx, ok = s.FieldIndex("count") + if !ok || idx != 2 { + t.Fatalf("FieldIndex(count): want (2,true), got (%d,%v)", idx, ok) + } + if _, ok := s.FieldIndex("missing"); ok { + t.Fatalf("FieldIndex(missing): expected miss") + } +} + +func TestBatchSchema_MultipleTagFamilies_NoCollision(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTag, TagFamily: "a", Name: "id", Type: ColumnTypeString}, + {Role: RoleTag, TagFamily: "b", Name: "id", Type: ColumnTypeString}, + }) + aIdx, aOK := s.TagIndex("a", "id") + bIdx, bOK := s.TagIndex("b", "id") + if !aOK || !bOK { + t.Fatalf("both lookups should succeed: aOK=%v bOK=%v", aOK, bOK) + } + if aIdx == bIdx { + t.Fatalf("same tag name in different families must map to distinct columns: a=%d b=%d", aIdx, bIdx) + } +} + +func TestBatchSchema_TagFamilyDotInName_NoCollision(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTag, TagFamily: "a.b", Name: "c", Type: ColumnTypeString}, + {Role: RoleTag, TagFamily: "a", Name: "b.c", Type: ColumnTypeString}, + }) + i1, ok1 := s.TagIndex("a.b", "c") + i2, ok2 := s.TagIndex("a", "b.c") + if !ok1 || !ok2 { + t.Fatalf("both lookups must succeed: ok1=%v ok2=%v", ok1, ok2) + } + if i1 == i2 { + t.Fatalf("collision: (a.b,c) and (a,b.c) resolve to the same index %d", i1) + } +} + +func TestBatchSchema_NoMetadata_ReturnsMinusOne(t *testing.T) { + s := NewBatchSchema([]ColumnDef{ + {Role: RoleTag, TagFamily: "default", Name: "service", Type: ColumnTypeString}, + }) + if got := s.TimestampIndex(); got != -1 { + t.Fatalf("absent timestamp: want -1, got %d", got) + } + if got := s.VersionIndex(); got != -1 { + t.Fatalf("absent version: want -1, got %d", got) + } + if got := s.SeriesIDIndex(); got != -1 { + t.Fatalf("absent seriesID: want -1, got %d", got) + } + if got := s.ShardIDIndex(); got != -1 { + t.Fatalf("absent shardID: want -1, got %d", got) + } +} diff --git a/pkg/query/vectorized/typed_column.go b/pkg/query/vectorized/typed_column.go new file mode 100644 index 000000000..e323aa7a9 --- /dev/null +++ b/pkg/query/vectorized/typed_column.go @@ -0,0 +1,110 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +// TypedColumn[T] is a generic Column with element type T. +// One instance per supported T. Use the typed constructors below. +type TypedColumn[T any] struct { + data []T + validity validityBitmap + zero T + typ ColumnType +} + +// Type returns the static ColumnType this column was built with. +func (c *TypedColumn[T]) Type() ColumnType { return c.typ } + +// Len returns the current row count. +func (c *TypedColumn[T]) Len() int { return len(c.data) } + +// IsNull reports whether row i is null. +func (c *TypedColumn[T]) IsNull(i int) bool { return c.validity.IsNull(i) } + +// Data returns the backing slice for bulk access. +func (c *TypedColumn[T]) Data() []T { return c.data } + +// Append adds a value, marking it valid. +func (c *TypedColumn[T]) Append(v T) { c.data = append(c.data, v) } + +// AppendNull adds a zero-value placeholder and marks it null. +func (c *TypedColumn[T]) AppendNull() { + idx := len(c.data) + c.data = append(c.data, c.zero) + c.validity.MarkNull(idx) +} + +// MarkNullAt marks an existing row at index i as null. Length is unchanged. +func (c *TypedColumn[T]) MarkNullAt(i int) { + c.validity.MarkNull(i) +} + +// Reset clears length and validity. Capacity is retained. +func (c *TypedColumn[T]) Reset() { + c.data = c.data[:0] + c.validity.Reset() +} + +// NewInt64Column constructs a TypedColumn[int64] with the given capacity. +func NewInt64Column(capacity int) *TypedColumn[int64] { + return &TypedColumn[int64]{typ: ColumnTypeInt64, data: make([]int64, 0, capacity)} +} + +// NewFloat64Column constructs a TypedColumn[float64] with the given capacity. +func NewFloat64Column(capacity int) *TypedColumn[float64] { + return &TypedColumn[float64]{typ: ColumnTypeFloat64, data: make([]float64, 0, capacity)} +} + +// NewStringColumn constructs a TypedColumn[string] with the given capacity. +func NewStringColumn(capacity int) *TypedColumn[string] { + return &TypedColumn[string]{typ: ColumnTypeString, data: make([]string, 0, capacity)} +} + +// NewBytesColumn constructs a TypedColumn[[]byte] with the given capacity. +func NewBytesColumn(capacity int) *TypedColumn[[]byte] { + return &TypedColumn[[]byte]{typ: ColumnTypeBytes, data: make([][]byte, 0, capacity)} +} + +// NewInt64ArrayColumn constructs a TypedColumn[[]int64] with the given capacity. +func NewInt64ArrayColumn(capacity int) *TypedColumn[[]int64] { + return &TypedColumn[[]int64]{typ: ColumnTypeInt64Array, data: make([][]int64, 0, capacity)} +} + +// NewStrArrayColumn constructs a TypedColumn[[]string] with the given capacity. +func NewStrArrayColumn(capacity int) *TypedColumn[[]string] { + return &TypedColumn[[]string]{typ: ColumnTypeStrArray, data: make([][]string, 0, capacity)} +} + +// NewColumnForType constructs a Column with the given type and capacity. +// Panics on unknown type — programmer error, not data error. +func NewColumnForType(t ColumnType, capacity int) Column { + switch t { + case ColumnTypeInt64: + return NewInt64Column(capacity) + case ColumnTypeFloat64: + return NewFloat64Column(capacity) + case ColumnTypeString: + return NewStringColumn(capacity) + case ColumnTypeBytes: + return NewBytesColumn(capacity) + case ColumnTypeInt64Array: + return NewInt64ArrayColumn(capacity) + case ColumnTypeStrArray: + return NewStrArrayColumn(capacity) + } + panic("vectorized: unknown ColumnType " + t.String()) +} diff --git a/pkg/query/vectorized/typed_column_test.go b/pkg/query/vectorized/typed_column_test.go new file mode 100644 index 000000000..5e13a1896 --- /dev/null +++ b/pkg/query/vectorized/typed_column_test.go @@ -0,0 +1,142 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package vectorized + +import ( + "bytes" + "testing" +) + +func TestTypedColumnInt64_AppendAndData(t *testing.T) { + c := NewInt64Column(8) + c.Append(1) + c.Append(2) + c.Append(3) + if c.Len() != 3 { + t.Fatalf("Len: want 3, got %d", c.Len()) + } + if c.Type() != ColumnTypeInt64 { + t.Fatalf("Type: want Int64, got %v", c.Type()) + } + want := []int64{1, 2, 3} + for i, v := range want { + if c.Data()[i] != v { + t.Fatalf("Data[%d]: want %d, got %d", i, v, c.Data()[i]) + } + } +} + +func TestTypedColumnInt64_AppendNull_GrowsLengthAndMarksNull(t *testing.T) { + c := NewInt64Column(8) + c.Append(10) + c.AppendNull() + c.Append(20) + if c.Len() != 3 { + t.Fatalf("Len: want 3, got %d", c.Len()) + } + if c.IsNull(0) || !c.IsNull(1) || c.IsNull(2) { + t.Fatalf("validity: want [valid,null,valid], got [%v,%v,%v]", c.IsNull(0), c.IsNull(1), c.IsNull(2)) + } +} + +func TestTypedColumnInt64_MarkNullAt_TogglesPreExistingRow(t *testing.T) { + c := NewInt64Column(4) + c.Append(0) + c.Append(0) + c.Append(0) + c.MarkNullAt(1) + if c.IsNull(0) || !c.IsNull(1) || c.IsNull(2) { + t.Fatalf("MarkNullAt(1): want [valid,null,valid], got [%v,%v,%v]", c.IsNull(0), c.IsNull(1), c.IsNull(2)) + } + if c.Len() != 3 { + t.Fatalf("MarkNullAt must not grow length: got Len=%d", c.Len()) + } +} + +func TestTypedColumnInt64_Reset_LengthZero_CapacityRetained(t *testing.T) { + c := NewInt64Column(64) + for i := range 64 { + c.Append(int64(i)) + } + c.MarkNullAt(10) + beforeCap := cap(c.Data()) + c.Reset() + if c.Len() != 0 { + t.Fatalf("Reset Len: want 0, got %d", c.Len()) + } + if cap(c.Data()) != beforeCap { + t.Fatalf("Reset must retain capacity: before=%d after=%d", beforeCap, cap(c.Data())) + } + if c.IsNull(10) { + t.Fatal("Reset must clear validity") + } +} + +func TestTypedColumn_AllSixTypes_RoundTrip(t *testing.T) { + t.Run("int64", func(t *testing.T) { + c := NewInt64Column(2) + c.Append(42) + if c.Type() != ColumnTypeInt64 || c.Data()[0] != 42 { + t.Fatalf("int64 roundtrip failed") + } + }) + t.Run("float64", func(t *testing.T) { + c := NewFloat64Column(2) + c.Append(3.14) + if c.Type() != ColumnTypeFloat64 || c.Data()[0] != 3.14 { + t.Fatalf("float64 roundtrip failed") + } + }) + t.Run("string", func(t *testing.T) { + c := NewStringColumn(2) + c.Append("hello") + if c.Type() != ColumnTypeString || c.Data()[0] != "hello" { + t.Fatalf("string roundtrip failed") + } + }) + t.Run("bytes", func(t *testing.T) { + c := NewBytesColumn(2) + c.Append([]byte("hi")) + if c.Type() != ColumnTypeBytes || !bytes.Equal(c.Data()[0], []byte("hi")) { + t.Fatalf("bytes roundtrip failed") + } + }) + t.Run("int64-array", func(t *testing.T) { + c := NewInt64ArrayColumn(2) + c.Append([]int64{1, 2, 3}) + if c.Type() != ColumnTypeInt64Array || len(c.Data()[0]) != 3 { + t.Fatalf("int64[] roundtrip failed") + } + }) + t.Run("string-array", func(t *testing.T) { + c := NewStrArrayColumn(2) + c.Append([]string{"a", "b"}) + if c.Type() != ColumnTypeStrArray || len(c.Data()[0]) != 2 { + t.Fatalf("string[] roundtrip failed") + } + }) +} + +func TestNewColumnForType_UnknownType_Panics(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected panic on unknown ColumnType") + } + }() + _ = NewColumnForType(ColumnType(999), 4) +}
