This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit a421813e12e9bfa75b53682bf4a07b14c5415648 Author: Hongtao Gao <[email protected]> AuthorDate: Fri May 8 07:50:37 2026 +0000 feat(query/model): add MeasureBatchResult interface; storage dual-emits (G5b foundation) First two stories of G5b — adds the columnar storage→executor interface and a wrapper-style implementation on the existing storage layer. Pull() remains byte-identical for legacy consumers; PullBatch is purely additive. US-001 — MeasureBatchResult interface + MeasureBatch struct - pkg/query/model/batch.go defines: * MeasureBatchResult { PullBatch(ctx) (*MeasureBatch, error); Release() } * MeasureBatch { Schema, Timestamps, Versions, ShardIDs, SeriesIDs, Tags, Fields, SeriesBoundaries } - pkg/query/model/batch_test.go covers nil receiver, empty/single/mixed column shapes, multi-series boundary semantics, sticky-error contract, and idempotent Release. US-002 — *queryResult and *indexSortResult dual-emit - pkg/query/vectorized/measure/build_batch.go provides BuildMeasureBatchFromResult — wrapper-style conversion of the existing *model.MeasureResult into *model.MeasureBatch with passthrough columns (TypedColumn[*modelv1.TagValue] / [*modelv1.FieldValue]). Multi-group null fill matches scan.go::fillTags / fillFields. Tests cover full row, missing-tag null fill, missing-field null fill, length-invariant violations, nil result/schema. - banyand/measure/query.go gains a *vectorized.BatchSchema field on queryResult and indexSortResult, populated once per Query() / buildIndexQueryResult() via vmeasure.BuildBatchSchema. Builds are best-effort — schema-build errors leave batchSchema nil; PullBatch returns a clean error rather than degrading Pull(). - banyand/measure/query_batch.go implements PullBatch on both result types as a wrapper around Pull() + BuildMeasureBatchFromResult. Compile-time assertions confirm both types satisfy model.MeasureBatchResult. Mixing Pull() and PullBatch() on the same result is undefined; each advances the same underlying cursors. Documented on each PullBatch method. This is the lazy/wrapper design — the *modelv1.TagValue intermediate still exists, just transiently. The architectural decode-elimination is US-003 (block_cursor emits typed columns natively), which removes the intermediate from the storage hot path. Verification: - go test ./pkg/query/model/... -count=1 -race OK - go test ./pkg/query/vectorized/... -count=1 -race OK - go test ./banyand/measure/... -count=1 -short OK (28s) - go test ./test/integration/standalone/query/ -timeout=15m OK (245s) - make -s lint clean - go build ./... clean Pull() byte-identical at the gRPC layer (integration suite parity). PullBatch is unwired — no consumer calls it yet (US-007 wires NewMIterator). G5a bench gates green at 5de3e0eb; this commit keeps them green by being purely additive. --- banyand/measure/query.go | 17 +- banyand/measure/query_batch.go | 81 ++++++++ pkg/query/model/batch.go | 92 +++++++++ pkg/query/model/batch_test.go | 186 +++++++++++++++++++ pkg/query/vectorized/measure/build_batch.go | 137 ++++++++++++++ pkg/query/vectorized/measure/build_batch_test.go | 226 +++++++++++++++++++++++ 6 files changed, 737 insertions(+), 2 deletions(-) diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 398e059f6..fd5f9ee70 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -39,6 +39,7 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -217,6 +218,11 @@ func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr m.queryMetrics.resultPoints.Observe(float64(len(result.data))) } + // Build the columnar BatchSchema once for PullBatch consumers (G5b). + // Falls back to nil on schema-build failure; PullBatch checks for nil + // and returns a clean error rather than degrading the row-path Pull(). + result.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, mqo) + return &result, nil } @@ -535,6 +541,11 @@ func (m *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQu r.segResults.sortDesc = true } + // G5b — build the columnar BatchSchema once for PullBatch consumers. + // Errors here are non-fatal for the row-path Pull(); PullBatch will + // return a clean error if batchSchema is nil at call time. + r.batchSchema, _ = vmeasure.BuildBatchSchema(m.schema, mqo) + heap.Init(&r.segResults) return r, nil } @@ -743,6 +754,7 @@ type queryResult struct { topNQueryOptions *topNQueryOptions sidToIndex map[common.SeriesID]int storedIndexValue map[common.SeriesID]map[string]*modelv1.TagValue + batchSchema *vectorized.BatchSchema tagProjection []model.TagProjection data []*blockCursor snapshots []*snapshot @@ -959,8 +971,9 @@ func (qr *queryResult) merge(storedIndexValue map[common.SeriesID]map[string]*mo } type indexSortResult struct { - tfl []tagFamilyLocation - segResults segResultHeap + batchSchema *vectorized.BatchSchema + tfl []tagFamilyLocation + segResults segResultHeap } // Pull implements model.MeasureQueryResult. diff --git a/banyand/measure/query_batch.go b/banyand/measure/query_batch.go new file mode 100644 index 000000000..08a7b1647 --- /dev/null +++ b/banyand/measure/query_batch.go @@ -0,0 +1,81 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "fmt" + + "github.com/apache/skywalking-banyandb/pkg/query/model" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" +) + +// Compile-time assertions that both result types satisfy the columnar +// interface alongside MeasureQueryResult. +var ( + _ model.MeasureBatchResult = (*queryResult)(nil) + _ model.MeasureBatchResult = (*indexSortResult)(nil) +) + +// PullBatch implements model.MeasureBatchResult for queryResult. +// +// G5b "dual-emit" wrapper: the implementation calls Pull() to obtain the +// next *model.MeasureResult and converts it to a *model.MeasureBatch using +// vmeasure.BuildMeasureBatchFromResult. Pull()'s row-path output stays +// byte-identical for legacy consumers; PullBatch is purely additive. +// +// Mixing Pull() and PullBatch() on the same queryResult is undefined — +// each call advances the same underlying cursors. Callers must pick one. +// +// Sticky-error contract: if Pull() returns a *MeasureResult with +// non-nil Error, PullBatch returns (nil, that error) so future calls +// continue to surface the same error (Pull() is itself sticky here). +func (qr *queryResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) { + if qr.batchSchema == nil { + return nil, fmt.Errorf("queryResult.PullBatch: batchSchema not initialized; " + + "the underlying query did not record a vectorized BatchSchema (likely a schema-build error at Query time)") + } + r := qr.Pull() + if r == nil { + return nil, nil + } + if r.Error != nil { + return nil, r.Error + } + return vmeasure.BuildMeasureBatchFromResult(r, qr.batchSchema) +} + +// PullBatch implements model.MeasureBatchResult for indexSortResult. +// +// indexSortResult.Pull() yields single-row MeasureResults; the conversion +// shape is identical to queryResult.PullBatch — single-row batches that +// the vectorized adapter accumulates into larger pipeline batches. +func (iqr *indexSortResult) PullBatch(_ context.Context) (*model.MeasureBatch, error) { + if iqr.batchSchema == nil { + return nil, fmt.Errorf("indexSortResult.PullBatch: batchSchema not initialized; " + + "the underlying query did not record a vectorized BatchSchema (likely a schema-build error at Query time)") + } + r := iqr.Pull() + if r == nil { + return nil, nil + } + if r.Error != nil { + return nil, r.Error + } + return vmeasure.BuildMeasureBatchFromResult(r, iqr.batchSchema) +} diff --git a/pkg/query/model/batch.go b/pkg/query/model/batch.go new file mode 100644 index 000000000..aaad3a191 --- /dev/null +++ b/pkg/query/model/batch.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "context" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// MeasureBatchResult is the columnar counterpart to MeasureQueryResult. +// Implementations return a *MeasureBatch carrying parallel typed slices +// instead of the row-shaped *modelv1.TagValue / *modelv1.FieldValue cells. +// +// The contract is independent of MeasureQueryResult: a single underlying +// query may satisfy both interfaces (dual-emit) or only one. Callers that +// want the columnar path call PullBatch; callers that still want the row +// path call MeasureQueryResult.Pull. Release is shared semantics — the +// caller invokes it exactly once after iteration is complete; calling +// Release twice (e.g. once via each interface) is safe but unnecessary. +type MeasureBatchResult interface { + // PullBatch returns the next batch from the underlying scan. It returns + // (nil, nil) on EOF; (nil, err) on storage error. Once an error is + // returned, subsequent calls must continue to return that same error + // (sticky-error contract — matches MeasureQueryResult.Pull's + // MeasureResult.Error semantics). + PullBatch(ctx context.Context) (*MeasureBatch, error) + + // Release frees any resources held by the result. Idempotent. + Release() +} + +// MeasureBatch is a single columnar batch flowing out of the storage layer. +// +// Row count is the length of Timestamps. Versions, ShardIDs and SeriesIDs +// have the same length. Tags and Fields are parallel column slices whose +// indices correspond to Schema.Columns entries with RoleTag / RoleField +// respectively (the column at Tags[i] aligns with the i-th tag slot in +// Schema, in declaration order; same for Fields[i]). +// +// SeriesBoundaries records the exclusive end-of-series row indices within +// the batch. For example, a batch holding 100 rows of series A followed by +// 200 rows of series B reports SeriesBoundaries = [100, 300]. Empty (nil +// or zero-length) means "single series" — every row in the batch belongs +// to the same series. +type MeasureBatch struct { + // Schema describes the column layout. Tags[i] / Fields[i] entries are + // indexed via Schema.TagIndex / Schema.FieldIndex. + Schema *vectorized.BatchSchema + + // Per-row metadata columns. Length equals the row count. + Timestamps []int64 + Versions []int64 + ShardIDs []common.ShardID + SeriesIDs []common.SeriesID + + // Typed tag and field columns. Tags[i] corresponds to the i-th + // RoleTag entry in Schema.Columns (in declaration order); same for + // Fields. Each column reports the same Len() as len(Timestamps). + Tags []vectorized.Column + Fields []vectorized.Column + + // SeriesBoundaries records exclusive end-of-series row indices for + // multi-series batches. nil or empty when the batch contains a single + // series. + SeriesBoundaries []int +} + +// RowCount returns the number of rows in the batch. It reads len(Timestamps); +// callers must keep the parallel column lengths in sync. +func (b *MeasureBatch) RowCount() int { + if b == nil { + return 0 + } + return len(b.Timestamps) +} diff --git a/pkg/query/model/batch_test.go b/pkg/query/model/batch_test.go new file mode 100644 index 000000000..eb6081177 --- /dev/null +++ b/pkg/query/model/batch_test.go @@ -0,0 +1,186 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package model + +import ( + "context" + "errors" + "testing" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +func TestMeasureBatch_RowCount_Nil(t *testing.T) { + var b *MeasureBatch + if got := b.RowCount(); got != 0 { + t.Fatalf("nil batch RowCount: want 0, got %d", got) + } +} + +func TestMeasureBatch_RowCount_Empty(t *testing.T) { + b := &MeasureBatch{} + if got := b.RowCount(); got != 0 { + t.Fatalf("empty batch RowCount: want 0, got %d", got) + } +} + +func TestMeasureBatch_SingleTagColumn(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + }) + tagCol := vectorized.NewStringColumn(4) + tagCol.Append("alpha") + tagCol.Append("beta") + b := &MeasureBatch{ + Schema: schema, + Timestamps: []int64{1, 2}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + SeriesIDs: []common.SeriesID{7, 7}, + Tags: []vectorized.Column{tagCol}, + } + if got := b.RowCount(); got != 2 { + t.Fatalf("RowCount: want 2, got %d", got) + } + if got := len(b.Tags); got != 1 { + t.Fatalf("len(Tags): want 1, got %d", got) + } + if got := b.Tags[0].Len(); got != 2 { + t.Fatalf("Tags[0].Len: want 2, got %d", got) + } +} + +func TestMeasureBatch_MixedTagAndField(t *testing.T) { + schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeString}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeInt64}, + }) + tagCol := vectorized.NewStringColumn(4) + tagCol.Append("alpha") + tagCol.Append("beta") + tagCol.Append("gamma") + fieldCol := vectorized.NewInt64Column(4) + fieldCol.Append(10) + fieldCol.Append(20) + fieldCol.Append(30) + b := &MeasureBatch{ + Schema: schema, + Timestamps: []int64{1, 2, 3}, + Versions: []int64{1, 1, 1}, + ShardIDs: []common.ShardID{0, 0, 0}, + SeriesIDs: []common.SeriesID{1, 1, 1}, + Tags: []vectorized.Column{tagCol}, + Fields: []vectorized.Column{fieldCol}, + } + if got := b.RowCount(); got != 3 { + t.Fatalf("RowCount: want 3, got %d", got) + } + if got := len(b.Fields); got != 1 { + t.Fatalf("len(Fields): want 1, got %d", got) + } + if got := b.Fields[0].Len(); got != 3 { + t.Fatalf("Fields[0].Len: want 3, got %d", got) + } + // Schema lookup helpers must align with the columns. + if idx, ok := schema.TagIndex("default", "svc"); !ok || idx != 1 { + t.Fatalf("TagIndex: want (1, true), got (%d, %v)", idx, ok) + } + if idx, ok := schema.FieldIndex("value"); !ok || idx != 2 { + t.Fatalf("FieldIndex: want (2, true), got (%d, %v)", idx, ok) + } +} + +func TestMeasureBatch_SeriesBoundariesMultiSeries(t *testing.T) { + // A batch holding 2 rows of series A followed by 3 rows of series B + // reports SeriesBoundaries = [2, 5]. + b := &MeasureBatch{ + Timestamps: []int64{1, 2, 3, 4, 5}, + Versions: []int64{1, 1, 1, 1, 1}, + ShardIDs: []common.ShardID{0, 0, 0, 0, 0}, + SeriesIDs: []common.SeriesID{1, 1, 2, 2, 2}, + SeriesBoundaries: []int{2, 5}, + } + if got := b.RowCount(); got != 5 { + t.Fatalf("RowCount: want 5, got %d", got) + } + if got := b.SeriesBoundaries; len(got) != 2 || got[0] != 2 || got[1] != 5 { + t.Fatalf("SeriesBoundaries: want [2 5], got %v", got) + } +} + +// fakeMeasureBatchResult is the minimal MeasureBatchResult used to verify +// the interface contract (sticky errors, EOF semantics, idempotent Release). +type fakeMeasureBatchResult struct { + err error + seq []*MeasureBatch + idx int + releaseCnt int +} + +func (f *fakeMeasureBatchResult) PullBatch(_ context.Context) (*MeasureBatch, error) { + if f.err != nil { + return nil, f.err + } + if f.idx >= len(f.seq) { + return nil, nil + } + b := f.seq[f.idx] + f.idx++ + return b, nil +} + +func (f *fakeMeasureBatchResult) Release() { + f.releaseCnt++ +} + +func TestMeasureBatchResult_PullBatchEOF(t *testing.T) { + r := &fakeMeasureBatchResult{seq: nil} + b, err := r.PullBatch(context.Background()) + if err != nil { + t.Fatalf("EOF must not return an error; got %v", err) + } + if b != nil { + t.Fatalf("EOF must return nil batch; got %v", b) + } +} + +func TestMeasureBatchResult_PullBatchStickyError(t *testing.T) { + boom := errors.New("scan failed") + r := &fakeMeasureBatchResult{err: boom} + for i := range 3 { + b, err := r.PullBatch(context.Background()) + if !errors.Is(err, boom) { + t.Fatalf("call %d: want sticky error %v, got %v", i, boom, err) + } + if b != nil { + t.Fatalf("call %d: error must yield nil batch, got %v", i, b) + } + } +} + +func TestMeasureBatchResult_ReleaseIdempotent(t *testing.T) { + r := &fakeMeasureBatchResult{} + r.Release() + r.Release() + if r.releaseCnt != 2 { + t.Fatalf("Release calls counted: want 2, got %d", r.releaseCnt) + } +} diff --git a/pkg/query/vectorized/measure/build_batch.go b/pkg/query/vectorized/measure/build_batch.go new file mode 100644 index 000000000..94702e574 --- /dev/null +++ b/pkg/query/vectorized/measure/build_batch.go @@ -0,0 +1,137 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "fmt" + + "github.com/apache/skywalking-banyandb/api/common" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BuildMeasureBatchFromResult converts a *model.MeasureResult produced by a +// row-shaped MeasureQueryResult.Pull() call into a *model.MeasureBatch whose +// columns match the supplied BatchSchema. +// +// This is the G5b "dual-emit" wrapper helper: the storage layer can +// implement MeasureBatchResult.PullBatch by calling its existing Pull() and +// passing the result through this converter. It preserves the existing +// row-path decode pipeline; the architectural decode-elimination is left to +// G5c/G5d (block_cursor native column emit). +// +// Schema-declared tag/field columns missing from the result are null-filled +// using pbv1.Null{Tag,Field}Value singletons — matching the multi-group +// projection behavior in fillTags / fillFields when one group's schema +// lacks a tag the other has. +// +// Returns (nil, nil) when r is nil. Returns (nil, err) when a length +// invariant is violated (a value slice is shorter than the timestamp count). +func BuildMeasureBatchFromResult(r *model.MeasureResult, schema *vectorized.BatchSchema) (*model.MeasureBatch, error) { + if r == nil { + return nil, nil + } + if schema == nil { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: nil schema") + } + n := len(r.Timestamps) + + timestamps := make([]int64, n) + copy(timestamps, r.Timestamps) + versions := make([]int64, n) + if len(r.Versions) >= n { + copy(versions, r.Versions[:n]) + } + shardIDs := make([]common.ShardID, n) + if len(r.ShardIDs) >= n { + copy(shardIDs, r.ShardIDs[:n]) + } + seriesIDs := make([]common.SeriesID, n) + for i := range seriesIDs { + seriesIDs[i] = r.SID + } + + resultTags := make(map[string]*model.Tag) + for i := range r.TagFamilies { + tf := &r.TagFamilies[i] + for j := range tf.Tags { + tag := &tf.Tags[j] + resultTags[tf.Name+"\x00"+tag.Name] = tag + } + } + resultFields := make(map[string]*model.Field, len(r.Fields)) + for i := range r.Fields { + resultFields[r.Fields[i].Name] = &r.Fields[i] + } + + tagCols := make([]vectorized.Column, 0, len(schema.Columns)) + fieldCols := make([]vectorized.Column, 0, len(schema.Columns)) + for _, def := range schema.Columns { + switch def.Role { + case vectorized.RoleTag: + col := vectorized.NewTagValueColumn(n) + tag, present := resultTags[def.TagFamily+"\x00"+def.Name] + if !present { + for range n { + col.Append(pbv1.NullTagValue) + } + } else { + if len(tag.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: tag %s.%s has %d values, expected %d", + def.TagFamily, def.Name, len(tag.Values), n) + } + for k := range n { + col.Append(tag.Values[k]) + } + } + tagCols = append(tagCols, col) + case vectorized.RoleField: + col := vectorized.NewFieldValueColumn(n) + fld, present := resultFields[def.Name] + if !present { + for range n { + col.Append(pbv1.NullFieldValue) + } + } else { + if len(fld.Values) < n { + return nil, fmt.Errorf("BuildMeasureBatchFromResult: field %s has %d values, expected %d", + def.Name, len(fld.Values), n) + } + for k := range n { + col.Append(fld.Values[k]) + } + } + fieldCols = append(fieldCols, col) + case vectorized.RoleTimestamp, vectorized.RoleVersion, + vectorized.RoleSeriesID, vectorized.RoleShardID: + // Metadata roles are populated via the parallel slices on the + // MeasureBatch itself; no per-column entry is needed. + } + } + + return &model.MeasureBatch{ + Schema: schema, + Timestamps: timestamps, + Versions: versions, + ShardIDs: shardIDs, + SeriesIDs: seriesIDs, + Tags: tagCols, + Fields: fieldCols, + }, nil +} diff --git a/pkg/query/vectorized/measure/build_batch_test.go b/pkg/query/vectorized/measure/build_batch_test.go new file mode 100644 index 000000000..0dcbf63cb --- /dev/null +++ b/pkg/query/vectorized/measure/build_batch_test.go @@ -0,0 +1,226 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "testing" + + "github.com/apache/skywalking-banyandb/api/common" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// schemaSingleTagSingleField is the canonical mini-schema used across the +// build_batch tests: timestamp + version + sid metadata + one tag (svc) + +// one field (value). Passthrough column types match what the storage-side +// converter emits in the G5b wrapper-style PullBatch. +func schemaSingleTagSingleField() *vectorized.BatchSchema { + return vectorized.NewBatchSchema([]vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleTag, TagFamily: "default", Name: "svc", Type: vectorized.ColumnTypeTagValue}, + {Role: vectorized.RoleField, Name: "value", Type: vectorized.ColumnTypeFieldValue}, + }) +} + +func TestBuildMeasureBatchFromResult_NilResult(t *testing.T) { + batch, err := BuildMeasureBatchFromResult(nil, schemaSingleTagSingleField()) + if err != nil { + t.Fatalf("nil result must return (nil, nil); got err %v", err) + } + if batch != nil { + t.Fatalf("nil result must return (nil, nil); got batch %v", batch) + } +} + +func TestBuildMeasureBatchFromResult_NilSchema(t *testing.T) { + r := &model.MeasureResult{SID: 1, Timestamps: []int64{1}} + if _, err := BuildMeasureBatchFromResult(r, nil); err == nil { + t.Fatal("nil schema must return an error") + } +} + +func TestBuildMeasureBatchFromResult_FullRow(t *testing.T) { + svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} + valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} + r := &model.MeasureResult{ + SID: 7, + Timestamps: []int64{100, 200, 300}, + Versions: []int64{1, 1, 1}, + ShardIDs: []common.ShardID{2, 2, 2}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{svcVal, svcVal, svcVal}}, + }}, + }, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{valueFld, valueFld, valueFld}}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaSingleTagSingleField()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult: %v", err) + } + if got := batch.RowCount(); got != 3 { + t.Fatalf("RowCount: want 3, got %d", got) + } + for i, ts := range []int64{100, 200, 300} { + if batch.Timestamps[i] != ts { + t.Fatalf("Timestamps[%d]: want %d, got %d", i, ts, batch.Timestamps[i]) + } + } + for i, sid := range batch.SeriesIDs { + if sid != 7 { + t.Fatalf("SeriesIDs[%d]: want 7, got %d", i, sid) + } + } + for i, sh := range batch.ShardIDs { + if sh != 2 { + t.Fatalf("ShardIDs[%d]: want 2, got %d", i, sh) + } + } + if len(batch.Tags) != 1 { + t.Fatalf("len(Tags): want 1, got %d", len(batch.Tags)) + } + tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[*modelv1.TagValue]) + if !ok { + t.Fatalf("Tags[0] is not TypedColumn[*modelv1.TagValue]; got %T", batch.Tags[0]) + } + if tagCol.Len() != 3 { + t.Fatalf("Tags[0].Len: want 3, got %d", tagCol.Len()) + } + for i, v := range tagCol.Data() { + if v != svcVal { + t.Fatalf("Tags[0][%d]: want passthrough pointer to svcVal, got %v", i, v) + } + } + if len(batch.Fields) != 1 { + t.Fatalf("len(Fields): want 1, got %d", len(batch.Fields)) + } + fldCol, ok := batch.Fields[0].(*vectorized.TypedColumn[*modelv1.FieldValue]) + if !ok { + t.Fatalf("Fields[0] is not TypedColumn[*modelv1.FieldValue]; got %T", batch.Fields[0]) + } + for i, v := range fldCol.Data() { + if v != valueFld { + t.Fatalf("Fields[0][%d]: want passthrough pointer to valueFld, got %v", i, v) + } + } +} + +func TestBuildMeasureBatchFromResult_MissingTagNullFilled(t *testing.T) { + // Schema declares tag "svc" + field "value", but the result omits "svc". + // Output column for svc must be null-filled with pbv1.NullTagValue per + // the multi-group projection contract. + valueFld := &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}} + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{10, 20}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{valueFld, valueFld}}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaSingleTagSingleField()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult: %v", err) + } + if batch.RowCount() != 2 { + t.Fatalf("RowCount: want 2, got %d", batch.RowCount()) + } + tagCol, ok := batch.Tags[0].(*vectorized.TypedColumn[*modelv1.TagValue]) + if !ok { + t.Fatalf("Tags[0] type") + } + for i, v := range tagCol.Data() { + if v != pbv1.NullTagValue { + t.Fatalf("Tags[0][%d]: want pbv1.NullTagValue (singleton), got %v", i, v) + } + } +} + +func TestBuildMeasureBatchFromResult_MissingFieldNullFilled(t *testing.T) { + svcVal := &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}} + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{10, 20}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{svcVal, svcVal}}, + }}, + }, + } + batch, err := BuildMeasureBatchFromResult(r, schemaSingleTagSingleField()) + if err != nil { + t.Fatalf("BuildMeasureBatchFromResult: %v", err) + } + fldCol, ok := batch.Fields[0].(*vectorized.TypedColumn[*modelv1.FieldValue]) + if !ok { + t.Fatalf("Fields[0] type") + } + for i, v := range fldCol.Data() { + if v != pbv1.NullFieldValue { + t.Fatalf("Fields[0][%d]: want pbv1.NullFieldValue (singleton), got %v", i, v) + } + } +} + +func TestBuildMeasureBatchFromResult_TagValueLengthMismatchErrors(t *testing.T) { + // The schema demands 2 timestamps' worth of cells but the tag's Values + // slice is shorter — the converter must surface the length invariant + // rather than silently truncating. + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{1, 2}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "x"}}}, + }}, + }}, + }, + } + if _, err := BuildMeasureBatchFromResult(r, schemaSingleTagSingleField()); err == nil { + t.Fatal("length-invariant violation must return an error") + } +} + +func TestBuildMeasureBatchFromResult_FieldValueLengthMismatchErrors(t *testing.T) { + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{1, 2}, + Versions: []int64{1, 1}, + ShardIDs: []common.ShardID{0, 0}, + Fields: []model.Field{ + {Name: "value", Values: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 1}}}, + }}, + }, + } + if _, err := BuildMeasureBatchFromResult(r, schemaSingleTagSingleField()); err == nil { + t.Fatal("length-invariant violation must return an error") + } +}
