This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit fe9d38eb3bb87dbd20b173e6222d0dba33e18311 Author: Hongtao Gao <[email protected]> AuthorDate: Tue May 5 22:36:39 2026 +0000 feat(query/vectorized/measure): add integration & parity (gate G4) Fourth gate of the vectorized query pipeline. Wires VectorizedConfig through the measure module, branches on the flag in the logical layer, and adds the unit-level differential parity tests plus an integration suite that boots a flag-on standalone cluster. banyand/measure: VectorizedConfig field on option/measure, plumbed through openMeasure; bind --measure-vectorized-{enabled,batch-size, query-memory-mib} CLI flags on standalone/data/liaison services. pkg/query/logical/measure: localIndexScan.Execute type-asserts ec for VectorizedConfig and routes through vmeasure.NewMIterator when Enabled, wrapped in a hidden-tag stripping decorator. Release MeasureQueryResult on construction failure. pkg/query/vectorized/measure: - integration.go: public BuildBatchSchema + NewMIterator + facade. - adapter.go: one-DataPoint-per-Next contract matching the row path's resultMIterator (the gRPC collector reads only Current()[0]); recycle consumed batches into the pool; Close returns errors.Join(err, ...). - serialize.go: defensive-copy slice-typed values so pool reuse cannot alias prior DataPoints. - scan.go: fillTags/fillFields walk the schema (not the result) and null-fill projections missing from a multi-group result. - diff_test.go: parity tests over single-/multi-series, index-mode, batch-boundary, null variants, ShardID flow, interleaved series, version variations, mid-iteration cancel, storage-error propagation. test/integration/standalone/query/vectorized_test.go: Ginkgo Describe that boots --measure-vectorized-enabled=true and replays casesmeasure + casestopn -- every assertion is a row-path equality check. Default Enabled=false keeps the row path byte-identical pre-G4 (C1). --- banyand/measure/measure.go | 26 + banyand/measure/metadata.go | 4 +- banyand/measure/query.go | 3 + banyand/measure/svc_data.go | 1 + banyand/measure/svc_liaison.go | 1 + banyand/measure/svc_standalone.go | 1 + .../measure/measure_plan_indexscan_local.go | 100 +++- pkg/query/vectorized/measure/adapter.go | 97 +++- pkg/query/vectorized/measure/adapter_test.go | 47 +- pkg/query/vectorized/measure/diff_test.go | 592 +++++++++++++++++++++ pkg/query/vectorized/measure/integration.go | 203 +++++++ pkg/query/vectorized/measure/scan.go | 66 ++- pkg/query/vectorized/measure/serialize.go | 21 +- .../standalone/query/vectorized_test.go | 98 ++++ 14 files changed, 1187 insertions(+), 73 deletions(-) diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go index 4e36be0dd..a90df68ea 100644 --- a/banyand/measure/measure.go +++ b/banyand/measure/measure.go @@ -32,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/meter" "github.com/apache/skywalking-banyandb/pkg/partition" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -56,6 +57,7 @@ type option struct { flushTimeout time.Duration syncInterval time.Duration failedPartsMaxTotalSizeBytes uint64 + vectorized vmeasure.VectorizedConfig } type indexSchema struct { @@ -87,6 +89,7 @@ type measure struct { name string group string interval time.Duration + vectorized vmeasure.VectorizedConfig } func (m *measure) GetSchema() *databasev1.Measure { @@ -146,6 +149,7 @@ func newQueryMetrics(factory observability.Factory) *queryMetrics { func openMeasure(spec measureSpec, l *logger.Logger, c storage.Cache, pm protector.Memory, schemaRepo *schemaRepo, qm *queryMetrics, + vectorized vmeasure.VectorizedConfig, ) (*measure, error) { m := &measure{ schema: spec.schema, @@ -154,9 +158,31 @@ func openMeasure(spec measureSpec, pm: pm, schemaRepo: schemaRepo, queryMetrics: qm, + vectorized: vectorized, } if err := m.parseSpec(); err != nil { return nil, err } return m, nil } + +// VectorizedConfig returns the per-Measure vectorized query configuration. It +// is consumed by the logical query layer to decide whether to wrap the row +// MeasureQueryResult in a vectorized iterator. +func (m *measure) VectorizedConfig() vmeasure.VectorizedConfig { + return m.vectorized +} + +// bindVectorizedFlags wires VectorizedConfig fields to a run.FlagSet. The +// defaults match vmeasure.DefaultConfig — Enabled=false, BatchSize=1024, +// QueryMemoryMiB=256 — so config loaders that omit these flags retain the +// pre-G4 row-only behavior. +func bindVectorizedFlags(flagS *run.FlagSet, cfg *vmeasure.VectorizedConfig) { + defaults := vmeasure.DefaultConfig() + flagS.BoolVar(&cfg.Enabled, "measure-vectorized-enabled", defaults.Enabled, + "enable the vectorized measure query path (off by default; flip after soak)") + flagS.IntVar(&cfg.BatchSize, "measure-vectorized-batch-size", defaults.BatchSize, + "row count per vectorized batch") + flagS.IntVar(&cfg.QueryMemoryMiB, "measure-vectorized-query-memory-mib", defaults.QueryMemoryMiB, + "per-query memory budget for the vectorized path, in MiB") +} diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index 8482eed9d..92ea69818 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -671,7 +671,7 @@ func (s *supplier) OpenResource(spec resourceSchema.Resource) (resourceSchema.In measureSchema := spec.Schema().(*databasev1.Measure) return openMeasure(measureSpec{ schema: measureSchema, - }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load()) + }, s.l, s.c, s.pm, s.schemaRepo, s.queryMetrics.Load(), s.option.vectorized) } func (s *supplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { @@ -785,7 +785,7 @@ func (s *queueSupplier) OpenResource(spec resourceSchema.Resource) (resourceSche measureSchema := spec.Schema().(*databasev1.Measure) return openMeasure(measureSpec{ schema: measureSchema, - }, s.l, nil, s.pm, s.schemaRepo, nil) + }, s.l, nil, s.pm, s.schemaRepo, nil, s.option.vectorized) } func (s *queueSupplier) ResourceSchema(md *commonv1.Metadata) (resourceSchema.ResourceSchema, error) { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 35ddb6e09..398e059f6 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -39,6 +39,7 @@ import ( pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" "github.com/apache/skywalking-banyandb/pkg/pool" "github.com/apache/skywalking-banyandb/pkg/query/model" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -71,6 +72,7 @@ var _ Measure = (*measure)(nil) type queryOptions struct { schemaTagTypes map[string]pbv1.ValueType model.MeasureQueryOptions + vectorized vmeasure.VectorizedConfig minTimestamp int64 maxTimestamp int64 } @@ -183,6 +185,7 @@ func (m *measure) Query(ctx context.Context, mqo model.MeasureQueryOptions) (mqr schemaTagTypes: schemaTagTypes, minTimestamp: mqo.TimeRange.Start.UnixNano(), maxTimestamp: mqo.TimeRange.End.UnixNano(), + vectorized: m.vectorized, } var n int for i := range tables { diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go index e4f776f9b..21e382d66 100644 --- a/banyand/measure/svc_data.go +++ b/banyand/measure/svc_data.go @@ -189,6 +189,7 @@ func (s *dataSVC) FlagSet() *run.FlagSet { flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum service cache size (e.g., 100M)") flagS.DurationVar(&s.cc.CleanupInterval, "service-cache-cleanup-interval", 30*time.Second, "service cache cleanup interval") flagS.DurationVar(&s.cc.IdleTimeout, "service-cache-idle-timeout", 2*time.Minute, "service cache entry idle timeout") + bindVectorizedFlags(flagS, &s.option.vectorized) return flagS } diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go index 7212c3316..ba863456a 100644 --- a/banyand/measure/svc_liaison.go +++ b/banyand/measure/svc_liaison.go @@ -115,6 +115,7 @@ func (s *liaison) FlagSet() *run.FlagSet { "percentage of BanyanDB's allowed disk usage allocated to failed parts storage. "+ "Calculated as: totalDisk * measure-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+ "Set to 0 to disable copying failed parts. Valid range: 0-100") + bindVectorizedFlags(flagS, &s.option.vectorized) return flagS } diff --git a/banyand/measure/svc_standalone.go b/banyand/measure/svc_standalone.go index ebb4f9f7c..a46fc3039 100644 --- a/banyand/measure/svc_standalone.go +++ b/banyand/measure/svc_standalone.go @@ -196,6 +196,7 @@ func (s *standalone) FlagSet() *run.FlagSet { flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum service cache size (e.g., 100M)") flagS.DurationVar(&s.cc.CleanupInterval, "service-cache-cleanup-interval", 30*time.Second, "service cache cleanup interval") flagS.DurationVar(&s.cc.IdleTimeout, "service-cache-idle-timeout", 2*time.Minute, "service cache entry idle timeout") + bindVectorizedFlags(flagS, &s.option.vectorized) return flagS } diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index f82dee2f9..7f9da3b83 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/pkg/index" @@ -36,9 +37,19 @@ import ( "github.com/apache/skywalking-banyandb/pkg/query/executor" "github.com/apache/skywalking-banyandb/pkg/query/logical" "github.com/apache/skywalking-banyandb/pkg/query/model" + vmeasure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +// vectorizedExecutionContext is an optional capability the executor may +// expose. When set and Enabled, localIndexScan wraps the row-path +// MeasureQueryResult in a vectorized adapter instead of the legacy +// resultMIterator. The bool guard keeps the row path mechanically untouched +// when the flag is off — confirming C1 (zero regression). +type vectorizedExecutionContext interface { + VectorizedConfig() vmeasure.VectorizedConfig +} + var _ logical.UnresolvedPlan = (*unresolvedIndexScan)(nil) type unresolvedIndexScan struct { @@ -218,7 +229,7 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e } ctx, stop := i.startSpan(ctx, query.GetTracer(ctx), orderBy) defer stop(err) - result, err := i.ec.Query(ctx, model.MeasureQueryOptions{ + opts := model.MeasureQueryOptions{ Name: i.metadata.GetName(), TimeRange: &i.timeRange, Entities: i.entities, @@ -226,10 +237,16 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e Order: orderBy, TagProjection: i.projectionTags, FieldProjection: i.projectionFields, - }) + } + result, err := i.ec.Query(ctx, opts) if err != nil { return nil, fmt.Errorf("failed to query measure: %w", err) } + if vit, vok, vErr := i.maybeVectorized(ctx, result, opts); vErr != nil { + return nil, vErr + } else if vok { + return vit, nil + } return &resultMIterator{ result: result, projectionTags: i.projectionTags, @@ -238,6 +255,85 @@ func (i *localIndexScan) Execute(ctx context.Context) (mit executor.MIterator, e }, nil } +// maybeVectorized returns a vectorized MIterator iff every gate is met: +// - the executor exposes a VectorizedConfig. +// - the config is Enabled. +// - the underlying result is non-nil (the row path returns a typed-nil result +// for empty queries; routing that through the vectorized scan would NPE). +// - the logical schema exposes the *databasev1.Measure needed to type the +// batch columns. +// +// Any negative answer falls through to the row path, so flag-off behavior is +// byte-identical to the pre-G4 codepath. +func (i *localIndexScan) maybeVectorized(ctx context.Context, result model.MeasureQueryResult, + opts model.MeasureQueryOptions, +) (executor.MIterator, bool, error) { + if result == nil { + return nil, false, nil + } + src, ok := i.ec.(vectorizedExecutionContext) + if !ok { + return nil, false, nil + } + cfg := src.VectorizedConfig() + if !cfg.Enabled { + return nil, false, nil + } + measureSchema := i.measureSchema() + if measureSchema == nil { + return nil, false, nil + } + it, buildErr := vmeasure.NewMIterator(ctx, result, measureSchema, opts, cfg) + if buildErr != nil { + // NewMIterator does not take ownership of result on failure; the + // row-path code releases on Close, so without this Release the + // snapshots / segments behind result leak. + result.Release() + return nil, false, fmt.Errorf("failed to build vectorized iterator: %w", buildErr) + } + if i.hiddenTags.IsEmpty() { + return it, true, nil + } + return &hiddenTagsMIterator{inner: it, hiddenTags: i.hiddenTags}, true, nil +} + +// hiddenTagsMIterator wraps an MIterator and strips hidden criteria tags from +// each Current() result. The row path applies the same strip inside +// resultMIterator.Next; this decorator keeps the vectorized path's contract +// identical without duplicating filter logic into the vectorized package. +type hiddenTagsMIterator struct { + inner executor.MIterator + hiddenTags logical.HiddenTagSet +} + +func (h *hiddenTagsMIterator) Next() bool { return h.inner.Next() } + +func (h *hiddenTagsMIterator) Current() []*measurev1.InternalDataPoint { + dps := h.inner.Current() + for _, dp := range dps { + if dp == nil || dp.DataPoint == nil { + continue + } + dp.DataPoint.TagFamilies = h.hiddenTags.StripHiddenTags(dp.DataPoint.TagFamilies) + } + return dps +} + +func (h *hiddenTagsMIterator) Close() error { return h.inner.Close() } + +// measureSchema extracts the *databasev1.Measure from the logical schema. The +// vectorized path needs it to type each projected column. +func (i *localIndexScan) measureSchema() *databasev1.Measure { + if i.schema == nil { + return nil + } + ms, ok := i.schema.(*schema) + if !ok { + return nil + } + return ms.measure +} + func (i *localIndexScan) String() string { return fmt.Sprintf("IndexScan: startTime=%d,endTime=%d,Metadata{group=%s,name=%s},conditions=%s; projection=%s; order=%s;", i.timeRange.Start.Unix(), i.timeRange.End.Unix(), i.metadata.GetGroup(), i.metadata.GetName(), diff --git a/pkg/query/vectorized/measure/adapter.go b/pkg/query/vectorized/measure/adapter.go index 1e872f82d..80294adde 100644 --- a/pkg/query/vectorized/measure/adapter.go +++ b/pkg/query/vectorized/measure/adapter.go @@ -19,6 +19,7 @@ package measure import ( "context" + "errors" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" "github.com/apache/skywalking-banyandb/pkg/query/vectorized" @@ -28,45 +29,81 @@ import ( // interface so the existing gRPC handler can drive it without knowing it // is internally columnar. // -// Lifecycle: Next pulls a RecordBatch from the pipeline, serializes it via -// serializeBatchToProto, and exposes the result through Current. EOF and -// errors both terminate iteration; the latter is exposed via Err. +// Contract match: the row-path resultMIterator advances one DataPoint per +// Next() and exposes it as a single-element slice via Current(); the gRPC +// collector at banyand/query/processor.go::collectInternalDataPoints reads +// only current[0]. This adapter mirrors that semantics: Next() advances one +// row at a time, pulling a new batch from the pipeline whenever the +// previously-serialized batch is exhausted, returning the consumed batch to +// pool so allocations stay flat. type vectorizedMIterator struct { - ctx context.Context - pipeline *vectorized.Pipeline - err error - current []*measurev1.InternalDataPoint - done bool + ctx context.Context + pipeline *vectorized.Pipeline + pool *vectorized.BatchPool + prevBatch *vectorized.RecordBatch + err error + batch []*measurev1.InternalDataPoint + pos int + done bool } -// newVectorizedMIterator constructs an adapter bound to ctx. -func newVectorizedMIterator(ctx context.Context, p *vectorized.Pipeline) *vectorizedMIterator { - return &vectorizedMIterator{ctx: ctx, pipeline: p} +// newVectorizedMIterator constructs an adapter bound to ctx. The pool is the +// BatchPool that BatchScan draws from; consumed batches are returned there +// after serialization. +func newVectorizedMIterator(ctx context.Context, p *vectorized.Pipeline, pool *vectorized.BatchPool) *vectorizedMIterator { + return &vectorizedMIterator{ctx: ctx, pipeline: p, pool: pool, pos: -1} } -// Next pulls one batch from the pipeline and serializes it. Returns true while -// data remains. After EOF or error, returns false and stays in the terminal state. +// Next advances by exactly one DataPoint. Whenever the cached serialization +// is exhausted, the previous batch is returned to the pool and a new one is +// pulled from the pipeline. Empty-active batches (e.g., from BatchLimit +// emitting an empty selection) are silently recycled — they are not +// surfaced as zero-row Current() reads. EOF and errors are terminal. func (i *vectorizedMIterator) Next() bool { if i.done { return false } - b, err := i.pipeline.Next(i.ctx) - if err != nil { - i.err = err - i.done = true - return false - } - if b == nil { - i.done = true - return false + i.pos++ + for i.pos >= len(i.batch) { + i.recyclePrev() + b, pullErr := i.pipeline.Next(i.ctx) + if pullErr != nil { + i.err = pullErr + i.done = true + return false + } + if b == nil { + i.done = true + return false + } + i.prevBatch = b + i.batch = serializeBatchToProto(b, i.batch[:0]) + i.pos = 0 } - i.current = serializeBatchToProto(b, i.current[:0]) return true } -// Current returns the most recently serialized batch. +// recyclePrev returns the last consumed batch to the pool. Safe to call when +// no batch is held. The serializer defensive-copies slice-typed values +// (see columnValueToTagValue / columnValueToFieldValue) so reusing the batch +// cannot corrupt previously-emitted DataPoints. +func (i *vectorizedMIterator) recyclePrev() { + if i.prevBatch == nil || i.pool == nil { + i.prevBatch = nil + return + } + i.pool.Put(i.prevBatch) + i.prevBatch = nil +} + +// Current returns a single-element slice containing the row most recently +// advanced into via Next(). Matches the row-path contract; collectors that +// read only Current()[0] see every row. func (i *vectorizedMIterator) Current() []*measurev1.InternalDataPoint { - return i.current + if i.pos < 0 || i.pos >= len(i.batch) { + return nil + } + return i.batch[i.pos : i.pos+1] } // Err returns the storage error that terminated iteration, or nil. @@ -74,8 +111,12 @@ func (i *vectorizedMIterator) Err() error { return i.err } -// Close delegates to the underlying pipeline. Pipeline.Close is idempotent, -// so repeated Close calls on this adapter are safe. +// Close releases pooled batches and the pipeline (which closes the BatchScan +// and releases the underlying MeasureQueryResult). Returns the join of any +// sticky iteration error and the pipeline-close error, mirroring the row-path +// resultMIterator.Close contract that surfaces ei.err. func (i *vectorizedMIterator) Close() error { - return i.pipeline.Close() + i.recyclePrev() + closeErr := i.pipeline.Close() + return errors.Join(i.err, closeErr) } diff --git a/pkg/query/vectorized/measure/adapter_test.go b/pkg/query/vectorized/measure/adapter_test.go index 56d455ff8..cc0e0b942 100644 --- a/pkg/query/vectorized/measure/adapter_test.go +++ b/pkg/query/vectorized/measure/adapter_test.go @@ -27,8 +27,9 @@ import ( ) // buildAdapterPipeline wires a BatchScan into a minimal Pipeline against the -// supplied fake MeasureQueryResult. -func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) *vectorized.Pipeline { +// supplied fake MeasureQueryResult and returns both the pipeline and its pool +// so the adapter can recycle batches. +func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) (*vectorized.Pipeline, *vectorized.BatchPool) { t.Helper() schema := minimalSchema() pool := vectorized.NewBatchPool(schema, 4) @@ -40,28 +41,32 @@ func buildAdapterPipeline(t *testing.T, qr model.MeasureQueryResult) *vectorized if err := scan.Init(context.Background()); err != nil { t.Fatal(err) } - return p + return p, pool } func TestVectorizedMIterator_Next_PullsAndSerializes(t *testing.T) { qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100, 200)}} - p := buildAdapterPipeline(t, qr) - it := newVectorizedMIterator(context.Background(), p) + p, pool := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p, pool) defer it.Close() - if !it.Next() { - t.Fatalf("Next must return true while pipeline has data; err=%v", it.Err()) + count := 0 + for it.Next() { + dps := it.Current() + if len(dps) != 1 { + t.Fatalf("Current length: want 1, got %d", len(dps)) + } + count++ } - dps := it.Current() - if len(dps) != 2 { - t.Fatalf("Current length: want 2, got %d", len(dps)) + if count != 2 { + t.Fatalf("iterations: want 2, got %d", count) } } func TestVectorizedMIterator_Next_ReturnsFalseOnEOF(t *testing.T) { qr := &fakeMeasureQueryResult{seq: nil} - p := buildAdapterPipeline(t, qr) - it := newVectorizedMIterator(context.Background(), p) + p, pool := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p, pool) defer it.Close() if it.Next() { @@ -75,8 +80,8 @@ func TestVectorizedMIterator_Next_ReturnsFalseOnEOF(t *testing.T) { func TestVectorizedMIterator_Next_ReturnsFalseOnError_ErrExposedViaErr(t *testing.T) { boom := errors.New("storage boom") qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResultErr(boom)}} - p := buildAdapterPipeline(t, qr) - it := newVectorizedMIterator(context.Background(), p) + p, pool := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p, pool) defer it.Close() if it.Next() { @@ -87,10 +92,10 @@ func TestVectorizedMIterator_Next_ReturnsFalseOnError_ErrExposedViaErr(t *testin } } -func TestVectorizedMIterator_Current_ReturnsLastSerializedBatch(t *testing.T) { +func TestVectorizedMIterator_Current_ReturnsCurrentRow(t *testing.T) { qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} - p := buildAdapterPipeline(t, qr) - it := newVectorizedMIterator(context.Background(), p) + p, pool := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p, pool) defer it.Close() _ = it.Next() @@ -98,16 +103,16 @@ func TestVectorizedMIterator_Current_ReturnsLastSerializedBatch(t *testing.T) { if len(first) != 1 { t.Fatalf("first Current len: want 1, got %d", len(first)) } - // Current called repeatedly should keep returning the last batch. + // Current called repeatedly without Next must keep returning the same row. if got := it.Current(); len(got) != 1 { - t.Fatalf("repeat Current must return same batch; got len %d", len(got)) + t.Fatalf("repeat Current must return same row; got len %d", len(got)) } } func TestVectorizedMIterator_Close_DelegatesToPipelineClose_Idempotent(t *testing.T) { qr := &fakeMeasureQueryResult{seq: []*model.MeasureResult{mkResult(1, 100)}} - p := buildAdapterPipeline(t, qr) - it := newVectorizedMIterator(context.Background(), p) + p, pool := buildAdapterPipeline(t, qr) + it := newVectorizedMIterator(context.Background(), p, pool) if err := it.Close(); err != nil { t.Fatal(err) } diff --git a/pkg/query/vectorized/measure/diff_test.go b/pkg/query/vectorized/measure/diff_test.go new file mode 100644 index 000000000..5891d35e2 --- /dev/null +++ b/pkg/query/vectorized/measure/diff_test.go @@ -0,0 +1,592 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/timestamppb" + + "github.com/apache/skywalking-banyandb/api/common" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" +) + +// diffFixture is a single parity case: a Measure schema, the projection +// applied to it, and the deterministic MeasureResult sequence both paths +// consume. +type diffFixture struct { + schema *databasev1.Measure + name string + results []*model.MeasureResult + opts model.MeasureQueryOptions +} + +// runDiff drives both the row-path serializer and the vectorized adapter on +// fresh copies of fx.results, then returns the two output slices for +// proto-equality comparison. fx.results is *not* shared across paths because +// the vectorized adapter consumes (and may mutate referenced) values; each +// path needs an independent fake MeasureQueryResult. +func runDiff(t *testing.T, fx diffFixture) (rowOut, vecOut []*measurev1.InternalDataPoint) { + t.Helper() + rowQR := &fakeMeasureQueryResult{seq: cloneResults(fx.results)} + rowOut = rowSerialize(rowQR, fx.opts) + + vecQR := &fakeMeasureQueryResult{seq: cloneResults(fx.results)} + cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64} + it, err := NewMIterator(context.Background(), vecQR, fx.schema, fx.opts, cfg) + if err != nil { + t.Fatalf("%s: NewMIterator: %v", fx.name, err) + } + defer it.Close() + for it.Next() { + // Adapter conforms to the row-path's "one DataPoint per Next" contract: + // the gRPC collector reads only Current()[0]. Mirror that here. + current := it.Current() + if len(current) > 0 { + vecOut = append(vecOut, cloneIDPs(current[:1])...) + } + } + if itErr := it.Err(); itErr != nil { + t.Fatalf("%s: vectorized iterator err: %v", fx.name, itErr) + } + return rowOut, vecOut +} + +// cloneResults returns a fresh slice so the row and vectorized paths +// each own their MeasureResult sequence; both paths advance the cursor +// independently. +func cloneResults(in []*model.MeasureResult) []*model.MeasureResult { + out := make([]*model.MeasureResult, len(in)) + copy(out, in) // shallow — neither path writes back into MeasureResult. + return out +} + +func cloneIDPs(in []*measurev1.InternalDataPoint) []*measurev1.InternalDataPoint { + out := make([]*measurev1.InternalDataPoint, len(in)) + for i, dp := range in { + out[i] = proto.Clone(dp).(*measurev1.InternalDataPoint) + } + return out +} + +// rowSerialize replays the row-path serializer that pkg/query/logical/measure +// runs in resultMIterator.Next. Replicated here (not imported) because +// pkg/query/logical/measure already imports this package — importing it back +// would form a cycle. +func rowSerialize(qr model.MeasureQueryResult, opts model.MeasureQueryOptions) []*measurev1.InternalDataPoint { + var dps []*measurev1.InternalDataPoint + for { + r := qr.Pull() + if r == nil { + return dps + } + if r.Error != nil { + return dps + } + tagFamilyMap := make(map[string]*model.TagFamily, len(r.TagFamilies)) + for idx := range r.TagFamilies { + tagFamilyMap[r.TagFamilies[idx].Name] = &r.TagFamilies[idx] + } + for i := range r.Timestamps { + dp := &measurev1.DataPoint{ + Timestamp: timestamppb.New(time.Unix(0, r.Timestamps[i])), + Sid: uint64(r.SID), + Version: r.Versions[i], + } + for _, proj := range opts.TagProjection { + tf := &modelv1.TagFamily{Name: proj.Family} + dp.TagFamilies = append(dp.TagFamilies, tf) + resultTF := tagFamilyMap[proj.Family] + for _, tagName := range proj.Names { + var value *modelv1.TagValue + if resultTF != nil { + for _, t := range resultTF.Tags { + if t.Name == tagName { + value = t.Values[i] + break + } + } + } + if value == nil { + value = pbv1.NullTagValue + } + tf.Tags = append(tf.Tags, &modelv1.Tag{Key: tagName, Value: value}) + } + } + for _, pf := range opts.FieldProjection { + foundIdx := -1 + for idx := range r.Fields { + if r.Fields[idx].Name == pf { + foundIdx = idx + break + } + } + if foundIdx != -1 { + dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{ + Name: r.Fields[foundIdx].Name, + Value: r.Fields[foundIdx].Values[i], + }) + } else { + dp.Fields = append(dp.Fields, &measurev1.DataPoint_Field{ + Name: pf, + Value: pbv1.NullFieldValue, + }) + } + } + var shardID common.ShardID + if len(r.ShardIDs) > i { + shardID = r.ShardIDs[i] + } + dps = append(dps, &measurev1.InternalDataPoint{ + DataPoint: dp, + ShardId: uint32(shardID), + }) + } + } +} + +// assertDPEqual compares two slices of InternalDataPoint structurally up to +// the subset of fields the vectorized path is expected to set. Notes: +// - Tag/field projection ordering is identical across both paths. +// - The vectorized path may not populate TagFamilies whose entries do not +// appear in the source MeasureResult; the row path always emits a +// family-shaped entry. Tests below pin the input to materialize tags so +// this difference does not surface. +func assertDPEqual(t *testing.T, name string, want, got []*measurev1.InternalDataPoint) { + t.Helper() + if len(want) != len(got) { + t.Fatalf("%s: count mismatch: row=%d vec=%d", name, len(want), len(got)) + } + for i := range want { + if diff := cmp.Diff(want[i], got[i], + cmp.Comparer(proto.Equal), + ); diff != "" { + t.Fatalf("%s: dp[%d] mismatch:\n%s", name, i, diff) + } + } +} + +// Schemas / fixtures. + +func diffSchemaTagFieldMix() *databasev1.Measure { + return &databasev1.Measure{ + Metadata: nil, + TagFamilies: []*databasev1.TagFamilySpec{ + {Name: "default", Tags: []*databasev1.TagSpec{ + {Name: "svc", Type: databasev1.TagType_TAG_TYPE_STRING}, + {Name: "env_id", Type: databasev1.TagType_TAG_TYPE_INT}, + {Name: "blob", Type: databasev1.TagType_TAG_TYPE_DATA_BINARY}, + {Name: "ports", Type: databasev1.TagType_TAG_TYPE_INT_ARRAY}, + {Name: "labels", Type: databasev1.TagType_TAG_TYPE_STRING_ARRAY}, + }}, + }, + Fields: []*databasev1.FieldSpec{ + {Name: "v_int", FieldType: databasev1.FieldType_FIELD_TYPE_INT}, + {Name: "v_float", FieldType: databasev1.FieldType_FIELD_TYPE_FLOAT}, + {Name: "v_str", FieldType: databasev1.FieldType_FIELD_TYPE_STRING}, + {Name: "v_bytes", FieldType: databasev1.FieldType_FIELD_TYPE_DATA_BINARY}, + }, + } +} + +func diffOptsAllTagsAllFields() model.MeasureQueryOptions { + return model.MeasureQueryOptions{ + TagProjection: []model.TagProjection{ + {Family: "default", Names: []string{"svc", "env_id", "blob", "ports", "labels"}}, + }, + FieldProjection: []string{"v_int", "v_float", "v_str", "v_bytes"}, + } +} + +// rowSet is shorthand for the canonical "all variants set" row used by most +// fixtures. +func rowSet(sid common.SeriesID, ts int64) *model.MeasureResult { + return &model.MeasureResult{ + SID: sid, + Timestamps: []int64{ts}, + Versions: []int64{1}, + ShardIDs: []common.ShardID{0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}}, + }}, + {Name: "env_id", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: 7}}}, + }}, + {Name: "blob", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_BinaryData{BinaryData: []byte{0x01, 0x02, 0x03}}}, + }}, + {Name: "ports", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{80, 443}}}}, + }}, + {Name: "labels", Values: []*modelv1.TagValue{ + {Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"a", "b"}}}}, + }}, + }}, + }, + Fields: []model.Field{ + {Name: "v_int", Values: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: 42}}}, + }}, + {Name: "v_float", Values: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: 3.14}}}, + }}, + {Name: "v_str", Values: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: "ok"}}}, + }}, + {Name: "v_bytes", Values: []*modelv1.FieldValue{ + {Value: &modelv1.FieldValue_BinaryData{BinaryData: []byte{0xde, 0xad}}}, + }}, + }, + } +} + +// rowMulti returns one MeasureResult covering n consecutive rows of a single +// series — exercises the bulk fast path. +func rowMulti(sid common.SeriesID, baseTS int64, n int) *model.MeasureResult { + r := &model.MeasureResult{SID: sid} + tags := []model.Tag{ + {Name: "svc"}, + {Name: "env_id"}, + {Name: "blob"}, + {Name: "ports"}, + {Name: "labels"}, + } + fields := []model.Field{ + {Name: "v_int"}, + {Name: "v_float"}, + {Name: "v_str"}, + {Name: "v_bytes"}, + } + for i := range n { + r.Timestamps = append(r.Timestamps, baseTS+int64(i)) + r.Versions = append(r.Versions, 1) + r.ShardIDs = append(r.ShardIDs, 0) + tags[0].Values = append(tags[0].Values, &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "alpha"}}}) + tags[1].Values = append(tags[1].Values, &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: &modelv1.Int{Value: int64(i)}}}) + tags[2].Values = append(tags[2].Values, &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: []byte{byte(i)}}}) + tags[3].Values = append(tags[3].Values, &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: []int64{int64(i)}}}}) + tags[4].Values = append(tags[4].Values, &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: []string{"x"}}}}) + fields[0].Values = append(fields[0].Values, &modelv1.FieldValue{Value: &modelv1.FieldValue_Int{Int: &modelv1.Int{Value: int64(i)}}}) + fields[1].Values = append(fields[1].Values, &modelv1.FieldValue{Value: &modelv1.FieldValue_Float{Float: &modelv1.Float{Value: float64(i)}}}) + fields[2].Values = append(fields[2].Values, &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: "s"}}}) + fields[3].Values = append(fields[3].Values, &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: []byte{0xff}}}) + } + r.TagFamilies = []model.TagFamily{{Name: "default", Tags: tags}} + r.Fields = fields + return r +} + +// Tests. + +func TestDiff_SingleSeries_AllVariants(t *testing.T) { + fx := diffFixture{ + name: "single-series-all-variants", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{rowSet(1, 100)}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_MultiSeries_TagOnlyProjection(t *testing.T) { + fx := diffFixture{ + name: "multi-series-tag-only", + schema: diffSchemaTagFieldMix(), + opts: model.MeasureQueryOptions{ + TagProjection: []model.TagProjection{ + {Family: "default", Names: []string{"svc", "env_id"}}, + }, + }, + results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200), rowSet(3, 300)}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_MultiSeries_MixedProjection(t *testing.T) { + fx := diffFixture{ + name: "multi-series-mixed", + schema: diffSchemaTagFieldMix(), + opts: model.MeasureQueryOptions{ + TagProjection: []model.TagProjection{ + {Family: "default", Names: []string{"svc"}}, + }, + FieldProjection: []string{"v_int", "v_float"}, + }, + results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200)}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_IndexMode_SingleRowResults(t *testing.T) { + // indexSortResult.Pull yields single-row MeasureResults; mimic that shape. + fx := diffFixture{ + name: "index-mode-single-row", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{ + rowSet(1, 100), rowSet(1, 200), rowSet(2, 300), + }, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_EmptyResultSet(t *testing.T) { + fx := diffFixture{ + name: "empty", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: nil, + } + rowOut, vecOut := runDiff(t, fx) + if len(rowOut) != 0 || len(vecOut) != 0 { + t.Fatalf("empty fixture produced rows: row=%d vec=%d", len(rowOut), len(vecOut)) + } +} + +func TestDiff_CrossesBatchBoundary(t *testing.T) { + // BatchSize=4 (set in runDiff). Use 9 rows so we cross at least two + // batch boundaries inside a single series. + fx := diffFixture{ + name: "batch-boundary", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{rowMulti(1, 1000, 9)}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_NullVariants(t *testing.T) { + r := &model.MeasureResult{ + SID: 1, + Timestamps: []int64{100}, + Versions: []int64{1}, + ShardIDs: []common.ShardID{0}, + TagFamilies: []model.TagFamily{ + {Name: "default", Tags: []model.Tag{ + {Name: "svc", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + {Name: "env_id", Values: []*modelv1.TagValue{pbv1.NullTagValue}}, + }}, + }, + Fields: []model.Field{ + {Name: "v_int", Values: []*modelv1.FieldValue{pbv1.NullFieldValue}}, + {Name: "v_float", Values: []*modelv1.FieldValue{pbv1.NullFieldValue}}, + }, + } + fx := diffFixture{ + name: "null-variants", + schema: diffSchemaTagFieldMix(), + opts: model.MeasureQueryOptions{ + TagProjection: []model.TagProjection{{Family: "default", Names: []string{"svc", "env_id"}}}, + FieldProjection: []string{"v_int", "v_float"}, + }, + results: []*model.MeasureResult{r}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +func TestDiff_CanceledContext(t *testing.T) { + // Build a result the iterator would normally walk through, then cancel + // the context before iteration. Both paths must surface the cancellation + // without producing partial output. The row path doesn't propagate + // cancellation through the dummy fakeMeasureQueryResult, so this test + // just asserts the vectorized path's Close is safe under cancel. + fx := diffFixture{ + name: "canceled-context", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{rowSet(1, 100), rowSet(2, 200)}, + } + cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64} + ctx, cancel := context.WithCancel(context.Background()) + cancel() + qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)} + it, err := NewMIterator(ctx, qr, fx.schema, fx.opts, cfg) + if err != nil { + t.Fatalf("%s: NewMIterator under canceled ctx: %v", fx.name, err) + } + if closeErr := it.Close(); closeErr != nil && !errors.Is(closeErr, context.Canceled) { + t.Fatalf("%s: Close after cancel: %v", fx.name, closeErr) + } +} + +// rowWithShard mirrors rowSet but pins both Version and ShardID to caller- +// specified values. Used by parity tests that exercise non-zero ShardIDs and +// per-row version variations. +func rowWithShard(sid common.SeriesID, ts, version int64, shard common.ShardID) *model.MeasureResult { + r := rowSet(sid, ts) + r.Versions = []int64{version} + r.ShardIDs = []common.ShardID{shard} + return r +} + +// TestDiff_NonZeroShardIDs pins ShardID column flow through the +// fillMetadata → serializeBatchToProto path: row path emits shard via +// dp.ShardId; vectorized path stores it as an int64 column then casts to +// uint32. Mismatched casts surface here. +func TestDiff_NonZeroShardIDs(t *testing.T) { + fx := diffFixture{ + name: "non-zero-shard-ids", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{ + rowWithShard(1, 100, 1, 7), + rowWithShard(2, 200, 1, 0), + rowWithShard(3, 300, 1, 12345), + }, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +// TestDiff_MixedShardIDsAcrossBatchBoundary keeps ShardID varying within a +// single MeasureResult so the bulk-fill metadata path copies a non-uniform +// shard column across a batch boundary. +func TestDiff_MixedShardIDsAcrossBatchBoundary(t *testing.T) { + r := rowMulti(1, 1000, 9) + for i := range r.ShardIDs { + r.ShardIDs[i] = common.ShardID(i % 3) + } + fx := diffFixture{ + name: "mixed-shard-ids-batch-boundary", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{r}, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +// TestDiff_VersionVariations advances per-row Version (the field that drives +// the row-path's queryResult.merge dedup, applied upstream of this adapter). +// At the adapter layer both paths must propagate every supplied row verbatim; +// dedup happens inside queryResult.Pull, not the iterator. +func TestDiff_VersionVariations(t *testing.T) { + fx := diffFixture{ + name: "version-variations", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{ + rowWithShard(1, 100, 1, 0), + rowWithShard(1, 200, 5, 0), + rowWithShard(1, 300, 99, 0), + }, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +// TestDiff_InterleavedSeriesAcrossResults verifies cross-MeasureResult +// ordering: each Pull yields a different series, and the adapter must emit +// rows in the order produced by the source (no internal reordering). +func TestDiff_InterleavedSeriesAcrossResults(t *testing.T) { + fx := diffFixture{ + name: "interleaved-multi-result", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{ + rowWithShard(7, 100, 1, 0), + rowWithShard(3, 110, 1, 1), + rowWithShard(7, 120, 1, 0), + rowWithShard(11, 130, 1, 2), + rowWithShard(3, 140, 1, 1), + }, + } + rowOut, vecOut := runDiff(t, fx) + assertDPEqual(t, fx.name, rowOut, vecOut) +} + +// TestDiff_CanceledMidIteration starts iterating, cancels the context, then +// continues iterating. The vectorized adapter must safely terminate without +// returning partial garbage and Close must be safe. +func TestDiff_CanceledMidIteration(t *testing.T) { + fx := diffFixture{ + name: "canceled-mid-iteration", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{rowMulti(1, 1000, 9)}, + } + cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64} + ctx, cancel := context.WithCancel(context.Background()) + qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)} + it, err := NewMIterator(ctx, qr, fx.schema, fx.opts, cfg) + if err != nil { + t.Fatalf("%s: NewMIterator: %v", fx.name, err) + } + if !it.Next() { + t.Fatalf("%s: first Next must succeed before cancel", fx.name) + } + if got := it.Current(); len(got) != 1 { + t.Fatalf("%s: Current after first Next: want 1, got %d", fx.name, len(got)) + } + cancel() + // Subsequent Next calls may return true (the fake doesn't honor ctx) or + // false; either is acceptable. The contract under test is that Close + // after cancel cleans up without panicking and returns either nil or a + // context.Canceled error. + for it.Next() { //nolint:revive // intentionally drains remaining cached batch + } + if closeErr := it.Close(); closeErr != nil && !errors.Is(closeErr, context.Canceled) { + t.Fatalf("%s: Close after mid-iteration cancel: %v", fx.name, closeErr) + } +} + +// TestDiff_StorageError_PropagatesViaErr confirms a Pull-time error from the +// MeasureQueryResult surfaces via Err() and is joined into Close(). +func TestDiff_StorageError_PropagatesViaErr(t *testing.T) { + boom := errors.New("simulated storage failure") + fx := diffFixture{ + name: "storage-error", + schema: diffSchemaTagFieldMix(), + opts: diffOptsAllTagsAllFields(), + results: []*model.MeasureResult{ + rowSet(1, 100), + {Error: boom}, + }, + } + cfg := VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 64} + qr := &fakeMeasureQueryResult{seq: cloneResults(fx.results)} + it, err := NewMIterator(context.Background(), qr, fx.schema, fx.opts, cfg) + if err != nil { + t.Fatalf("%s: NewMIterator: %v", fx.name, err) + } + for it.Next() { //nolint:revive // drain until error + } + if itErr := it.Err(); !errors.Is(itErr, boom) { + t.Fatalf("%s: Err want %v, got %v", fx.name, boom, itErr) + } + if closeErr := it.Close(); !errors.Is(closeErr, boom) { + t.Fatalf("%s: Close must surface sticky storage error; got %v", fx.name, closeErr) + } +} diff --git a/pkg/query/vectorized/measure/integration.go b/pkg/query/vectorized/measure/integration.go new file mode 100644 index 000000000..e1d8c6e20 --- /dev/null +++ b/pkg/query/vectorized/measure/integration.go @@ -0,0 +1,203 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package measure + +import ( + "context" + "fmt" + + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + "github.com/apache/skywalking-banyandb/pkg/query/model" + "github.com/apache/skywalking-banyandb/pkg/query/vectorized" +) + +// BuildBatchSchema derives a BatchSchema from a Measure schema and a query's +// projection list. The output column order is fixed: +// +// timestamp, version, sid, shardID, then projected tags (in TagProjection +// order, family-by-family, name-by-name), then projected fields (in +// FieldProjection order). +// +// Tag families and tag names that are not present in the Measure schema are +// dropped — the row path silently skips unknown tags as well, so this matches +// existing semantics. Fields not present in the schema yield a Null-typed +// column so projection still produces a slot in the output. +func BuildBatchSchema(measureSchema *databasev1.Measure, opts model.MeasureQueryOptions) (*vectorized.BatchSchema, error) { + if measureSchema == nil { + return nil, fmt.Errorf("vectorized.measure: nil Measure schema") + } + cols := []vectorized.ColumnDef{ + {Role: vectorized.RoleTimestamp, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleVersion, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleSeriesID, Type: vectorized.ColumnTypeInt64}, + {Role: vectorized.RoleShardID, Type: vectorized.ColumnTypeInt64}, + } + + tagSpecs := make(map[string]map[string]*databasev1.TagSpec) + for _, tf := range measureSchema.GetTagFamilies() { + byName := make(map[string]*databasev1.TagSpec, len(tf.GetTags())) + for _, ts := range tf.GetTags() { + byName[ts.GetName()] = ts + } + tagSpecs[tf.GetName()] = byName + } + + // Projection entries whose names are absent from the Measure schema get a + // nullable placeholder column so the serializer can still emit a + // NullTagValue / NullFieldValue slot — matching the row path, which fills + // missing projections with pbv1.Null{Tag,Field}Value. The placeholder type + // is irrelevant: validity-bit-only access via MarkNullAt + IsNull means + // the value bytes are never read. + for _, tp := range opts.TagProjection { + family := tagSpecs[tp.Family] + for _, name := range tp.Names { + ct := vectorized.ColumnTypeInt64 + if family != nil { + if spec, found := family[name]; found { + mapped, mapErr := tagTypeToColumnType(spec.GetType()) + if mapErr != nil { + return nil, fmt.Errorf("vectorized.measure: tag %s.%s: %w", tp.Family, name, mapErr) + } + ct = mapped + } + } + cols = append(cols, vectorized.ColumnDef{ + Role: vectorized.RoleTag, + TagFamily: tp.Family, + Name: name, + Type: ct, + }) + } + } + + fieldSpecs := make(map[string]*databasev1.FieldSpec, len(measureSchema.GetFields())) + for _, fs := range measureSchema.GetFields() { + fieldSpecs[fs.GetName()] = fs + } + for _, name := range opts.FieldProjection { + ct := vectorized.ColumnTypeInt64 + if spec, found := fieldSpecs[name]; found { + mapped, mapErr := fieldTypeToColumnType(spec.GetFieldType()) + if mapErr != nil { + return nil, fmt.Errorf("vectorized.measure: field %s: %w", name, mapErr) + } + ct = mapped + } + cols = append(cols, vectorized.ColumnDef{ + Role: vectorized.RoleField, + Name: name, + Type: ct, + }) + } + + return vectorized.NewBatchSchema(cols), nil +} + +func tagTypeToColumnType(t databasev1.TagType) (vectorized.ColumnType, error) { + switch t { + case databasev1.TagType_TAG_TYPE_INT: + return vectorized.ColumnTypeInt64, nil + case databasev1.TagType_TAG_TYPE_STRING: + return vectorized.ColumnTypeString, nil + case databasev1.TagType_TAG_TYPE_DATA_BINARY: + return vectorized.ColumnTypeBytes, nil + case databasev1.TagType_TAG_TYPE_INT_ARRAY: + return vectorized.ColumnTypeInt64Array, nil + case databasev1.TagType_TAG_TYPE_STRING_ARRAY: + return vectorized.ColumnTypeStrArray, nil + case databasev1.TagType_TAG_TYPE_UNSPECIFIED, databasev1.TagType_TAG_TYPE_TIMESTAMP: + return 0, fmt.Errorf("unsupported tag type %v", t) + } + return 0, fmt.Errorf("unsupported tag type %v", t) +} + +func fieldTypeToColumnType(t databasev1.FieldType) (vectorized.ColumnType, error) { + switch t { + case databasev1.FieldType_FIELD_TYPE_INT: + return vectorized.ColumnTypeInt64, nil + case databasev1.FieldType_FIELD_TYPE_FLOAT: + return vectorized.ColumnTypeFloat64, nil + case databasev1.FieldType_FIELD_TYPE_STRING: + return vectorized.ColumnTypeString, nil + case databasev1.FieldType_FIELD_TYPE_DATA_BINARY: + return vectorized.ColumnTypeBytes, nil + case databasev1.FieldType_FIELD_TYPE_UNSPECIFIED: + return 0, fmt.Errorf("unsupported field type %v", t) + } + return 0, fmt.Errorf("unsupported field type %v", t) +} + +// NewMIterator builds a vectorized adapter that drives a Pipeline over qr and +// satisfies executor.MIterator. The returned VectorizedMIterator owns the qr +// lifetime through the pipeline: Close → pipeline.Close → BatchScan.Close → +// SeriesCursor.Close → qr.Release. Callers must NOT release qr themselves on +// the success path. +// +// On error, ownership of qr stays with the caller; the caller is responsible +// for releasing it. Construction is split so qr.Release-on-failure is decided +// at the call site (where build inputs other than qr are also tracked). +func NewMIterator(ctx context.Context, qr model.MeasureQueryResult, + measureSchema *databasev1.Measure, opts model.MeasureQueryOptions, cfg VectorizedConfig, +) (*VectorizedMIterator, error) { + if validateErr := cfg.Validate(); validateErr != nil { + return nil, validateErr + } + schema, schemaErr := BuildBatchSchema(measureSchema, opts) + if schemaErr != nil { + return nil, schemaErr + } + pool := vectorized.NewBatchPool(schema, cfg.BatchSize) + scan := NewBatchScan(qr, schema, pool, cfg.BatchSize) + pipeline, buildErr := vectorized.NewPipelineBuilder().From(scan).Build() + if buildErr != nil { + // scan was constructed but never wired into a Pipeline; close it + // directly to release qr through the cursor. + _ = scan.Close() + return nil, buildErr + } + if initErr := scan.Init(ctx); initErr != nil { + _ = pipeline.Close() + return nil, initErr + } + return &VectorizedMIterator{inner: newVectorizedMIterator(ctx, pipeline, pool)}, nil +} + +// VectorizedMIterator is the public adapter exposed to other packages. It is +// a thin facade over the unexported vectorizedMIterator so the executor +// interface is satisfied without leaking the package-private type. +type VectorizedMIterator struct { + inner *vectorizedMIterator +} + +// Next advances one DataPoint. +func (v *VectorizedMIterator) Next() bool { return v.inner.Next() } + +// Current returns the current row as a single-element slice (matches row-path +// contract). +func (v *VectorizedMIterator) Current() []*measurev1.InternalDataPoint { return v.inner.Current() } + +// Err returns any sticky storage error that terminated iteration. +func (v *VectorizedMIterator) Err() error { return v.inner.Err() } + +// Close releases the pipeline (and through it the BatchScan, cursor, and +// underlying MeasureQueryResult). Returns the join of the sticky iteration +// error and the pipeline-close error, matching resultMIterator.Close. +func (v *VectorizedMIterator) Close() error { + return v.inner.Close() +} diff --git a/pkg/query/vectorized/measure/scan.go b/pkg/query/vectorized/measure/scan.go index 678346a83..c0c7cb1b7 100644 --- a/pkg/query/vectorized/measure/scan.go +++ b/pkg/query/vectorized/measure/scan.go @@ -153,38 +153,64 @@ func fillMetadata(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, } // fillTags extracts every tag column for n rows starting at pos into the batch -// at offset. Tag families and tag names are matched against the BatchSchema; -// unmatched tags in cur are skipped. +// at offset. Schema-declared tag columns missing from cur are grown with +// explicit nulls so a downstream serializer sees the same shape the row path +// emits when the projected tag is absent (which the multi-group flow +// produces — one group's schema may lack a tag the other group has). func fillTags(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, cur *model.MeasureResult, pos, offset, n int, ) error { - for _, tf := range cur.TagFamilies { - for _, tag := range tf.Tags { - colIdx, ok := schema.TagIndex(tf.Name, tag.Name) - if !ok { - continue - } - col := b.Columns[colIdx] - growColumn(col, n) - if extractErr := extractTagBulk(col, offset, tag.Values[pos:pos+n], n); extractErr != nil { - return extractErr + resultTags := make(map[int]*model.Tag, len(cur.TagFamilies)) + for tfIdx := range cur.TagFamilies { + tf := &cur.TagFamilies[tfIdx] + for tagIdx := range tf.Tags { + tag := &tf.Tags[tagIdx] + if colIdx, ok := schema.TagIndex(tf.Name, tag.Name); ok { + resultTags[colIdx] = tag } } } + for colIdx, def := range schema.Columns { + if def.Role != vectorized.RoleTag { + continue + } + col := b.Columns[colIdx] + growColumn(col, n) + tag, present := resultTags[colIdx] + if !present { + markRowsNull(col, offset, n) + continue + } + if extractErr := extractTagBulk(col, offset, tag.Values[pos:pos+n], n); extractErr != nil { + return extractErr + } + } return nil } -// fillFields is the field-side counterpart of fillTags. +// fillFields is the field-side counterpart of fillTags. Same null-fill rule +// applies for projection entries that the active result lacks. func fillFields(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, cur *model.MeasureResult, pos, offset, n int, ) error { - for _, f := range cur.Fields { - colIdx, ok := schema.FieldIndex(f.Name) - if !ok { + resultFields := make(map[int]*model.Field, len(cur.Fields)) + for fIdx := range cur.Fields { + f := &cur.Fields[fIdx] + if colIdx, ok := schema.FieldIndex(f.Name); ok { + resultFields[colIdx] = f + } + } + for colIdx, def := range schema.Columns { + if def.Role != vectorized.RoleField { continue } col := b.Columns[colIdx] growColumn(col, n) + f, present := resultFields[colIdx] + if !present { + markRowsNull(col, offset, n) + continue + } if extractErr := extractFieldBulk(col, offset, f.Values[pos:pos+n], n); extractErr != nil { return extractErr } @@ -192,6 +218,14 @@ func fillFields(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, return nil } +// markRowsNull marks rows [offset, offset+n) in col as null without otherwise +// touching the underlying data slice. +func markRowsNull(col vectorized.Column, offset, n int) { + for k := range n { + col.MarkNullAt(offset + k) + } +} + func appendInt64Zeros(c *vectorized.TypedColumn[int64], n int) { for range n { c.Append(0) diff --git a/pkg/query/vectorized/measure/serialize.go b/pkg/query/vectorized/measure/serialize.go index 6a20c0fee..f2e0c3e64 100644 --- a/pkg/query/vectorized/measure/serialize.go +++ b/pkg/query/vectorized/measure/serialize.go @@ -18,6 +18,7 @@ package measure import ( + "slices" "time" "google.golang.org/protobuf/types/known/timestamppb" @@ -94,6 +95,10 @@ func buildDataPoint(b *vectorized.RecordBatch, schema *vectorized.BatchSchema, r return dp } +// columnValueToTagValue materializes a *modelv1.TagValue from one row of col. +// Slice-typed values (BinaryData, IntArray, StrArray) are defensively copied +// so the produced TagValue does not alias the column's backing slice — pooled +// batches re-overwrite that slice on the next iteration. func columnValueToTagValue(col vectorized.Column, rowIdx int) *modelv1.TagValue { if col.IsNull(rowIdx) { return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} @@ -104,15 +109,20 @@ func columnValueToTagValue(col vectorized.Column, rowIdx int) *modelv1.TagValue case *vectorized.TypedColumn[string]: return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: c.Data()[rowIdx]}}} case *vectorized.TypedColumn[[]byte]: - return &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: c.Data()[rowIdx]}} + src := c.Data()[rowIdx] + buf := make([]byte, len(src)) + copy(buf, src) + return &modelv1.TagValue{Value: &modelv1.TagValue_BinaryData{BinaryData: buf}} case *vectorized.TypedColumn[[]int64]: - return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: c.Data()[rowIdx]}}} + return &modelv1.TagValue{Value: &modelv1.TagValue_IntArray{IntArray: &modelv1.IntArray{Value: slices.Clone(c.Data()[rowIdx])}}} case *vectorized.TypedColumn[[]string]: - return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: c.Data()[rowIdx]}}} + return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray: &modelv1.StrArray{Value: slices.Clone(c.Data()[rowIdx])}}} } return &modelv1.TagValue{Value: &modelv1.TagValue_Null{}} } +// columnValueToFieldValue is the field-side counterpart. Same defensive copy +// rule for BinaryData. func columnValueToFieldValue(col vectorized.Column, rowIdx int) *modelv1.FieldValue { if col.IsNull(rowIdx) { return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} @@ -125,7 +135,10 @@ func columnValueToFieldValue(col vectorized.Column, rowIdx int) *modelv1.FieldVa case *vectorized.TypedColumn[string]: return &modelv1.FieldValue{Value: &modelv1.FieldValue_Str{Str: &modelv1.Str{Value: c.Data()[rowIdx]}}} case *vectorized.TypedColumn[[]byte]: - return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: c.Data()[rowIdx]}} + src := c.Data()[rowIdx] + buf := make([]byte, len(src)) + copy(buf, src) + return &modelv1.FieldValue{Value: &modelv1.FieldValue_BinaryData{BinaryData: buf}} } return &modelv1.FieldValue{Value: &modelv1.FieldValue_Null{}} } diff --git a/test/integration/standalone/query/vectorized_test.go b/test/integration/standalone/query/vectorized_test.go new file mode 100644 index 000000000..15f530e67 --- /dev/null +++ b/test/integration/standalone/query/vectorized_test.go @@ -0,0 +1,98 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package query_test + +import ( + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + + "github.com/apache/skywalking-banyandb/pkg/grpchelper" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + test_cases "github.com/apache/skywalking-banyandb/test/cases" + casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure" + casestopn "github.com/apache/skywalking-banyandb/test/cases/topn" +) + +// Vectorized parity gate (G4 §"Integration Test Plan"). +// +// Boots a *separate* standalone with --measure-vectorized-enabled=true and +// replays the same Measure / TopN test entries the row-path integration suite +// already covers in suite_test.go. Each case asserts the row-path's expected +// output, so every greenness here is a parity check: vectorized produced the +// row-path's reference InternalDataPoints. The cluster is fresh and isolated +// so neither side observes the other's state. +// +// This block runs after the on-disk-data Describe in round2.go, which closes +// the original cluster. The integration suite remains a release-candidate +// gate; the unit-level differential tests in pkg/query/vectorized/measure +// gate every PR. +var _ = ginkgo.Describe("vectorized parity", ginkgo.Ordered, func() { + var ( + vectorizedConn *grpc.ClientConn + stopFn func() + ) + ginkgo.BeforeAll(func() { + path, diskCleanupFn, pathErr := test.NewSpace() + gomega.Expect(pathErr).NotTo(gomega.HaveOccurred()) + ports, portsErr := test.AllocateFreePorts(5) + gomega.Expect(portsErr).NotTo(gomega.HaveOccurred()) + tmpDir, tmpDirCleanup, tmpErr := test.NewSpace() + gomega.Expect(tmpErr).NotTo(gomega.HaveOccurred()) + dfWriter := setup.NewDiscoveryFileWriter(tmpDir) + config := setup.PropertyClusterConfig(dfWriter) + addr, _, closeFn := setup.ClosableStandalone(config, path, ports, + "--measure-vectorized-enabled=true", + ) + stopFn = func() { + closeFn() + diskCleanupFn() + tmpDirCleanup() + } + var connErr error + vectorizedConn, connErr = grpchelper.Conn(addr, 10*time.Second, + grpc.WithTransportCredentials(insecure.NewCredentials())) + gomega.Expect(connErr).NotTo(gomega.HaveOccurred()) + ns := timestamp.NowMilli().UnixNano() + now := time.Unix(0, ns-ns%int64(time.Minute)) + test_cases.Initialize(addr, now) + sharedCtx := helpers.SharedContext{ + Connection: vectorizedConn, + BaseTime: now, + } + casesmeasure.SharedContext = sharedCtx + casestopn.SharedContext = sharedCtx + }) + ginkgo.AfterAll(func() { + if vectorizedConn != nil { + gomega.Expect(vectorizedConn.Close()).To(gomega.Succeed()) + } + if stopFn != nil { + stopFn() + } + }) + + casesmeasure.RegisterTable("Vectorized: scanning measures") + casestopn.RegisterTable("Vectorized: TopN") +})
