This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b7b15697d2f9d46105d7c55366f8389396d802f0 Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 20 15:02:43 2026 +0000 chore(lint): clear local CI gate — refactor dquery.Rev + sweep pre-existing pkg/test debt Splits `(*measureQueryProcessor).Rev` into three helpers (loadMeasureSchemas, analyzeDistributedPlan, buildNodeSelectors) so its cyclomatic complexity drops under the 30-line gate. Sweeps the remaining linter findings on the branch: British→US spelling, govet fieldalignment, errorlint %w wrapping, exhaustive switch nolint with documented carve-outs, gocyclo nolint on the switch-dispatch encode/decode functions, contextcheck nolint on pure in-memory reducers, lll reflows on long signatures and test calls, gocritic ifElseChain→switch, goconst extraction of the "default" group name, gocritic appendAssign nolint for the intentional 3-index slice copy, gosec G306 0o644→0o600 on bench report writes, and a missing package comment on `querybench`. `make lint` now returns zero issues across api/, banyand/, pkg/, test/ and ui/. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> --- banyand/dquery/measure.go | 166 +++++++++++---------- banyand/dquery/measure_test.go | 4 +- banyand/measure/svc_liaison.go | 2 +- banyand/query/processor.go | 6 +- pkg/query/vectorized/column_pool.go | 2 +- pkg/query/vectorized/measure/adapter.go | 6 +- pkg/query/vectorized/measure/aggregation.go | 78 +++------- pkg/query/vectorized/measure/aggregation_reduce.go | 4 +- pkg/query/vectorized/measure/aggregation_test.go | 3 +- pkg/query/vectorized/measure/config.go | 2 +- pkg/query/vectorized/measure/frame/decode.go | 9 +- pkg/query/vectorized/measure/frame/decode_test.go | 31 ++-- pkg/query/vectorized/measure/frame/encode.go | 5 +- pkg/query/vectorized/measure/frame/frame.go | 8 +- pkg/query/vectorized/measure/frame/frame_test.go | 44 +++--- pkg/query/vectorized/measure/integration.go | 2 +- pkg/query/vectorized/measure/plan/dispatch.go | 34 ++--- pkg/query/vectorized/measure/plan/dispatch_test.go | 40 ++--- pkg/query/vectorized/measure/plan/distributed.go | 63 +++++--- .../vectorized/measure/plan/distributed_rows.go | 80 ++++++---- .../measure/plan/distributed_rows_test.go | 13 +- .../vectorized/measure/plan/distributed_test.go | 68 +++++---- pkg/query/vectorized/measure/plan/hidden_tags.go | 4 +- .../vectorized/measure/plan/multi_group_schema.go | 4 +- pkg/query/vectorized/measure/plan/orderby.go | 1 - pkg/query/vectorized/measure/raw_emit.go | 41 +++-- .../vectorized/measure/raw_emit_bench_test.go | 10 +- pkg/query/vectorized/measure/raw_emit_test.go | 8 +- .../vectorized/measure/topology_matrix_test.go | 24 +-- test/integration/distributed/querybench/config.go | 28 ++-- test/integration/distributed/querybench/report.go | 81 +++++----- .../distributed/querybench/report_test.go | 12 +- .../integration/distributed/querybench/workload.go | 15 +- .../distributed/querybench/workload_test.go | 6 +- 34 files changed, 472 insertions(+), 432 deletions(-) diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go index 2cf578c9f..260626ea2 100644 --- a/banyand/dquery/measure.go +++ b/banyand/dquery/measure.go @@ -68,49 +68,11 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a query event") } - var schemas []logical.Schema - var measureSchemas []*databasev1.Measure - var measureIndexRules [][]*databasev1.IndexRule - var vecCfg vmeasure.VectorizedConfig - for groupIdx, g := range queryCriteria.Groups { - meta := &commonv1.Metadata{ - Name: queryCriteria.Name, - Group: g, - } - ec, measureErr := p.measureService.Measure(meta) - if measureErr != nil { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to get execution context for measure %s: %v", meta.GetName(), measureErr)) - return - } - // nolint:staticcheck // SA1019 - row schema is still used for the flag-off rollback path. - s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) - if schemaErr != nil { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to build schema for measure %s: %v", meta.GetName(), schemaErr)) - return - } - schemas = append(schemas, s) - measureSchemas = append(measureSchemas, ec.GetSchema()) - measureIndexRules = append(measureIndexRules, ec.GetIndexRules()) - if vecEC, ok := ec.(measureVectorizedExecutionContext); ok { - if groupIdx == 0 { - vecCfg = vecEC.VectorizedConfig() - } - } else if data.MeasureWireModeRaw() { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("measure %s is not vectorized-capable under raw wire mode", meta.GetName())) - return - } + schemas, measureSchemas, measureIndexRules, vecCfg, loadErr := p.loadMeasureSchemas(queryCriteria) + if loadErr != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("%s", loadErr.Error())) + return } - - var plan measureDistributedExecutable - var err error - // Routing contract (Phase 6 — see docs/operation/troubleshooting/measure-vec-flag-off-rollback.md): - // - Raw wire mode (data.MeasureWireModeRaw() == true) -> vec distributed plan. - // vecplan.AnalyzeDistributed handles all request shapes: non-agg, Agg, - // OrderBy-by-index-rule, multi-group, Top, GroupBy, and combinations. - // - Flag-off (data.MeasureWireModeRaw() == false) -> row-path DistributedAnalyze. - // This is the kill-switch / rollback path; latency is higher and multi-group - // native merge is unavailable. Use only during incident response. - useVecDistributed := data.MeasureWireModeRaw() // Operator-configured broadcast timeout overrides each plan's historical // 15 s default. Setting it on vecCfg.BroadcastTimeout flows through // vecplan.AnalyzeDistributed into DistributedPlan.broadcastTimeout(); the @@ -119,25 +81,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r if p.broadcastTimeout > 0 { vecCfg.BroadcastTimeout = p.broadcastTimeout } - if useVecDistributed { - if len(measureSchemas) == 0 { - resp = bus.NewMessage(bus.MessageID(now), common.NewError("vec distributed plan requires at least one measure schema")) - return - } - plan, err = vecplan.AnalyzeDistributed(queryCriteria, measureSchemas, measureIndexRules, vecCfg) - } else { - // nolint:staticcheck // SA1019 - row distributed plan is the flag-off rollback path only. - rowPlan, analyzeErr := logical_measure.DistributedAnalyze(queryCriteria, schemas, p.broadcastTimeout) - if analyzeErr != nil { - err = analyzeErr - } else { - var ok bool - plan, ok = rowPlan.(measureDistributedExecutable) - if !ok { - err = fmt.Errorf("distributed measure plan %T is not executable", rowPlan) - } - } - } + plan, err := p.analyzeDistributedPlan(queryCriteria, schemas, measureSchemas, measureIndexRules, vecCfg) if err != nil { resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail to analyze the query request for measure %s: %v", queryCriteria.Name, err)) return @@ -146,21 +90,10 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r if e := ml.Debug(); e.Enabled() { e.Str("plan", plan.String()).Msg("query plan") } - nodeSelectors := make(map[string][]string) - for _, g := range queryCriteria.Groups { - if gs, ok := p.measureService.LoadGroup(g); ok { - if ns, exist := p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist { - nodeSelectors[g] = ns - } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 { - ml.Error().Strs("req_stages", queryCriteria.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found") - resp = bus.NewMessage(bus.MessageID(now), common.NewError("no stage found in request or default stages in resource opts")) - return - } - } else { - ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("group not found") - resp = bus.NewMessage(bus.MessageID(now), common.NewError("group %s not found", g)) - return - } + nodeSelectors, selErr := p.buildNodeSelectors(queryCriteria, ml) + if selErr != nil { + resp = bus.NewMessage(bus.MessageID(now), common.NewError("%s", selErr.Error())) + return } if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 { ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("no stage found") @@ -240,3 +173,84 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, message bus.Message) (r } return } + +func (p *measureQueryProcessor) loadMeasureSchemas(queryCriteria *measurev1.QueryRequest) ( + []logical.Schema, []*databasev1.Measure, [][]*databasev1.IndexRule, vmeasure.VectorizedConfig, error, +) { + var schemas []logical.Schema + var measureSchemas []*databasev1.Measure + var measureIndexRules [][]*databasev1.IndexRule + var vecCfg vmeasure.VectorizedConfig + for groupIdx, g := range queryCriteria.Groups { + meta := &commonv1.Metadata{Name: queryCriteria.Name, Group: g} + ec, measureErr := p.measureService.Measure(meta) + if measureErr != nil { + return nil, nil, nil, vecCfg, fmt.Errorf("fail to get execution context for measure %s: %w", meta.GetName(), measureErr) + } + // nolint:staticcheck // SA1019 - row schema is still used for the flag-off rollback path. + s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(), ec.GetIndexRules()) + if schemaErr != nil { + return nil, nil, nil, vecCfg, fmt.Errorf("fail to build schema for measure %s: %w", meta.GetName(), schemaErr) + } + schemas = append(schemas, s) + measureSchemas = append(measureSchemas, ec.GetSchema()) + measureIndexRules = append(measureIndexRules, ec.GetIndexRules()) + if vecEC, ok := ec.(measureVectorizedExecutionContext); ok { + if groupIdx == 0 { + vecCfg = vecEC.VectorizedConfig() + } + } else if data.MeasureWireModeRaw() { + return nil, nil, nil, vecCfg, fmt.Errorf("measure %s is not vectorized-capable under raw wire mode", meta.GetName()) + } + } + return schemas, measureSchemas, measureIndexRules, vecCfg, nil +} + +// analyzeDistributedPlan picks the vec plan when raw wire mode is on, otherwise +// falls back to the row distributed plan (kill-switch / rollback rail). +// See docs/operation/troubleshooting/measure-vec-flag-off-rollback.md. +func (p *measureQueryProcessor) analyzeDistributedPlan( + queryCriteria *measurev1.QueryRequest, + schemas []logical.Schema, + measureSchemas []*databasev1.Measure, + measureIndexRules [][]*databasev1.IndexRule, + vecCfg vmeasure.VectorizedConfig, +) (measureDistributedExecutable, error) { + if data.MeasureWireModeRaw() { + if len(measureSchemas) == 0 { + return nil, fmt.Errorf("vec distributed plan requires at least one measure schema") + } + return vecplan.AnalyzeDistributed(queryCriteria, measureSchemas, measureIndexRules, vecCfg) + } + // nolint:staticcheck // SA1019 - row distributed plan is the flag-off rollback path only. + rowPlan, analyzeErr := logical_measure.DistributedAnalyze(queryCriteria, schemas, p.broadcastTimeout) + if analyzeErr != nil { + return nil, analyzeErr + } + plan, ok := rowPlan.(measureDistributedExecutable) + if !ok { + return nil, fmt.Errorf("distributed measure plan %T is not executable", rowPlan) + } + return plan, nil +} + +func (p *measureQueryProcessor) buildNodeSelectors(queryCriteria *measurev1.QueryRequest, ml *logger.Logger) (map[string][]string, error) { + nodeSelectors := make(map[string][]string) + for _, g := range queryCriteria.Groups { + gs, ok := p.measureService.LoadGroup(g) + if !ok { + ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("group not found") + return nil, fmt.Errorf("group %s not found", g) + } + ns, exist := p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts) + if exist { + nodeSelectors[g] = ns + continue + } + if len(gs.GetSchema().ResourceOpts.Stages) > 0 { + ml.Error().Strs("req_stages", queryCriteria.Stages).Strs("default_stages", gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found") + return nil, fmt.Errorf("no stage found in request or default stages in resource opts") + } + } + return nodeSelectors, nil +} diff --git a/banyand/dquery/measure_test.go b/banyand/dquery/measure_test.go index 724a2f740..6683c839e 100644 --- a/banyand/dquery/measure_test.go +++ b/banyand/dquery/measure_test.go @@ -69,8 +69,8 @@ func TestRawWireMode_PlanType_AnalyzeDistributed(t *testing.T) { } shapes := []struct { - name string req *measurev1.QueryRequest + name string }{ { name: "plain non-agg", @@ -140,8 +140,8 @@ func TestRawWireMode_PlanType_AnalyzeDistributed(t *testing.T) { // After Phase 6 the routing in Rev is a single boolean: data.MeasureWireModeRaw(). func TestRawWireMode_AlwaysUsesVecPlan(t *testing.T) { shapes := []struct { - name string req *measurev1.QueryRequest + name string }{ { name: "plain non-agg", diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go index 5022ea701..d34752ed9 100644 --- a/banyand/measure/svc_liaison.go +++ b/banyand/measure/svc_liaison.go @@ -149,7 +149,7 @@ func (s *liaison) PreRun(ctx context.Context) error { // Publish the per-process wire mode for TopicInternalMeasureQuery. // On the liaison this controls the DECODE side: an incoming response // body on TopicInternalMeasureQuery is parsed as raw frame bytes - // under flag-on (G9f.5.c's distributedPlan recognises the []byte + // under flag-on (G9f.5.c's distributedPlan recognizes the []byte // shape and runs ReduceFramesToInternalDataPoints / // DecodeFramesToInternalDataPoints) or proto under flag-off. The // per-process modes MUST match across the cluster — the runbook diff --git a/banyand/query/processor.go b/banyand/query/processor.go index 8d19998f3..66cfeed8f 100644 --- a/banyand/query/processor.go +++ b/banyand/query/processor.go @@ -540,14 +540,14 @@ func (p *measureInternalQueryProcessor) Rev(ctx context.Context, message bus.Mes // Iterators encapsulate their own drain + encode via FrameEmitter: // // - vectorizedMIterator drains the vec Pipeline directly - // (throughput-optimal: no proto materialisation, columnar + // (throughput-optimal: no proto materialization, columnar // end-to-end). // - emptyMIterator emits a nil body (codec empty-result carve-out). // - hiddenTagsMIterator drains via Next/Current (its strip stays - // the source of truth) and reverse-serialises. + // the source of truth) and reverse-serializes. // - sortedMIterator drains via Next/Current (its cross-group sort // + version dedup stay the source of truth) and reverse- - // serialises. + // serializes. // // queryCriteria.Trace=true still hard-errors: the columnar frame // has no trace-bytes slot, so trace-enabled distributed queries are diff --git a/pkg/query/vectorized/column_pool.go b/pkg/query/vectorized/column_pool.go index a51e326e2..e234eb1fe 100644 --- a/pkg/query/vectorized/column_pool.go +++ b/pkg/query/vectorized/column_pool.go @@ -42,7 +42,7 @@ var ( // AcquireColumn with a matching ReleaseColumn once they are done with the // column. // -// Capacity behaviour: pooled columns retain whichever capacity they had at +// Capacity behavior: pooled columns retain whichever capacity they had at // the last Release. If a pooled column is too small for the new request the // backing slice is reallocated; otherwise the existing backing slice is // reused. Over time the pool converges to the largest capacity seen. diff --git a/pkg/query/vectorized/measure/adapter.go b/pkg/query/vectorized/measure/adapter.go index ba6bd8011..95ffe7bd8 100644 --- a/pkg/query/vectorized/measure/adapter.go +++ b/pkg/query/vectorized/measure/adapter.go @@ -38,7 +38,7 @@ import ( // IMPORTANT: a caller that drains the Pipeline directly MUST NOT also // drive Next() on the MIterator afterwards — the pipeline is now empty // and the iterator's internal cursor is stale. The data-node processor -// honours this by branching on RawFrameSource BEFORE the iterator's +// honors this by branching on RawFrameSource BEFORE the iterator's // proto-collection loop. type RawFrameSource interface { // Pipeline returns the vec pipeline the iterator wraps. Drain it via @@ -85,8 +85,8 @@ func (i *vectorizedMIterator) Pipeline() *vectorized.Pipeline { return i.pipelin // EmitFrame implements FrameEmitter: drain the underlying vec pipeline // directly via DrainPipelineToFrame. This is the throughput-optimal -// wire-emit path — no proto materialisation, columnar end-to-end. The -// data-node Rev prefers this over the row-side reverse-serialise that +// wire-emit path — no proto materialization, columnar end-to-end. The +// data-node Rev prefers this over the row-side reverse-serialize that // wrapper iterators fall back on. func (i *vectorizedMIterator) EmitFrame(ctx context.Context) ([]byte, error) { return DrainPipelineToFrame(ctx, i.pipeline, i.schema) diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go index 3b701e205..e625ffef8 100644 --- a/pkg/query/vectorized/measure/aggregation.go +++ b/pkg/query/vectorized/measure/aggregation.go @@ -41,7 +41,7 @@ import ( // path's incidental "first-idp" rule, measure_plan_aggregation.go:285); // scalar reduce (len(keyIndices)==0) emits shard_id=0 (matching the row // path's aggAllIterator.Current() at :364). The partial batch is then -// serialised by pkg/query/vectorized/measure/frame.Encode for cluster +// serialized by pkg/query/vectorized/measure/frame.Encode for cluster // transport. // - AggModeReduce — coordinator's Reduce phase (G9f.3). Consumes the // typed-column partial batches emitted by AggModeMap (one row per @@ -50,7 +50,7 @@ import ( // the row path's deduplicateAggregatedDataPointsWithShard — and combines // them through aggregation.Reduce[N] into a final batch shaped like // AggModeAll (tags + final value column, no shard column, no count -// sidecar). The aggregation.Reduce[N].Val() handles MEAN finalisation +// sidecar). The aggregation.Reduce[N].Val() handles MEAN finalization // (sum÷count) so the emit path stays type-symmetric with AggModeAll. type AggMode int @@ -117,58 +117,28 @@ type AggSpec struct { // Output rows are emitted one per group, in group-insertion order, // paginated by batchSize. type BatchAggregation struct { - inputSchema *vectorized.BatchSchema - outputSchema *vectorized.BatchSchema - pool *vectorized.BatchPool - tracker *vectorized.MemoryTracker - groups map[string]*aggGroup - insertion []*aggGroup - keyIndices []int - tagIndices []int - aggs []AggSpec - // aggOutOffsets[i] is the output-batch column index of the i-th agg's - // VALUE column. In AggModeMap, when aggHasCount[i] is true (i.e. AggMean), - // the agg's count sidecar lives at aggOutOffsets[i]+1. Cached on the - // operator so emitGroupRow does no per-row schema walking. - aggOutOffsets []int - aggHasCount []bool - // outputShardIdx is the output-batch column index for the leading - // RoleShardID column. AggModeAll: -1 (no shard column emitted, matching - // the existing single-node contract). AggModeMap: 0 (the partial batch - // always carries shard id as its first column). AggModeReduce: -1 (the - // final reduce batch matches AggModeAll's shape — no shard column). - outputShardIdx int - // tagOutOffset is where the tag columns start in the output batch - // (0 in AggModeAll / AggModeReduce; 1 in AggModeMap because the shard - // column comes first). - tagOutOffset int - // shardIDIdx is the input-batch column index of the RoleShardID column; - // -1 if input has none. AggModeMap consults this in newGroup to capture - // the first-fed-idp shard per group (G9f.2.a). AggModeReduce uses it to - // key the (shard, group) replica-dedup map. Unit-test fixtures that - // pre-date the storage bridge may lack one — Map mode then emits the - // per-group shard as zero (consistent with scalar-reduce), and Reduce - // dedups on group key alone (a missing shard column means there is no - // way to distinguish replicas, so dedup falls back to one entry per key). - shardIDIdx int - // aggInputCountIdx[i] is the input-batch column index of the i-th agg's - // PartialCount sidecar (for MEAN — see meanCountSuffix), or -1 if the - // agg has no count sidecar (non-MEAN, or any agg in AggModeAll). Only - // populated for AggModeReduce; AggModeAll/AggModeMap ignore the slot. + dedupSeen map[string]struct{} + outputSchema *vectorized.BatchSchema + pool *vectorized.BatchPool + tracker *vectorized.MemoryTracker + groups map[string]*aggGroup + inputSchema *vectorized.BatchSchema + insertion []*aggGroup + tagIndices []int + aggs []AggSpec + aggOutOffsets []int + aggHasCount []bool + keyIndices []int aggInputCountIdx []int - // dedupSeen is the (shard, group_key) replica-dedup map for - // AggModeReduce. Nil in AggModeAll/AggModeMap. Built lazily in Init so - // re-init resets it deterministically. Mirrors row path semantics from - // measure_plan_distributed.go's deduplicateAggregatedDataPointsWithShard: - // same shard + same group_key ⇒ drop the duplicate; different shards - // with the same group_key are KEPT and combined into one final value. - dedupSeen map[string]struct{} - mode AggMode - entrySize int64 - reserved int64 - batchSize int - cursor int - closed bool + shardIDIdx int + tagOutOffset int + outputShardIdx int + mode AggMode + entrySize int64 + reserved int64 + batchSize int + cursor int + closed bool } // aggGroup carries one bucket's reduction state plus a copy of every @@ -356,7 +326,7 @@ func (a *BatchAggregation) Finalize(_ context.Context) error { // partial state plus a leading shard-id column (see AggMode docs). // AggModeReduce emits the same shape as AggModeAll — tags + final reduced // value — but draws the value from aggregation.Reduce[N].Val() so MEAN -// finalisation (sum÷count) happens inside the reducer. +// finalization (sum÷count) happens inside the reducer. func (a *BatchAggregation) NextBatch(_ context.Context) (*vectorized.RecordBatch, error) { if a.mode != AggModeAll && a.mode != AggModeMap && a.mode != AggModeReduce { return nil, ErrAggModeNotImplemented diff --git a/pkg/query/vectorized/measure/aggregation_reduce.go b/pkg/query/vectorized/measure/aggregation_reduce.go index 28b5c0095..8aa264306 100644 --- a/pkg/query/vectorized/measure/aggregation_reduce.go +++ b/pkg/query/vectorized/measure/aggregation_reduce.go @@ -76,7 +76,7 @@ func buildAggInputCountIdx(input *vectorized.BatchSchema, aggs []AggSpec, mode A // // When the input schema has no RoleShardID column (shardIDIdx == -1 — a // degenerate fixture that pre-dates the storage bridge), dedup falls back -// to the group key alone. This is the safest behaviour: we cannot +// to the group key alone. This is the safest behavior: we cannot // distinguish replicas without a shard id, so we treat every (group_key) // pair as the single contribution. func (a *BatchAggregation) markDedupSeen(b *vectorized.RecordBatch, rowIdx int, groupKey string) bool { @@ -142,7 +142,7 @@ func (a *BatchAggregation) combinePartial(b *vectorized.RecordBatch, rowIdx int, // writeReduce emits the slot's reduced final value to the typed output // column. Mirrors aggSlot.write but reads from the Reduce accumulator -// (whose Val() handles MEAN finalisation by dividing Sum by Count). +// (whose Val() handles MEAN finalization by dividing Sum by Count). func (s *aggSlot) writeReduce(col vectorized.Column) { if s.intReduce != nil { col.(*vectorized.TypedColumn[int64]).Append(s.intReduce.Val()) diff --git a/pkg/query/vectorized/measure/aggregation_test.go b/pkg/query/vectorized/measure/aggregation_test.go index faf73e7c1..b66d0aafd 100644 --- a/pkg/query/vectorized/measure/aggregation_test.go +++ b/pkg/query/vectorized/measure/aggregation_test.go @@ -574,7 +574,6 @@ func TestBatchAggregation_AggModeMap_NonMean_EmitsValueOnly(t *testing.T) { } } - // aggReduceSchema builds the canonical AggModeMap output schema used as // AggModeReduce input: [shard_id, g (tag), sum_v (int64)]. shard_id is // int64 RoleShardID; g is a string RoleTag (single group key). For pure @@ -709,7 +708,7 @@ func TestBatchAggregation_AggModeReduce_DedupsSameShardSameGroup(t *testing.T) { } } -// TestBatchAggregation_AggModeReduce_MeanFinalises asserts MEAN finalisation +// TestBatchAggregation_AggModeReduce_MeanFinalises asserts MEAN finalization // happens inside Reduce.Val(): partials carry (Sum, Count) and the final // value is Sum/Count. Two shards: (sum=10,count=2) + (sum=20,count=3) ⇒ // 30/5 = 6. diff --git a/pkg/query/vectorized/measure/config.go b/pkg/query/vectorized/measure/config.go index b458c7598..21d5d602d 100644 --- a/pkg/query/vectorized/measure/config.go +++ b/pkg/query/vectorized/measure/config.go @@ -32,7 +32,7 @@ type VectorizedConfig struct { // DefaultBroadcastTimeout is the per-broadcast wait the vec distributed // liaison uses when the caller (banyand/dquery) does not override it. The -// value matches the historical hard-coded constant so behaviour is +// value matches the historical hard-coded constant so behavior is // unchanged when the operator does not set --dst-broadcast-timeout. const DefaultBroadcastTimeout = 15 * time.Second diff --git a/pkg/query/vectorized/measure/frame/decode.go b/pkg/query/vectorized/measure/frame/decode.go index f2e04f516..98270271d 100644 --- a/pkg/query/vectorized/measure/frame/decode.go +++ b/pkg/query/vectorized/measure/frame/decode.go @@ -75,7 +75,7 @@ func Decode(b []byte) (*vectorized.RecordBatch, error) { } // decodeColumn parses one column block (header + body) from b. It returns -// the column definition, the materialised column with nrows rows already +// the column definition, the materialized column with nrows rows already // appended (validity bits respected), and the number of bytes consumed so // the caller can advance its offset. func decodeColumn(b []byte, nrows uint64) (vectorized.ColumnDef, vectorized.Column, int, error) { @@ -149,12 +149,13 @@ func readValidityBitmap(b []byte, nrows uint64) ([]bool, int, error) { return nulls, nbytes, nil } -// readColumnData materialises one column of nrows rows from b given the +// readColumnData materializes one column of nrows rows from b given the // validity vector. Fixed-width types read N × 8 bytes regardless of // nullness; variable-width types read uvarint(len) + len bytes per row and // rely on the validity vector to mark nullness. +// nolint:gocyclo // switch-dispatch over ColumnType variants is intentionally exhaustive; splitting per-case helpers would obscure the wire-format mapping func readColumnData(b []byte, t vectorized.ColumnType, nrows int, nulls []bool) (vectorized.Column, int, error) { - switch t { + switch t { //nolint:exhaustive // Int64Array/StrArray never appear on the wire; the producer-side encoder rejects them via mapColumnType case vectorized.ColumnTypeInt64: if len(b) < nrows*8 { return nil, 0, fmt.Errorf("%w: int64 column needs %d bytes, have %d", ErrTruncated, nrows*8, len(b)) @@ -287,7 +288,7 @@ func readColumnData(b []byte, t vectorized.ColumnType, nrows int, nulls []bool) // unmapColumnRole is the inverse of mapColumnRole: it translates a // wire-stable frameColRole byte back to its pkg/query/vectorized.ColumnRole // equivalent. Unknown bytes fail loud — the spec forbids dual-wire so an -// unrecognised role byte is by definition a botched encoder, not a +// unrecognized role byte is by definition a botched encoder, not a // version-skew negotiation. func unmapColumnRole(r frameColRole) (vectorized.ColumnRole, error) { switch r { diff --git a/pkg/query/vectorized/measure/frame/decode_test.go b/pkg/query/vectorized/measure/frame/decode_test.go index 680af9a8a..5a79bbbe5 100644 --- a/pkg/query/vectorized/measure/frame/decode_test.go +++ b/pkg/query/vectorized/measure/frame/decode_test.go @@ -85,7 +85,7 @@ func TestDecode_RoundTrip_Int64(t *testing.T) { } // TestDecode_RoundTrip_NullInMiddle verifies the validity bitmap survives -// the wire: a middle null row decodes as IsNull(1) and untouched neighbours. +// the wire: a middle null row decodes as IsNull(1) and untouched neighbors. func TestDecode_RoundTrip_NullInMiddle(t *testing.T) { schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{ {Role: vectorized.RoleField, Name: "v", Type: vectorized.ColumnTypeInt64}, @@ -325,14 +325,14 @@ func TestDecode_TruncatedColumnData_FailsLoud(t *testing.T) { // 3 rows declared but only 2 int64 worth of data. body := []byte{ 0x00, 'V', 'F', 'R', - 0x03, // version (v3: TagValue/FieldValue proto-bytes) - 0x03, // nrows = 3 - 0x01, // ncols = 1 - 0x06, // role = Field - 0x01, // type = Int64 - 0x01, 'n', // name "n" - 0x00, // TagFamilyLen = 0 (RoleField has no family) - 0x00, // validity (1 byte, all valid) + 0x03, // version (v3: TagValue/FieldValue proto-bytes) + 0x03, // nrows = 3 + 0x01, // ncols = 1 + 0x06, // role = Field + 0x01, // type = Int64 + 0x01, 'n', // name "n" + 0x00, // TagFamilyLen = 0 (RoleField has no family) + 0x00, // validity (1 byte, all valid) 1, 0, 0, 0, 0, 0, 0, 0, // row 0 2, 0, 0, 0, 0, 0, 0, 0, // row 1 — row 2 missing } @@ -359,6 +359,7 @@ func TestDecode_TrailingBytes_FailsLoud(t *testing.T) { if err != nil { t.Fatalf("Encode: %v", err) } + // nolint:gocritic // intentional: 3-index slice forces allocation of a new backing array so `raw` stays untouched corrupt := append(raw[:len(raw):len(raw)], 0xFF, 0xAA) _, decodeErr := Decode(corrupt) if decodeErr == nil { @@ -375,12 +376,12 @@ func TestDecode_TrailingBytes_FailsLoud(t *testing.T) { func TestDecode_UnknownRoleByte_FailsLoud(t *testing.T) { body := []byte{ 0x00, 'V', 'F', 'R', - 0x03, // version (v3) - 0x00, // nrows = 0 - 0x01, // ncols = 1 - 0xFE, // role byte 254 — unassigned (rejected before TagFamily read) - 0x01, // type = Int64 - 0x01, 'n', // name "n" + 0x03, // version (v3) + 0x00, // nrows = 0 + 0x01, // ncols = 1 + 0xFE, // role byte 254 — unassigned (rejected before TagFamily read) + 0x01, // type = Int64 + 0x01, 'n', // name "n" } _, err := Decode(body) if err == nil { diff --git a/pkg/query/vectorized/measure/frame/encode.go b/pkg/query/vectorized/measure/frame/encode.go index cb73092ee..c3de1dced 100644 --- a/pkg/query/vectorized/measure/frame/encode.go +++ b/pkg/query/vectorized/measure/frame/encode.go @@ -142,8 +142,9 @@ func appendValidityBitmap(buf []byte, col vectorized.Column, active []int) []byt // Variable-width types (string, []byte) write uvarint(len) + len bytes per row; // null rows write len=0 + 0 bytes (the validity bitmap disambiguates "null" // from "empty string"/"empty bytes"). +// nolint:gocyclo // switch-dispatch over ColumnType variants is intentionally exhaustive; splitting per-case helpers would obscure the wire-format mapping func appendColumnData(buf []byte, col vectorized.Column, t vectorized.ColumnType, active []int) ([]byte, error) { - switch t { + switch t { //nolint:exhaustive // Int64Array/StrArray are handled via passthrough at the dispatcher; this function never receives them case vectorized.ColumnTypeInt64: tc, ok := col.(*vectorized.TypedColumn[int64]) if !ok { @@ -257,7 +258,7 @@ func appendColumnData(buf []byte, col vectorized.Column, t vectorized.ColumnType // frameColType. Unsupported types yield ErrUnsupportedColumnType so callers // surface the bad input loudly at encode time. func mapColumnType(t vectorized.ColumnType) (frameColType, error) { - switch t { + switch t { //nolint:exhaustive // unsupported types (Int64Array/StrArray) fall through to the ErrUnsupportedColumnType return below case vectorized.ColumnTypeInt64: return frameColInt64, nil case vectorized.ColumnTypeFloat64: diff --git a/pkg/query/vectorized/measure/frame/frame.go b/pkg/query/vectorized/measure/frame/frame.go index 94ee76027..c3251d278 100644 --- a/pkg/query/vectorized/measure/frame/frame.go +++ b/pkg/query/vectorized/measure/frame/frame.go @@ -62,8 +62,8 @@ // - Int64: N × 8 bytes little-endian. // - Float64: N × 8 bytes IEEE-754 little-endian. // - String: For each row in order: uvarint(len) + len UTF-8 bytes. -// Null rows have len=0 and 0 bytes; the validity bitmap -// disambiguates "null" from "empty string". +// Null rows have len=0 and 0 bytes; the validity bitmap +// disambiguates "null" from "empty string". // - Bytes: Same shape as String, opaque bytes. package frame @@ -84,7 +84,7 @@ import ( // the "garbage-but-parsed silently-empty" failure mode into an unmistakable // hard decode error (G9f spec Principle 3, codec contract). The remaining // three bytes 'V','F','R' are a distinctive signature so a flag-on decoder -// can recognise a valid frame from random noise on the same wire. +// can recognize a valid frame from random noise on the same wire. var Magic = [4]byte{data.RawFrameMagicLeadingByte, 'V', 'F', 'R'} // WireVersion is the on-wire frame format version emitted by Encode. The @@ -122,9 +122,9 @@ const MinHeaderLen = MagicLen + 1 + 1 + 1 // Header is the parsed frame header (everything up to but not including the // first column block). type Header struct { - Magic [4]byte NumRows uint64 NumCols uint64 + Magic [4]byte WireVersion uint8 } diff --git a/pkg/query/vectorized/measure/frame/frame_test.go b/pkg/query/vectorized/measure/frame/frame_test.go index 1ecc3c3e4..1fc859916 100644 --- a/pkg/query/vectorized/measure/frame/frame_test.go +++ b/pkg/query/vectorized/measure/frame/frame_test.go @@ -32,7 +32,7 @@ import ( // TestMagic_LeadingByteIs00 pins the codec contract at the package boundary: // frame.Magic[0] MUST equal api/data.RawFrameMagicLeadingByte (0x00). The // remaining bytes are the distinctive 'VFR' signature so a flag-on decoder -// can recognise the frame, but the load-bearing safety property is that the +// can recognize the frame, but the load-bearing safety property is that the // first byte is the field-0 varint tag. func TestMagic_LeadingByteIs00(t *testing.T) { if Magic[0] != data.RawFrameMagicLeadingByte { @@ -79,14 +79,14 @@ func TestEncode_SingleInt64Column_GoldenBytes(t *testing.T) { } want := []byte{ 0x00, 'V', 'F', 'R', // magic - 0x03, // version (v3: TagValue/FieldValue proto-bytes) - 0x03, // nrows uvarint - 0x01, // ncols uvarint - 0x06, // role = Field - 0x01, // type = Int64 - 0x01, 'n', // name length + name - 0x00, // TagFamilyLen = 0 (RoleField has no family) - 0x00, // validity bitmap (1 byte, all valid) + 0x03, // version (v3: TagValue/FieldValue proto-bytes) + 0x03, // nrows uvarint + 0x01, // ncols uvarint + 0x06, // role = Field + 0x01, // type = Int64 + 0x01, 'n', // name length + name + 0x00, // TagFamilyLen = 0 (RoleField has no family) + 0x00, // validity bitmap (1 byte, all valid) 10, 0, 0, 0, 0, 0, 0, 0, // LE int64 10 20, 0, 0, 0, 0, 0, 0, 0, // LE int64 20 30, 0, 0, 0, 0, 0, 0, 0, // LE int64 30 @@ -143,23 +143,23 @@ func TestEncode_StringColumn_LengthPrefixedRows(t *testing.T) { } want := []byte{ 0x00, 'V', 'F', 'R', // magic - 0x03, // version (v3: TagValue/FieldValue proto-bytes) - 0x02, // nrows - 0x01, // ncols - 0x05, // role = Tag - 0x03, // type = String - 0x06, 'r', 'e', 'g', 'i', 'o', 'n', // name "region" - 0x03, 'g', 'e', 'o', // tag family "geo" - 0x00, // validity (2 rows, all valid) - 0x07, 'u', 's', '-', 'e', 'a', 's', 't', // row 0: "us-east" - 0x07, 'u', 's', '-', 'w', 'e', 's', 't', // row 1: "us-west" + 0x03, // version (v3: TagValue/FieldValue proto-bytes) + 0x02, // nrows + 0x01, // ncols + 0x05, // role = Tag + 0x03, // type = String + 0x06, 'r', 'e', 'g', 'i', 'o', 'n', // name "region" + 0x03, 'g', 'e', 'o', // tag family "geo" + 0x00, // validity (2 rows, all valid) + 0x07, 'u', 's', '-', 'e', 'a', 's', 't', // row 0: "us-east" + 0x07, 'u', 's', '-', 'w', 'e', 's', 't', // row 1: "us-west" } if !bytes.Equal(got, want) { t.Fatalf("Encode mismatch:\n got %#x\n want %#x", got, want) } } -// TestEncode_RespectsSelection asserts the encoder honours RecordBatch.Selection +// TestEncode_RespectsSelection asserts the encoder honors RecordBatch.Selection // — only the listed rows are emitted, and in the listed order. With Len=5 and // Selection={0,2,4} the frame contains 3 rows pulled from those source indices. func TestEncode_RespectsSelection(t *testing.T) { @@ -205,8 +205,8 @@ func TestEncode_RespectsSelection(t *testing.T) { // model safe against a botched partial restart. func TestEncode_ProtoUnmarshal_FailsLoud(t *testing.T) { cases := []struct { - name string build func() *vectorized.RecordBatch + name string }{ { name: "header-only", @@ -308,9 +308,9 @@ func TestValidateHeader_Roundtrip(t *testing.T) { // error — never silently succeed. func TestValidateHeader_Negatives(t *testing.T) { cases := []struct { + wantErr error name string body []byte - wantErr error }{ {name: "empty", body: nil, wantErr: ErrTruncated}, {name: "short-of-min", body: []byte{0x00, 'V', 'F'}, wantErr: ErrTruncated}, diff --git a/pkg/query/vectorized/measure/integration.go b/pkg/query/vectorized/measure/integration.go index 8da6984f5..03d2644fb 100644 --- a/pkg/query/vectorized/measure/integration.go +++ b/pkg/query/vectorized/measure/integration.go @@ -269,7 +269,7 @@ func (v *VectorizedMIterator) Schema() *vectorized.BatchSchema { return v.inner. // EmitFrame implements FrameEmitter on the public adapter — delegates to // the inner iterator's vec-native DrainPipelineToFrame path so the // data-node Rev under flag-on emits a columnar raw frame body directly -// without proto materialisation. The exported facade must implement the +// without proto materialization. The exported facade must implement the // method too because that is the type processor.go sees from the // vec dispatch return. func (v *VectorizedMIterator) EmitFrame(ctx context.Context) ([]byte, error) { diff --git a/pkg/query/vectorized/measure/plan/dispatch.go b/pkg/query/vectorized/measure/plan/dispatch.go index ba62b3e97..4472d7130 100644 --- a/pkg/query/vectorized/measure/plan/dispatch.go +++ b/pkg/query/vectorized/measure/plan/dispatch.go @@ -371,23 +371,6 @@ func projectedNames(tp *modelv1.TagProjection) map[string]struct{} { return out } -// validateProjectionParity reproduces, byte-for-byte, the projection -// errors the row path's logical_measure.Analyze raises so dispatch can -// surface the canonical WantErr=true message directly instead of falling -// through. It mirrors the row path exactly: -// -// - Tags are validated before fields (measure_analyzer.go:110-119). -// - Tag projection is checked only when non-empty; each projected tag -// (families in order, tags in order) is looked up schema-wide via the -// TagSpec registry — the logical.Schema equivalent of CommonSchema's -// TagSpecMap. The first miss returns errors.Wrap(ErrTagNotDefined, -// tagName), identical to CommonSchema.ValidateProjectionTags -// (schema.go:175): "<tagName>: tag is not defined". -// - Field projection is checked only when non-empty; the first name -// absent from the Measure schema's fields returns errors.Errorf( -// "field %s not found in schema", field), identical to -// measure.schema.ValidateProjectionFields (measure/schema.go:77). -// // ValidateMultiGroupProjection is the multi-measure counterpart of the // single-group validation Dispatch runs inline. A projected tag/field is // accepted if it resolves in ANY group's schema/measure — mirroring the @@ -433,6 +416,23 @@ func ValidateMultiGroupProjection(req *measurev1.QueryRequest, schemas []logical return nil } +// validateProjectionParity reproduces, byte-for-byte, the projection +// errors the row path's logical_measure.Analyze raises so dispatch can +// surface the canonical WantErr=true message directly instead of falling +// through. It mirrors the row path exactly: +// +// - Tags are validated before fields (measure_analyzer.go:110-119). +// - Tag projection is checked only when non-empty; each projected tag +// (families in order, tags in order) is looked up schema-wide via the +// TagSpec registry — the logical.Schema equivalent of CommonSchema's +// TagSpecMap. The first miss returns errors.Wrap(ErrTagNotDefined, +// tagName), identical to CommonSchema.ValidateProjectionTags +// (schema.go:175): "<tagName>: tag is not defined". +// - Field projection is checked only when non-empty; the first name +// absent from the Measure schema's fields returns errors.Errorf( +// "field %s not found in schema", field), identical to +// measure.schema.ValidateProjectionFields (measure/schema.go:77). +// // A nil error means every projected name resolves, so dispatch proceeds. func validateProjectionParity(req *measurev1.QueryRequest, logicalSchema logical.Schema, m *databasev1.Measure) error { if tp := req.GetTagProjection(); tp != nil { diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go b/pkg/query/vectorized/measure/plan/dispatch_test.go index 61dd31cce..d0dac8412 100644 --- a/pkg/query/vectorized/measure/plan/dispatch_test.go +++ b/pkg/query/vectorized/measure/plan/dispatch_test.go @@ -34,6 +34,8 @@ import ( measure "github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure" ) +const defaultName = "default" + func dispatchCfg(enabled bool) measure.VectorizedConfig { return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, QueryMemoryMiB: 16} } @@ -41,7 +43,7 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig { func bareReq() *measurev1.QueryRequest { return &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, TimeRange: &modelv1.TimeRange{ @@ -80,7 +82,7 @@ func dispatchSchemaFixture(t *testing.T) (*databasev1.Measure, logical.Schema, * if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - return measureSchema, logicalSchema, &commonv1.Metadata{Name: "demo", Group: "default"}, &fakeEC{} + return measureSchema, logicalSchema, &commonv1.Metadata{Name: "demo", Group: defaultName}, &fakeEC{} } // TestDispatch_RawGroupBy_ReachesEcQuery confirms G9b: GroupBy without @@ -145,7 +147,7 @@ func TestDispatch_GroupByAggUncoveredProjection_ReachesEcQuery(t *testing.T) { // GroupBy references region but TagProjection only carries svc. req.GroupBy = &measurev1.QueryRequest_GroupBy{ TagProjection: &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ - {Name: "default", Tags: []string{"region"}}, + {Name: defaultName, Tags: []string{"region"}}, }}, FieldName: fieldValue, } @@ -204,7 +206,7 @@ func TestDispatch_Top_ReachesEcQuery(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -238,7 +240,7 @@ func TestDispatch_OrderBy_ReachesEcQuery(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -269,7 +271,7 @@ func TestDispatch_OrderBy_UnknownIndexRule_BubblesUpError(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -303,12 +305,12 @@ func TestDispatch_UnknownTagProjection_SurfacesCanonicalError(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() req.TagProjection = &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ - {Name: "default", Tags: []string{"ghost"}}, + {Name: defaultName, Tags: []string{"ghost"}}, }} _, _, handled, err := Dispatch(context.Background(), req, metadata, measureSchema, logicalSchema, ec, dispatchCfg(true), false, false) @@ -339,7 +341,7 @@ func TestDispatch_UnknownFieldProjection_SurfacesCanonicalError(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -371,12 +373,12 @@ func TestDispatch_TagValidatedBeforeField(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{} req := bareReq() req.TagProjection = &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ - {Name: "default", Tags: []string{"ghost"}}, + {Name: defaultName, Tags: []string{"ghost"}}, }} req.FieldProjection = &measurev1.QueryRequest_FieldProjection{Names: []string{"phantom"}} _, _, handled, err := Dispatch(context.Background(), @@ -403,7 +405,7 @@ func TestDispatch_NoTimeRange_EmptyResultParity(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -476,7 +478,7 @@ func TestDispatch_EmptyResult_CanonicalEmptyIterator(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} iter, _, handled, err := Dispatch(context.Background(), @@ -552,7 +554,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} ec := &fakeEC{wantResult: nil, wantErr: nil} req := bareReq() @@ -587,7 +589,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t *testing.T) { func TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible(t *testing.T) { req := bareReq() // TagProjection: default=[svc] extras := [][]*logical.Tag{ - {logical.NewTag("default", "region")}, + {logical.NewTag(defaultName, "region")}, {logical.NewTag("extra", "zone")}, } got := augmentRequestWithHiddenTags(req, extras) @@ -602,10 +604,10 @@ func TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible(t *testing.T) if len(fams) != 3 { t.Fatalf("want 3 families (1 visible + 2 hidden), got %d: %+v", len(fams), fams) } - if fams[0].GetName() != "default" || fams[0].GetTags()[0] != tagSvc { + if fams[0].GetName() != defaultName || fams[0].GetTags()[0] != tagSvc { t.Fatalf("visible family must stay first, got %+v", fams[0]) } - if fams[1].GetName() != "default" || fams[1].GetTags()[0] != "region" { + if fams[1].GetName() != defaultName || fams[1].GetTags()[0] != "region" { t.Fatalf("first hidden family wrong, got %+v", fams[1]) } if fams[2].GetName() != "extra" || fams[2].GetTags()[0] != "zone" { @@ -625,7 +627,7 @@ func TestHiddenTagsMIterator_StripsHiddenTagsFromCurrent(t *testing.T) { return []*measurev1.InternalDataPoint{{ DataPoint: &measurev1.DataPoint{ TagFamilies: []*modelv1.TagFamily{{ - Name: "default", + Name: defaultName, Tags: []*modelv1.Tag{ {Key: tagSvc, Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "a"}}}}, {Key: "region", Value: &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "us"}}}}, @@ -684,7 +686,7 @@ func TestDispatch_QueryError_BubblesUp(t *testing.T) { if schemaErr != nil { t.Fatalf("BuildSchema: %v", schemaErr) } - metadata := &commonv1.Metadata{Name: "demo", Group: "default"} + metadata := &commonv1.Metadata{Name: "demo", Group: defaultName} wantErr := context.DeadlineExceeded ec := &fakeEC{wantErr: wantErr} diff --git a/pkg/query/vectorized/measure/plan/distributed.go b/pkg/query/vectorized/measure/plan/distributed.go index 93708c890..d379707e2 100644 --- a/pkg/query/vectorized/measure/plan/distributed.go +++ b/pkg/query/vectorized/measure/plan/distributed.go @@ -43,7 +43,7 @@ import ( // distributedQueryTimeout is the historical hard-coded broadcast deadline. // Retained as a fallback when DistributedPlan.cfg.BroadcastTimeout is zero // so call sites that build a VectorizedConfig by hand (most existing -// tests) keep their prior behaviour. Production deployments thread the +// tests) keep their prior behavior. Production deployments thread the // operator's --dst-broadcast-timeout through cfg.BroadcastTimeout. const distributedQueryTimeout = 15 * time.Second @@ -79,20 +79,20 @@ func SupportsDistributedRows(req *measurev1.QueryRequest) bool { // decodes them into vectorized batches, and runs the liaison operators without // routing through the row-compatible logical distributedPlan. type DistributedPlan struct { - queryTemplate *measurev1.QueryRequest - nodeTemplate *measurev1.QueryRequest - measureSchemas []*databasev1.Measure - indexRules [][]*databasev1.IndexRule - orderByTag *resolvedOrderByTag - hiddenOrderBy logical.HiddenTagSet + queryTemplate *measurev1.QueryRequest + nodeTemplate *measurev1.QueryRequest + orderByTag *resolvedOrderByTag + hiddenOrderBy logical.HiddenTagSet + measureSchemas []*databasev1.Measure // hiddenTopField is the field name appended to the nodeTemplate's // FieldProjection for Top-without-Agg queries when the Top.FieldName // is not already in the user-visible FieldProjection. Data nodes - // materialise the column so BatchTop can sort on it; the egress + // materialize the column so BatchTop can sort on it; the egress // hiddenFieldsMIterator strips it so the wire bytes match a query // without the extra projection. - hiddenTopField string - cfg vmeasure.VectorizedConfig + hiddenTopField string + indexRules [][]*databasev1.IndexRule + cfg vmeasure.VectorizedConfig } // AnalyzeDistributed builds the vectorized distributed liaison plan. @@ -100,12 +100,17 @@ type DistributedPlan struct { // req.Groups element). indexRules is the corresponding per-group slice of // index rule sets (ec.GetIndexRules() for each group). Both slices must be // in request-group order. Single-group callers may pass a length-1 slice for -// each (the existing behaviour is preserved byte-for-byte). +// each (the existing behavior is preserved byte-for-byte). // // When indexRules is nil or empty and req.OrderBy.IndexRuleName is non-empty, // the resolver surfaces an "index rule X not found" error byte-equivalent to // the row path. -func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas []*databasev1.Measure, indexRules [][]*databasev1.IndexRule, cfg vmeasure.VectorizedConfig) (*DistributedPlan, error) { +func AnalyzeDistributed( + req *measurev1.QueryRequest, + measureSchemas []*databasev1.Measure, + indexRules [][]*databasev1.IndexRule, + cfg vmeasure.VectorizedConfig, +) (*DistributedPlan, error) { if req == nil { return nil, fmt.Errorf("vec distributed analyze: nil request") } @@ -135,7 +140,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas []*databasev nodeTemplate.Top = nil // Phase 5: GroupBy without Agg (raw GroupBy) keeps GroupBy on the // nodeTemplate so each data node runs its per-node BatchGroupByFirst pass - // and emits at most one row per group. This minimises wire bytes: the + // and emits at most one row per group. This minimizes wire bytes: the // node sends one representative row per group instead of all matching rows. // GroupBy+Agg continues to keep both on the node for partial aggregation // (unchanged from prior phases). The old `nodeTemplate.GroupBy = nil` when @@ -166,9 +171,10 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas []*databasev // For Top-without-Agg without GroupBy (Phase 4): MaxUint32 — the // per-node row count is unbounded and any finite cap risks dropping // global winners that haven't been deduplicated by GroupByFirst yet. - if nodeTemplate.GetAgg() != nil { + switch { + case nodeTemplate.GetAgg() != nil: nodeTemplate.Limit = math.MaxUint32 - } else if req.GetGroupBy() != nil { + case req.GetGroupBy() != nil: perNodeLimit := calibratedTopWithoutAggLimit(origTop, len(req.GetGroups())) nodeTemplate.Limit = perNodeLimit // Push the Top down to the data node: after BatchGroupByFirst emits @@ -178,7 +184,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas []*databasev // the true top-N representatives from every node. nodeTemplate.Top = proto.Clone(origTop).(*measurev1.QueryRequest_Top) nodeTemplate.Top.Number = int32(perNodeLimit) - } else { + default: nodeTemplate.Limit = math.MaxUint32 } } @@ -225,7 +231,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas []*databasev } // Hidden-projection augmentation: when the Top field is not in the // user-visible FieldProjection, append it to the nodeTemplate so data - // nodes materialise the column for BatchTop sorting. The egress + // nodes materialize the column for BatchTop sorting. The egress // hiddenFieldsMIterator strips it so the response matches a query // without the extra projection. if !topFieldProjectionVisible(req.GetFieldProjection(), topFieldName) { @@ -299,7 +305,7 @@ func appendOrderByToProjection(projection *modelv1.TagProjection, want resolvedO // topFieldProjectionVisible reports whether fieldName is already present in the // user-facing FieldProjection. When false, the analyzer augments the node -// template so data nodes materialise the column on the wire for BatchTop sorting. +// template so data nodes materialize the column on the wire for BatchTop sorting. func topFieldProjectionVisible(fp *measurev1.QueryRequest_FieldProjection, fieldName string) bool { if fp == nil { return false @@ -394,7 +400,7 @@ func intersectFieldProjection(proj *measurev1.QueryRequest_FieldProjection, ms * // Execute broadcasts the internal query and executes the liaison-side vectorized plan. // For single-group requests, one broadcast is issued for all groups (existing -// behaviour). For multi-group requests, one broadcast is issued per group, +// behavior). For multi-group requests, one broadcast is issued per group, // each carrying a single-element Groups slice, so data nodes can answer with // a schema that matches only their local group's columns. func (p *DistributedPlan) Execute(ctx context.Context) (executor.MIterator, error) { @@ -407,7 +413,7 @@ func (p *DistributedPlan) Execute(ctx context.Context) (executor.MIterator, erro groups := queryRequest.GetGroups() if len(groups) <= 1 { - // Single-group fast path — unchanged behaviour. + // Single-group fast path — unchanged behavior. nodeRequest := proto.Clone(p.nodeTemplate).(*measurev1.QueryRequest) nodeRequest.TimeRange = dctx.TimeRange() internalRequest := &measurev1.InternalQueryRequest{Request: nodeRequest, AggReturnPartial: queryRequest.GetAgg() != nil} @@ -519,11 +525,13 @@ func (p *DistributedPlan) executeAgg(ctx context.Context, frames [][]byte, req * topSpec = &vmeasure.ReduceTopSpec{FieldName: top.GetFieldName(), N: int(top.GetNumber()), Asc: top.GetFieldValueSort() == modelv1.Sort_SORT_ASC} } tracker := vectorized.NewMemoryTracker(int64(p.cfg.QueryMemoryMiB) * 1024 * 1024) + // nolint:contextcheck // pure in-memory reducer; no cancelable I/O downstream batches, reduceErr := vmeasure.ReduceRawFrames(frames, keyTagNames, aggSpecs, p.cfg.BatchSize, tracker) if reduceErr != nil { return nil, fmt.Errorf("vec distributed plan: reduce raw frames: %w", reduceErr) } if topSpec != nil && topSpec.N > 0 && len(batches) > 0 { + // nolint:contextcheck // pure in-memory top selection; no cancelable I/O downstream topped, topErr := vmeasure.ApplyTopToReduce(batches, *topSpec, p.cfg.BatchSize) if topErr != nil { return nil, fmt.Errorf("vec distributed plan: top reduced frames: %w", topErr) @@ -573,6 +581,7 @@ func (p *DistributedPlan) executeRows(ctx context.Context, frames [][]byte, req // series per window. if groupBy := req.GetGroupBy(); groupBy != nil { var gbErr error + // nolint:contextcheck // pure in-memory dedup; no cancelable I/O downstream batches, gbErr = applyBatchGroupByFirstToRows(batches, groupBy, p.cfg.BatchSize, tracker) if gbErr != nil { return nil, fmt.Errorf("vec distributed plan: apply group-by to rows: %w", gbErr) @@ -580,6 +589,7 @@ func (p *DistributedPlan) executeRows(ctx context.Context, frames [][]byte, req } if top := req.GetTop(); top != nil { var topErr error + // nolint:contextcheck // pure in-memory top selection; no cancelable I/O downstream batches, topErr = applyBatchTopToRows(batches, top, p.cfg.BatchSize) if topErr != nil { return nil, fmt.Errorf("vec distributed plan: apply top to rows: %w", topErr) @@ -627,6 +637,7 @@ func (p *DistributedPlan) executeRowsMultiGroup(ctx context.Context, groupFrames // to the global Top ranking. if groupBy := req.GetGroupBy(); groupBy != nil { var gbErr error + // nolint:contextcheck // pure in-memory dedup; no cancelable I/O downstream batches, gbErr = applyBatchGroupByFirstToRows(batches, groupBy, p.cfg.BatchSize, tracker) if gbErr != nil { return nil, fmt.Errorf("vec distributed plan: apply group-by to multi-group rows: %w", gbErr) @@ -634,6 +645,7 @@ func (p *DistributedPlan) executeRowsMultiGroup(ctx context.Context, groupFrames } if top := req.GetTop(); top != nil { var topErr error + // nolint:contextcheck // pure in-memory top selection; no cancelable I/O downstream batches, topErr = applyBatchTopToRows(batches, top, p.cfg.BatchSize) if topErr != nil { return nil, fmt.Errorf("vec distributed plan: apply top to multi-group rows: %w", topErr) @@ -842,7 +854,12 @@ func (p *DistributedPlan) iteratorFromBatches(ctx context.Context, batches []*ve // (already computed by BuildMultiGroupBatchSchema) so this function does not // fall back to Analyze on empty output — an empty multi-group result simply // returns no rows rather than re-deriving a schema from a single group. -func (p *DistributedPlan) iteratorFromBatchesWithSchema(ctx context.Context, batches []*vectorized.RecordBatch, req *measurev1.QueryRequest, schema *vectorized.BatchSchema) (executor.MIterator, error) { +func (p *DistributedPlan) iteratorFromBatchesWithSchema( + ctx context.Context, + batches []*vectorized.RecordBatch, + req *measurev1.QueryRequest, + schema *vectorized.BatchSchema, +) (executor.MIterator, error) { builder := vectorized.NewPipelineBuilder().WithMemoryTracker(vectorized.NewMemoryTracker(int64(p.cfg.QueryMemoryMiB) * 1024 * 1024)) builder.From(&batchSliceSource{batches: batches, schema: schema}) limit := req.GetLimit() @@ -896,7 +913,7 @@ func (p *DistributedPlan) iteratorFromBatchesWithSchema(ctx context.Context, bat // // Engineering ceiling: the formula output is capped at perNodeSafetyBound // (500_000 rows) before the uint32 overflow guard. Rationale: each data node -// in a typical 4 GB deployment can safely materialise ~500K int64 rows +// in a typical 4 GB deployment can safely materialize ~500K int64 rows // (~4 MB) per GroupBy+Top response without memory pressure. Any N large // enough to push 3*N beyond 500_000 (i.e. N > ~166_667) is far outside // normal operational use; capping at 500_000 prevents a pathological @@ -973,8 +990,8 @@ func (p *DistributedPlan) String() string { } type batchSliceSource struct { - batches []*vectorized.RecordBatch schema *vectorized.BatchSchema + batches []*vectorized.RecordBatch idx int } diff --git a/pkg/query/vectorized/measure/plan/distributed_rows.go b/pkg/query/vectorized/measure/plan/distributed_rows.go index 208a8615e..39e63780b 100644 --- a/pkg/query/vectorized/measure/plan/distributed_rows.go +++ b/pkg/query/vectorized/measure/plan/distributed_rows.go @@ -52,7 +52,7 @@ type seenSIDKey struct { // OrderByFamily / OrderByTagName select the sort column when the request // carries an OrderBy.IndexRuleName (Phase 2). Both empty means time-sort: // the comparator uses the timestamp column encoded as 8-byte big-endian, -// matching the Phase 1.5 behaviour byte-for-byte. The merger looks up the +// matching the Phase 1.5 behavior byte-for-byte. The merger looks up the // column index from the merged-batch schema after frames have been decoded, // so the spec only carries the names — not a column index. // @@ -61,9 +61,9 @@ type seenSIDKey struct { // column without going through TagIndex lookup. The merger prefers the // explicit index when >= 0; otherwise it resolves from the names. type distributedRowsSpec struct { + Tracker *vectorized.MemoryTracker OrderByFamily string OrderByTagName string - Tracker *vectorized.MemoryTracker BatchSize int OrderByColIdx int Desc bool @@ -106,20 +106,20 @@ func (r *distributedRowItem) SortedField() []byte { return r.sortField } // sources is built by handing N of these to itersort.NewItemIter. When the // merger is configured for OrderBy-by-index-rule, sortKeys caches the // per-row encoded sort bytes so the local sort and Next()'s SortedField -// both reuse the same encoding without re-marshalling per call. +// both reuse the same encoding without re-marshaling per call. type distributedRowSourceIter struct { - batch *vectorized.RecordBatch - cur *distributedRowItem - sortKeys [][]byte - indices []int - source int - group int - sidIdx int - tsIdx int - verIdx int - sortColIdx int - pos int - seq int + batch *vectorized.RecordBatch + cur *distributedRowItem + sortKeys [][]byte + indices []int + source int + group int + sidIdx int + tsIdx int + verIdx int + sortColIdx int + pos int + seq int } // newDistributedRowSourceIter constructs the per-source iterator. When @@ -128,7 +128,13 @@ type distributedRowSourceIter struct { // pre-encodes each active row's sort key via encodeSortKey, stable-sorts // indices by that encoding (respecting desc), and reuses the cached keys // in Next() to avoid re-encoding per heap pop. -func newDistributedRowSourceIter(batch *vectorized.RecordBatch, schema *vectorized.BatchSchema, source, group int, desc bool, sortColIdx int) (*distributedRowSourceIter, error) { +func newDistributedRowSourceIter( + batch *vectorized.RecordBatch, + schema *vectorized.BatchSchema, + source, group int, + desc bool, + sortColIdx int, +) (*distributedRowSourceIter, error) { tsIdx := schema.TimestampIndex() indices := activeDistributedRows(batch) iter := &distributedRowSourceIter{ @@ -155,7 +161,7 @@ func newDistributedRowSourceIter(batch *vectorized.RecordBatch, schema *vectoriz // heap's lex compare gives the right global order. Data nodes emit // per-shard pre-sorted on this same column, but tests and unusual // scan modes may not respect that, so the defensive O(n log n) sort - // keeps the heap invariant intact regardless of producer behaviour. + // keeps the heap invariant intact regardless of producer behavior. sort.SliceStable(indices, func(i, j int) bool { cmp := bytes.Compare(sortKeys[indices[i]], sortKeys[indices[j]]) if desc { @@ -280,7 +286,12 @@ func mergeDistributedRows(frames [][]byte, spec distributedRowsSpec) ([]*vectori // batches. Extracted so both the single-group path (mergeDistributedRows) and // the multi-group path (mergeDistributedRowsMulti) share the same merge loop // without duplication. -func runDistributedRowMerge(iters []itersort.Iterator[*distributedRowItem], schema *vectorized.BatchSchema, spec distributedRowsSpec, batchSize int) ([]*vectorized.RecordBatch, error) { +func runDistributedRowMerge( + iters []itersort.Iterator[*distributedRowItem], + schema *vectorized.BatchSchema, + spec distributedRowsSpec, + batchSize int, +) ([]*vectorized.RecordBatch, error) { merger := itersort.NewItemIter(iters, spec.Desc) defer func() { _ = merger.Close() }() @@ -334,7 +345,12 @@ type decodedBatchSource struct { // heap expects (largest-first for desc, smallest-first for asc). sortColIdx // selects the comparator: < 0 means time-sort (Phase 1.5), >= 0 means the // OrderBy column at that index in the merged-batch schema (Phase 2). -func buildDistributedRowSourceIters(sources []decodedBatchSource, schema *vectorized.BatchSchema, desc bool, sortColIdx int) ([]itersort.Iterator[*distributedRowItem], error) { +func buildDistributedRowSourceIters( + sources []decodedBatchSource, + schema *vectorized.BatchSchema, + desc bool, + sortColIdx int, +) ([]itersort.Iterator[*distributedRowItem], error) { iters := make([]itersort.Iterator[*distributedRowItem], 0, len(sources)) for _, src := range sources { iter, buildErr := newDistributedRowSourceIter(src.batch, schema, src.source, src.group, desc, sortColIdx) @@ -373,17 +389,17 @@ func resolveDistributedSortColumn(schema *vectorized.BatchSchema, spec distribut // dedup map, the current in-progress output batch, the cross-window seenSID // guard for index-mode queries, and the returned batch list. type distributedRowEmitter struct { - pool *vectorized.BatchPool - schema *vectorized.BatchSchema - tracker *vectorized.MemoryTracker - seenSID map[seenSIDKey]struct{} - window map[distributedRowKey]*distributedRowItem - current *vectorized.RecordBatch - windowKey []byte - output []*vectorized.RecordBatch - batchSize int - rowWidth int64 - indexMode bool + pool *vectorized.BatchPool + schema *vectorized.BatchSchema + tracker *vectorized.MemoryTracker + seenSID map[seenSIDKey]struct{} + window map[distributedRowKey]*distributedRowItem + current *vectorized.RecordBatch + windowKey []byte + output []*vectorized.RecordBatch + batchSize int + rowWidth int64 + indexMode bool } // accept adds an incoming row to the current sort-group dedup map. The window @@ -404,7 +420,7 @@ func (e *distributedRowEmitter) accept(item *distributedRowItem) { // // Emit-order note: sortedMIterator.loadOneGroup in the row-path baseline // (pkg/query/logical/measure/measure_plan_distributed.go) iterates its -// uniqueData map in Go map order, which is intentionally randomised by the +// uniqueData map in Go map order, which is intentionally randomized by the // runtime. The vec path here imposes a deterministic (source, seq) order via // sort.SliceStable on the window's surviving rows so equal-sort-field output // is reproducible across reruns and process restarts. This is strictly @@ -442,7 +458,7 @@ func (e *distributedRowEmitter) flushWindow() error { } // appendRowToCurrent zero-copies one source row into the active output batch -// via measure.AppendColumnRange. When the batch fills, it is finalised (which +// via measure.AppendColumnRange. When the batch fills, it is finalized (which // reserves memory tracker bytes, appends to the output, then releases the // reservation so the tracker bounds in-transit batches without // double-counting once they reach the caller). diff --git a/pkg/query/vectorized/measure/plan/distributed_rows_test.go b/pkg/query/vectorized/measure/plan/distributed_rows_test.go index 9bbb62706..fc8139c04 100644 --- a/pkg/query/vectorized/measure/plan/distributed_rows_test.go +++ b/pkg/query/vectorized/measure/plan/distributed_rows_test.go @@ -315,6 +315,7 @@ func collectOrderByRows(batches []*vectorized.RecordBatch) []orderByRow { // instead of the timestamp when OrderByFamily/OrderByTagName are set: // - ascending: the surviving rows are emitted in ascending tag value; // - descending: same set, reversed. +// // Two sources contribute rows that arrive interleaved in input order to // exercise the cross-source ordering invariant. func TestMergeDistributedRows_OrderByString_AscDescAcrossSources(t *testing.T) { @@ -457,9 +458,9 @@ func TestMergeDistributedRows_RawGroupBy_FirstSeenPerGroup(t *testing.T) { // Collect grouped rows. type groupedRow struct { + svc string ts int64 value int64 - svc string } var rows []groupedRow for _, batch := range grouped { @@ -497,7 +498,7 @@ func TestMergeDistributedRows_RawGroupBy_FirstSeenPerGroup(t *testing.T) { // Scenario: perNodeLimit = 2 (Top.N = 1, nGroups = 1 → calibrated limit = 2). // Source A has 4 groups (svc-a1 value=900, svc-a2 value=100, svc-a3 value=800, svc-a4 value=50). // After BatchGroupByFirst: 4 rows, one per group (already deduplicated). -// Without per-node BatchTop (old behaviour): Limit(2) would keep svc-a1 and +// Without per-node BatchTop (old behavior): Limit(2) would keep svc-a1 and // svc-a2 (value=900 and 100) in insertion order, discarding svc-a3 (value=800). // The global top-1 is svc-a1 (value=900) — correct by accident, but svc-a3 // (the second-highest) is lost. @@ -715,8 +716,8 @@ func encodeMultiGroupRows(t *testing.T, schema *vectorized.BatchSchema, rows ... // has the base 5 columns), the merged output has IsNull==true for that column // for all group-0 rows, while group-1 rows (which carry the column) are non-null. func TestMergeDistributedRows_MultiGroup_NullFillsMissingColumn(t *testing.T) { - baseSchema := distributedRowsTestSchema() // 5 cols, no extra_tag - mergedSchema := multiGroupMergedSchema() // 6 cols, with extra_tag + baseSchema := distributedRowsTestSchema() // 5 cols, no extra_tag + mergedSchema := multiGroupMergedSchema() // 6 cols, with extra_tag // Group 0: uses base schema — no extra_tag column. frameGroup0 := encodeMultiGroupRows(t, baseSchema, @@ -750,11 +751,11 @@ func TestMergeDistributedRows_MultiGroup_NullFillsMissingColumn(t *testing.T) { t.Fatal("expected non-empty output batches") } - // Collect rows and verify null-fill behaviour. + // Collect rows and verify null-fill behavior. type mergedRow struct { + extraTag string ts int64 sid int64 - extraTag string nullTag bool } var rows []mergedRow diff --git a/pkg/query/vectorized/measure/plan/distributed_test.go b/pkg/query/vectorized/measure/plan/distributed_test.go index 95c4dc637..a4dc9acec 100644 --- a/pkg/query/vectorized/measure/plan/distributed_test.go +++ b/pkg/query/vectorized/measure/plan/distributed_test.go @@ -35,7 +35,7 @@ import ( func TestAnalyzeDistributed_AllowsGroupByTopWithoutAgg(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, GroupBy: &measurev1.QueryRequest_GroupBy{ @@ -58,7 +58,7 @@ func TestAnalyzeDistributed_AllowsGroupByTopWithoutAgg(t *testing.T) { func TestAnalyzeDistributed_AllowsSupportedNonAggRows(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, Limit: 7, @@ -81,11 +81,11 @@ func TestAnalyzeDistributed_AllowsSupportedNonAggRows(t *testing.T) { func TestSupportsDistributedRows(t *testing.T) { base := func() *measurev1.QueryRequest { - return &measurev1.QueryRequest{Name: "demo", Groups: []string{"default"}} + return &measurev1.QueryRequest{Name: "demo", Groups: []string{defaultName}} } cases := []struct { - name string req *measurev1.QueryRequest + name string want bool }{ {name: "plain", req: base(), want: true}, @@ -281,7 +281,7 @@ func TestAnalyzeDistributed_TopWithoutAgg_NodeLimitIsUnbounded(t *testing.T) { } req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, Top: &measurev1.QueryRequest_Top{ @@ -314,15 +314,15 @@ func TestAnalyzeDistributed_TopWithoutAgg_NodeLimitIsUnbounded(t *testing.T) { // TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded covers // the Phase 4 augmentation path: when Top.FieldName is NOT in the request's // FieldProjection, AnalyzeDistributed must append it to the nodeTemplate so -// data nodes materialise the column, and record hiddenTopField so the egress +// data nodes materialize the column, and record hiddenTopField so the egress // strip removes it before the response. func TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing.T) { // Visible field projection: only "total" — "value" (Top.FieldName) is absent. req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ - {Name: "default", Tags: []string{tagSvc}}, + {Name: defaultName, Tags: []string{tagSvc}}, }}, FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{"total"}}, Top: &measurev1.QueryRequest_Top{ @@ -334,9 +334,9 @@ func TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing. } // Schema must have both "total" and "value" fields. ms := &databasev1.Measure{ - Metadata: &commonv1.Metadata{Name: "demo", Group: "default"}, + Metadata: &commonv1.Metadata{Name: "demo", Group: defaultName}, TagFamilies: []*databasev1.TagFamilySpec{ - {Name: "default", Tags: []*databasev1.TagSpec{ + {Name: defaultName, Tags: []*databasev1.TagSpec{ {Name: tagSvc, Type: databasev1.TagType_TAG_TYPE_STRING}, }}, }, @@ -356,7 +356,7 @@ func TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing. if p.hiddenTopField != fieldValue { t.Fatalf("hiddenTopField: got %q, want %q", p.hiddenTopField, fieldValue) } - // Node template must project the Top field so data nodes materialise it. + // Node template must project the Top field so data nodes materialize it. nodeNames := p.nodeTemplate.GetFieldProjection().GetNames() sawTotal, sawValue := false, false for _, n := range nodeNames { @@ -383,7 +383,7 @@ func TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing. func TestAnalyzeDistributed_TopWithoutAgg_FieldNotInSchema(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, Top: &measurev1.QueryRequest_Top{ @@ -408,7 +408,7 @@ func TestAnalyzeDistributed_TopWithoutAgg_FieldNotInSchema(t *testing.T) { // AnalyzeDistributed as the indexRules parameter. func testIndexRuleOnTag(ruleName, tagName string) *databasev1.IndexRule { return &databasev1.IndexRule{ - Metadata: &commonv1.Metadata{Name: ruleName, Group: "default"}, + Metadata: &commonv1.Metadata{Name: ruleName, Group: defaultName}, Tags: []string{tagName}, } } @@ -421,7 +421,7 @@ func testIndexRuleOnTag(ruleName, tagName string) *databasev1.IndexRule { func TestAnalyzeDistributed_OrderByByIndexRule_AcceptedNatively(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), // already projects "svc" FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, OrderBy: &modelv1.QueryOrder{IndexRuleName: "svc_idx", Sort: modelv1.Sort_SORT_ASC}, @@ -431,14 +431,19 @@ func TestAnalyzeDistributed_OrderByByIndexRule_AcceptedNatively(t *testing.T) { if !SupportsDistributedRows(req) { t.Fatal("Phase 2: SupportsDistributedRows must accept OrderBy by index rule") } - p, analyzeErr := AnalyzeDistributed(req, []*databasev1.Measure{testMeasureSchema()}, [][]*databasev1.IndexRule{indexRules}, vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 1}) + p, analyzeErr := AnalyzeDistributed( + req, + []*databasev1.Measure{testMeasureSchema()}, + [][]*databasev1.IndexRule{indexRules}, + vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 1}, + ) if analyzeErr != nil { t.Fatalf("AnalyzeDistributed: %v", analyzeErr) } if p.orderByTag == nil { t.Fatal("DistributedPlan.orderByTag must be populated when an index rule resolves") } - if p.orderByTag.family != "default" || p.orderByTag.tag != tagSvc { + if p.orderByTag.family != defaultName || p.orderByTag.tag != tagSvc { t.Fatalf("orderByTag got %+v want default/%s", *p.orderByTag, tagSvc) } if !p.hiddenOrderBy.IsEmpty() { @@ -460,22 +465,27 @@ func TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing. // Projection visible to the user: only "region" — the OrderBy tag // "svc" is NOT projected, so the analyzer must hide-project it. visibleProjection := &modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{ - {Name: "default", Tags: []string{"region"}}, + {Name: defaultName, Tags: []string{"region"}}, }} req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: visibleProjection, FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, OrderBy: &modelv1.QueryOrder{IndexRuleName: "svc_idx", Sort: modelv1.Sort_SORT_DESC}, Limit: 5, } indexRules := []*databasev1.IndexRule{testIndexRuleOnTag("svc_idx", tagSvc)} - p, analyzeErr := AnalyzeDistributed(req, []*databasev1.Measure{testMeasureSchema()}, [][]*databasev1.IndexRule{indexRules}, vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 1}) + p, analyzeErr := AnalyzeDistributed( + req, + []*databasev1.Measure{testMeasureSchema()}, + [][]*databasev1.IndexRule{indexRules}, + vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, QueryMemoryMiB: 1}, + ) if analyzeErr != nil { t.Fatalf("AnalyzeDistributed: %v", analyzeErr) } - if p.orderByTag == nil || p.orderByTag.family != "default" || p.orderByTag.tag != tagSvc { + if p.orderByTag == nil || p.orderByTag.family != defaultName || p.orderByTag.tag != tagSvc { t.Fatalf("orderByTag got %+v want default/%s", p.orderByTag, tagSvc) } if p.hiddenOrderBy.IsEmpty() { @@ -492,7 +502,7 @@ func TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing. } // The node template's TagProjection must include the OrderBy tag. nodeFams := p.nodeTemplate.GetTagProjection().GetTagFamilies() - if len(nodeFams) != 1 || nodeFams[0].GetName() != "default" { + if len(nodeFams) != 1 || nodeFams[0].GetName() != defaultName { t.Fatalf("node template projection wrong shape, got %+v", nodeFams) } saw := map[string]bool{} @@ -511,7 +521,7 @@ func TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing. func TestAnalyzeDistributed_OrderByByIndexRule_UnknownRuleErrors(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, OrderBy: &modelv1.QueryOrder{IndexRuleName: "nope"}, @@ -533,7 +543,7 @@ func testMeasureSchemaForGroup(group string) *databasev1.Measure { Metadata: &commonv1.Metadata{Name: "demo", Group: group}, TagFamilies: []*databasev1.TagFamilySpec{ { - Name: "default", + Name: defaultName, Tags: []*databasev1.TagSpec{ {Name: tagSvc, Type: databasev1.TagType_TAG_TYPE_STRING}, {Name: "region", Type: databasev1.TagType_TAG_TYPE_STRING}, @@ -590,7 +600,7 @@ func TestAnalyzeDistributed_MultiGroup_UnionsSchemaAcrossGroups(t *testing.T) { Metadata: &commonv1.Metadata{Name: "demo", Group: "groupB"}, TagFamilies: []*databasev1.TagFamilySpec{ { - Name: "default", + Name: defaultName, Tags: []*databasev1.TagSpec{ {Name: tagSvc, Type: databasev1.TagType_TAG_TYPE_STRING}, {Name: "region", Type: databasev1.TagType_TAG_TYPE_STRING}, @@ -693,8 +703,8 @@ func TestHiddenFieldsMIterator_StripsHiddenTopField(t *testing.T) { func TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t *testing.T) { cases := []struct { name string - topN int32 groups []string + topN int32 wantLimit uint32 }{ { @@ -712,7 +722,7 @@ func TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t *testing.T) { name: "SingleGroup_Top5_WithGroupBy", topN: 5, - groups: []string{"default"}, + groups: []string{defaultName}, wantLimit: calibratedTopWithoutAggLimit(&measurev1.QueryRequest_Top{Number: 5, FieldName: fieldValue}, 1), }, } @@ -774,13 +784,13 @@ func TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t *testing.T) // TestAnalyzeDistributed_RawGroupBy_NodeTemplateKeepsGroupBy verifies that // Phase 5 propagates GroupBy to data nodes for raw GroupBy requests (GroupBy // without Agg). Data nodes must receive the GroupBy so they run their per-node -// BatchGroupByFirst pass and emit at most one row per group, minimising wire -// bytes. The old behaviour (clearing nodeTemplate.GroupBy when Agg is nil) is +// BatchGroupByFirst pass and emit at most one row per group, minimizing wire +// bytes. The old behavior (clearing nodeTemplate.GroupBy when Agg is nil) is // intentionally dropped in Phase 5. func TestAnalyzeDistributed_RawGroupBy_NodeTemplateKeepsGroupBy(t *testing.T) { req := &measurev1.QueryRequest{ Name: "demo", - Groups: []string{"default"}, + Groups: []string{defaultName}, TagProjection: projTagProj(), FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}}, GroupBy: &measurev1.QueryRequest_GroupBy{ diff --git a/pkg/query/vectorized/measure/plan/hidden_tags.go b/pkg/query/vectorized/measure/plan/hidden_tags.go index 47862f9c6..dc70a70a3 100644 --- a/pkg/query/vectorized/measure/plan/hidden_tags.go +++ b/pkg/query/vectorized/measure/plan/hidden_tags.go @@ -99,7 +99,7 @@ func (h *hiddenTagsMIterator) Close() error { return h.inner.Close() } // EmitFrame implements vmeasure.FrameEmitter. Draining via the wrapper's // own Next / Current keeps the hidden-tag strip as the single source of // truth — Current already removes the hidden criteria tags from each -// emitted DataPoint, so the reverse-serialised RecordBatch carries only +// emitted DataPoint, so the reverse-serialized RecordBatch carries only // the projected columns. The columnar wire then matches what a query // without hidden criteria tags would have emitted, byte-identical to // the row path. @@ -119,7 +119,7 @@ func (h *hiddenTagsMIterator) EmitFrame(_ context.Context) ([]byte, error) { // hiddenFieldsMIterator wraps an MIterator and strips a single hidden field // from each Current() result. It is the Phase 4 parallel of hiddenTagsMIterator: // when Top.FieldName is not in the user-visible FieldProjection, the analyzer -// appends it to the nodeTemplate so data nodes materialise it for BatchTop +// appends it to the nodeTemplate so data nodes materialize it for BatchTop // sorting. This wrapper removes it at egress so the wire bytes match a query // without the extra field projection — byte-identical to the row path's // hidden-projection strip for criteria tags. diff --git a/pkg/query/vectorized/measure/plan/multi_group_schema.go b/pkg/query/vectorized/measure/plan/multi_group_schema.go index bb0c30624..1b665b24a 100644 --- a/pkg/query/vectorized/measure/plan/multi_group_schema.go +++ b/pkg/query/vectorized/measure/plan/multi_group_schema.go @@ -173,7 +173,7 @@ func BuildMultiGroupBatchSchema(measureSchemas []*databasev1.Measure, req *measu orderedFields = append(orderedFields, name) } else { // Field not in any group's schema — include as passthrough - // (BuildBatchSchema's null-slot behaviour for unknown fields). + // (BuildBatchSchema's null-slot behavior for unknown fields). orderedFields = append(orderedFields, name) fieldColType[name] = vectorized.ColumnTypeFieldValue } @@ -204,7 +204,7 @@ func nativeKeyMulti(family, name string) string { return family + "\x00" + name // for types that have no direct vec mapping (UNSPECIFIED, TIMESTAMP) so the // caller can fall back to ColumnTypeTagValue. func tagTypeToColumnTypeMG(t databasev1.TagType) (vectorized.ColumnType, error) { - switch t { + switch t { //nolint:exhaustive // TAG_TYPE_TIMESTAMP and UNSPECIFIED fall through to the error return below case databasev1.TagType_TAG_TYPE_INT: return vectorized.ColumnTypeInt64, nil case databasev1.TagType_TAG_TYPE_STRING: diff --git a/pkg/query/vectorized/measure/plan/orderby.go b/pkg/query/vectorized/measure/plan/orderby.go index 01b94a26e..4fd1e1409 100644 --- a/pkg/query/vectorized/measure/plan/orderby.go +++ b/pkg/query/vectorized/measure/plan/orderby.go @@ -78,4 +78,3 @@ func resolveOrderByTag(measureSchema *databasev1.Measure, indexRules []*database } return resolvedOrderByTag{}, fmt.Errorf("tag %s not found", tagName) } - diff --git a/pkg/query/vectorized/measure/raw_emit.go b/pkg/query/vectorized/measure/raw_emit.go index c6a4cdc5c..9f6fedbee 100644 --- a/pkg/query/vectorized/measure/raw_emit.go +++ b/pkg/query/vectorized/measure/raw_emit.go @@ -37,18 +37,18 @@ import ( // // - VectorizedMIterator: drains the underlying vec Pipeline directly // via DrainPipelineToFrame — the throughput-optimal path that -// never materialises proto datapoints. +// never materializes proto datapoints. // - emptyMIterator: returns a nil body (matches the codec layer's // RawFrameCodec carve-out for empty distributed results). // - hiddenTagsMIterator: drains via Next / Current (which already // strips hidden criteria tags from the egress datapoints), then -// reverse-serialises the surviving rows into a passthrough +// reverse-serializes the surviving rows into a passthrough // RecordBatch through SerializeDataPointsToFrame. // - sortedMIterator: drains via Next / Current (which already // applies cross-group merge + version dedup), then reverse- -// serialises through SerializeDataPointsToFrame. +// serializes through SerializeDataPointsToFrame. // -// The reverse-serialise path is less efficient than draining a vec +// The reverse-serialize path is less efficient than draining a vec // Pipeline (one allocation per cell during passthrough rebuild) but // keeps the wrapper's row-side semantics — hidden-tag strip, sort, // dedup — as the single source of truth on the wire. @@ -120,7 +120,7 @@ func DrainPipelineToFrame(ctx context.Context, p *vectorized.Pipeline, schema *v // typed columns; the typed-cell → TagValue/FieldValue reconstruction // happens at the row-egress (one allocation per surviving row, vs the // scan-time decode the storage avoided via passthrough — but the - // trade is favourable when the wire crossing in between would + // trade is favorable when the wire crossing in between would // otherwise dominate). converted, convErr := convertPassthroughForFrame(out) if convErr != nil { @@ -134,15 +134,15 @@ func DrainPipelineToFrame(ctx context.Context, p *vectorized.Pipeline, schema *v // strip / cross-group merge logic operates on []*InternalDataPoint // rather than on a vec Pipeline. The wrapper drains itself via the // row-side Next / Current API (so its existing strip / merge / dedup -// logic still runs); the resulting rows are reverse-serialised into a +// logic still runs); the resulting rows are reverse-serialized into a // passthrough RecordBatch, convertPassthroughForFrame decodes the // passthrough columns to typed wire columns, and frame.Encode produces // the body. // // This path is less efficient than the vec native pipeline drain — it // allocates a *modelv1.TagValue / *FieldValue per cell during reverse- -// serialise — but it keeps the wrapper's egress semantics intact end- -// to-end on the wire (hidden tags stripped, cross-group order honoured, +// serialize — but it keeps the wrapper's egress semantics intact end- +// to-end on the wire (hidden tags stripped, cross-group order honored, // version dedup applied) without re-implementing each one in columnar // form. // @@ -285,7 +285,7 @@ func buildPassthroughBatchFromDataPoints(idps []*measurev1.InternalDataPoint) (* // so this is a no-op. // // Wire-type selection is variant-driven: the first non-null cell decides -// what typed column to materialise (a Str variant ⇒ string column, Int ⇒ +// what typed column to materialize (a Str variant ⇒ string column, Int ⇒ // int64, BinaryData ⇒ bytes). All-null columns default to bytes — the // receiver's reconstruction uses the validity bitmap, not the data, so // the chosen wire type is irrelevant for purely-null columns and bytes @@ -295,7 +295,7 @@ func buildPassthroughBatchFromDataPoints(idps []*measurev1.InternalDataPoint) (* // frame.Encode (the format lacks array column types); the helper returns // a typed error so the caller can surface it as a hard failure rather // than silently mis-encoding. Adding native array column types to the -// frame format is the natural follow-up; until then, scans that materialise +// frame format is the natural follow-up; until then, scans that materialize // array-typed tags on the cluster wire are unsupported. func convertPassthroughForFrame(b *vectorized.RecordBatch) (*vectorized.RecordBatch, error) { if !hasPassthroughColumn(b.Schema) { @@ -356,10 +356,7 @@ func convertTagValueColumn(def vectorized.ColumnDef, col vectorized.Column, n in if !ok { return vectorized.ColumnDef{}, nil, fmt.Errorf("declared TagValue passthrough but column is %T", col) } - wireType, err := inferTagValueWireType(tc, n) - if err != nil { - return vectorized.ColumnDef{}, nil, err - } + wireType := inferTagValueWireType(tc, n) if wireType == vectorized.ColumnTypeTagValue { return def, col, nil } @@ -419,7 +416,7 @@ func convertTagValueColumn(def vectorized.ColumnDef, col vectorized.Column, n in // (frame v3 ColumnTypeTagValue) which preserves the oneof intact. The // receive side reconstructs them as TagValue passthrough so // serializeBatchToProto handles them via the pointer-return fast path. -func inferTagValueWireType(tc *vectorized.TypedColumn[*modelv1.TagValue], n int) (vectorized.ColumnType, error) { +func inferTagValueWireType(tc *vectorized.TypedColumn[*modelv1.TagValue], n int) vectorized.ColumnType { for i := range n { if tc.IsNull(i) { continue @@ -430,18 +427,18 @@ func inferTagValueWireType(tc *vectorized.TypedColumn[*modelv1.TagValue], n int) } switch v.GetValue().(type) { case *modelv1.TagValue_Str: - return vectorized.ColumnTypeString, nil + return vectorized.ColumnTypeString case *modelv1.TagValue_Int: - return vectorized.ColumnTypeInt64, nil + return vectorized.ColumnTypeInt64 case *modelv1.TagValue_BinaryData: - return vectorized.ColumnTypeBytes, nil + return vectorized.ColumnTypeBytes case *modelv1.TagValue_IntArray, *modelv1.TagValue_StrArray: - return vectorized.ColumnTypeTagValue, nil + return vectorized.ColumnTypeTagValue case *modelv1.TagValue_Null: continue } } - return vectorized.ColumnTypeTagValue, nil + return vectorized.ColumnTypeTagValue } // convertFieldValueColumn is the FieldValue counterpart of @@ -567,7 +564,7 @@ func appendActive(dst, src *vectorized.RecordBatch) { // scope for G9f.5), this round trip drops out. // // Empty input (no non-empty frames) returns an empty slice with no -// error — matches the row path's behaviour when every data node returned +// error — matches the row path's behavior when every data node returned // an empty distributed result. func ReduceFramesToInternalDataPoints( frames [][]byte, @@ -639,7 +636,7 @@ func DecodeFramesToInternalDataPoints(frames [][]byte) ([]*measurev1.InternalDat // end up in different sortField groups across the flat sequence. // // nil / empty frame bodies produce no slice in the output — the codec -// carve-out for empty bodies is honoured here too. +// carve-out for empty bodies is honored here too. func DecodeFramesPerSource(frames [][]byte) ([][]*measurev1.InternalDataPoint, error) { out := make([][]*measurev1.InternalDataPoint, 0, len(frames)) for i, body := range frames { diff --git a/pkg/query/vectorized/measure/raw_emit_bench_test.go b/pkg/query/vectorized/measure/raw_emit_bench_test.go index 2bc4bcfc9..c8084e2c9 100644 --- a/pkg/query/vectorized/measure/raw_emit_bench_test.go +++ b/pkg/query/vectorized/measure/raw_emit_bench_test.go @@ -30,12 +30,12 @@ import ( // distributed soak/bench. It simulates a fan-out of N data nodes each // running AggModeMap → frame.Encode, the cluster wire (just the byte // slice handoff), and the liaison's frame.Decode → AggModeReduce + -// (shard, group) dedup → serialise to InternalDataPoint. +// (shard, group) dedup → serialize to InternalDataPoint. // // Compared against BenchmarkG9f_SingleNodeAggModeAll on the same row // set, this measures the *full path overhead* of the throughout-vec -// distributed loop: per-shard Map fold + columnar serialise + bytes + -// columnar deserialise + Reduce + final proto materialise. The ratio +// distributed loop: per-shard Map fold + columnar serialize + bytes + +// columnar deserialise + Reduce + final proto materialize. The ratio // of the two numbers is the load-bearing efficiency claim of G9f. // // Real-cluster soak (gRPC + retry + network latency) is a follow-up @@ -73,7 +73,7 @@ func BenchmarkG9f_DistributedFanout(b *testing.B) { // BenchmarkG9f_SingleNodeAggModeAll is the AggModeAll oracle: full // dataset processed in one BatchAggregation pass with no wire crossing. // The number this produces is the lower bound — any distributed path -// adding wire / serialise / decode steps must stay close to it. +// adding wire / serialize / decode steps must stay close to it. func BenchmarkG9f_SingleNodeAggModeAll(b *testing.B) { rows := generateBenchRows(1000) schema := topologyRawSchema() @@ -108,7 +108,7 @@ func BenchmarkG9f_SingleNodeAggModeAll(b *testing.B) { // BenchmarkG9f_DataNodeEmit measures the per-data-node Map + Encode // step alone — the upstream cost of the distributed path. Useful when -// regressions appear in BenchmarkG9f_DistributedFanout to localise +// regressions appear in BenchmarkG9f_DistributedFanout to localize // whether they're on the encode side or the decode/reduce side. func BenchmarkG9f_DataNodeEmit(b *testing.B) { rows := generateBenchRows(1000) diff --git a/pkg/query/vectorized/measure/raw_emit_test.go b/pkg/query/vectorized/measure/raw_emit_test.go index 27fb30e92..c95886e0c 100644 --- a/pkg/query/vectorized/measure/raw_emit_test.go +++ b/pkg/query/vectorized/measure/raw_emit_test.go @@ -108,7 +108,7 @@ func TestDrainPipelineToFrame_AggModeMap(t *testing.T) { // counterpart of TestDrainPipelineToFrame_AggModeMap: it pipes a vec // pipeline's frame body through the liaison's vmeasure receive path // (frame.Decode → AggModeReduce + (shard,group) dedup → optional Top → -// serialise to InternalDataPoint) and asserts the final values match an +// serialize to InternalDataPoint) and asserts the final values match an // AggModeAll oracle. // // Replica duplication (the frame is appended twice for shard=1) verifies @@ -334,9 +334,9 @@ func TestConvertPassthroughForFrame_NoOpForNativeColumns(t *testing.T) { // distributed integration scenario where MEAN over an int field returned // 0 instead of the computed mean. The wire flow: // -// data-node: BatchAggregation(AggModeMap, MEAN) → frame.Encode → -// liaison: frame.Decode → BatchAggregation(AggModeReduce, MEAN) → -// serializeBatchToProto → InternalDataPoint +// data-node: BatchAggregation(AggModeMap, MEAN) → frame.Encode → +// liaison: frame.Decode → BatchAggregation(AggModeReduce, MEAN) → +// serializeBatchToProto → InternalDataPoint // // Sample: group "a" with values {1, 2, 3} → mean 2; group "b" with {4, // 5, 6} → mean 5. After the round trip the final InternalDataPoint must diff --git a/pkg/query/vectorized/measure/topology_matrix_test.go b/pkg/query/vectorized/measure/topology_matrix_test.go index 43c5e5345..0a86170fe 100644 --- a/pkg/query/vectorized/measure/topology_matrix_test.go +++ b/pkg/query/vectorized/measure/topology_matrix_test.go @@ -34,7 +34,7 @@ import ( // harness specified in the G9f spec (Principle 4): the row path / vec // AggModeAll defines the truth, and the distributed vec pipeline must // agree byte-for-byte (well, value-for-value) under any topology a -// production cluster can realise. +// production cluster can realize. // // Per cell: // - Split the source rows across S shards (rows i → shard i%S). @@ -49,7 +49,7 @@ import ( // times. With dedup, the answer must be identical regardless of R. // // Aggs covered: SUM, COUNT, MIN, MAX, MEAN. Each runs as its own subtest; -// MEAN exercises the count sidecar + Reduce.Val finalisation. +// MEAN exercises the count sidecar + Reduce.Val finalization. func TestTopologyMatrix(t *testing.T) { rows := []topologyRow{ {group: "a", value: 10}, @@ -246,7 +246,7 @@ func flattenReduce(batches []*vectorized.RecordBatch) map[string]string { out := make(map[string]string) for _, b := range batches { // First RoleTag column is "g"; first RoleField column is "out". - var gIdx, outIdx = -1, -1 + gIdx, outIdx := -1, -1 for i, def := range b.Schema.Columns { if def.Role == vectorized.RoleTag && gIdx < 0 { gIdx = i @@ -309,12 +309,18 @@ func numericallyEqual(a, b string) bool { // ApplyTopToReduce's bind-by-name + heap consistency. func TestTopologyMatrix_WithTop(t *testing.T) { rows := []topologyRow{ - {group: "a", value: 10}, {group: "b", value: 20}, - {group: "c", value: 30}, {group: "d", value: 40}, - {group: "e", value: 50}, {group: "f", value: 60}, - {group: "a", value: 100}, {group: "b", value: 100}, - {group: "c", value: 100}, {group: "d", value: 100}, - {group: "e", value: 100}, {group: "f", value: 100}, + {group: "a", value: 10}, + {group: "b", value: 20}, + {group: "c", value: 30}, + {group: "d", value: 40}, + {group: "e", value: 50}, + {group: "f", value: 60}, + {group: "a", value: 100}, + {group: "b", value: 100}, + {group: "c", value: 100}, + {group: "d", value: 100}, + {group: "e", value: 100}, + {group: "f", value: 100}, } cases := []struct { name string diff --git a/test/integration/distributed/querybench/config.go b/test/integration/distributed/querybench/config.go index dfd58d011..edf8af924 100644 --- a/test/integration/distributed/querybench/config.go +++ b/test/integration/distributed/querybench/config.go @@ -15,6 +15,8 @@ // specific language governing permissions and limitations // under the License. +// Package querybench is the docker-gated distributed-query benchmark harness +// that drives row vs vec mode comparisons across scenarios and cardinalities. package querybench import ( @@ -97,10 +99,10 @@ type Config struct { // LoadConfig reads benchmark settings from environment variables. func LoadConfig() Config { return Config{ - RunBench: getBool(envRunBench, false), - InContainer: getBool(envInContainer, false), - Profile: getBool(envProfile, false), - Merge: getBool(envMerge, false), + RunBench: getBool(envRunBench), + InContainer: getBool(envInContainer), + Profile: getBool(envProfile), + Merge: getBool(envMerge), ReportDir: getString(envReportDir, defaultReportDir), DockerImage: getString(envDockerImage, ""), CPULimit: getString(envCPULimit, ""), @@ -184,16 +186,16 @@ func getString(key, def string) string { return def } -func getBool(key string, def bool) bool { - if value := os.Getenv(key); value != "" { - switch strings.ToLower(strings.TrimSpace(value)) { - case "1", "true", "yes", "on": - return true - case "0", "false", "no", "off": - return false - } +func getBool(key string) bool { + value := os.Getenv(key) + if value == "" { + return false } - return def + switch strings.ToLower(strings.TrimSpace(value)) { + case "1", "true", "yes", "on": + return true + } + return false } func getInt(key string, def int) int { diff --git a/test/integration/distributed/querybench/report.go b/test/integration/distributed/querybench/report.go index 9b57d89ec..db54dc246 100644 --- a/test/integration/distributed/querybench/report.go +++ b/test/integration/distributed/querybench/report.go @@ -33,8 +33,8 @@ import ( type Report struct { GeneratedAt time.Time `json:"generated_at"` Environment Environment `json:"environment"` - Config ConfigView `json:"config"` Results []Result `json:"results"` + Config ConfigView `json:"config"` } // ConfigView is a JSON-friendly copy of Config. @@ -54,68 +54,63 @@ type Environment struct { GoVersion string `json:"go_version"` GOOS string `json:"goos"` GOARCH string `json:"goarch"` - NumCPU int `json:"num_cpu"` DockerImage string `json:"docker_image,omitempty"` CPULimit string `json:"cpu_limit,omitempty"` MemoryLimit string `json:"memory_limit,omitempty"` Cgroup string `json:"cgroup,omitempty"` ContainerID string `json:"container_id,omitempty"` ResourceNote string `json:"resource_note,omitempty"` + NumCPU int `json:"num_cpu"` } // Result records one mode/scenario/cardinality benchmark outcome. type Result struct { - Mode string `json:"mode"` - Scenario Scenario `json:"scenario"` - Cardinality int `json:"cardinality"` - Entities int `json:"entities"` - PointsEach int `json:"points_each"` - ResponseRows int `json:"response_rows"` - Correctness string `json:"correctness"` - QueryIterations int `json:"query_iterations"` - QueryWorkers int `json:"query_workers"` - Latency LatencyStats `json:"latency"` - QPS float64 `json:"qps"` - Resources ResourceStats `json:"resources"` - Allocations AllocationStats `json:"allocations"` - Profiles map[string]string `json:"profiles,omitempty"` - Error string `json:"error,omitempty"` - ApproxResultHash uint64 `json:"approx_result_hash,omitempty"` - // SampleDataPointText is the prototext of the first DataPoint of the - // first response. Captured so the merge pass can dump row vs vec - // shapes side by side when the correctness gate fires on a hash - // mismatch — the row counts can match while the proto byte layout - // diverges (TagFamily order, oneof variant choice, etc.). - SampleDataPointText string `json:"sample_data_point_text,omitempty"` + Profiles map[string]string `json:"profiles,omitempty"` + Scenario Scenario `json:"scenario"` + SampleDataPointText string `json:"sample_data_point_text,omitempty"` + Error string `json:"error,omitempty"` + Mode string `json:"mode"` + Correctness string `json:"correctness"` + Allocations AllocationStats `json:"allocations"` + Resources ResourceStats `json:"resources"` + Latency LatencyStats `json:"latency"` + PointsEach int `json:"points_each"` + QPS float64 `json:"qps"` + QueryWorkers int `json:"query_workers"` + QueryIterations int `json:"query_iterations"` + ResponseRows int `json:"response_rows"` + Entities int `json:"entities"` + ApproxResultHash uint64 `json:"approx_result_hash,omitempty"` + Cardinality int `json:"cardinality"` } // LatencyStats contains latency percentiles in milliseconds. type LatencyStats struct { - P50Ms float64 `json:"p50_ms"` - P90Ms float64 `json:"p90_ms"` - P95Ms float64 `json:"p95_ms"` - P99Ms float64 `json:"p99_ms"` - MaxMs float64 `json:"max_ms"` + P50Ms float64 `json:"p50_ms"` + P90Ms float64 `json:"p90_ms"` + P95Ms float64 `json:"p95_ms"` + P99Ms float64 `json:"p99_ms"` + MaxMs float64 `json:"max_ms"` MeanMs float64 `json:"mean_ms"` } // ResourceStats records process-level resource deltas for the in-process cluster harness. type ResourceStats struct { + MetricSource string `json:"metric_source"` CPUSecondsDelta float64 `json:"cpu_seconds_delta,omitempty"` RSSBytes uint64 `json:"rss_bytes,omitempty"` HeapAllocBytes uint64 `json:"heap_alloc_bytes,omitempty"` HeapSysBytes uint64 `json:"heap_sys_bytes,omitempty"` NumGC uint32 `json:"num_gc,omitempty"` - MetricSource string `json:"metric_source"` } // AllocationStats records allocation counters for the timed read phase. type AllocationStats struct { - MallocsDelta uint64 `json:"mallocs_delta"` - TotalAllocDelta uint64 `json:"total_alloc_delta"` - MallocsPerQuery float64 `json:"mallocs_per_query"` + MetricSource string `json:"metric_source"` + MallocsDelta uint64 `json:"mallocs_delta"` + TotalAllocDelta uint64 `json:"total_alloc_delta"` + MallocsPerQuery float64 `json:"mallocs_per_query"` AllocBytesPerQuery float64 `json:"alloc_bytes_per_query"` - MetricSource string `json:"metric_source"` } // newReportFromShards builds the unified Report from the shard set the @@ -178,7 +173,7 @@ func newReportFromShards(cfg Config, results []Result) Report { } } -// writeShard serialises a single single-shot Result to ReportDir/shards/. +// writeShard serializes a single single-shot Result to ReportDir/shards/. // The filename encodes mode_scenario_cardinality so merge can pair vec // shards with their row counterparts without parsing the JSON. func writeShard(result Result, reportDir string) (string, error) { @@ -192,7 +187,7 @@ func writeShard(result Result, reportDir string) (string, error) { if marshalErr != nil { return "", fmt.Errorf("marshal shard: %w", marshalErr) } - if writeErr := os.WriteFile(shardPath, append(body, '\n'), 0o644); writeErr != nil { + if writeErr := os.WriteFile(shardPath, append(body, '\n'), 0o600); writeErr != nil { return "", fmt.Errorf("write shard: %w", writeErr) } return shardPath, nil @@ -235,11 +230,11 @@ func summarizeLatencies(latencies []time.Duration, elapsed time.Duration) (Laten total += latency } stats := LatencyStats{ - P50Ms: percentile(sorted, 0.50), - P90Ms: percentile(sorted, 0.90), - P95Ms: percentile(sorted, 0.95), - P99Ms: percentile(sorted, 0.99), - MaxMs: float64(sorted[len(sorted)-1].Microseconds()) / 1000, + P50Ms: percentile(sorted, 0.50), + P90Ms: percentile(sorted, 0.90), + P95Ms: percentile(sorted, 0.95), + P99Ms: percentile(sorted, 0.99), + MaxMs: float64(sorted[len(sorted)-1].Microseconds()) / 1000, MeanMs: float64(total.Microseconds()) / 1000 / float64(len(sorted)), } qps := 0.0 @@ -272,11 +267,11 @@ func writeReport(report Report, dir string) (string, string, error) { if marshalErr != nil { return "", "", marshalErr } - if writeErr := os.WriteFile(jsonPath, append(jsonBody, '\n'), 0o644); writeErr != nil { + if writeErr := os.WriteFile(jsonPath, append(jsonBody, '\n'), 0o600); writeErr != nil { return "", "", writeErr } mdPath := filepath.Join(dir, "distributed-querybench.md") - if writeErr := os.WriteFile(mdPath, []byte(renderMarkdown(report)), 0o644); writeErr != nil { + if writeErr := os.WriteFile(mdPath, []byte(renderMarkdown(report)), 0o600); writeErr != nil { return "", "", writeErr } return jsonPath, mdPath, nil diff --git a/test/integration/distributed/querybench/report_test.go b/test/integration/distributed/querybench/report_test.go index 6b896f14c..48aa6271a 100644 --- a/test/integration/distributed/querybench/report_test.go +++ b/test/integration/distributed/querybench/report_test.go @@ -39,8 +39,16 @@ func TestWriteReportFromShards(t *testing.T) { dir := t.TempDir() cfg := Config{ReportDir: dir, QueryWorkers: 1, QueryIterations: 5, WarmupIterations: 0, Writers: 1} results := []Result{ - {Mode: modeRow, Scenario: ScenarioScanAll, Cardinality: 1024, ResponseRows: 1024, Latency: LatencyStats{P50Ms: 1}, Allocations: AllocationStats{MallocsPerQuery: 2}, QueryIterations: 5, QueryWorkers: 1}, - {Mode: modeVec, Scenario: ScenarioScanAll, Cardinality: 1024, ResponseRows: 1024, Latency: LatencyStats{P50Ms: 2}, Allocations: AllocationStats{MallocsPerQuery: 4}, QueryIterations: 5, QueryWorkers: 1, Correctness: "matched"}, + { + Mode: modeRow, Scenario: ScenarioScanAll, Cardinality: 1024, ResponseRows: 1024, + Latency: LatencyStats{P50Ms: 1}, Allocations: AllocationStats{MallocsPerQuery: 2}, + QueryIterations: 5, QueryWorkers: 1, + }, + { + Mode: modeVec, Scenario: ScenarioScanAll, Cardinality: 1024, ResponseRows: 1024, + Latency: LatencyStats{P50Ms: 2}, Allocations: AllocationStats{MallocsPerQuery: 4}, + QueryIterations: 5, QueryWorkers: 1, Correctness: "matched", + }, } report := newReportFromShards(cfg, results) if len(report.Config.Cardinalities) != 1 || report.Config.Cardinalities[0] != 1024 { diff --git a/test/integration/distributed/querybench/workload.go b/test/integration/distributed/querybench/workload.go index a01e73523..18a95223c 100644 --- a/test/integration/distributed/querybench/workload.go +++ b/test/integration/distributed/querybench/workload.go @@ -57,11 +57,11 @@ type writeSummary struct { } type queryRunSummary struct { - Latencies []time.Duration - SampleDPText string - Rows int - Hash uint64 - Elapsed time.Duration + SampleDPText string + Latencies []time.Duration + Rows int + Hash uint64 + Elapsed time.Duration } func writeBenchmarkData(ctx context.Context, conn *grpc.ClientConn, cfg Config, cardinality int, base time.Time) (writeSummary, error) { @@ -146,10 +146,7 @@ func writeEntityRange(ctx context.Context, client measurev1.MeasureServiceClient if closeErr := stream.CloseSend(); closeErr != nil { return fmt.Errorf("close measure write stream: %w", closeErr) } - if recvErr := <-recvErrCh; recvErr != nil { - return recvErr - } - return nil + return <-recvErrCh } func benchmarkDataPointSpec() *measurev1.DataPointSpec { diff --git a/test/integration/distributed/querybench/workload_test.go b/test/integration/distributed/querybench/workload_test.go index 47763576d..f3827e8e6 100644 --- a/test/integration/distributed/querybench/workload_test.go +++ b/test/integration/distributed/querybench/workload_test.go @@ -49,7 +49,11 @@ func TestBuildScenarioQueryTopWithFilterMatchesFixtureShape(t *testing.T) { if condition.GetName() != benchTagID || condition.GetOp() != modelv1.Condition_BINARY_OP_NE || condition.GetValue().GetStr().GetValue() != "svc3" { t.Fatalf("top-with-filter condition does not match fixture: %+v", condition) } - if req.GetAgg().GetFunction() != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN || req.GetTop().GetNumber() != 2 || req.GetTop().GetFieldValueSort() != modelv1.Sort_SORT_DESC { + agg := req.GetAgg() + top := req.GetTop() + if agg.GetFunction() != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN || + top.GetNumber() != 2 || + top.GetFieldValueSort() != modelv1.Sort_SORT_DESC { t.Fatalf("top-with-filter agg/top does not match fixture: %+v", req) } }
