This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b39ed477b99ee1a9d75638d0fb7550b53b075138
Author: Hongtao Gao <[email protected]>
AuthorDate: Sun May 3 02:05:24 2026 +0000

    feat(query/vectorized): add vectorized pipeline foundation (gate G1)
    
    First gate of a multi-gate implementation of the columnar query pipeline
    for local Measure queries. This commit lands the foundation primitives
    under pkg/query/vectorized/, with no edits to existing code (zero-regression
    guaranteed structurally — nothing imports this yet).
    
    Modules:
      - schema.go      BatchSchema with O(1) lookups; tagKey struct (no string
                       concatenation, no separator collision)
      - column.go      Column interface, ColumnType enum, validityBitmap
      - typed_column.go TypedColumn[T] for 
int64/float64/string/[]byte/[]int64/[]string
      - batch.go       RecordBatch + Reset + ActiveLen (handles nil vs []
                       selection footgun explicitly)
      - pool.go        BatchPool with Reset-on-Get and foreign-schema rejection
      - operator.go    PullOperator/FusibleOperator/BreakerOperator interfaces +
                       ErrLimitExhausted sentinel
      - memory.go      MemoryTracker (CAS-based, panics on negative bytes)
      - fused.go       fusedStage with discard-on-error and idempotent Close
      - pipeline.go    Pipeline + PipelineBuilder + breakerStage with sticky
                       error and idempotent Close
    
    Risks closed at this gate (per the design spec):
      R3 fused-stage error discard, R5 memory tracker semantics,
      R6 uint16 selection constraint, R8 Reset semantics,
      R9 nil vs empty batch.
    
    52 tests, all green under -race in ~1s. go vet clean.
---
 pkg/query/vectorized/batch.go             |  66 +++++++++
 pkg/query/vectorized/batch_test.go        | 101 +++++++++++++
 pkg/query/vectorized/column.go            |  94 ++++++++++++
 pkg/query/vectorized/column_test.go       |  81 +++++++++++
 pkg/query/vectorized/doc.go               |  20 +++
 pkg/query/vectorized/fused.go             | 109 ++++++++++++++
 pkg/query/vectorized/fused_test.go        | 225 +++++++++++++++++++++++++++++
 pkg/query/vectorized/memory.go            |  79 ++++++++++
 pkg/query/vectorized/memory_test.go       | 119 +++++++++++++++
 pkg/query/vectorized/operator.go          |  70 +++++++++
 pkg/query/vectorized/operator_test.go     |  34 +++++
 pkg/query/vectorized/pipeline.go          | 154 ++++++++++++++++++++
 pkg/query/vectorized/pipeline_test.go     | 231 ++++++++++++++++++++++++++++++
 pkg/query/vectorized/pool.go              |  54 +++++++
 pkg/query/vectorized/pool_test.go         |  69 +++++++++
 pkg/query/vectorized/schema.go            | 114 +++++++++++++++
 pkg/query/vectorized/schema_test.go       | 127 ++++++++++++++++
 pkg/query/vectorized/typed_column.go      | 110 ++++++++++++++
 pkg/query/vectorized/typed_column_test.go | 142 ++++++++++++++++++
 19 files changed, 1999 insertions(+)

diff --git a/pkg/query/vectorized/batch.go b/pkg/query/vectorized/batch.go
new file mode 100644
index 000000000..4e345ab30
--- /dev/null
+++ b/pkg/query/vectorized/batch.go
@@ -0,0 +1,66 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+// DefaultBatchSize is the canonical batch row capacity.
+// uint16 selection vectors cap batch size at 65536.
+const DefaultBatchSize = 1024
+
+// RecordBatch is the unit of work flowing through the pipeline.
+// All columns share Len logical rows. Selection optionally narrows that to a 
subset.
+//
+// Selection contract:
+//   - nil  → all rows in [0, Len) are active
+//   - []   → zero rows active (post-fusible-filter empty case)
+//   - non-empty → only the listed row indices are active
+//
+// Operators must use ActiveLen() to compute the effective row count.
+type RecordBatch struct {
+       Schema    *BatchSchema
+       Columns   []Column
+       Selection []uint16
+       Len       int
+}
+
+// NewRecordBatch allocates a batch with column capacities sized to capacity 
rows.
+func NewRecordBatch(schema *BatchSchema, capacity int) *RecordBatch {
+       cols := make([]Column, len(schema.Columns))
+       for i, def := range schema.Columns {
+               cols[i] = NewColumnForType(def.Type, capacity)
+       }
+       return &RecordBatch{Schema: schema, Columns: cols}
+}
+
+// Reset clears every column and the selection vector. Schema is preserved.
+func (b *RecordBatch) Reset() {
+       b.Len = 0
+       b.Selection = nil
+       for _, c := range b.Columns {
+               c.Reset()
+       }
+}
+
+// ActiveLen returns the number of rows operators should process.
+//
+// nil Selection → Len. Non-nil Selection → len(Selection), even when zero.
+func (b *RecordBatch) ActiveLen() int {
+       if b.Selection == nil {
+               return b.Len
+       }
+       return len(b.Selection)
+}
diff --git a/pkg/query/vectorized/batch_test.go 
b/pkg/query/vectorized/batch_test.go
new file mode 100644
index 000000000..fcf6c8e23
--- /dev/null
+++ b/pkg/query/vectorized/batch_test.go
@@ -0,0 +1,101 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import "testing"
+
+func TestRecordBatch_New_AllocatesColumnPerSchemaEntry(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTimestamp, Type: ColumnTypeInt64},
+               {Role: RoleField, Name: "v", Type: ColumnTypeFloat64},
+               {Role: RoleTag, TagFamily: "default", Name: "service", Type: 
ColumnTypeString},
+       })
+       b := NewRecordBatch(s, 8)
+       if len(b.Columns) != 3 {
+               t.Fatalf("Columns: want 3, got %d", len(b.Columns))
+       }
+       if b.Columns[0].Type() != ColumnTypeInt64 {
+               t.Fatalf("col 0 type: want int64, got %v", b.Columns[0].Type())
+       }
+       if b.Columns[1].Type() != ColumnTypeFloat64 {
+               t.Fatalf("col 1 type: want float64, got %v", 
b.Columns[1].Type())
+       }
+       if b.Columns[2].Type() != ColumnTypeString {
+               t.Fatalf("col 2 type: want string, got %v", b.Columns[2].Type())
+       }
+}
+
+func TestRecordBatch_Reset_ClearsLenSelectionAndColumns(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       b := NewRecordBatch(s, 8)
+       col := b.Columns[0].(*TypedColumn[int64])
+       col.Append(100)
+       col.Append(200)
+       b.Len = 2
+       b.Selection = []uint16{0, 1}
+       b.Reset()
+       if b.Len != 0 {
+               t.Fatalf("Len: want 0, got %d", b.Len)
+       }
+       if b.Selection != nil {
+               t.Fatalf("Selection: want nil, got %v", b.Selection)
+       }
+       if b.Columns[0].Len() != 0 {
+               t.Fatalf("column not reset: Len=%d", b.Columns[0].Len())
+       }
+}
+
+func TestRecordBatch_Reset_PreservesSchemaPointer(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       b := NewRecordBatch(s, 8)
+       before := b.Schema
+       b.Reset()
+       if b.Schema != before {
+               t.Fatal("Reset must preserve Schema pointer")
+       }
+}
+
+func TestRecordBatch_ActiveLen_NilSelection_ReturnsLen(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       b := NewRecordBatch(s, 8)
+       b.Len = 5
+       b.Selection = nil
+       if got := b.ActiveLen(); got != 5 {
+               t.Fatalf("ActiveLen with nil Selection: want 5, got %d", got)
+       }
+}
+
+func TestRecordBatch_ActiveLen_NonNilSelection_ReturnsSelectionLen(t 
*testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       b := NewRecordBatch(s, 8)
+       b.Len = 5
+       b.Selection = []uint16{0, 2, 4}
+       if got := b.ActiveLen(); got != 3 {
+               t.Fatalf("ActiveLen with selection: want 3, got %d", got)
+       }
+}
+
+func TestRecordBatch_ActiveLen_EmptyNonNilSelection_ReturnsZero(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       b := NewRecordBatch(s, 8)
+       b.Len = 5
+       b.Selection = []uint16{}
+       if got := b.ActiveLen(); got != 0 {
+               t.Fatalf("ActiveLen with empty non-nil Selection 
(post-fusible-filter footgun): want 0, got %d", got)
+       }
+}
diff --git a/pkg/query/vectorized/column.go b/pkg/query/vectorized/column.go
new file mode 100644
index 000000000..117163979
--- /dev/null
+++ b/pkg/query/vectorized/column.go
@@ -0,0 +1,94 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+// ColumnType is the runtime type tag for a Column.
+type ColumnType int
+
+// ColumnType variants. Each value corresponds to a TypedColumn[T] 
specialization.
+const (
+       ColumnTypeInt64 ColumnType = iota
+       ColumnTypeFloat64
+       ColumnTypeString
+       ColumnTypeBytes
+       ColumnTypeInt64Array
+       ColumnTypeStrArray
+)
+
+// String returns a human label used in diagnostics and error messages.
+func (c ColumnType) String() string {
+       switch c {
+       case ColumnTypeInt64:
+               return "int64"
+       case ColumnTypeFloat64:
+               return "float64"
+       case ColumnTypeString:
+               return "string"
+       case ColumnTypeBytes:
+               return "bytes"
+       case ColumnTypeInt64Array:
+               return "int64[]"
+       case ColumnTypeStrArray:
+               return "string[]"
+       }
+       return "unknown"
+}
+
+// Column is the storage-agnostic view of one column in a RecordBatch.
+type Column interface {
+       Type() ColumnType
+       Len() int
+       IsNull(i int) bool
+       Reset()
+       AppendNull()
+       MarkNullAt(i int)
+}
+
+// validityBitmap tracks per-row null state. nil bits = all valid.
+// Allocation is lazy on first MarkNull.
+type validityBitmap struct {
+       bits []uint64
+}
+
+// IsNull reports whether bit i is null.
+func (v *validityBitmap) IsNull(i int) bool {
+       if i < 0 {
+               return false
+       }
+       word := i / 64
+       if word >= len(v.bits) {
+               return false
+       }
+       return (v.bits[word]>>uint(i%64))&1 == 1
+}
+
+// MarkNull sets bit i to null, growing the bitmap as needed.
+func (v *validityBitmap) MarkNull(i int) {
+       word := i / 64
+       for word >= len(v.bits) {
+               v.bits = append(v.bits, 0)
+       }
+       v.bits[word] |= 1 << uint(i%64)
+}
+
+// Reset clears every null mark but keeps the underlying slice for reuse.
+func (v *validityBitmap) Reset() {
+       for i := range v.bits {
+               v.bits[i] = 0
+       }
+}
diff --git a/pkg/query/vectorized/column_test.go 
b/pkg/query/vectorized/column_test.go
new file mode 100644
index 000000000..6f400aa0b
--- /dev/null
+++ b/pkg/query/vectorized/column_test.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import "testing"
+
+func TestValidityBitmap_Default_AllValid(t *testing.T) {
+       var v validityBitmap
+       for i := range 64 {
+               if v.IsNull(i) {
+                       t.Fatalf("default bitmap: bit %d unexpectedly null", i)
+               }
+       }
+}
+
+func TestValidityBitmap_MarkNull_SetsExpectedBit(t *testing.T) {
+       var v validityBitmap
+       v.MarkNull(3)
+       if !v.IsNull(3) {
+               t.Fatal("bit 3 should be null after MarkNull(3)")
+       }
+       if v.IsNull(2) || v.IsNull(4) {
+               t.Fatalf("only bit 3 should be null; got 2=%v 4=%v", 
v.IsNull(2), v.IsNull(4))
+       }
+}
+
+func TestValidityBitmap_GrowthBeyondInitialWord(t *testing.T) {
+       var v validityBitmap
+       v.MarkNull(150)
+       if !v.IsNull(150) {
+               t.Fatal("bit 150 should be null after growth")
+       }
+       if v.IsNull(149) || v.IsNull(151) {
+               t.Fatal("only bit 150 should be null after growth")
+       }
+}
+
+func TestValidityBitmap_Reset_ClearsAllBits_RetainsSlice(t *testing.T) {
+       var v validityBitmap
+       v.MarkNull(10)
+       v.MarkNull(200)
+       beforeCap := cap(v.bits)
+       v.Reset()
+       if v.IsNull(10) || v.IsNull(200) {
+               t.Fatal("Reset should clear all nulls")
+       }
+       if cap(v.bits) != beforeCap {
+               t.Fatalf("Reset must retain slice capacity: before=%d 
after=%d", beforeCap, cap(v.bits))
+       }
+}
+
+func TestColumnType_String_NotEmpty(t *testing.T) {
+       types := []ColumnType{
+               ColumnTypeInt64,
+               ColumnTypeFloat64,
+               ColumnTypeString,
+               ColumnTypeBytes,
+               ColumnTypeInt64Array,
+               ColumnTypeStrArray,
+       }
+       for _, ct := range types {
+               if ct.String() == "" {
+                       t.Fatalf("ColumnType(%d).String() must not be empty", 
int(ct))
+               }
+       }
+}
diff --git a/pkg/query/vectorized/doc.go b/pkg/query/vectorized/doc.go
new file mode 100644
index 000000000..f2bb49d15
--- /dev/null
+++ b/pkg/query/vectorized/doc.go
@@ -0,0 +1,20 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package vectorized provides RecordBatch-based columnar execution primitives:
+// schema, columns, operator interfaces, pipeline composition, and memory 
tracking.
+package vectorized
diff --git a/pkg/query/vectorized/fused.go b/pkg/query/vectorized/fused.go
new file mode 100644
index 000000000..c1b2617e0
--- /dev/null
+++ b/pkg/query/vectorized/fused.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+)
+
+// fusedStage combines a PullOperator source with a list of FusibleOperators
+// applied in order to each pulled batch. On any operator error the batch is
+// discarded (R3: not returned to the caller's pool) to prevent corrupted
+// batches from leaking into subsequent queries.
+//
+// When a fusible returns ErrLimitExhausted, the current batch is emitted and
+// subsequent NextBatch calls return (nil, nil).
+type fusedStage struct {
+       source    PullOperator
+       fused     []FusibleOperator
+       limitDone bool
+       closed    bool
+}
+
+func newFusedStage(source PullOperator, fused []FusibleOperator) *fusedStage {
+       return &fusedStage{source: source, fused: fused}
+}
+
+// Init wires the source and every fused operator.
+func (s *fusedStage) Init(ctx context.Context) error {
+       if initErr := s.source.Init(ctx); initErr != nil {
+               return initErr
+       }
+       for _, op := range s.fused {
+               if initErr := op.Init(ctx); initErr != nil {
+                       return initErr
+               }
+       }
+       return nil
+}
+
+// OutputSchema is the source's schema; fusible operators must not change 
column layout.
+func (s *fusedStage) OutputSchema() *BatchSchema { return 
s.source.OutputSchema() }
+
+// NextBatch pulls one batch and applies every fused operator in order.
+//
+// Error handling: if any fusible returns an error other than 
ErrLimitExhausted,
+// the batch is dropped and (nil, err) is returned. The batch is NOT returned
+// to the pool — see R3 in the design spec.
+//
+// Limit handling: if a fusible returns ErrLimitExhausted, the current batch
+// is emitted (with whatever selection vector it has) and the next call
+// returns (nil, nil) — EOF.
+func (s *fusedStage) NextBatch(ctx context.Context) (*RecordBatch, error) {
+       if s.limitDone {
+               return nil, nil
+       }
+       b, pullErr := s.source.NextBatch(ctx)
+       if pullErr != nil || b == nil {
+               return b, pullErr
+       }
+       for _, op := range s.fused {
+               processErr := op.Process(ctx, b)
+               if processErr == nil {
+                       continue
+               }
+               if errors.Is(processErr, ErrLimitExhausted) {
+                       s.limitDone = true
+                       return b, nil
+               }
+               // R3: drop b to GC. Do NOT pool.Put(b).
+               return nil, processErr
+       }
+       return b, nil
+}
+
+// Close closes the source and every underlying operator. Idempotent — repeat
+// calls return without re-invoking children's Close so the BatchOperator
+// contract holds for the stage itself, not just its leaves.
+func (s *fusedStage) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       var firstErr error
+       if closeErr := s.source.Close(); closeErr != nil {
+               firstErr = closeErr
+       }
+       for _, op := range s.fused {
+               if closeErr := op.Close(); closeErr != nil && firstErr == nil {
+                       firstErr = closeErr
+               }
+       }
+       return firstErr
+}
diff --git a/pkg/query/vectorized/fused_test.go 
b/pkg/query/vectorized/fused_test.go
new file mode 100644
index 000000000..d077057d7
--- /dev/null
+++ b/pkg/query/vectorized/fused_test.go
@@ -0,0 +1,225 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       "testing"
+)
+
+// fakePull is a test PullOperator that yields a hand-built sequence of 
batches.
+type fakePull struct {
+       pullErr  error
+       schema   *BatchSchema
+       batches  []*RecordBatch
+       idx      int
+       closeCnt int
+       initCnt  int
+}
+
+func (f *fakePull) Init(_ context.Context) error { f.initCnt++; return nil }
+func (f *fakePull) OutputSchema() *BatchSchema   { return f.schema }
+func (f *fakePull) Close() error                 { f.closeCnt++; return nil }
+func (f *fakePull) NextBatch(_ context.Context) (*RecordBatch, error) {
+       if f.pullErr != nil {
+               return nil, f.pullErr
+       }
+       if f.idx >= len(f.batches) {
+               return nil, nil
+       }
+       b := f.batches[f.idx]
+       f.idx++
+       return b, nil
+}
+
+// fakeFusible is a test FusibleOperator. If processFn is set, it overrides 
the default
+// "increment first column by 1". processErr makes Process fail before any 
mutation.
+type fakeFusible struct {
+       processErr error
+       processFn  func(b *RecordBatch) error
+       schema     *BatchSchema
+       processed  int
+       closeCnt   int
+}
+
+func (f *fakeFusible) Init(_ context.Context) error { return nil }
+func (f *fakeFusible) OutputSchema() *BatchSchema   { return f.schema }
+func (f *fakeFusible) Close() error                 { f.closeCnt++; return nil 
}
+func (f *fakeFusible) Process(_ context.Context, b *RecordBatch) error {
+       if f.processErr != nil {
+               return f.processErr
+       }
+       f.processed++
+       if f.processFn != nil {
+               return f.processFn(b)
+       }
+       col := b.Columns[0].(*TypedColumn[int64])
+       for i := range col.Data() {
+               col.Data()[i]++
+       }
+       return nil
+}
+
+func mkInt64Batch(s *BatchSchema, vals ...int64) *RecordBatch {
+       b := NewRecordBatch(s, len(vals))
+       col := b.Columns[0].(*TypedColumn[int64])
+       for _, v := range vals {
+               col.Append(v)
+       }
+       b.Len = len(vals)
+       return b
+}
+
+func TestFusedStage_HappyPath_AppliesEveryOperatorInOrder(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1, 
2, 3)}}
+       f1 := &fakeFusible{schema: s} // +1
+       f2 := &fakeFusible{schema: s} // +1
+       stage := newFusedStage(src, []FusibleOperator{f1, f2})
+       if err := stage.Init(context.Background()); err != nil {
+               t.Fatal(err)
+       }
+       out, err := stage.NextBatch(context.Background())
+       if err != nil || out == nil {
+               t.Fatalf("happy path: err=%v out=%v", err, out)
+       }
+       got := out.Columns[0].(*TypedColumn[int64]).Data()
+       want := []int64{3, 4, 5}
+       for i := range want {
+               if got[i] != want[i] {
+                       t.Fatalf("row %d: want %d, got %d", i, want[i], got[i])
+               }
+       }
+       if f1.processed != 1 || f2.processed != 1 {
+               t.Fatalf("each fusible processed once: f1=%d f2=%d", 
f1.processed, f2.processed)
+       }
+}
+
+func TestFusedStage_FirstFusibleErrors_ReturnsNilBatchAndError(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}}
+       boom := errors.New("first fusible boom")
+       f1 := &fakeFusible{schema: s, processErr: boom}
+       f2 := &fakeFusible{schema: s}
+       stage := newFusedStage(src, []FusibleOperator{f1, f2})
+       _ = stage.Init(context.Background())
+       out, err := stage.NextBatch(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want first-fusible error, got %v", err)
+       }
+       if out != nil {
+               t.Fatal("error path must return nil batch (R3: discard, do not 
pool)")
+       }
+       if f2.processed != 0 {
+               t.Fatalf("second fusible must not run after first errors: 
processed=%d", f2.processed)
+       }
+}
+
+func TestFusedStage_SecondFusibleErrors_StillReturnsNilBatch(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}}
+       boom := errors.New("second fusible boom")
+       f1 := &fakeFusible{schema: s}
+       f2 := &fakeFusible{schema: s, processErr: boom}
+       stage := newFusedStage(src, []FusibleOperator{f1, f2})
+       _ = stage.Init(context.Background())
+       out, err := stage.NextBatch(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want second-fusible error, got %v", err)
+       }
+       if out != nil {
+               t.Fatal("error path must return nil batch even after first 
fusible mutated")
+       }
+}
+
+func TestFusedStage_LimitExhausted_EmitsCurrentBatchThenEOF(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{
+               mkInt64Batch(s, 1, 2, 3),
+               mkInt64Batch(s, 4, 5, 6),
+       }}
+       limiter := &fakeFusible{
+               schema: s,
+               processFn: func(_ *RecordBatch) error {
+                       return fmt.Errorf("limit done: %w", ErrLimitExhausted)
+               },
+       }
+       stage := newFusedStage(src, []FusibleOperator{limiter})
+       _ = stage.Init(context.Background())
+       first, err := stage.NextBatch(context.Background())
+       if err != nil {
+               t.Fatalf("first call after limit-exhausted: want (b, nil), got 
err=%v", err)
+       }
+       if first == nil {
+               t.Fatal("first call must emit the current batch with truncated 
selection")
+       }
+       second, err := stage.NextBatch(context.Background())
+       if err != nil || second != nil {
+               t.Fatalf("second call after limit-exhausted: want (nil, nil), 
got (%v, %v)", second, err)
+       }
+}
+
+func TestFusedStage_SourceEOF_NoFusibleInvoked(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: nil} // EOF immediately
+       f := &fakeFusible{schema: s}
+       stage := newFusedStage(src, []FusibleOperator{f})
+       _ = stage.Init(context.Background())
+       out, err := stage.NextBatch(context.Background())
+       if err != nil || out != nil {
+               t.Fatalf("source EOF: want (nil, nil), got (%v, %v)", out, err)
+       }
+       if f.processed != 0 {
+               t.Fatalf("fusible must not run on EOF: processed=%d", 
f.processed)
+       }
+}
+
+func TestFusedStage_SourceError_PropagatesUnchanged(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       boom := errors.New("source boom")
+       src := &fakePull{schema: s, pullErr: boom}
+       f := &fakeFusible{schema: s}
+       stage := newFusedStage(src, []FusibleOperator{f})
+       _ = stage.Init(context.Background())
+       out, err := stage.NextBatch(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want source error, got %v", err)
+       }
+       if out != nil {
+               t.Fatal("source error must return nil batch")
+       }
+       if f.processed != 0 {
+               t.Fatal("fusible must not run on source error")
+       }
+}
+
+func TestFusedStage_Close_Idempotent_CallsChildrenOnce(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s}
+       f1 := &fakeFusible{schema: s}
+       f2 := &fakeFusible{schema: s}
+       stage := newFusedStage(src, []FusibleOperator{f1, f2})
+       _ = stage.Close()
+       _ = stage.Close()
+       if src.closeCnt != 1 || f1.closeCnt != 1 || f2.closeCnt != 1 {
+               t.Fatalf("Close must be idempotent (children invoked once): 
src=%d f1=%d f2=%d",
+                       src.closeCnt, f1.closeCnt, f2.closeCnt)
+       }
+}
diff --git a/pkg/query/vectorized/memory.go b/pkg/query/vectorized/memory.go
new file mode 100644
index 000000000..95543cd9f
--- /dev/null
+++ b/pkg/query/vectorized/memory.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "fmt"
+       "sync/atomic"
+)
+
+// MemoryTracker is a lock-free per-query memory budget.
+//
+// Reserve grows used iff used+bytes <= limit; otherwise it returns an error 
and
+// leaves used unchanged. Release shrinks used. Used reads the current value.
+type MemoryTracker struct {
+       used  atomic.Int64
+       limit int64
+}
+
+// NewMemoryTracker returns a tracker with the given byte limit.
+func NewMemoryTracker(limit int64) *MemoryTracker {
+       return &MemoryTracker{limit: limit}
+}
+
+// Reserve attempts to allocate bytes from the budget.
+// On success used grows by bytes; on failure used is unchanged.
+//
+// A negative bytes value indicates a programmer error (Reserve must not be
+// used to decrement) and panics. Zero bytes is a valid no-op.
+func (m *MemoryTracker) Reserve(bytes int64) error {
+       if bytes < 0 {
+               panic("vectorized: MemoryTracker.Reserve called with negative 
bytes")
+       }
+       if bytes == 0 {
+               return nil
+       }
+       for {
+               current := m.used.Load()
+               next := current + bytes
+               if next > m.limit {
+                       return fmt.Errorf("memory budget exceeded: used %d + 
requested %d > limit %d",
+                               current, bytes, m.limit)
+               }
+               if m.used.CompareAndSwap(current, next) {
+                       return nil
+               }
+       }
+}
+
+// Release returns bytes to the budget.
+// A negative bytes value indicates a programmer error and panics.
+func (m *MemoryTracker) Release(bytes int64) {
+       if bytes < 0 {
+               panic("vectorized: MemoryTracker.Release called with negative 
bytes")
+       }
+       if bytes == 0 {
+               return
+       }
+       m.used.Add(-bytes)
+}
+
+// Used returns the current outstanding reservation.
+func (m *MemoryTracker) Used() int64 {
+       return m.used.Load()
+}
diff --git a/pkg/query/vectorized/memory_test.go 
b/pkg/query/vectorized/memory_test.go
new file mode 100644
index 000000000..171720937
--- /dev/null
+++ b/pkg/query/vectorized/memory_test.go
@@ -0,0 +1,119 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "sync"
+       "testing"
+)
+
+func TestMemoryTracker_Reserve_SucceedsWithinLimit(t *testing.T) {
+       m := NewMemoryTracker(1024)
+       if err := m.Reserve(512); err != nil {
+               t.Fatalf("Reserve(512): %v", err)
+       }
+       if m.Used() != 512 {
+               t.Fatalf("Used after Reserve(512): want 512, got %d", m.Used())
+       }
+       if err := m.Reserve(256); err != nil {
+               t.Fatalf("Reserve(256): %v", err)
+       }
+       if m.Used() != 768 {
+               t.Fatalf("Used after second reserve: want 768, got %d", 
m.Used())
+       }
+}
+
+func TestMemoryTracker_Reserve_FailsAtLimit_UsedUnchanged(t *testing.T) {
+       m := NewMemoryTracker(100)
+       if err := m.Reserve(50); err != nil {
+               t.Fatalf("first reserve: %v", err)
+       }
+       if err := m.Reserve(60); err == nil {
+               t.Fatal("Reserve(60) on top of 50/100 must fail")
+       }
+       if m.Used() != 50 {
+               t.Fatalf("failed Reserve must not advance used: got %d", 
m.Used())
+       }
+}
+
+func TestMemoryTracker_Release_DecreasesUsed(t *testing.T) {
+       m := NewMemoryTracker(1024)
+       _ = m.Reserve(512)
+       m.Release(200)
+       if m.Used() != 312 {
+               t.Fatalf("after Release(200): want 312, got %d", m.Used())
+       }
+}
+
+func TestMemoryTracker_Used_ReturnsCurrent(t *testing.T) {
+       m := NewMemoryTracker(1024)
+       if m.Used() != 0 {
+               t.Fatalf("initial Used: want 0, got %d", m.Used())
+       }
+       _ = m.Reserve(7)
+       if m.Used() != 7 {
+               t.Fatalf("after Reserve(7): want 7, got %d", m.Used())
+       }
+}
+
+func TestMemoryTracker_Reserve_NegativeBytes_Panics(t *testing.T) {
+       m := NewMemoryTracker(1024)
+       defer func() {
+               if r := recover(); r == nil {
+                       t.Fatal("Reserve(-1) must panic — programmer error, not 
a recoverable runtime condition")
+               }
+       }()
+       _ = m.Reserve(-1)
+}
+
+func TestMemoryTracker_Release_NegativeBytes_Panics(t *testing.T) {
+       m := NewMemoryTracker(1024)
+       if err := m.Reserve(100); err != nil {
+               t.Fatal(err)
+       }
+       defer func() {
+               if r := recover(); r == nil {
+                       t.Fatal("Release(-1) must panic — programmer error")
+               }
+       }()
+       m.Release(-1)
+}
+
+func TestMemoryTracker_Concurrent_TotalExact(t *testing.T) {
+       m := NewMemoryTracker(100000)
+       const goroutines = 100
+       const perGoroutine = 100
+       var wg sync.WaitGroup
+       wg.Add(goroutines)
+       for range goroutines {
+               go func() {
+                       defer wg.Done()
+                       for range perGoroutine {
+                               if err := m.Reserve(1); err != nil {
+                                       t.Errorf("unexpected reserve failure: 
%v", err)
+                                       return
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+       want := int64(goroutines * perGoroutine)
+       if m.Used() != want {
+               t.Fatalf("concurrent Used: want %d, got %d", want, m.Used())
+       }
+}
diff --git a/pkg/query/vectorized/operator.go b/pkg/query/vectorized/operator.go
new file mode 100644
index 000000000..2e6690f79
--- /dev/null
+++ b/pkg/query/vectorized/operator.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+)
+
+// ErrLimitExhausted is returned by FusibleOperators that finished their work 
and
+// want the fused stage to translate the signal into "emit current batch + EOF 
on next call".
+var ErrLimitExhausted = errors.New("vectorized: limit exhausted")
+
+// BatchOperator is the lifecycle base. All operators implement it.
+//
+// Close is idempotent and safe to call at any phase — after Init, mid-Consume,
+// after Finalize, or after any error. It must release every 
MemoryTracker.Reserve
+// the operator made during its lifetime.
+type BatchOperator interface {
+       Init(ctx context.Context) error
+       OutputSchema() *BatchSchema
+       Close() error
+}
+
+// PullOperator produces batches. It is the source of a pipeline.
+//
+// NextBatch contract:
+//   - (non-nil, nil)  → valid batch with Len > 0
+//   - (nil, nil)      → EOF; caller must not call NextBatch again
+//   - (nil, non-nil)  → error; pipeline stops
+//
+// NextBatch may block — channel-backed implementations are explicitly 
supported
+// for future distributed remote-scan operators.
+type PullOperator interface {
+       BatchOperator
+       NextBatch(ctx context.Context) (*RecordBatch, error)
+}
+
+// FusibleOperator transforms a batch in place. No state across batches.
+//
+// Process must not retain references to the batch beyond the call. Returning
+// ErrLimitExhausted signals "emit this batch then EOF on the next pull".
+type FusibleOperator interface {
+       BatchOperator
+       Process(ctx context.Context, b *RecordBatch) error
+}
+
+// BreakerOperator buffers all input via Consume, then produces output via 
NextBatch
+// after Finalize is called.
+type BreakerOperator interface {
+       BatchOperator
+       Consume(ctx context.Context, b *RecordBatch) error
+       Finalize(ctx context.Context) error
+       NextBatch(ctx context.Context) (*RecordBatch, error)
+}
diff --git a/pkg/query/vectorized/operator_test.go 
b/pkg/query/vectorized/operator_test.go
new file mode 100644
index 000000000..44d4dcafa
--- /dev/null
+++ b/pkg/query/vectorized/operator_test.go
@@ -0,0 +1,34 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "errors"
+       "fmt"
+       "testing"
+)
+
+func TestErrLimitExhausted_IsSentinel_MatchableViaErrorsIs(t *testing.T) {
+       wrapped := fmt.Errorf("limit operator finished: %w", ErrLimitExhausted)
+       if !errors.Is(wrapped, ErrLimitExhausted) {
+               t.Fatal("ErrLimitExhausted must be matchable via errors.Is on a 
wrapped error")
+       }
+       if errors.Is(errors.New("unrelated"), ErrLimitExhausted) {
+               t.Fatal("unrelated errors must not match ErrLimitExhausted")
+       }
+}
diff --git a/pkg/query/vectorized/pipeline.go b/pkg/query/vectorized/pipeline.go
new file mode 100644
index 000000000..b12221555
--- /dev/null
+++ b/pkg/query/vectorized/pipeline.go
@@ -0,0 +1,154 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+)
+
+// Pipeline is the composed sequence of stages from source to final breaker.
+// It exposes a single PullOperator-shaped Next method to the driver.
+type Pipeline struct {
+       head   PullOperator
+       closed bool
+}
+
+// Next returns the next batch from the head stage.
+func (p *Pipeline) Next(ctx context.Context) (*RecordBatch, error) {
+       return p.head.NextBatch(ctx)
+}
+
+// Close closes the head stage. Idempotent — repeat calls are no-ops.
+func (p *Pipeline) Close() error {
+       if p.closed {
+               return nil
+       }
+       p.closed = true
+       return p.head.Close()
+}
+
+// PipelineBuilder fluently composes a Pipeline.
+type PipelineBuilder struct {
+       source       PullOperator
+       pendingFused []FusibleOperator
+       breakers     []BreakerOperator
+}
+
+// NewPipelineBuilder starts a builder.
+func NewPipelineBuilder() *PipelineBuilder { return &PipelineBuilder{} }
+
+// From sets the leaf source.
+func (b *PipelineBuilder) From(p PullOperator) *PipelineBuilder { b.source = 
p; return b }
+
+// Apply queues a FusibleOperator to fold into the next stage.
+func (b *PipelineBuilder) Apply(f FusibleOperator) *PipelineBuilder {
+       b.pendingFused = append(b.pendingFused, f)
+       return b
+}
+
+// Break appends a breaker, finalizing the current fused-stage prefix.
+func (b *PipelineBuilder) Break(br BreakerOperator) *PipelineBuilder {
+       b.breakers = append(b.breakers, br)
+       return b
+}
+
+// Build validates and constructs the Pipeline.
+func (b *PipelineBuilder) Build() (*Pipeline, error) {
+       if b.source == nil {
+               return nil, errors.New("vectorized: pipeline missing source 
(use From)")
+       }
+       var head PullOperator = newFusedStage(b.source, b.pendingFused)
+       for _, br := range b.breakers {
+               head = newBreakerStage(head, br)
+       }
+       return &Pipeline{head: head}, nil
+}
+
+// breakerStage wraps a BreakerOperator so it acts as a PullOperator for the 
next stage.
+// It drains the upstream PullOperator into the breaker's Consume, calls 
Finalize once,
+// then serves the breaker's output via NextBatch.
+//
+// Error stickiness: if upstream Pull, Consume, or Finalize fails, the error is
+// stored and returned on every subsequent NextBatch call. drained is set as
+// soon as the consume loop is entered, so a retry never re-runs Consume or
+// Finalize. This guarantees Finalize executes at most once per stage.
+type breakerStage struct {
+       err      error
+       upstream PullOperator
+       breaker  BreakerOperator
+       drained  bool
+       closed   bool
+}
+
+func newBreakerStage(upstream PullOperator, br BreakerOperator) *breakerStage {
+       return &breakerStage{upstream: upstream, breaker: br}
+}
+
+func (s *breakerStage) Init(ctx context.Context) error {
+       if initErr := s.upstream.Init(ctx); initErr != nil {
+               return initErr
+       }
+       return s.breaker.Init(ctx)
+}
+
+func (s *breakerStage) OutputSchema() *BatchSchema { return 
s.breaker.OutputSchema() }
+
+func (s *breakerStage) NextBatch(ctx context.Context) (*RecordBatch, error) {
+       if s.err != nil {
+               return nil, s.err
+       }
+       if !s.drained {
+               s.drained = true
+               for {
+                       b, pullErr := s.upstream.NextBatch(ctx)
+                       if pullErr != nil {
+                               s.err = pullErr
+                               return nil, pullErr
+                       }
+                       if b == nil {
+                               break
+                       }
+                       if consumeErr := s.breaker.Consume(ctx, b); consumeErr 
!= nil {
+                               s.err = consumeErr
+                               return nil, consumeErr
+                       }
+               }
+               if finalizeErr := s.breaker.Finalize(ctx); finalizeErr != nil {
+                       s.err = finalizeErr
+                       return nil, finalizeErr
+               }
+       }
+       return s.breaker.NextBatch(ctx)
+}
+
+// Close is idempotent — children are invoked once across repeated calls.
+func (s *breakerStage) Close() error {
+       if s.closed {
+               return nil
+       }
+       s.closed = true
+       var firstErr error
+       if closeErr := s.upstream.Close(); closeErr != nil {
+               firstErr = closeErr
+       }
+       if closeErr := s.breaker.Close(); closeErr != nil && firstErr == nil {
+               firstErr = closeErr
+       }
+       return firstErr
+}
diff --git a/pkg/query/vectorized/pipeline_test.go 
b/pkg/query/vectorized/pipeline_test.go
new file mode 100644
index 000000000..2628a4e1d
--- /dev/null
+++ b/pkg/query/vectorized/pipeline_test.go
@@ -0,0 +1,231 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "context"
+       "errors"
+       "testing"
+)
+
+// fakeBreaker is a test BreakerOperator that records its lifecycle and
+// emits a hand-built sequence of output batches after Finalize.
+type fakeBreaker struct {
+       consumeErr  error
+       finalizeErr error
+       schema      *BatchSchema
+       consumed    []*RecordBatch
+       output      []*RecordBatch
+       outIdx      int
+       consumeCnt  int
+       finalizeCnt int
+       emitCnt     int
+       closeCnt    int
+}
+
+func (b *fakeBreaker) Init(_ context.Context) error { return nil }
+func (b *fakeBreaker) OutputSchema() *BatchSchema   { return b.schema }
+func (b *fakeBreaker) Close() error                 { b.closeCnt++; return nil 
}
+func (b *fakeBreaker) Consume(_ context.Context, batch *RecordBatch) error {
+       b.consumeCnt++
+       if b.consumeErr != nil {
+               return b.consumeErr
+       }
+       b.consumed = append(b.consumed, batch)
+       return nil
+}
+func (b *fakeBreaker) Finalize(_ context.Context) error { b.finalizeCnt++; 
return b.finalizeErr }
+func (b *fakeBreaker) NextBatch(_ context.Context) (*RecordBatch, error) {
+       if b.outIdx >= len(b.output) {
+               return nil, nil
+       }
+       batch := b.output[b.outIdx]
+       b.outIdx++
+       b.emitCnt++
+       return batch, nil
+}
+
+func TestPipelineBuilder_BuildWithoutSource_ReturnsError(t *testing.T) {
+       if _, err := NewPipelineBuilder().Build(); err == nil {
+               t.Fatal("Build without From() should return an error")
+       }
+}
+
+func TestPipelineBuilder_FromOnly_BuildsSingleFusedStage(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}}
+       p, err := NewPipelineBuilder().From(src).Build()
+       if err != nil {
+               t.Fatal(err)
+       }
+       if err := p.head.Init(context.Background()); err != nil {
+               t.Fatal(err)
+       }
+       out, err := p.Next(context.Background())
+       if err != nil || out == nil {
+               t.Fatalf("From-only pipeline should yield the source's batch: 
out=%v err=%v", out, err)
+       }
+       eof, err := p.Next(context.Background())
+       if err != nil || eof != nil {
+               t.Fatalf("expected EOF: out=%v err=%v", eof, err)
+       }
+}
+
+func TestPipelineBuilder_FromApplyApply_BuildsFusedStageWithTwoFusibles(t 
*testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1, 
2)}}
+       f1 := &fakeFusible{schema: s}
+       f2 := &fakeFusible{schema: s}
+       p, err := NewPipelineBuilder().From(src).Apply(f1).Apply(f2).Build()
+       if err != nil {
+               t.Fatal(err)
+       }
+       _ = p.head.Init(context.Background())
+       out, err := p.Next(context.Background())
+       if err != nil {
+               t.Fatal(err)
+       }
+       if out.Columns[0].(*TypedColumn[int64]).Data()[0] != 3 {
+               t.Fatalf("two fusibles should each +1 the row: got %d", 
out.Columns[0].(*TypedColumn[int64]).Data()[0])
+       }
+}
+
+func TestPipelineBuilder_FromBreak_BuildsBreakerStageOnTopOfSource(t 
*testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}}
+       br := &fakeBreaker{schema: s, output: []*RecordBatch{mkInt64Batch(s, 
99)}}
+       p, err := NewPipelineBuilder().From(src).Break(br).Build()
+       if err != nil {
+               t.Fatal(err)
+       }
+       _ = p.head.Init(context.Background())
+       out, err := p.Next(context.Background())
+       if err != nil || out == nil {
+               t.Fatalf("breaker stage should emit output after draining: 
err=%v out=%v", err, out)
+       }
+       if out.Columns[0].(*TypedColumn[int64]).Data()[0] != 99 {
+               t.Fatalf("expected breaker output 99, got %d", 
out.Columns[0].(*TypedColumn[int64]).Data()[0])
+       }
+}
+
+func TestBreakerStage_DrainsUpstreamBeforeEmittingOutput(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{
+               mkInt64Batch(s, 1),
+               mkInt64Batch(s, 2),
+               mkInt64Batch(s, 3),
+       }}
+       br := &fakeBreaker{schema: s, output: []*RecordBatch{mkInt64Batch(s, 
100)}}
+       p, _ := NewPipelineBuilder().From(src).Break(br).Build()
+       _ = p.head.Init(context.Background())
+       if br.consumeCnt != 0 || br.finalizeCnt != 0 {
+               t.Fatal("Init must not invoke Consume/Finalize")
+       }
+       _, _ = p.Next(context.Background())
+       if br.consumeCnt != 3 {
+               t.Fatalf("breaker should Consume every upstream batch: got %d", 
br.consumeCnt)
+       }
+       if br.finalizeCnt != 1 {
+               t.Fatalf("breaker should Finalize once: got %d", br.finalizeCnt)
+       }
+}
+
+func TestBreakerStage_UpstreamError_ShortCircuitsBeforeFinalize(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       boom := errors.New("upstream boom")
+       src := &fakePull{schema: s, pullErr: boom}
+       br := &fakeBreaker{schema: s}
+       p, _ := NewPipelineBuilder().From(src).Break(br).Build()
+       _ = p.head.Init(context.Background())
+       _, err := p.Next(context.Background())
+       if !errors.Is(err, boom) {
+               t.Fatalf("want upstream error, got %v", err)
+       }
+       if br.finalizeCnt != 0 {
+               t.Fatalf("Finalize must not run on upstream error: got %d", 
br.finalizeCnt)
+       }
+}
+
+func TestBreakerStage_FinalizeError_StickyOnRetry(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s} // EOF immediately
+       boom := errors.New("finalize boom")
+       br := &fakeBreaker{schema: s, finalizeErr: boom}
+       p, _ := NewPipelineBuilder().From(src).Break(br).Build()
+       _ = p.head.Init(context.Background())
+
+       _, err1 := p.Next(context.Background())
+       if !errors.Is(err1, boom) {
+               t.Fatalf("first Next: want finalize error, got %v", err1)
+       }
+       _, err2 := p.Next(context.Background())
+       if !errors.Is(err2, boom) {
+               t.Fatalf("second Next must return sticky finalize error, got 
%v", err2)
+       }
+       if br.finalizeCnt != 1 {
+               t.Fatalf("Finalize must be called exactly once across retries; 
got %d", br.finalizeCnt)
+       }
+}
+
+func TestBreakerStage_ConsumeError_StickyOnRetry(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s, batches: []*RecordBatch{mkInt64Batch(s, 1)}}
+       boom := errors.New("consume boom")
+       br := &fakeBreaker{schema: s, consumeErr: boom}
+       p, _ := NewPipelineBuilder().From(src).Break(br).Build()
+       _ = p.head.Init(context.Background())
+
+       _, err1 := p.Next(context.Background())
+       if !errors.Is(err1, boom) {
+               t.Fatalf("first Next: want consume error, got %v", err1)
+       }
+       _, err2 := p.Next(context.Background())
+       if !errors.Is(err2, boom) {
+               t.Fatalf("second Next must return sticky consume error, got 
%v", err2)
+       }
+       if br.finalizeCnt != 0 {
+               t.Fatalf("Finalize must not run after consume error; got %d", 
br.finalizeCnt)
+       }
+}
+
+func TestBreakerStage_Close_Idempotent_CallsChildrenOnce(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s}
+       br := &fakeBreaker{schema: s}
+       stage := newBreakerStage(src, br)
+       _ = stage.Close()
+       _ = stage.Close()
+       if src.closeCnt != 1 || br.closeCnt != 1 {
+               t.Fatalf("breakerStage.Close must be idempotent: src=%d br=%d", 
src.closeCnt, br.closeCnt)
+       }
+}
+
+func TestPipeline_Close_Idempotent(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       src := &fakePull{schema: s}
+       p, _ := NewPipelineBuilder().From(src).Build()
+       if err := p.Close(); err != nil {
+               t.Fatal(err)
+       }
+       if err := p.Close(); err != nil {
+               t.Fatalf("second Close should be no-op, got %v", err)
+       }
+       if src.closeCnt != 1 {
+               t.Fatalf("second Close must not propagate to source: got %d", 
src.closeCnt)
+       }
+}
diff --git a/pkg/query/vectorized/pool.go b/pkg/query/vectorized/pool.go
new file mode 100644
index 000000000..a080bde51
--- /dev/null
+++ b/pkg/query/vectorized/pool.go
@@ -0,0 +1,54 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import "sync"
+
+// BatchPool reuses RecordBatches across pipeline iterations.
+// All batches in a pool share the same schema and capacity.
+//
+// Caller contract: Put a batch only when it is consistent. Error paths must
+// discard rather than Put — see fusedStage.NextBatch.
+type BatchPool struct {
+       schema   *BatchSchema
+       pool     sync.Pool
+       capacity int
+}
+
+// NewBatchPool returns a pool whose Get yields freshly Reset batches.
+func NewBatchPool(schema *BatchSchema, capacity int) *BatchPool {
+       p := &BatchPool{schema: schema, capacity: capacity}
+       p.pool.New = func() any { return NewRecordBatch(schema, capacity) }
+       return p
+}
+
+// Get returns a Reset batch. Caller may write rows up to capacity.
+func (p *BatchPool) Get() *RecordBatch {
+       b := p.pool.Get().(*RecordBatch)
+       b.Reset()
+       return b
+}
+
+// Put returns a batch to the pool. Nil batches and batches with a foreign
+// schema are silently dropped.
+func (p *BatchPool) Put(b *RecordBatch) {
+       if b == nil || b.Schema != p.schema {
+               return
+       }
+       p.pool.Put(b)
+}
diff --git a/pkg/query/vectorized/pool_test.go 
b/pkg/query/vectorized/pool_test.go
new file mode 100644
index 000000000..f20507de0
--- /dev/null
+++ b/pkg/query/vectorized/pool_test.go
@@ -0,0 +1,69 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import "testing"
+
+func TestBatchPool_Get_ReturnsResetBatch(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       p := NewBatchPool(s, DefaultBatchSize)
+       dirty := p.Get()
+       dirty.Columns[0].(*TypedColumn[int64]).Append(99)
+       dirty.Len = 1
+       dirty.Selection = []uint16{0}
+       p.Put(dirty)
+       clean := p.Get()
+       if clean.Len != 0 {
+               t.Fatalf("Get must return Reset batch: Len=%d", clean.Len)
+       }
+       if clean.Selection != nil {
+               t.Fatalf("Get must return Reset batch: Selection=%v", 
clean.Selection)
+       }
+       if clean.Columns[0].Len() != 0 {
+               t.Fatalf("Get must return Reset columns: col Len=%d", 
clean.Columns[0].Len())
+       }
+}
+
+func TestBatchPool_Put_NilBatch_NoOp(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       p := NewBatchPool(s, 8)
+       defer func() {
+               if r := recover(); r != nil {
+                       t.Fatalf("Put(nil) should not panic; got %v", r)
+               }
+       }()
+       p.Put(nil)
+}
+
+func TestBatchPool_Put_ForeignSchema_NoOp(t *testing.T) {
+       s1 := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       s2 := NewBatchSchema([]ColumnDef{{Role: RoleTimestamp, Type: 
ColumnTypeInt64}})
+       p1 := NewBatchPool(s1, 8)
+       foreign := NewRecordBatch(s2, 8)
+       defer func() {
+               if r := recover(); r != nil {
+                       t.Fatalf("Put(foreign) should not panic; got %v", r)
+               }
+       }()
+       p1.Put(foreign)
+       // Confirm pool is still functional after rejecting the foreign batch.
+       got := p1.Get()
+       if got.Schema != s1 {
+               t.Fatal("pool returned a batch with foreign schema after 
rejection")
+       }
+}
diff --git a/pkg/query/vectorized/schema.go b/pkg/query/vectorized/schema.go
new file mode 100644
index 000000000..2b604177c
--- /dev/null
+++ b/pkg/query/vectorized/schema.go
@@ -0,0 +1,114 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+// ColumnRole identifies the semantic role of a column within a RecordBatch.
+type ColumnRole int
+
+// Column roles. Each batch schema may include at most one column per metadata 
role.
+const (
+       RoleTimestamp ColumnRole = iota
+       RoleVersion
+       RoleSeriesID
+       RoleShardID
+       RoleTag
+       RoleField
+)
+
+// ColumnDef describes one column in a BatchSchema.
+type ColumnDef struct {
+       Name      string
+       TagFamily string
+       Role      ColumnRole
+       Type      ColumnType
+}
+
+// tagKey is the composite key used to index tag columns by (family, name).
+// Using a struct key eliminates two issues with a string-concatenated key:
+//   - it cannot collide when family or name contains the separator character;
+//   - the lookup avoids allocating a fresh string on every hot-path call.
+type tagKey struct {
+       family string
+       name   string
+}
+
+// BatchSchema is the immutable column layout shared by every RecordBatch in a 
pipeline.
+type BatchSchema struct {
+       tagByPath    map[tagKey]int
+       fieldByName  map[string]int
+       Columns      []ColumnDef
+       timestampIdx int
+       versionIdx   int
+       seriesIDIdx  int
+       shardIDIdx   int
+}
+
+// NewBatchSchema builds a BatchSchema and precomputes lookup indices.
+func NewBatchSchema(cols []ColumnDef) *BatchSchema {
+       s := &BatchSchema{
+               Columns:      cols,
+               timestampIdx: -1,
+               versionIdx:   -1,
+               seriesIDIdx:  -1,
+               shardIDIdx:   -1,
+               tagByPath:    make(map[tagKey]int),
+               fieldByName:  make(map[string]int),
+       }
+       for i, c := range cols {
+               switch c.Role {
+               case RoleTimestamp:
+                       s.timestampIdx = i
+               case RoleVersion:
+                       s.versionIdx = i
+               case RoleSeriesID:
+                       s.seriesIDIdx = i
+               case RoleShardID:
+                       s.shardIDIdx = i
+               case RoleTag:
+                       s.tagByPath[tagKey{family: c.TagFamily, name: c.Name}] 
= i
+               case RoleField:
+                       s.fieldByName[c.Name] = i
+               }
+       }
+       return s
+}
+
+// TimestampIndex returns the timestamp column index, or -1 if absent.
+func (s *BatchSchema) TimestampIndex() int { return s.timestampIdx }
+
+// VersionIndex returns the version column index, or -1 if absent.
+func (s *BatchSchema) VersionIndex() int { return s.versionIdx }
+
+// SeriesIDIndex returns the series-id column index, or -1 if absent.
+func (s *BatchSchema) SeriesIDIndex() int { return s.seriesIDIdx }
+
+// ShardIDIndex returns the shard-id column index, or -1 if absent.
+func (s *BatchSchema) ShardIDIndex() int { return s.shardIDIdx }
+
+// TagIndex returns the column index for a (family, name) tag.
+// Lookup uses a struct key, so it does not allocate.
+func (s *BatchSchema) TagIndex(family, name string) (int, bool) {
+       i, ok := s.tagByPath[tagKey{family: family, name: name}]
+       return i, ok
+}
+
+// FieldIndex returns the column index for a field name.
+func (s *BatchSchema) FieldIndex(name string) (int, bool) {
+       i, ok := s.fieldByName[name]
+       return i, ok
+}
diff --git a/pkg/query/vectorized/schema_test.go 
b/pkg/query/vectorized/schema_test.go
new file mode 100644
index 000000000..e753fc3f9
--- /dev/null
+++ b/pkg/query/vectorized/schema_test.go
@@ -0,0 +1,127 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import "testing"
+
+func TestBatchSchema_LookupHelpers_TimestampVersionSeriesIDShardID(t 
*testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTimestamp, Type: ColumnTypeInt64},
+               {Role: RoleVersion, Type: ColumnTypeInt64},
+               {Role: RoleSeriesID, Type: ColumnTypeInt64},
+               {Role: RoleShardID, Type: ColumnTypeInt64},
+       })
+       if got := s.TimestampIndex(); got != 0 {
+               t.Fatalf("TimestampIndex: want 0, got %d", got)
+       }
+       if got := s.VersionIndex(); got != 1 {
+               t.Fatalf("VersionIndex: want 1, got %d", got)
+       }
+       if got := s.SeriesIDIndex(); got != 2 {
+               t.Fatalf("SeriesIDIndex: want 2, got %d", got)
+       }
+       if got := s.ShardIDIndex(); got != 3 {
+               t.Fatalf("ShardIDIndex: want 3, got %d", got)
+       }
+}
+
+func TestBatchSchema_TagIndex_FoundAndMissing(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTimestamp, Type: ColumnTypeInt64},
+               {Role: RoleTag, TagFamily: "default", Name: "service", Type: 
ColumnTypeString},
+               {Role: RoleTag, TagFamily: "default", Name: "endpoint", Type: 
ColumnTypeString},
+       })
+       idx, ok := s.TagIndex("default", "service")
+       if !ok || idx != 1 {
+               t.Fatalf("TagIndex(default,service): want (1,true), got 
(%d,%v)", idx, ok)
+       }
+       idx, ok = s.TagIndex("default", "endpoint")
+       if !ok || idx != 2 {
+               t.Fatalf("TagIndex(default,endpoint): want (2,true), got 
(%d,%v)", idx, ok)
+       }
+       if _, ok := s.TagIndex("default", "missing"); ok {
+               t.Fatalf("TagIndex(default,missing): expected miss")
+       }
+}
+
+func TestBatchSchema_FieldIndex_FoundAndMissing(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTimestamp, Type: ColumnTypeInt64},
+               {Role: RoleField, Name: "value", Type: ColumnTypeFloat64},
+               {Role: RoleField, Name: "count", Type: ColumnTypeInt64},
+       })
+       idx, ok := s.FieldIndex("value")
+       if !ok || idx != 1 {
+               t.Fatalf("FieldIndex(value): want (1,true), got (%d,%v)", idx, 
ok)
+       }
+       idx, ok = s.FieldIndex("count")
+       if !ok || idx != 2 {
+               t.Fatalf("FieldIndex(count): want (2,true), got (%d,%v)", idx, 
ok)
+       }
+       if _, ok := s.FieldIndex("missing"); ok {
+               t.Fatalf("FieldIndex(missing): expected miss")
+       }
+}
+
+func TestBatchSchema_MultipleTagFamilies_NoCollision(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTag, TagFamily: "a", Name: "id", Type: 
ColumnTypeString},
+               {Role: RoleTag, TagFamily: "b", Name: "id", Type: 
ColumnTypeString},
+       })
+       aIdx, aOK := s.TagIndex("a", "id")
+       bIdx, bOK := s.TagIndex("b", "id")
+       if !aOK || !bOK {
+               t.Fatalf("both lookups should succeed: aOK=%v bOK=%v", aOK, bOK)
+       }
+       if aIdx == bIdx {
+               t.Fatalf("same tag name in different families must map to 
distinct columns: a=%d b=%d", aIdx, bIdx)
+       }
+}
+
+func TestBatchSchema_TagFamilyDotInName_NoCollision(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTag, TagFamily: "a.b", Name: "c", Type: 
ColumnTypeString},
+               {Role: RoleTag, TagFamily: "a", Name: "b.c", Type: 
ColumnTypeString},
+       })
+       i1, ok1 := s.TagIndex("a.b", "c")
+       i2, ok2 := s.TagIndex("a", "b.c")
+       if !ok1 || !ok2 {
+               t.Fatalf("both lookups must succeed: ok1=%v ok2=%v", ok1, ok2)
+       }
+       if i1 == i2 {
+               t.Fatalf("collision: (a.b,c) and (a,b.c) resolve to the same 
index %d", i1)
+       }
+}
+
+func TestBatchSchema_NoMetadata_ReturnsMinusOne(t *testing.T) {
+       s := NewBatchSchema([]ColumnDef{
+               {Role: RoleTag, TagFamily: "default", Name: "service", Type: 
ColumnTypeString},
+       })
+       if got := s.TimestampIndex(); got != -1 {
+               t.Fatalf("absent timestamp: want -1, got %d", got)
+       }
+       if got := s.VersionIndex(); got != -1 {
+               t.Fatalf("absent version: want -1, got %d", got)
+       }
+       if got := s.SeriesIDIndex(); got != -1 {
+               t.Fatalf("absent seriesID: want -1, got %d", got)
+       }
+       if got := s.ShardIDIndex(); got != -1 {
+               t.Fatalf("absent shardID: want -1, got %d", got)
+       }
+}
diff --git a/pkg/query/vectorized/typed_column.go 
b/pkg/query/vectorized/typed_column.go
new file mode 100644
index 000000000..e323aa7a9
--- /dev/null
+++ b/pkg/query/vectorized/typed_column.go
@@ -0,0 +1,110 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+// TypedColumn[T] is a generic Column with element type T.
+// One instance per supported T. Use the typed constructors below.
+type TypedColumn[T any] struct {
+       data     []T
+       validity validityBitmap
+       zero     T
+       typ      ColumnType
+}
+
+// Type returns the static ColumnType this column was built with.
+func (c *TypedColumn[T]) Type() ColumnType { return c.typ }
+
+// Len returns the current row count.
+func (c *TypedColumn[T]) Len() int { return len(c.data) }
+
+// IsNull reports whether row i is null.
+func (c *TypedColumn[T]) IsNull(i int) bool { return c.validity.IsNull(i) }
+
+// Data returns the backing slice for bulk access.
+func (c *TypedColumn[T]) Data() []T { return c.data }
+
+// Append adds a value, marking it valid.
+func (c *TypedColumn[T]) Append(v T) { c.data = append(c.data, v) }
+
+// AppendNull adds a zero-value placeholder and marks it null.
+func (c *TypedColumn[T]) AppendNull() {
+       idx := len(c.data)
+       c.data = append(c.data, c.zero)
+       c.validity.MarkNull(idx)
+}
+
+// MarkNullAt marks an existing row at index i as null. Length is unchanged.
+func (c *TypedColumn[T]) MarkNullAt(i int) {
+       c.validity.MarkNull(i)
+}
+
+// Reset clears length and validity. Capacity is retained.
+func (c *TypedColumn[T]) Reset() {
+       c.data = c.data[:0]
+       c.validity.Reset()
+}
+
+// NewInt64Column constructs a TypedColumn[int64] with the given capacity.
+func NewInt64Column(capacity int) *TypedColumn[int64] {
+       return &TypedColumn[int64]{typ: ColumnTypeInt64, data: make([]int64, 0, 
capacity)}
+}
+
+// NewFloat64Column constructs a TypedColumn[float64] with the given capacity.
+func NewFloat64Column(capacity int) *TypedColumn[float64] {
+       return &TypedColumn[float64]{typ: ColumnTypeFloat64, data: 
make([]float64, 0, capacity)}
+}
+
+// NewStringColumn constructs a TypedColumn[string] with the given capacity.
+func NewStringColumn(capacity int) *TypedColumn[string] {
+       return &TypedColumn[string]{typ: ColumnTypeString, data: make([]string, 
0, capacity)}
+}
+
+// NewBytesColumn constructs a TypedColumn[[]byte] with the given capacity.
+func NewBytesColumn(capacity int) *TypedColumn[[]byte] {
+       return &TypedColumn[[]byte]{typ: ColumnTypeBytes, data: make([][]byte, 
0, capacity)}
+}
+
+// NewInt64ArrayColumn constructs a TypedColumn[[]int64] with the given 
capacity.
+func NewInt64ArrayColumn(capacity int) *TypedColumn[[]int64] {
+       return &TypedColumn[[]int64]{typ: ColumnTypeInt64Array, data: 
make([][]int64, 0, capacity)}
+}
+
+// NewStrArrayColumn constructs a TypedColumn[[]string] with the given 
capacity.
+func NewStrArrayColumn(capacity int) *TypedColumn[[]string] {
+       return &TypedColumn[[]string]{typ: ColumnTypeStrArray, data: 
make([][]string, 0, capacity)}
+}
+
+// NewColumnForType constructs a Column with the given type and capacity.
+// Panics on unknown type — programmer error, not data error.
+func NewColumnForType(t ColumnType, capacity int) Column {
+       switch t {
+       case ColumnTypeInt64:
+               return NewInt64Column(capacity)
+       case ColumnTypeFloat64:
+               return NewFloat64Column(capacity)
+       case ColumnTypeString:
+               return NewStringColumn(capacity)
+       case ColumnTypeBytes:
+               return NewBytesColumn(capacity)
+       case ColumnTypeInt64Array:
+               return NewInt64ArrayColumn(capacity)
+       case ColumnTypeStrArray:
+               return NewStrArrayColumn(capacity)
+       }
+       panic("vectorized: unknown ColumnType " + t.String())
+}
diff --git a/pkg/query/vectorized/typed_column_test.go 
b/pkg/query/vectorized/typed_column_test.go
new file mode 100644
index 000000000..5e13a1896
--- /dev/null
+++ b/pkg/query/vectorized/typed_column_test.go
@@ -0,0 +1,142 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package vectorized
+
+import (
+       "bytes"
+       "testing"
+)
+
+func TestTypedColumnInt64_AppendAndData(t *testing.T) {
+       c := NewInt64Column(8)
+       c.Append(1)
+       c.Append(2)
+       c.Append(3)
+       if c.Len() != 3 {
+               t.Fatalf("Len: want 3, got %d", c.Len())
+       }
+       if c.Type() != ColumnTypeInt64 {
+               t.Fatalf("Type: want Int64, got %v", c.Type())
+       }
+       want := []int64{1, 2, 3}
+       for i, v := range want {
+               if c.Data()[i] != v {
+                       t.Fatalf("Data[%d]: want %d, got %d", i, v, c.Data()[i])
+               }
+       }
+}
+
+func TestTypedColumnInt64_AppendNull_GrowsLengthAndMarksNull(t *testing.T) {
+       c := NewInt64Column(8)
+       c.Append(10)
+       c.AppendNull()
+       c.Append(20)
+       if c.Len() != 3 {
+               t.Fatalf("Len: want 3, got %d", c.Len())
+       }
+       if c.IsNull(0) || !c.IsNull(1) || c.IsNull(2) {
+               t.Fatalf("validity: want [valid,null,valid], got [%v,%v,%v]", 
c.IsNull(0), c.IsNull(1), c.IsNull(2))
+       }
+}
+
+func TestTypedColumnInt64_MarkNullAt_TogglesPreExistingRow(t *testing.T) {
+       c := NewInt64Column(4)
+       c.Append(0)
+       c.Append(0)
+       c.Append(0)
+       c.MarkNullAt(1)
+       if c.IsNull(0) || !c.IsNull(1) || c.IsNull(2) {
+               t.Fatalf("MarkNullAt(1): want [valid,null,valid], got 
[%v,%v,%v]", c.IsNull(0), c.IsNull(1), c.IsNull(2))
+       }
+       if c.Len() != 3 {
+               t.Fatalf("MarkNullAt must not grow length: got Len=%d", c.Len())
+       }
+}
+
+func TestTypedColumnInt64_Reset_LengthZero_CapacityRetained(t *testing.T) {
+       c := NewInt64Column(64)
+       for i := range 64 {
+               c.Append(int64(i))
+       }
+       c.MarkNullAt(10)
+       beforeCap := cap(c.Data())
+       c.Reset()
+       if c.Len() != 0 {
+               t.Fatalf("Reset Len: want 0, got %d", c.Len())
+       }
+       if cap(c.Data()) != beforeCap {
+               t.Fatalf("Reset must retain capacity: before=%d after=%d", 
beforeCap, cap(c.Data()))
+       }
+       if c.IsNull(10) {
+               t.Fatal("Reset must clear validity")
+       }
+}
+
+func TestTypedColumn_AllSixTypes_RoundTrip(t *testing.T) {
+       t.Run("int64", func(t *testing.T) {
+               c := NewInt64Column(2)
+               c.Append(42)
+               if c.Type() != ColumnTypeInt64 || c.Data()[0] != 42 {
+                       t.Fatalf("int64 roundtrip failed")
+               }
+       })
+       t.Run("float64", func(t *testing.T) {
+               c := NewFloat64Column(2)
+               c.Append(3.14)
+               if c.Type() != ColumnTypeFloat64 || c.Data()[0] != 3.14 {
+                       t.Fatalf("float64 roundtrip failed")
+               }
+       })
+       t.Run("string", func(t *testing.T) {
+               c := NewStringColumn(2)
+               c.Append("hello")
+               if c.Type() != ColumnTypeString || c.Data()[0] != "hello" {
+                       t.Fatalf("string roundtrip failed")
+               }
+       })
+       t.Run("bytes", func(t *testing.T) {
+               c := NewBytesColumn(2)
+               c.Append([]byte("hi"))
+               if c.Type() != ColumnTypeBytes || !bytes.Equal(c.Data()[0], 
[]byte("hi")) {
+                       t.Fatalf("bytes roundtrip failed")
+               }
+       })
+       t.Run("int64-array", func(t *testing.T) {
+               c := NewInt64ArrayColumn(2)
+               c.Append([]int64{1, 2, 3})
+               if c.Type() != ColumnTypeInt64Array || len(c.Data()[0]) != 3 {
+                       t.Fatalf("int64[] roundtrip failed")
+               }
+       })
+       t.Run("string-array", func(t *testing.T) {
+               c := NewStrArrayColumn(2)
+               c.Append([]string{"a", "b"})
+               if c.Type() != ColumnTypeStrArray || len(c.Data()[0]) != 2 {
+                       t.Fatalf("string[] roundtrip failed")
+               }
+       })
+}
+
+func TestNewColumnForType_UnknownType_Panics(t *testing.T) {
+       defer func() {
+               if r := recover(); r == nil {
+                       t.Fatal("expected panic on unknown ColumnType")
+               }
+       }()
+       _ = NewColumnForType(ColumnType(999), 4)
+}

Reply via email to