This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch vectorized-query
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b7b15697d2f9d46105d7c55366f8389396d802f0
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 20 15:02:43 2026 +0000

    chore(lint): clear local CI gate — refactor dquery.Rev + sweep pre-existing 
pkg/test debt
    
    Splits `(*measureQueryProcessor).Rev` into three helpers 
(loadMeasureSchemas,
    analyzeDistributedPlan, buildNodeSelectors) so its cyclomatic complexity 
drops
    under the 30-line gate. Sweeps the remaining linter findings on the branch:
    British→US spelling, govet fieldalignment, errorlint %w wrapping, exhaustive
    switch nolint with documented carve-outs, gocyclo nolint on the 
switch-dispatch
    encode/decode functions, contextcheck nolint on pure in-memory reducers, lll
    reflows on long signatures and test calls, gocritic ifElseChain→switch,
    goconst extraction of the "default" group name, gocritic appendAssign nolint
    for the intentional 3-index slice copy, gosec G306 0o644→0o600 on bench
    report writes, and a missing package comment on `querybench`. `make lint`
    now returns zero issues across api/, banyand/, pkg/, test/ and ui/.
    
    via [HAPI](https://hapi.run)
    
    Co-Authored-By: HAPI <[email protected]>
---
 banyand/dquery/measure.go                          | 166 +++++++++++----------
 banyand/dquery/measure_test.go                     |   4 +-
 banyand/measure/svc_liaison.go                     |   2 +-
 banyand/query/processor.go                         |   6 +-
 pkg/query/vectorized/column_pool.go                |   2 +-
 pkg/query/vectorized/measure/adapter.go            |   6 +-
 pkg/query/vectorized/measure/aggregation.go        |  78 +++-------
 pkg/query/vectorized/measure/aggregation_reduce.go |   4 +-
 pkg/query/vectorized/measure/aggregation_test.go   |   3 +-
 pkg/query/vectorized/measure/config.go             |   2 +-
 pkg/query/vectorized/measure/frame/decode.go       |   9 +-
 pkg/query/vectorized/measure/frame/decode_test.go  |  31 ++--
 pkg/query/vectorized/measure/frame/encode.go       |   5 +-
 pkg/query/vectorized/measure/frame/frame.go        |   8 +-
 pkg/query/vectorized/measure/frame/frame_test.go   |  44 +++---
 pkg/query/vectorized/measure/integration.go        |   2 +-
 pkg/query/vectorized/measure/plan/dispatch.go      |  34 ++---
 pkg/query/vectorized/measure/plan/dispatch_test.go |  40 ++---
 pkg/query/vectorized/measure/plan/distributed.go   |  63 +++++---
 .../vectorized/measure/plan/distributed_rows.go    |  80 ++++++----
 .../measure/plan/distributed_rows_test.go          |  13 +-
 .../vectorized/measure/plan/distributed_test.go    |  68 +++++----
 pkg/query/vectorized/measure/plan/hidden_tags.go   |   4 +-
 .../vectorized/measure/plan/multi_group_schema.go  |   4 +-
 pkg/query/vectorized/measure/plan/orderby.go       |   1 -
 pkg/query/vectorized/measure/raw_emit.go           |  41 +++--
 .../vectorized/measure/raw_emit_bench_test.go      |  10 +-
 pkg/query/vectorized/measure/raw_emit_test.go      |   8 +-
 .../vectorized/measure/topology_matrix_test.go     |  24 +--
 test/integration/distributed/querybench/config.go  |  28 ++--
 test/integration/distributed/querybench/report.go  |  81 +++++-----
 .../distributed/querybench/report_test.go          |  12 +-
 .../integration/distributed/querybench/workload.go |  15 +-
 .../distributed/querybench/workload_test.go        |   6 +-
 34 files changed, 472 insertions(+), 432 deletions(-)

diff --git a/banyand/dquery/measure.go b/banyand/dquery/measure.go
index 2cf578c9f..260626ea2 100644
--- a/banyand/dquery/measure.go
+++ b/banyand/dquery/measure.go
@@ -68,49 +68,11 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
                e.RawJSON("req", logger.Proto(queryCriteria)).Msg("received a 
query event")
        }
 
-       var schemas []logical.Schema
-       var measureSchemas []*databasev1.Measure
-       var measureIndexRules [][]*databasev1.IndexRule
-       var vecCfg vmeasure.VectorizedConfig
-       for groupIdx, g := range queryCriteria.Groups {
-               meta := &commonv1.Metadata{
-                       Name:  queryCriteria.Name,
-                       Group: g,
-               }
-               ec, measureErr := p.measureService.Measure(meta)
-               if measureErr != nil {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to get execution context for measure %s: %v", 
meta.GetName(), measureErr))
-                       return
-               }
-               // nolint:staticcheck // SA1019 - row schema is still used for 
the flag-off rollback path.
-               s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
-               if schemaErr != nil {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("fail to build schema for measure %s: %v", meta.GetName(), 
schemaErr))
-                       return
-               }
-               schemas = append(schemas, s)
-               measureSchemas = append(measureSchemas, ec.GetSchema())
-               measureIndexRules = append(measureIndexRules, 
ec.GetIndexRules())
-               if vecEC, ok := ec.(measureVectorizedExecutionContext); ok {
-                       if groupIdx == 0 {
-                               vecCfg = vecEC.VectorizedConfig()
-                       }
-               } else if data.MeasureWireModeRaw() {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("measure %s is not vectorized-capable under raw wire mode", 
meta.GetName()))
-                       return
-               }
+       schemas, measureSchemas, measureIndexRules, vecCfg, loadErr := 
p.loadMeasureSchemas(queryCriteria)
+       if loadErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%s", 
loadErr.Error()))
+               return
        }
-
-       var plan measureDistributedExecutable
-       var err error
-       // Routing contract (Phase 6 — see 
docs/operation/troubleshooting/measure-vec-flag-off-rollback.md):
-       //   - Raw wire mode (data.MeasureWireModeRaw() == true) -> vec 
distributed plan.
-       //     vecplan.AnalyzeDistributed handles all request shapes: non-agg, 
Agg,
-       //     OrderBy-by-index-rule, multi-group, Top, GroupBy, and 
combinations.
-       //   - Flag-off (data.MeasureWireModeRaw() == false) -> row-path 
DistributedAnalyze.
-       //     This is the kill-switch / rollback path; latency is higher and 
multi-group
-       //     native merge is unavailable. Use only during incident response.
-       useVecDistributed := data.MeasureWireModeRaw()
        // Operator-configured broadcast timeout overrides each plan's 
historical
        // 15 s default. Setting it on vecCfg.BroadcastTimeout flows through
        // vecplan.AnalyzeDistributed into DistributedPlan.broadcastTimeout(); 
the
@@ -119,25 +81,7 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
        if p.broadcastTimeout > 0 {
                vecCfg.BroadcastTimeout = p.broadcastTimeout
        }
-       if useVecDistributed {
-               if len(measureSchemas) == 0 {
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("vec distributed plan requires at least one measure schema"))
-                       return
-               }
-               plan, err = vecplan.AnalyzeDistributed(queryCriteria, 
measureSchemas, measureIndexRules, vecCfg)
-       } else {
-               // nolint:staticcheck // SA1019 - row distributed plan is the 
flag-off rollback path only.
-               rowPlan, analyzeErr := 
logical_measure.DistributedAnalyze(queryCriteria, schemas, p.broadcastTimeout)
-               if analyzeErr != nil {
-                       err = analyzeErr
-               } else {
-                       var ok bool
-                       plan, ok = rowPlan.(measureDistributedExecutable)
-                       if !ok {
-                               err = fmt.Errorf("distributed measure plan %T 
is not executable", rowPlan)
-                       }
-               }
-       }
+       plan, err := p.analyzeDistributedPlan(queryCriteria, schemas, 
measureSchemas, measureIndexRules, vecCfg)
        if err != nil {
                resp = bus.NewMessage(bus.MessageID(now), common.NewError("fail 
to analyze the query request for measure %s: %v", queryCriteria.Name, err))
                return
@@ -146,21 +90,10 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
        if e := ml.Debug(); e.Enabled() {
                e.Str("plan", plan.String()).Msg("query plan")
        }
-       nodeSelectors := make(map[string][]string)
-       for _, g := range queryCriteria.Groups {
-               if gs, ok := p.measureService.LoadGroup(g); ok {
-                       if ns, exist := 
p.parseNodeSelector(queryCriteria.Stages, gs.GetSchema().ResourceOpts); exist {
-                               nodeSelectors[g] = ns
-                       } else if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
-                               ml.Error().Strs("req_stages", 
queryCriteria.Stages).Strs("default_stages", 
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
-                               resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("no stage found in request or default stages in resource opts"))
-                               return
-                       }
-               } else {
-                       ml.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
-                       resp = bus.NewMessage(bus.MessageID(now), 
common.NewError("group %s not found", g))
-                       return
-               }
+       nodeSelectors, selErr := p.buildNodeSelectors(queryCriteria, ml)
+       if selErr != nil {
+               resp = bus.NewMessage(bus.MessageID(now), common.NewError("%s", 
selErr.Error()))
+               return
        }
        if len(queryCriteria.Stages) > 0 && len(nodeSelectors) == 0 {
                ml.Error().RawJSON("req", logger.Proto(queryCriteria)).Msg("no 
stage found")
@@ -240,3 +173,84 @@ func (p *measureQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (r
        }
        return
 }
+
+func (p *measureQueryProcessor) loadMeasureSchemas(queryCriteria 
*measurev1.QueryRequest) (
+       []logical.Schema, []*databasev1.Measure, [][]*databasev1.IndexRule, 
vmeasure.VectorizedConfig, error,
+) {
+       var schemas []logical.Schema
+       var measureSchemas []*databasev1.Measure
+       var measureIndexRules [][]*databasev1.IndexRule
+       var vecCfg vmeasure.VectorizedConfig
+       for groupIdx, g := range queryCriteria.Groups {
+               meta := &commonv1.Metadata{Name: queryCriteria.Name, Group: g}
+               ec, measureErr := p.measureService.Measure(meta)
+               if measureErr != nil {
+                       return nil, nil, nil, vecCfg, fmt.Errorf("fail to get 
execution context for measure %s: %w", meta.GetName(), measureErr)
+               }
+               // nolint:staticcheck // SA1019 - row schema is still used for 
the flag-off rollback path.
+               s, schemaErr := logical_measure.BuildSchema(ec.GetSchema(), 
ec.GetIndexRules())
+               if schemaErr != nil {
+                       return nil, nil, nil, vecCfg, fmt.Errorf("fail to build 
schema for measure %s: %w", meta.GetName(), schemaErr)
+               }
+               schemas = append(schemas, s)
+               measureSchemas = append(measureSchemas, ec.GetSchema())
+               measureIndexRules = append(measureIndexRules, 
ec.GetIndexRules())
+               if vecEC, ok := ec.(measureVectorizedExecutionContext); ok {
+                       if groupIdx == 0 {
+                               vecCfg = vecEC.VectorizedConfig()
+                       }
+               } else if data.MeasureWireModeRaw() {
+                       return nil, nil, nil, vecCfg, fmt.Errorf("measure %s is 
not vectorized-capable under raw wire mode", meta.GetName())
+               }
+       }
+       return schemas, measureSchemas, measureIndexRules, vecCfg, nil
+}
+
+// analyzeDistributedPlan picks the vec plan when raw wire mode is on, 
otherwise
+// falls back to the row distributed plan (kill-switch / rollback rail).
+// See docs/operation/troubleshooting/measure-vec-flag-off-rollback.md.
+func (p *measureQueryProcessor) analyzeDistributedPlan(
+       queryCriteria *measurev1.QueryRequest,
+       schemas []logical.Schema,
+       measureSchemas []*databasev1.Measure,
+       measureIndexRules [][]*databasev1.IndexRule,
+       vecCfg vmeasure.VectorizedConfig,
+) (measureDistributedExecutable, error) {
+       if data.MeasureWireModeRaw() {
+               if len(measureSchemas) == 0 {
+                       return nil, fmt.Errorf("vec distributed plan requires 
at least one measure schema")
+               }
+               return vecplan.AnalyzeDistributed(queryCriteria, 
measureSchemas, measureIndexRules, vecCfg)
+       }
+       // nolint:staticcheck // SA1019 - row distributed plan is the flag-off 
rollback path only.
+       rowPlan, analyzeErr := 
logical_measure.DistributedAnalyze(queryCriteria, schemas, p.broadcastTimeout)
+       if analyzeErr != nil {
+               return nil, analyzeErr
+       }
+       plan, ok := rowPlan.(measureDistributedExecutable)
+       if !ok {
+               return nil, fmt.Errorf("distributed measure plan %T is not 
executable", rowPlan)
+       }
+       return plan, nil
+}
+
+func (p *measureQueryProcessor) buildNodeSelectors(queryCriteria 
*measurev1.QueryRequest, ml *logger.Logger) (map[string][]string, error) {
+       nodeSelectors := make(map[string][]string)
+       for _, g := range queryCriteria.Groups {
+               gs, ok := p.measureService.LoadGroup(g)
+               if !ok {
+                       ml.Error().RawJSON("req", 
logger.Proto(queryCriteria)).Msg("group not found")
+                       return nil, fmt.Errorf("group %s not found", g)
+               }
+               ns, exist := p.parseNodeSelector(queryCriteria.Stages, 
gs.GetSchema().ResourceOpts)
+               if exist {
+                       nodeSelectors[g] = ns
+                       continue
+               }
+               if len(gs.GetSchema().ResourceOpts.Stages) > 0 {
+                       ml.Error().Strs("req_stages", 
queryCriteria.Stages).Strs("default_stages", 
gs.GetSchema().GetResourceOpts().GetDefaultStages()).Msg("no stage found")
+                       return nil, fmt.Errorf("no stage found in request or 
default stages in resource opts")
+               }
+       }
+       return nodeSelectors, nil
+}
diff --git a/banyand/dquery/measure_test.go b/banyand/dquery/measure_test.go
index 724a2f740..6683c839e 100644
--- a/banyand/dquery/measure_test.go
+++ b/banyand/dquery/measure_test.go
@@ -69,8 +69,8 @@ func TestRawWireMode_PlanType_AnalyzeDistributed(t 
*testing.T) {
        }
 
        shapes := []struct {
-               name string
                req  *measurev1.QueryRequest
+               name string
        }{
                {
                        name: "plain non-agg",
@@ -140,8 +140,8 @@ func TestRawWireMode_PlanType_AnalyzeDistributed(t 
*testing.T) {
 // After Phase 6 the routing in Rev is a single boolean: 
data.MeasureWireModeRaw().
 func TestRawWireMode_AlwaysUsesVecPlan(t *testing.T) {
        shapes := []struct {
-               name string
                req  *measurev1.QueryRequest
+               name string
        }{
                {
                        name: "plain non-agg",
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 5022ea701..d34752ed9 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -149,7 +149,7 @@ func (s *liaison) PreRun(ctx context.Context) error {
        // Publish the per-process wire mode for TopicInternalMeasureQuery.
        // On the liaison this controls the DECODE side: an incoming response
        // body on TopicInternalMeasureQuery is parsed as raw frame bytes
-       // under flag-on (G9f.5.c's distributedPlan recognises the []byte
+       // under flag-on (G9f.5.c's distributedPlan recognizes the []byte
        // shape and runs ReduceFramesToInternalDataPoints /
        // DecodeFramesToInternalDataPoints) or proto under flag-off. The
        // per-process modes MUST match across the cluster — the runbook
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 8d19998f3..66cfeed8f 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -540,14 +540,14 @@ func (p *measureInternalQueryProcessor) Rev(ctx 
context.Context, message bus.Mes
        // Iterators encapsulate their own drain + encode via FrameEmitter:
        //
        //   - vectorizedMIterator drains the vec Pipeline directly
-       //     (throughput-optimal: no proto materialisation, columnar
+       //     (throughput-optimal: no proto materialization, columnar
        //     end-to-end).
        //   - emptyMIterator emits a nil body (codec empty-result carve-out).
        //   - hiddenTagsMIterator drains via Next/Current (its strip stays
-       //     the source of truth) and reverse-serialises.
+       //     the source of truth) and reverse-serializes.
        //   - sortedMIterator drains via Next/Current (its cross-group sort
        //     + version dedup stay the source of truth) and reverse-
-       //     serialises.
+       //     serializes.
        //
        // queryCriteria.Trace=true still hard-errors: the columnar frame
        // has no trace-bytes slot, so trace-enabled distributed queries are
diff --git a/pkg/query/vectorized/column_pool.go 
b/pkg/query/vectorized/column_pool.go
index a51e326e2..e234eb1fe 100644
--- a/pkg/query/vectorized/column_pool.go
+++ b/pkg/query/vectorized/column_pool.go
@@ -42,7 +42,7 @@ var (
 // AcquireColumn with a matching ReleaseColumn once they are done with the
 // column.
 //
-// Capacity behaviour: pooled columns retain whichever capacity they had at
+// Capacity behavior: pooled columns retain whichever capacity they had at
 // the last Release. If a pooled column is too small for the new request the
 // backing slice is reallocated; otherwise the existing backing slice is
 // reused. Over time the pool converges to the largest capacity seen.
diff --git a/pkg/query/vectorized/measure/adapter.go 
b/pkg/query/vectorized/measure/adapter.go
index ba6bd8011..95ffe7bd8 100644
--- a/pkg/query/vectorized/measure/adapter.go
+++ b/pkg/query/vectorized/measure/adapter.go
@@ -38,7 +38,7 @@ import (
 // IMPORTANT: a caller that drains the Pipeline directly MUST NOT also
 // drive Next() on the MIterator afterwards — the pipeline is now empty
 // and the iterator's internal cursor is stale. The data-node processor
-// honours this by branching on RawFrameSource BEFORE the iterator's
+// honors this by branching on RawFrameSource BEFORE the iterator's
 // proto-collection loop.
 type RawFrameSource interface {
        // Pipeline returns the vec pipeline the iterator wraps. Drain it via
@@ -85,8 +85,8 @@ func (i *vectorizedMIterator) Pipeline() *vectorized.Pipeline 
{ return i.pipelin
 
 // EmitFrame implements FrameEmitter: drain the underlying vec pipeline
 // directly via DrainPipelineToFrame. This is the throughput-optimal
-// wire-emit path — no proto materialisation, columnar end-to-end. The
-// data-node Rev prefers this over the row-side reverse-serialise that
+// wire-emit path — no proto materialization, columnar end-to-end. The
+// data-node Rev prefers this over the row-side reverse-serialize that
 // wrapper iterators fall back on.
 func (i *vectorizedMIterator) EmitFrame(ctx context.Context) ([]byte, error) {
        return DrainPipelineToFrame(ctx, i.pipeline, i.schema)
diff --git a/pkg/query/vectorized/measure/aggregation.go 
b/pkg/query/vectorized/measure/aggregation.go
index 3b701e205..e625ffef8 100644
--- a/pkg/query/vectorized/measure/aggregation.go
+++ b/pkg/query/vectorized/measure/aggregation.go
@@ -41,7 +41,7 @@ import (
 //     path's incidental "first-idp" rule, measure_plan_aggregation.go:285);
 //     scalar reduce (len(keyIndices)==0) emits shard_id=0 (matching the row
 //     path's aggAllIterator.Current() at :364). The partial batch is then
-//     serialised by pkg/query/vectorized/measure/frame.Encode for cluster
+//     serialized by pkg/query/vectorized/measure/frame.Encode for cluster
 //     transport.
 //   - AggModeReduce — coordinator's Reduce phase (G9f.3). Consumes the
 //     typed-column partial batches emitted by AggModeMap (one row per
@@ -50,7 +50,7 @@ import (
 //     the row path's deduplicateAggregatedDataPointsWithShard — and combines
 //     them through aggregation.Reduce[N] into a final batch shaped like
 //     AggModeAll (tags + final value column, no shard column, no count
-//     sidecar). The aggregation.Reduce[N].Val() handles MEAN finalisation
+//     sidecar). The aggregation.Reduce[N].Val() handles MEAN finalization
 //     (sum÷count) so the emit path stays type-symmetric with AggModeAll.
 type AggMode int
 
@@ -117,58 +117,28 @@ type AggSpec struct {
 // Output rows are emitted one per group, in group-insertion order,
 // paginated by batchSize.
 type BatchAggregation struct {
-       inputSchema  *vectorized.BatchSchema
-       outputSchema *vectorized.BatchSchema
-       pool         *vectorized.BatchPool
-       tracker      *vectorized.MemoryTracker
-       groups       map[string]*aggGroup
-       insertion    []*aggGroup
-       keyIndices   []int
-       tagIndices   []int
-       aggs         []AggSpec
-       // aggOutOffsets[i] is the output-batch column index of the i-th agg's
-       // VALUE column. In AggModeMap, when aggHasCount[i] is true (i.e. 
AggMean),
-       // the agg's count sidecar lives at aggOutOffsets[i]+1. Cached on the
-       // operator so emitGroupRow does no per-row schema walking.
-       aggOutOffsets []int
-       aggHasCount   []bool
-       // outputShardIdx is the output-batch column index for the leading
-       // RoleShardID column. AggModeAll: -1 (no shard column emitted, matching
-       // the existing single-node contract). AggModeMap: 0 (the partial batch
-       // always carries shard id as its first column). AggModeReduce: -1 (the
-       // final reduce batch matches AggModeAll's shape — no shard column).
-       outputShardIdx int
-       // tagOutOffset is where the tag columns start in the output batch
-       // (0 in AggModeAll / AggModeReduce; 1 in AggModeMap because the shard
-       // column comes first).
-       tagOutOffset int
-       // shardIDIdx is the input-batch column index of the RoleShardID column;
-       // -1 if input has none. AggModeMap consults this in newGroup to capture
-       // the first-fed-idp shard per group (G9f.2.a). AggModeReduce uses it to
-       // key the (shard, group) replica-dedup map. Unit-test fixtures that
-       // pre-date the storage bridge may lack one — Map mode then emits the
-       // per-group shard as zero (consistent with scalar-reduce), and Reduce
-       // dedups on group key alone (a missing shard column means there is no
-       // way to distinguish replicas, so dedup falls back to one entry per 
key).
-       shardIDIdx int
-       // aggInputCountIdx[i] is the input-batch column index of the i-th agg's
-       // PartialCount sidecar (for MEAN — see meanCountSuffix), or -1 if the
-       // agg has no count sidecar (non-MEAN, or any agg in AggModeAll). Only
-       // populated for AggModeReduce; AggModeAll/AggModeMap ignore the slot.
+       dedupSeen        map[string]struct{}
+       outputSchema     *vectorized.BatchSchema
+       pool             *vectorized.BatchPool
+       tracker          *vectorized.MemoryTracker
+       groups           map[string]*aggGroup
+       inputSchema      *vectorized.BatchSchema
+       insertion        []*aggGroup
+       tagIndices       []int
+       aggs             []AggSpec
+       aggOutOffsets    []int
+       aggHasCount      []bool
+       keyIndices       []int
        aggInputCountIdx []int
-       // dedupSeen is the (shard, group_key) replica-dedup map for
-       // AggModeReduce. Nil in AggModeAll/AggModeMap. Built lazily in Init so
-       // re-init resets it deterministically. Mirrors row path semantics from
-       // measure_plan_distributed.go's 
deduplicateAggregatedDataPointsWithShard:
-       // same shard + same group_key ⇒ drop the duplicate; different shards
-       // with the same group_key are KEPT and combined into one final value.
-       dedupSeen map[string]struct{}
-       mode      AggMode
-       entrySize int64
-       reserved  int64
-       batchSize int
-       cursor    int
-       closed    bool
+       shardIDIdx       int
+       tagOutOffset     int
+       outputShardIdx   int
+       mode             AggMode
+       entrySize        int64
+       reserved         int64
+       batchSize        int
+       cursor           int
+       closed           bool
 }
 
 // aggGroup carries one bucket's reduction state plus a copy of every
@@ -356,7 +326,7 @@ func (a *BatchAggregation) Finalize(_ context.Context) 
error {
 // partial state plus a leading shard-id column (see AggMode docs).
 // AggModeReduce emits the same shape as AggModeAll — tags + final reduced
 // value — but draws the value from aggregation.Reduce[N].Val() so MEAN
-// finalisation (sum÷count) happens inside the reducer.
+// finalization (sum÷count) happens inside the reducer.
 func (a *BatchAggregation) NextBatch(_ context.Context) 
(*vectorized.RecordBatch, error) {
        if a.mode != AggModeAll && a.mode != AggModeMap && a.mode != 
AggModeReduce {
                return nil, ErrAggModeNotImplemented
diff --git a/pkg/query/vectorized/measure/aggregation_reduce.go 
b/pkg/query/vectorized/measure/aggregation_reduce.go
index 28b5c0095..8aa264306 100644
--- a/pkg/query/vectorized/measure/aggregation_reduce.go
+++ b/pkg/query/vectorized/measure/aggregation_reduce.go
@@ -76,7 +76,7 @@ func buildAggInputCountIdx(input *vectorized.BatchSchema, 
aggs []AggSpec, mode A
 //
 // When the input schema has no RoleShardID column (shardIDIdx == -1 — a
 // degenerate fixture that pre-dates the storage bridge), dedup falls back
-// to the group key alone. This is the safest behaviour: we cannot
+// to the group key alone. This is the safest behavior: we cannot
 // distinguish replicas without a shard id, so we treat every (group_key)
 // pair as the single contribution.
 func (a *BatchAggregation) markDedupSeen(b *vectorized.RecordBatch, rowIdx 
int, groupKey string) bool {
@@ -142,7 +142,7 @@ func (a *BatchAggregation) combinePartial(b 
*vectorized.RecordBatch, rowIdx int,
 
 // writeReduce emits the slot's reduced final value to the typed output
 // column. Mirrors aggSlot.write but reads from the Reduce accumulator
-// (whose Val() handles MEAN finalisation by dividing Sum by Count).
+// (whose Val() handles MEAN finalization by dividing Sum by Count).
 func (s *aggSlot) writeReduce(col vectorized.Column) {
        if s.intReduce != nil {
                col.(*vectorized.TypedColumn[int64]).Append(s.intReduce.Val())
diff --git a/pkg/query/vectorized/measure/aggregation_test.go 
b/pkg/query/vectorized/measure/aggregation_test.go
index faf73e7c1..b66d0aafd 100644
--- a/pkg/query/vectorized/measure/aggregation_test.go
+++ b/pkg/query/vectorized/measure/aggregation_test.go
@@ -574,7 +574,6 @@ func 
TestBatchAggregation_AggModeMap_NonMean_EmitsValueOnly(t *testing.T) {
        }
 }
 
-
 // aggReduceSchema builds the canonical AggModeMap output schema used as
 // AggModeReduce input: [shard_id, g (tag), sum_v (int64)]. shard_id is
 // int64 RoleShardID; g is a string RoleTag (single group key). For pure
@@ -709,7 +708,7 @@ func 
TestBatchAggregation_AggModeReduce_DedupsSameShardSameGroup(t *testing.T) {
        }
 }
 
-// TestBatchAggregation_AggModeReduce_MeanFinalises asserts MEAN finalisation
+// TestBatchAggregation_AggModeReduce_MeanFinalises asserts MEAN finalization
 // happens inside Reduce.Val(): partials carry (Sum, Count) and the final
 // value is Sum/Count. Two shards: (sum=10,count=2) + (sum=20,count=3) ⇒
 // 30/5 = 6.
diff --git a/pkg/query/vectorized/measure/config.go 
b/pkg/query/vectorized/measure/config.go
index b458c7598..21d5d602d 100644
--- a/pkg/query/vectorized/measure/config.go
+++ b/pkg/query/vectorized/measure/config.go
@@ -32,7 +32,7 @@ type VectorizedConfig struct {
 
 // DefaultBroadcastTimeout is the per-broadcast wait the vec distributed
 // liaison uses when the caller (banyand/dquery) does not override it. The
-// value matches the historical hard-coded constant so behaviour is
+// value matches the historical hard-coded constant so behavior is
 // unchanged when the operator does not set --dst-broadcast-timeout.
 const DefaultBroadcastTimeout = 15 * time.Second
 
diff --git a/pkg/query/vectorized/measure/frame/decode.go 
b/pkg/query/vectorized/measure/frame/decode.go
index f2e04f516..98270271d 100644
--- a/pkg/query/vectorized/measure/frame/decode.go
+++ b/pkg/query/vectorized/measure/frame/decode.go
@@ -75,7 +75,7 @@ func Decode(b []byte) (*vectorized.RecordBatch, error) {
 }
 
 // decodeColumn parses one column block (header + body) from b. It returns
-// the column definition, the materialised column with nrows rows already
+// the column definition, the materialized column with nrows rows already
 // appended (validity bits respected), and the number of bytes consumed so
 // the caller can advance its offset.
 func decodeColumn(b []byte, nrows uint64) (vectorized.ColumnDef, 
vectorized.Column, int, error) {
@@ -149,12 +149,13 @@ func readValidityBitmap(b []byte, nrows uint64) ([]bool, 
int, error) {
        return nulls, nbytes, nil
 }
 
-// readColumnData materialises one column of nrows rows from b given the
+// readColumnData materializes one column of nrows rows from b given the
 // validity vector. Fixed-width types read N × 8 bytes regardless of
 // nullness; variable-width types read uvarint(len) + len bytes per row and
 // rely on the validity vector to mark nullness.
+// nolint:gocyclo // switch-dispatch over ColumnType variants is intentionally 
exhaustive; splitting per-case helpers would obscure the wire-format mapping
 func readColumnData(b []byte, t vectorized.ColumnType, nrows int, nulls 
[]bool) (vectorized.Column, int, error) {
-       switch t {
+       switch t { //nolint:exhaustive // Int64Array/StrArray never appear on 
the wire; the producer-side encoder rejects them via mapColumnType
        case vectorized.ColumnTypeInt64:
                if len(b) < nrows*8 {
                        return nil, 0, fmt.Errorf("%w: int64 column needs %d 
bytes, have %d", ErrTruncated, nrows*8, len(b))
@@ -287,7 +288,7 @@ func readColumnData(b []byte, t vectorized.ColumnType, 
nrows int, nulls []bool)
 // unmapColumnRole is the inverse of mapColumnRole: it translates a
 // wire-stable frameColRole byte back to its pkg/query/vectorized.ColumnRole
 // equivalent. Unknown bytes fail loud — the spec forbids dual-wire so an
-// unrecognised role byte is by definition a botched encoder, not a
+// unrecognized role byte is by definition a botched encoder, not a
 // version-skew negotiation.
 func unmapColumnRole(r frameColRole) (vectorized.ColumnRole, error) {
        switch r {
diff --git a/pkg/query/vectorized/measure/frame/decode_test.go 
b/pkg/query/vectorized/measure/frame/decode_test.go
index 680af9a8a..5a79bbbe5 100644
--- a/pkg/query/vectorized/measure/frame/decode_test.go
+++ b/pkg/query/vectorized/measure/frame/decode_test.go
@@ -85,7 +85,7 @@ func TestDecode_RoundTrip_Int64(t *testing.T) {
 }
 
 // TestDecode_RoundTrip_NullInMiddle verifies the validity bitmap survives
-// the wire: a middle null row decodes as IsNull(1) and untouched neighbours.
+// the wire: a middle null row decodes as IsNull(1) and untouched neighbors.
 func TestDecode_RoundTrip_NullInMiddle(t *testing.T) {
        schema := vectorized.NewBatchSchema([]vectorized.ColumnDef{
                {Role: vectorized.RoleField, Name: "v", Type: 
vectorized.ColumnTypeInt64},
@@ -325,14 +325,14 @@ func TestDecode_TruncatedColumnData_FailsLoud(t 
*testing.T) {
        // 3 rows declared but only 2 int64 worth of data.
        body := []byte{
                0x00, 'V', 'F', 'R',
-               0x03,                  // version (v3: TagValue/FieldValue 
proto-bytes)
-               0x03,                  // nrows = 3
-               0x01,                  // ncols = 1
-               0x06,                  // role = Field
-               0x01,                  // type = Int64
-               0x01, 'n',             // name "n"
-               0x00,                  // TagFamilyLen = 0 (RoleField has no 
family)
-               0x00,                  // validity (1 byte, all valid)
+               0x03,      // version (v3: TagValue/FieldValue proto-bytes)
+               0x03,      // nrows = 3
+               0x01,      // ncols = 1
+               0x06,      // role = Field
+               0x01,      // type = Int64
+               0x01, 'n', // name "n"
+               0x00,                   // TagFamilyLen = 0 (RoleField has no 
family)
+               0x00,                   // validity (1 byte, all valid)
                1, 0, 0, 0, 0, 0, 0, 0, // row 0
                2, 0, 0, 0, 0, 0, 0, 0, // row 1 — row 2 missing
        }
@@ -359,6 +359,7 @@ func TestDecode_TrailingBytes_FailsLoud(t *testing.T) {
        if err != nil {
                t.Fatalf("Encode: %v", err)
        }
+       // nolint:gocritic // intentional: 3-index slice forces allocation of a 
new backing array so `raw` stays untouched
        corrupt := append(raw[:len(raw):len(raw)], 0xFF, 0xAA)
        _, decodeErr := Decode(corrupt)
        if decodeErr == nil {
@@ -375,12 +376,12 @@ func TestDecode_TrailingBytes_FailsLoud(t *testing.T) {
 func TestDecode_UnknownRoleByte_FailsLoud(t *testing.T) {
        body := []byte{
                0x00, 'V', 'F', 'R',
-               0x03,         // version (v3)
-               0x00,         // nrows = 0
-               0x01,         // ncols = 1
-               0xFE,         // role byte 254 — unassigned (rejected before 
TagFamily read)
-               0x01,         // type = Int64
-               0x01, 'n',    // name "n"
+               0x03,      // version (v3)
+               0x00,      // nrows = 0
+               0x01,      // ncols = 1
+               0xFE,      // role byte 254 — unassigned (rejected before 
TagFamily read)
+               0x01,      // type = Int64
+               0x01, 'n', // name "n"
        }
        _, err := Decode(body)
        if err == nil {
diff --git a/pkg/query/vectorized/measure/frame/encode.go 
b/pkg/query/vectorized/measure/frame/encode.go
index cb73092ee..c3de1dced 100644
--- a/pkg/query/vectorized/measure/frame/encode.go
+++ b/pkg/query/vectorized/measure/frame/encode.go
@@ -142,8 +142,9 @@ func appendValidityBitmap(buf []byte, col 
vectorized.Column, active []int) []byt
 // Variable-width types (string, []byte) write uvarint(len) + len bytes per 
row;
 // null rows write len=0 + 0 bytes (the validity bitmap disambiguates "null"
 // from "empty string"/"empty bytes").
+// nolint:gocyclo // switch-dispatch over ColumnType variants is intentionally 
exhaustive; splitting per-case helpers would obscure the wire-format mapping
 func appendColumnData(buf []byte, col vectorized.Column, t 
vectorized.ColumnType, active []int) ([]byte, error) {
-       switch t {
+       switch t { //nolint:exhaustive // Int64Array/StrArray are handled via 
passthrough at the dispatcher; this function never receives them
        case vectorized.ColumnTypeInt64:
                tc, ok := col.(*vectorized.TypedColumn[int64])
                if !ok {
@@ -257,7 +258,7 @@ func appendColumnData(buf []byte, col vectorized.Column, t 
vectorized.ColumnType
 // frameColType. Unsupported types yield ErrUnsupportedColumnType so callers
 // surface the bad input loudly at encode time.
 func mapColumnType(t vectorized.ColumnType) (frameColType, error) {
-       switch t {
+       switch t { //nolint:exhaustive // unsupported types 
(Int64Array/StrArray) fall through to the ErrUnsupportedColumnType return below
        case vectorized.ColumnTypeInt64:
                return frameColInt64, nil
        case vectorized.ColumnTypeFloat64:
diff --git a/pkg/query/vectorized/measure/frame/frame.go 
b/pkg/query/vectorized/measure/frame/frame.go
index 94ee76027..c3251d278 100644
--- a/pkg/query/vectorized/measure/frame/frame.go
+++ b/pkg/query/vectorized/measure/frame/frame.go
@@ -62,8 +62,8 @@
 //   - Int64:     N × 8 bytes little-endian.
 //   - Float64:   N × 8 bytes IEEE-754 little-endian.
 //   - String:    For each row in order: uvarint(len) + len UTF-8 bytes.
-//                Null rows have len=0 and 0 bytes; the validity bitmap
-//                disambiguates "null" from "empty string".
+//     Null rows have len=0 and 0 bytes; the validity bitmap
+//     disambiguates "null" from "empty string".
 //   - Bytes:     Same shape as String, opaque bytes.
 package frame
 
@@ -84,7 +84,7 @@ import (
 // the "garbage-but-parsed silently-empty" failure mode into an unmistakable
 // hard decode error (G9f spec Principle 3, codec contract). The remaining
 // three bytes 'V','F','R' are a distinctive signature so a flag-on decoder
-// can recognise a valid frame from random noise on the same wire.
+// can recognize a valid frame from random noise on the same wire.
 var Magic = [4]byte{data.RawFrameMagicLeadingByte, 'V', 'F', 'R'}
 
 // WireVersion is the on-wire frame format version emitted by Encode. The
@@ -122,9 +122,9 @@ const MinHeaderLen = MagicLen + 1 + 1 + 1
 // Header is the parsed frame header (everything up to but not including the
 // first column block).
 type Header struct {
-       Magic       [4]byte
        NumRows     uint64
        NumCols     uint64
+       Magic       [4]byte
        WireVersion uint8
 }
 
diff --git a/pkg/query/vectorized/measure/frame/frame_test.go 
b/pkg/query/vectorized/measure/frame/frame_test.go
index 1ecc3c3e4..1fc859916 100644
--- a/pkg/query/vectorized/measure/frame/frame_test.go
+++ b/pkg/query/vectorized/measure/frame/frame_test.go
@@ -32,7 +32,7 @@ import (
 // TestMagic_LeadingByteIs00 pins the codec contract at the package boundary:
 // frame.Magic[0] MUST equal api/data.RawFrameMagicLeadingByte (0x00). The
 // remaining bytes are the distinctive 'VFR' signature so a flag-on decoder
-// can recognise the frame, but the load-bearing safety property is that the
+// can recognize the frame, but the load-bearing safety property is that the
 // first byte is the field-0 varint tag.
 func TestMagic_LeadingByteIs00(t *testing.T) {
        if Magic[0] != data.RawFrameMagicLeadingByte {
@@ -79,14 +79,14 @@ func TestEncode_SingleInt64Column_GoldenBytes(t *testing.T) 
{
        }
        want := []byte{
                0x00, 'V', 'F', 'R', // magic
-               0x03,             // version (v3: TagValue/FieldValue 
proto-bytes)
-               0x03,             // nrows uvarint
-               0x01,             // ncols uvarint
-               0x06,             // role = Field
-               0x01,             // type = Int64
-               0x01, 'n',        // name length + name
-               0x00,             // TagFamilyLen = 0 (RoleField has no family)
-               0x00,             // validity bitmap (1 byte, all valid)
+               0x03,      // version (v3: TagValue/FieldValue proto-bytes)
+               0x03,      // nrows uvarint
+               0x01,      // ncols uvarint
+               0x06,      // role = Field
+               0x01,      // type = Int64
+               0x01, 'n', // name length + name
+               0x00,                    // TagFamilyLen = 0 (RoleField has no 
family)
+               0x00,                    // validity bitmap (1 byte, all valid)
                10, 0, 0, 0, 0, 0, 0, 0, // LE int64 10
                20, 0, 0, 0, 0, 0, 0, 0, // LE int64 20
                30, 0, 0, 0, 0, 0, 0, 0, // LE int64 30
@@ -143,23 +143,23 @@ func TestEncode_StringColumn_LengthPrefixedRows(t 
*testing.T) {
        }
        want := []byte{
                0x00, 'V', 'F', 'R', // magic
-               0x03,                                          // version (v3: 
TagValue/FieldValue proto-bytes)
-               0x02,                                          // nrows
-               0x01,                                          // ncols
-               0x05,                                          // role = Tag
-               0x03,                                          // type = String
-               0x06, 'r', 'e', 'g', 'i', 'o', 'n',            // name "region"
-               0x03, 'g', 'e', 'o',                            // tag family 
"geo"
-               0x00,                                          // validity (2 
rows, all valid)
-               0x07, 'u', 's', '-', 'e', 'a', 's', 't',        // row 0: 
"us-east"
-               0x07, 'u', 's', '-', 'w', 'e', 's', 't',        // row 1: 
"us-west"
+               0x03,                               // version (v3: 
TagValue/FieldValue proto-bytes)
+               0x02,                               // nrows
+               0x01,                               // ncols
+               0x05,                               // role = Tag
+               0x03,                               // type = String
+               0x06, 'r', 'e', 'g', 'i', 'o', 'n', // name "region"
+               0x03, 'g', 'e', 'o', // tag family "geo"
+               0x00,                                    // validity (2 rows, 
all valid)
+               0x07, 'u', 's', '-', 'e', 'a', 's', 't', // row 0: "us-east"
+               0x07, 'u', 's', '-', 'w', 'e', 's', 't', // row 1: "us-west"
        }
        if !bytes.Equal(got, want) {
                t.Fatalf("Encode mismatch:\n  got  %#x\n  want %#x", got, want)
        }
 }
 
-// TestEncode_RespectsSelection asserts the encoder honours 
RecordBatch.Selection
+// TestEncode_RespectsSelection asserts the encoder honors 
RecordBatch.Selection
 // — only the listed rows are emitted, and in the listed order. With Len=5 and
 // Selection={0,2,4} the frame contains 3 rows pulled from those source 
indices.
 func TestEncode_RespectsSelection(t *testing.T) {
@@ -205,8 +205,8 @@ func TestEncode_RespectsSelection(t *testing.T) {
 // model safe against a botched partial restart.
 func TestEncode_ProtoUnmarshal_FailsLoud(t *testing.T) {
        cases := []struct {
-               name  string
                build func() *vectorized.RecordBatch
+               name  string
        }{
                {
                        name:  "header-only",
@@ -308,9 +308,9 @@ func TestValidateHeader_Roundtrip(t *testing.T) {
 // error — never silently succeed.
 func TestValidateHeader_Negatives(t *testing.T) {
        cases := []struct {
+               wantErr error
                name    string
                body    []byte
-               wantErr error
        }{
                {name: "empty", body: nil, wantErr: ErrTruncated},
                {name: "short-of-min", body: []byte{0x00, 'V', 'F'}, wantErr: 
ErrTruncated},
diff --git a/pkg/query/vectorized/measure/integration.go 
b/pkg/query/vectorized/measure/integration.go
index 8da6984f5..03d2644fb 100644
--- a/pkg/query/vectorized/measure/integration.go
+++ b/pkg/query/vectorized/measure/integration.go
@@ -269,7 +269,7 @@ func (v *VectorizedMIterator) Schema() 
*vectorized.BatchSchema { return v.inner.
 // EmitFrame implements FrameEmitter on the public adapter — delegates to
 // the inner iterator's vec-native DrainPipelineToFrame path so the
 // data-node Rev under flag-on emits a columnar raw frame body directly
-// without proto materialisation. The exported facade must implement the
+// without proto materialization. The exported facade must implement the
 // method too because that is the type processor.go sees from the
 // vec dispatch return.
 func (v *VectorizedMIterator) EmitFrame(ctx context.Context) ([]byte, error) {
diff --git a/pkg/query/vectorized/measure/plan/dispatch.go 
b/pkg/query/vectorized/measure/plan/dispatch.go
index ba62b3e97..4472d7130 100644
--- a/pkg/query/vectorized/measure/plan/dispatch.go
+++ b/pkg/query/vectorized/measure/plan/dispatch.go
@@ -371,23 +371,6 @@ func projectedNames(tp *modelv1.TagProjection) 
map[string]struct{} {
        return out
 }
 
-// validateProjectionParity reproduces, byte-for-byte, the projection
-// errors the row path's logical_measure.Analyze raises so dispatch can
-// surface the canonical WantErr=true message directly instead of falling
-// through. It mirrors the row path exactly:
-//
-//   - Tags are validated before fields (measure_analyzer.go:110-119).
-//   - Tag projection is checked only when non-empty; each projected tag
-//     (families in order, tags in order) is looked up schema-wide via the
-//     TagSpec registry — the logical.Schema equivalent of CommonSchema's
-//     TagSpecMap. The first miss returns errors.Wrap(ErrTagNotDefined,
-//     tagName), identical to CommonSchema.ValidateProjectionTags
-//     (schema.go:175): "<tagName>: tag is not defined".
-//   - Field projection is checked only when non-empty; the first name
-//     absent from the Measure schema's fields returns errors.Errorf(
-//     "field %s not found in schema", field), identical to
-//     measure.schema.ValidateProjectionFields (measure/schema.go:77).
-//
 // ValidateMultiGroupProjection is the multi-measure counterpart of the
 // single-group validation Dispatch runs inline. A projected tag/field is
 // accepted if it resolves in ANY group's schema/measure — mirroring the
@@ -433,6 +416,23 @@ func ValidateMultiGroupProjection(req 
*measurev1.QueryRequest, schemas []logical
        return nil
 }
 
+// validateProjectionParity reproduces, byte-for-byte, the projection
+// errors the row path's logical_measure.Analyze raises so dispatch can
+// surface the canonical WantErr=true message directly instead of falling
+// through. It mirrors the row path exactly:
+//
+//   - Tags are validated before fields (measure_analyzer.go:110-119).
+//   - Tag projection is checked only when non-empty; each projected tag
+//     (families in order, tags in order) is looked up schema-wide via the
+//     TagSpec registry — the logical.Schema equivalent of CommonSchema's
+//     TagSpecMap. The first miss returns errors.Wrap(ErrTagNotDefined,
+//     tagName), identical to CommonSchema.ValidateProjectionTags
+//     (schema.go:175): "<tagName>: tag is not defined".
+//   - Field projection is checked only when non-empty; the first name
+//     absent from the Measure schema's fields returns errors.Errorf(
+//     "field %s not found in schema", field), identical to
+//     measure.schema.ValidateProjectionFields (measure/schema.go:77).
+//
 // A nil error means every projected name resolves, so dispatch proceeds.
 func validateProjectionParity(req *measurev1.QueryRequest, logicalSchema 
logical.Schema, m *databasev1.Measure) error {
        if tp := req.GetTagProjection(); tp != nil {
diff --git a/pkg/query/vectorized/measure/plan/dispatch_test.go 
b/pkg/query/vectorized/measure/plan/dispatch_test.go
index 61dd31cce..d0dac8412 100644
--- a/pkg/query/vectorized/measure/plan/dispatch_test.go
+++ b/pkg/query/vectorized/measure/plan/dispatch_test.go
@@ -34,6 +34,8 @@ import (
        measure 
"github.com/apache/skywalking-banyandb/pkg/query/vectorized/measure"
 )
 
+const defaultName = "default"
+
 func dispatchCfg(enabled bool) measure.VectorizedConfig {
        return measure.VectorizedConfig{Enabled: enabled, BatchSize: 1024, 
QueryMemoryMiB: 16}
 }
@@ -41,7 +43,7 @@ func dispatchCfg(enabled bool) measure.VectorizedConfig {
 func bareReq() *measurev1.QueryRequest {
        return &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                TimeRange: &modelv1.TimeRange{
@@ -80,7 +82,7 @@ func dispatchSchemaFixture(t *testing.T) 
(*databasev1.Measure, logical.Schema, *
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       return measureSchema, logicalSchema, &commonv1.Metadata{Name: "demo", 
Group: "default"}, &fakeEC{}
+       return measureSchema, logicalSchema, &commonv1.Metadata{Name: "demo", 
Group: defaultName}, &fakeEC{}
 }
 
 // TestDispatch_RawGroupBy_ReachesEcQuery confirms G9b: GroupBy without
@@ -145,7 +147,7 @@ func 
TestDispatch_GroupByAggUncoveredProjection_ReachesEcQuery(t *testing.T) {
                                // GroupBy references region but TagProjection 
only carries svc.
                                req.GroupBy = &measurev1.QueryRequest_GroupBy{
                                        TagProjection: 
&modelv1.TagProjection{TagFamilies: []*modelv1.TagProjection_TagFamily{
-                                               {Name: "default", Tags: 
[]string{"region"}},
+                                               {Name: defaultName, Tags: 
[]string{"region"}},
                                        }},
                                        FieldName: fieldValue,
                                }
@@ -204,7 +206,7 @@ func TestDispatch_Top_ReachesEcQuery(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -238,7 +240,7 @@ func TestDispatch_OrderBy_ReachesEcQuery(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -269,7 +271,7 @@ func TestDispatch_OrderBy_UnknownIndexRule_BubblesUpError(t 
*testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -303,12 +305,12 @@ func 
TestDispatch_UnknownTagProjection_SurfacesCanonicalError(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
        req.TagProjection = &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
-               {Name: "default", Tags: []string{"ghost"}},
+               {Name: defaultName, Tags: []string{"ghost"}},
        }}
        _, _, handled, err := Dispatch(context.Background(),
                req, metadata, measureSchema, logicalSchema, ec, 
dispatchCfg(true), false, false)
@@ -339,7 +341,7 @@ func 
TestDispatch_UnknownFieldProjection_SurfacesCanonicalError(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -371,12 +373,12 @@ func TestDispatch_TagValidatedBeforeField(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{}
 
        req := bareReq()
        req.TagProjection = &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
-               {Name: "default", Tags: []string{"ghost"}},
+               {Name: defaultName, Tags: []string{"ghost"}},
        }}
        req.FieldProjection = &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"phantom"}}
        _, _, handled, err := Dispatch(context.Background(),
@@ -403,7 +405,7 @@ func TestDispatch_NoTimeRange_EmptyResultParity(t 
*testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -476,7 +478,7 @@ func TestDispatch_EmptyResult_CanonicalEmptyIterator(t 
*testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        iter, _, handled, err := Dispatch(context.Background(),
@@ -552,7 +554,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t 
*testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        ec := &fakeEC{wantResult: nil, wantErr: nil}
 
        req := bareReq()
@@ -587,7 +589,7 @@ func TestDispatch_GroupByAggCovered_ReachesEcQuery(t 
*testing.T) {
 func TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible(t 
*testing.T) {
        req := bareReq() // TagProjection: default=[svc]
        extras := [][]*logical.Tag{
-               {logical.NewTag("default", "region")},
+               {logical.NewTag(defaultName, "region")},
                {logical.NewTag("extra", "zone")},
        }
        got := augmentRequestWithHiddenTags(req, extras)
@@ -602,10 +604,10 @@ func 
TestAugmentRequestWithHiddenTags_AppendsFamiliesAfterVisible(t *testing.T)
        if len(fams) != 3 {
                t.Fatalf("want 3 families (1 visible + 2 hidden), got %d: %+v", 
len(fams), fams)
        }
-       if fams[0].GetName() != "default" || fams[0].GetTags()[0] != tagSvc {
+       if fams[0].GetName() != defaultName || fams[0].GetTags()[0] != tagSvc {
                t.Fatalf("visible family must stay first, got %+v", fams[0])
        }
-       if fams[1].GetName() != "default" || fams[1].GetTags()[0] != "region" {
+       if fams[1].GetName() != defaultName || fams[1].GetTags()[0] != "region" 
{
                t.Fatalf("first hidden family wrong, got %+v", fams[1])
        }
        if fams[2].GetName() != "extra" || fams[2].GetTags()[0] != "zone" {
@@ -625,7 +627,7 @@ func TestHiddenTagsMIterator_StripsHiddenTagsFromCurrent(t 
*testing.T) {
                return []*measurev1.InternalDataPoint{{
                        DataPoint: &measurev1.DataPoint{
                                TagFamilies: []*modelv1.TagFamily{{
-                                       Name: "default",
+                                       Name: defaultName,
                                        Tags: []*modelv1.Tag{
                                                {Key: tagSvc, Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: "a"}}}},
                                                {Key: "region", Value: 
&modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"us"}}}},
@@ -684,7 +686,7 @@ func TestDispatch_QueryError_BubblesUp(t *testing.T) {
        if schemaErr != nil {
                t.Fatalf("BuildSchema: %v", schemaErr)
        }
-       metadata := &commonv1.Metadata{Name: "demo", Group: "default"}
+       metadata := &commonv1.Metadata{Name: "demo", Group: defaultName}
        wantErr := context.DeadlineExceeded
        ec := &fakeEC{wantErr: wantErr}
 
diff --git a/pkg/query/vectorized/measure/plan/distributed.go 
b/pkg/query/vectorized/measure/plan/distributed.go
index 93708c890..d379707e2 100644
--- a/pkg/query/vectorized/measure/plan/distributed.go
+++ b/pkg/query/vectorized/measure/plan/distributed.go
@@ -43,7 +43,7 @@ import (
 // distributedQueryTimeout is the historical hard-coded broadcast deadline.
 // Retained as a fallback when DistributedPlan.cfg.BroadcastTimeout is zero
 // so call sites that build a VectorizedConfig by hand (most existing
-// tests) keep their prior behaviour. Production deployments thread the
+// tests) keep their prior behavior. Production deployments thread the
 // operator's --dst-broadcast-timeout through cfg.BroadcastTimeout.
 const distributedQueryTimeout = 15 * time.Second
 
@@ -79,20 +79,20 @@ func SupportsDistributedRows(req *measurev1.QueryRequest) 
bool {
 // decodes them into vectorized batches, and runs the liaison operators without
 // routing through the row-compatible logical distributedPlan.
 type DistributedPlan struct {
-       queryTemplate   *measurev1.QueryRequest
-       nodeTemplate    *measurev1.QueryRequest
-       measureSchemas  []*databasev1.Measure
-       indexRules      [][]*databasev1.IndexRule
-       orderByTag      *resolvedOrderByTag
-       hiddenOrderBy   logical.HiddenTagSet
+       queryTemplate  *measurev1.QueryRequest
+       nodeTemplate   *measurev1.QueryRequest
+       orderByTag     *resolvedOrderByTag
+       hiddenOrderBy  logical.HiddenTagSet
+       measureSchemas []*databasev1.Measure
        // hiddenTopField is the field name appended to the nodeTemplate's
        // FieldProjection for Top-without-Agg queries when the Top.FieldName
        // is not already in the user-visible FieldProjection. Data nodes
-       // materialise the column so BatchTop can sort on it; the egress
+       // materialize the column so BatchTop can sort on it; the egress
        // hiddenFieldsMIterator strips it so the wire bytes match a query
        // without the extra projection.
-       hiddenTopField  string
-       cfg             vmeasure.VectorizedConfig
+       hiddenTopField string
+       indexRules     [][]*databasev1.IndexRule
+       cfg            vmeasure.VectorizedConfig
 }
 
 // AnalyzeDistributed builds the vectorized distributed liaison plan.
@@ -100,12 +100,17 @@ type DistributedPlan struct {
 // req.Groups element). indexRules is the corresponding per-group slice of
 // index rule sets (ec.GetIndexRules() for each group). Both slices must be
 // in request-group order. Single-group callers may pass a length-1 slice for
-// each (the existing behaviour is preserved byte-for-byte).
+// each (the existing behavior is preserved byte-for-byte).
 //
 // When indexRules is nil or empty and req.OrderBy.IndexRuleName is non-empty,
 // the resolver surfaces an "index rule X not found" error byte-equivalent to
 // the row path.
-func AnalyzeDistributed(req *measurev1.QueryRequest, measureSchemas 
[]*databasev1.Measure, indexRules [][]*databasev1.IndexRule, cfg 
vmeasure.VectorizedConfig) (*DistributedPlan, error) {
+func AnalyzeDistributed(
+       req *measurev1.QueryRequest,
+       measureSchemas []*databasev1.Measure,
+       indexRules [][]*databasev1.IndexRule,
+       cfg vmeasure.VectorizedConfig,
+) (*DistributedPlan, error) {
        if req == nil {
                return nil, fmt.Errorf("vec distributed analyze: nil request")
        }
@@ -135,7 +140,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, 
measureSchemas []*databasev
        nodeTemplate.Top = nil
        // Phase 5: GroupBy without Agg (raw GroupBy) keeps GroupBy on the
        // nodeTemplate so each data node runs its per-node BatchGroupByFirst 
pass
-       // and emits at most one row per group. This minimises wire bytes: the
+       // and emits at most one row per group. This minimizes wire bytes: the
        // node sends one representative row per group instead of all matching 
rows.
        // GroupBy+Agg continues to keep both on the node for partial 
aggregation
        // (unchanged from prior phases). The old `nodeTemplate.GroupBy = nil` 
when
@@ -166,9 +171,10 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, 
measureSchemas []*databasev
                // For Top-without-Agg without GroupBy (Phase 4): MaxUint32 — 
the
                // per-node row count is unbounded and any finite cap risks 
dropping
                // global winners that haven't been deduplicated by 
GroupByFirst yet.
-               if nodeTemplate.GetAgg() != nil {
+               switch {
+               case nodeTemplate.GetAgg() != nil:
                        nodeTemplate.Limit = math.MaxUint32
-               } else if req.GetGroupBy() != nil {
+               case req.GetGroupBy() != nil:
                        perNodeLimit := calibratedTopWithoutAggLimit(origTop, 
len(req.GetGroups()))
                        nodeTemplate.Limit = perNodeLimit
                        // Push the Top down to the data node: after 
BatchGroupByFirst emits
@@ -178,7 +184,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, 
measureSchemas []*databasev
                        // the true top-N representatives from every node.
                        nodeTemplate.Top = 
proto.Clone(origTop).(*measurev1.QueryRequest_Top)
                        nodeTemplate.Top.Number = int32(perNodeLimit)
-               } else {
+               default:
                        nodeTemplate.Limit = math.MaxUint32
                }
        }
@@ -225,7 +231,7 @@ func AnalyzeDistributed(req *measurev1.QueryRequest, 
measureSchemas []*databasev
                }
                // Hidden-projection augmentation: when the Top field is not in 
the
                // user-visible FieldProjection, append it to the nodeTemplate 
so data
-               // nodes materialise the column for BatchTop sorting. The egress
+               // nodes materialize the column for BatchTop sorting. The egress
                // hiddenFieldsMIterator strips it so the response matches a 
query
                // without the extra projection.
                if !topFieldProjectionVisible(req.GetFieldProjection(), 
topFieldName) {
@@ -299,7 +305,7 @@ func appendOrderByToProjection(projection 
*modelv1.TagProjection, want resolvedO
 
 // topFieldProjectionVisible reports whether fieldName is already present in 
the
 // user-facing FieldProjection. When false, the analyzer augments the node
-// template so data nodes materialise the column on the wire for BatchTop 
sorting.
+// template so data nodes materialize the column on the wire for BatchTop 
sorting.
 func topFieldProjectionVisible(fp *measurev1.QueryRequest_FieldProjection, 
fieldName string) bool {
        if fp == nil {
                return false
@@ -394,7 +400,7 @@ func intersectFieldProjection(proj 
*measurev1.QueryRequest_FieldProjection, ms *
 
 // Execute broadcasts the internal query and executes the liaison-side 
vectorized plan.
 // For single-group requests, one broadcast is issued for all groups (existing
-// behaviour). For multi-group requests, one broadcast is issued per group,
+// behavior). For multi-group requests, one broadcast is issued per group,
 // each carrying a single-element Groups slice, so data nodes can answer with
 // a schema that matches only their local group's columns.
 func (p *DistributedPlan) Execute(ctx context.Context) (executor.MIterator, 
error) {
@@ -407,7 +413,7 @@ func (p *DistributedPlan) Execute(ctx context.Context) 
(executor.MIterator, erro
 
        groups := queryRequest.GetGroups()
        if len(groups) <= 1 {
-               // Single-group fast path — unchanged behaviour.
+               // Single-group fast path — unchanged behavior.
                nodeRequest := 
proto.Clone(p.nodeTemplate).(*measurev1.QueryRequest)
                nodeRequest.TimeRange = dctx.TimeRange()
                internalRequest := &measurev1.InternalQueryRequest{Request: 
nodeRequest, AggReturnPartial: queryRequest.GetAgg() != nil}
@@ -519,11 +525,13 @@ func (p *DistributedPlan) executeAgg(ctx context.Context, 
frames [][]byte, req *
                topSpec = &vmeasure.ReduceTopSpec{FieldName: 
top.GetFieldName(), N: int(top.GetNumber()), Asc: top.GetFieldValueSort() == 
modelv1.Sort_SORT_ASC}
        }
        tracker := vectorized.NewMemoryTracker(int64(p.cfg.QueryMemoryMiB) * 
1024 * 1024)
+       // nolint:contextcheck // pure in-memory reducer; no cancelable I/O 
downstream
        batches, reduceErr := vmeasure.ReduceRawFrames(frames, keyTagNames, 
aggSpecs, p.cfg.BatchSize, tracker)
        if reduceErr != nil {
                return nil, fmt.Errorf("vec distributed plan: reduce raw 
frames: %w", reduceErr)
        }
        if topSpec != nil && topSpec.N > 0 && len(batches) > 0 {
+               // nolint:contextcheck // pure in-memory top selection; no 
cancelable I/O downstream
                topped, topErr := vmeasure.ApplyTopToReduce(batches, *topSpec, 
p.cfg.BatchSize)
                if topErr != nil {
                        return nil, fmt.Errorf("vec distributed plan: top 
reduced frames: %w", topErr)
@@ -573,6 +581,7 @@ func (p *DistributedPlan) executeRows(ctx context.Context, 
frames [][]byte, req
        // series per window.
        if groupBy := req.GetGroupBy(); groupBy != nil {
                var gbErr error
+               // nolint:contextcheck // pure in-memory dedup; no cancelable 
I/O downstream
                batches, gbErr = applyBatchGroupByFirstToRows(batches, groupBy, 
p.cfg.BatchSize, tracker)
                if gbErr != nil {
                        return nil, fmt.Errorf("vec distributed plan: apply 
group-by to rows: %w", gbErr)
@@ -580,6 +589,7 @@ func (p *DistributedPlan) executeRows(ctx context.Context, 
frames [][]byte, req
        }
        if top := req.GetTop(); top != nil {
                var topErr error
+               // nolint:contextcheck // pure in-memory top selection; no 
cancelable I/O downstream
                batches, topErr = applyBatchTopToRows(batches, top, 
p.cfg.BatchSize)
                if topErr != nil {
                        return nil, fmt.Errorf("vec distributed plan: apply top 
to rows: %w", topErr)
@@ -627,6 +637,7 @@ func (p *DistributedPlan) executeRowsMultiGroup(ctx 
context.Context, groupFrames
        // to the global Top ranking.
        if groupBy := req.GetGroupBy(); groupBy != nil {
                var gbErr error
+               // nolint:contextcheck // pure in-memory dedup; no cancelable 
I/O downstream
                batches, gbErr = applyBatchGroupByFirstToRows(batches, groupBy, 
p.cfg.BatchSize, tracker)
                if gbErr != nil {
                        return nil, fmt.Errorf("vec distributed plan: apply 
group-by to multi-group rows: %w", gbErr)
@@ -634,6 +645,7 @@ func (p *DistributedPlan) executeRowsMultiGroup(ctx 
context.Context, groupFrames
        }
        if top := req.GetTop(); top != nil {
                var topErr error
+               // nolint:contextcheck // pure in-memory top selection; no 
cancelable I/O downstream
                batches, topErr = applyBatchTopToRows(batches, top, 
p.cfg.BatchSize)
                if topErr != nil {
                        return nil, fmt.Errorf("vec distributed plan: apply top 
to multi-group rows: %w", topErr)
@@ -842,7 +854,12 @@ func (p *DistributedPlan) iteratorFromBatches(ctx 
context.Context, batches []*ve
 // (already computed by BuildMultiGroupBatchSchema) so this function does not
 // fall back to Analyze on empty output — an empty multi-group result simply
 // returns no rows rather than re-deriving a schema from a single group.
-func (p *DistributedPlan) iteratorFromBatchesWithSchema(ctx context.Context, 
batches []*vectorized.RecordBatch, req *measurev1.QueryRequest, schema 
*vectorized.BatchSchema) (executor.MIterator, error) {
+func (p *DistributedPlan) iteratorFromBatchesWithSchema(
+       ctx context.Context,
+       batches []*vectorized.RecordBatch,
+       req *measurev1.QueryRequest,
+       schema *vectorized.BatchSchema,
+) (executor.MIterator, error) {
        builder := 
vectorized.NewPipelineBuilder().WithMemoryTracker(vectorized.NewMemoryTracker(int64(p.cfg.QueryMemoryMiB)
 * 1024 * 1024))
        builder.From(&batchSliceSource{batches: batches, schema: schema})
        limit := req.GetLimit()
@@ -896,7 +913,7 @@ func (p *DistributedPlan) iteratorFromBatchesWithSchema(ctx 
context.Context, bat
 //
 // Engineering ceiling: the formula output is capped at perNodeSafetyBound
 // (500_000 rows) before the uint32 overflow guard. Rationale: each data node
-// in a typical 4 GB deployment can safely materialise ~500K int64 rows
+// in a typical 4 GB deployment can safely materialize ~500K int64 rows
 // (~4 MB) per GroupBy+Top response without memory pressure. Any N large
 // enough to push 3*N beyond 500_000 (i.e. N > ~166_667) is far outside
 // normal operational use; capping at 500_000 prevents a pathological
@@ -973,8 +990,8 @@ func (p *DistributedPlan) String() string {
 }
 
 type batchSliceSource struct {
-       batches []*vectorized.RecordBatch
        schema  *vectorized.BatchSchema
+       batches []*vectorized.RecordBatch
        idx     int
 }
 
diff --git a/pkg/query/vectorized/measure/plan/distributed_rows.go 
b/pkg/query/vectorized/measure/plan/distributed_rows.go
index 208a8615e..39e63780b 100644
--- a/pkg/query/vectorized/measure/plan/distributed_rows.go
+++ b/pkg/query/vectorized/measure/plan/distributed_rows.go
@@ -52,7 +52,7 @@ type seenSIDKey struct {
 // OrderByFamily / OrderByTagName select the sort column when the request
 // carries an OrderBy.IndexRuleName (Phase 2). Both empty means time-sort:
 // the comparator uses the timestamp column encoded as 8-byte big-endian,
-// matching the Phase 1.5 behaviour byte-for-byte. The merger looks up the
+// matching the Phase 1.5 behavior byte-for-byte. The merger looks up the
 // column index from the merged-batch schema after frames have been decoded,
 // so the spec only carries the names — not a column index.
 //
@@ -61,9 +61,9 @@ type seenSIDKey struct {
 // column without going through TagIndex lookup. The merger prefers the
 // explicit index when >= 0; otherwise it resolves from the names.
 type distributedRowsSpec struct {
+       Tracker        *vectorized.MemoryTracker
        OrderByFamily  string
        OrderByTagName string
-       Tracker        *vectorized.MemoryTracker
        BatchSize      int
        OrderByColIdx  int
        Desc           bool
@@ -106,20 +106,20 @@ func (r *distributedRowItem) SortedField() []byte { 
return r.sortField }
 // sources is built by handing N of these to itersort.NewItemIter. When the
 // merger is configured for OrderBy-by-index-rule, sortKeys caches the
 // per-row encoded sort bytes so the local sort and Next()'s SortedField
-// both reuse the same encoding without re-marshalling per call.
+// both reuse the same encoding without re-marshaling per call.
 type distributedRowSourceIter struct {
-       batch        *vectorized.RecordBatch
-       cur          *distributedRowItem
-       sortKeys     [][]byte
-       indices      []int
-       source       int
-       group        int
-       sidIdx       int
-       tsIdx        int
-       verIdx       int
-       sortColIdx   int
-       pos          int
-       seq          int
+       batch      *vectorized.RecordBatch
+       cur        *distributedRowItem
+       sortKeys   [][]byte
+       indices    []int
+       source     int
+       group      int
+       sidIdx     int
+       tsIdx      int
+       verIdx     int
+       sortColIdx int
+       pos        int
+       seq        int
 }
 
 // newDistributedRowSourceIter constructs the per-source iterator. When
@@ -128,7 +128,13 @@ type distributedRowSourceIter struct {
 // pre-encodes each active row's sort key via encodeSortKey, stable-sorts
 // indices by that encoding (respecting desc), and reuses the cached keys
 // in Next() to avoid re-encoding per heap pop.
-func newDistributedRowSourceIter(batch *vectorized.RecordBatch, schema 
*vectorized.BatchSchema, source, group int, desc bool, sortColIdx int) 
(*distributedRowSourceIter, error) {
+func newDistributedRowSourceIter(
+       batch *vectorized.RecordBatch,
+       schema *vectorized.BatchSchema,
+       source, group int,
+       desc bool,
+       sortColIdx int,
+) (*distributedRowSourceIter, error) {
        tsIdx := schema.TimestampIndex()
        indices := activeDistributedRows(batch)
        iter := &distributedRowSourceIter{
@@ -155,7 +161,7 @@ func newDistributedRowSourceIter(batch 
*vectorized.RecordBatch, schema *vectoriz
                // heap's lex compare gives the right global order. Data nodes 
emit
                // per-shard pre-sorted on this same column, but tests and 
unusual
                // scan modes may not respect that, so the defensive O(n log n) 
sort
-               // keeps the heap invariant intact regardless of producer 
behaviour.
+               // keeps the heap invariant intact regardless of producer 
behavior.
                sort.SliceStable(indices, func(i, j int) bool {
                        cmp := bytes.Compare(sortKeys[indices[i]], 
sortKeys[indices[j]])
                        if desc {
@@ -280,7 +286,12 @@ func mergeDistributedRows(frames [][]byte, spec 
distributedRowsSpec) ([]*vectori
 // batches. Extracted so both the single-group path (mergeDistributedRows) and
 // the multi-group path (mergeDistributedRowsMulti) share the same merge loop
 // without duplication.
-func runDistributedRowMerge(iters []itersort.Iterator[*distributedRowItem], 
schema *vectorized.BatchSchema, spec distributedRowsSpec, batchSize int) 
([]*vectorized.RecordBatch, error) {
+func runDistributedRowMerge(
+       iters []itersort.Iterator[*distributedRowItem],
+       schema *vectorized.BatchSchema,
+       spec distributedRowsSpec,
+       batchSize int,
+) ([]*vectorized.RecordBatch, error) {
        merger := itersort.NewItemIter(iters, spec.Desc)
        defer func() { _ = merger.Close() }()
 
@@ -334,7 +345,12 @@ type decodedBatchSource struct {
 // heap expects (largest-first for desc, smallest-first for asc). sortColIdx
 // selects the comparator: < 0 means time-sort (Phase 1.5), >= 0 means the
 // OrderBy column at that index in the merged-batch schema (Phase 2).
-func buildDistributedRowSourceIters(sources []decodedBatchSource, schema 
*vectorized.BatchSchema, desc bool, sortColIdx int) 
([]itersort.Iterator[*distributedRowItem], error) {
+func buildDistributedRowSourceIters(
+       sources []decodedBatchSource,
+       schema *vectorized.BatchSchema,
+       desc bool,
+       sortColIdx int,
+) ([]itersort.Iterator[*distributedRowItem], error) {
        iters := make([]itersort.Iterator[*distributedRowItem], 0, len(sources))
        for _, src := range sources {
                iter, buildErr := newDistributedRowSourceIter(src.batch, 
schema, src.source, src.group, desc, sortColIdx)
@@ -373,17 +389,17 @@ func resolveDistributedSortColumn(schema 
*vectorized.BatchSchema, spec distribut
 // dedup map, the current in-progress output batch, the cross-window seenSID
 // guard for index-mode queries, and the returned batch list.
 type distributedRowEmitter struct {
-       pool         *vectorized.BatchPool
-       schema       *vectorized.BatchSchema
-       tracker      *vectorized.MemoryTracker
-       seenSID      map[seenSIDKey]struct{}
-       window       map[distributedRowKey]*distributedRowItem
-       current      *vectorized.RecordBatch
-       windowKey    []byte
-       output       []*vectorized.RecordBatch
-       batchSize    int
-       rowWidth     int64
-       indexMode    bool
+       pool      *vectorized.BatchPool
+       schema    *vectorized.BatchSchema
+       tracker   *vectorized.MemoryTracker
+       seenSID   map[seenSIDKey]struct{}
+       window    map[distributedRowKey]*distributedRowItem
+       current   *vectorized.RecordBatch
+       windowKey []byte
+       output    []*vectorized.RecordBatch
+       batchSize int
+       rowWidth  int64
+       indexMode bool
 }
 
 // accept adds an incoming row to the current sort-group dedup map. The window
@@ -404,7 +420,7 @@ func (e *distributedRowEmitter) accept(item 
*distributedRowItem) {
 //
 // Emit-order note: sortedMIterator.loadOneGroup in the row-path baseline
 // (pkg/query/logical/measure/measure_plan_distributed.go) iterates its
-// uniqueData map in Go map order, which is intentionally randomised by the
+// uniqueData map in Go map order, which is intentionally randomized by the
 // runtime. The vec path here imposes a deterministic (source, seq) order via
 // sort.SliceStable on the window's surviving rows so equal-sort-field output
 // is reproducible across reruns and process restarts. This is strictly
@@ -442,7 +458,7 @@ func (e *distributedRowEmitter) flushWindow() error {
 }
 
 // appendRowToCurrent zero-copies one source row into the active output batch
-// via measure.AppendColumnRange. When the batch fills, it is finalised (which
+// via measure.AppendColumnRange. When the batch fills, it is finalized (which
 // reserves memory tracker bytes, appends to the output, then releases the
 // reservation so the tracker bounds in-transit batches without
 // double-counting once they reach the caller).
diff --git a/pkg/query/vectorized/measure/plan/distributed_rows_test.go 
b/pkg/query/vectorized/measure/plan/distributed_rows_test.go
index 9bbb62706..fc8139c04 100644
--- a/pkg/query/vectorized/measure/plan/distributed_rows_test.go
+++ b/pkg/query/vectorized/measure/plan/distributed_rows_test.go
@@ -315,6 +315,7 @@ func collectOrderByRows(batches []*vectorized.RecordBatch) 
[]orderByRow {
 // instead of the timestamp when OrderByFamily/OrderByTagName are set:
 //   - ascending: the surviving rows are emitted in ascending tag value;
 //   - descending: same set, reversed.
+//
 // Two sources contribute rows that arrive interleaved in input order to
 // exercise the cross-source ordering invariant.
 func TestMergeDistributedRows_OrderByString_AscDescAcrossSources(t *testing.T) 
{
@@ -457,9 +458,9 @@ func 
TestMergeDistributedRows_RawGroupBy_FirstSeenPerGroup(t *testing.T) {
 
        // Collect grouped rows.
        type groupedRow struct {
+               svc   string
                ts    int64
                value int64
-               svc   string
        }
        var rows []groupedRow
        for _, batch := range grouped {
@@ -497,7 +498,7 @@ func 
TestMergeDistributedRows_RawGroupBy_FirstSeenPerGroup(t *testing.T) {
 // Scenario: perNodeLimit = 2 (Top.N = 1, nGroups = 1 → calibrated limit = 2).
 // Source A has 4 groups (svc-a1 value=900, svc-a2 value=100, svc-a3 
value=800, svc-a4 value=50).
 // After BatchGroupByFirst: 4 rows, one per group (already deduplicated).
-// Without per-node BatchTop (old behaviour): Limit(2) would keep svc-a1 and
+// Without per-node BatchTop (old behavior): Limit(2) would keep svc-a1 and
 // svc-a2 (value=900 and 100) in insertion order, discarding svc-a3 
(value=800).
 // The global top-1 is svc-a1 (value=900) — correct by accident, but svc-a3
 // (the second-highest) is lost.
@@ -715,8 +716,8 @@ func encodeMultiGroupRows(t *testing.T, schema 
*vectorized.BatchSchema, rows ...
 // has the base 5 columns), the merged output has IsNull==true for that column
 // for all group-0 rows, while group-1 rows (which carry the column) are 
non-null.
 func TestMergeDistributedRows_MultiGroup_NullFillsMissingColumn(t *testing.T) {
-       baseSchema := distributedRowsTestSchema()   // 5 cols, no extra_tag
-       mergedSchema := multiGroupMergedSchema()    // 6 cols, with extra_tag
+       baseSchema := distributedRowsTestSchema() // 5 cols, no extra_tag
+       mergedSchema := multiGroupMergedSchema()  // 6 cols, with extra_tag
 
        // Group 0: uses base schema — no extra_tag column.
        frameGroup0 := encodeMultiGroupRows(t, baseSchema,
@@ -750,11 +751,11 @@ func 
TestMergeDistributedRows_MultiGroup_NullFillsMissingColumn(t *testing.T) {
                t.Fatal("expected non-empty output batches")
        }
 
-       // Collect rows and verify null-fill behaviour.
+       // Collect rows and verify null-fill behavior.
        type mergedRow struct {
+               extraTag string
                ts       int64
                sid      int64
-               extraTag string
                nullTag  bool
        }
        var rows []mergedRow
diff --git a/pkg/query/vectorized/measure/plan/distributed_test.go 
b/pkg/query/vectorized/measure/plan/distributed_test.go
index 95c4dc637..a4dc9acec 100644
--- a/pkg/query/vectorized/measure/plan/distributed_test.go
+++ b/pkg/query/vectorized/measure/plan/distributed_test.go
@@ -35,7 +35,7 @@ import (
 func TestAnalyzeDistributed_AllowsGroupByTopWithoutAgg(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                GroupBy: &measurev1.QueryRequest_GroupBy{
@@ -58,7 +58,7 @@ func TestAnalyzeDistributed_AllowsGroupByTopWithoutAgg(t 
*testing.T) {
 func TestAnalyzeDistributed_AllowsSupportedNonAggRows(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                Limit:           7,
@@ -81,11 +81,11 @@ func TestAnalyzeDistributed_AllowsSupportedNonAggRows(t 
*testing.T) {
 
 func TestSupportsDistributedRows(t *testing.T) {
        base := func() *measurev1.QueryRequest {
-               return &measurev1.QueryRequest{Name: "demo", Groups: 
[]string{"default"}}
+               return &measurev1.QueryRequest{Name: "demo", Groups: 
[]string{defaultName}}
        }
        cases := []struct {
-               name string
                req  *measurev1.QueryRequest
+               name string
                want bool
        }{
                {name: "plain", req: base(), want: true},
@@ -281,7 +281,7 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_NodeLimitIsUnbounded(t *testing.T) {
                        }
                        req := &measurev1.QueryRequest{
                                Name:            "demo",
-                               Groups:          []string{"default"},
+                               Groups:          []string{defaultName},
                                TagProjection:   projTagProj(),
                                FieldProjection: 
&measurev1.QueryRequest_FieldProjection{Names: []string{fieldValue}},
                                Top: &measurev1.QueryRequest_Top{
@@ -314,15 +314,15 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_NodeLimitIsUnbounded(t *testing.T) {
 // TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded covers
 // the Phase 4 augmentation path: when Top.FieldName is NOT in the request's
 // FieldProjection, AnalyzeDistributed must append it to the nodeTemplate so
-// data nodes materialise the column, and record hiddenTopField so the egress
+// data nodes materialize the column, and record hiddenTopField so the egress
 // strip removes it before the response.
 func TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t 
*testing.T) {
        // Visible field projection: only "total" — "value" (Top.FieldName) is 
absent.
        req := &measurev1.QueryRequest{
                Name:   "demo",
-               Groups: []string{"default"},
+               Groups: []string{defaultName},
                TagProjection: &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
-                       {Name: "default", Tags: []string{tagSvc}},
+                       {Name: defaultName, Tags: []string{tagSvc}},
                }},
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{"total"}},
                Top: &measurev1.QueryRequest_Top{
@@ -334,9 +334,9 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing.
        }
        // Schema must have both "total" and "value" fields.
        ms := &databasev1.Measure{
-               Metadata: &commonv1.Metadata{Name: "demo", Group: "default"},
+               Metadata: &commonv1.Metadata{Name: "demo", Group: defaultName},
                TagFamilies: []*databasev1.TagFamilySpec{
-                       {Name: "default", Tags: []*databasev1.TagSpec{
+                       {Name: defaultName, Tags: []*databasev1.TagSpec{
                                {Name: tagSvc, Type: 
databasev1.TagType_TAG_TYPE_STRING},
                        }},
                },
@@ -356,7 +356,7 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing.
        if p.hiddenTopField != fieldValue {
                t.Fatalf("hiddenTopField: got %q, want %q", p.hiddenTopField, 
fieldValue)
        }
-       // Node template must project the Top field so data nodes materialise 
it.
+       // Node template must project the Top field so data nodes materialize 
it.
        nodeNames := p.nodeTemplate.GetFieldProjection().GetNames()
        sawTotal, sawValue := false, false
        for _, n := range nodeNames {
@@ -383,7 +383,7 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_HiddenFieldProjectionAdded(t *testing.
 func TestAnalyzeDistributed_TopWithoutAgg_FieldNotInSchema(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                Top: &measurev1.QueryRequest_Top{
@@ -408,7 +408,7 @@ func 
TestAnalyzeDistributed_TopWithoutAgg_FieldNotInSchema(t *testing.T) {
 // AnalyzeDistributed as the indexRules parameter.
 func testIndexRuleOnTag(ruleName, tagName string) *databasev1.IndexRule {
        return &databasev1.IndexRule{
-               Metadata: &commonv1.Metadata{Name: ruleName, Group: "default"},
+               Metadata: &commonv1.Metadata{Name: ruleName, Group: 
defaultName},
                Tags:     []string{tagName},
        }
 }
@@ -421,7 +421,7 @@ func testIndexRuleOnTag(ruleName, tagName string) 
*databasev1.IndexRule {
 func TestAnalyzeDistributed_OrderByByIndexRule_AcceptedNatively(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(), // already projects "svc"
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                OrderBy:         &modelv1.QueryOrder{IndexRuleName: "svc_idx", 
Sort: modelv1.Sort_SORT_ASC},
@@ -431,14 +431,19 @@ func 
TestAnalyzeDistributed_OrderByByIndexRule_AcceptedNatively(t *testing.T) {
        if !SupportsDistributedRows(req) {
                t.Fatal("Phase 2: SupportsDistributedRows must accept OrderBy 
by index rule")
        }
-       p, analyzeErr := AnalyzeDistributed(req, 
[]*databasev1.Measure{testMeasureSchema()}, 
[][]*databasev1.IndexRule{indexRules}, vmeasure.VectorizedConfig{Enabled: true, 
BatchSize: 4, QueryMemoryMiB: 1})
+       p, analyzeErr := AnalyzeDistributed(
+               req,
+               []*databasev1.Measure{testMeasureSchema()},
+               [][]*databasev1.IndexRule{indexRules},
+               vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, 
QueryMemoryMiB: 1},
+       )
        if analyzeErr != nil {
                t.Fatalf("AnalyzeDistributed: %v", analyzeErr)
        }
        if p.orderByTag == nil {
                t.Fatal("DistributedPlan.orderByTag must be populated when an 
index rule resolves")
        }
-       if p.orderByTag.family != "default" || p.orderByTag.tag != tagSvc {
+       if p.orderByTag.family != defaultName || p.orderByTag.tag != tagSvc {
                t.Fatalf("orderByTag got %+v want default/%s", *p.orderByTag, 
tagSvc)
        }
        if !p.hiddenOrderBy.IsEmpty() {
@@ -460,22 +465,27 @@ func 
TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing.
        // Projection visible to the user: only "region" — the OrderBy tag
        // "svc" is NOT projected, so the analyzer must hide-project it.
        visibleProjection := &modelv1.TagProjection{TagFamilies: 
[]*modelv1.TagProjection_TagFamily{
-               {Name: "default", Tags: []string{"region"}},
+               {Name: defaultName, Tags: []string{"region"}},
        }}
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   visibleProjection,
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                OrderBy:         &modelv1.QueryOrder{IndexRuleName: "svc_idx", 
Sort: modelv1.Sort_SORT_DESC},
                Limit:           5,
        }
        indexRules := []*databasev1.IndexRule{testIndexRuleOnTag("svc_idx", 
tagSvc)}
-       p, analyzeErr := AnalyzeDistributed(req, 
[]*databasev1.Measure{testMeasureSchema()}, 
[][]*databasev1.IndexRule{indexRules}, vmeasure.VectorizedConfig{Enabled: true, 
BatchSize: 4, QueryMemoryMiB: 1})
+       p, analyzeErr := AnalyzeDistributed(
+               req,
+               []*databasev1.Measure{testMeasureSchema()},
+               [][]*databasev1.IndexRule{indexRules},
+               vmeasure.VectorizedConfig{Enabled: true, BatchSize: 4, 
QueryMemoryMiB: 1},
+       )
        if analyzeErr != nil {
                t.Fatalf("AnalyzeDistributed: %v", analyzeErr)
        }
-       if p.orderByTag == nil || p.orderByTag.family != "default" || 
p.orderByTag.tag != tagSvc {
+       if p.orderByTag == nil || p.orderByTag.family != defaultName || 
p.orderByTag.tag != tagSvc {
                t.Fatalf("orderByTag got %+v want default/%s", p.orderByTag, 
tagSvc)
        }
        if p.hiddenOrderBy.IsEmpty() {
@@ -492,7 +502,7 @@ func 
TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing.
        }
        // The node template's TagProjection must include the OrderBy tag.
        nodeFams := p.nodeTemplate.GetTagProjection().GetTagFamilies()
-       if len(nodeFams) != 1 || nodeFams[0].GetName() != "default" {
+       if len(nodeFams) != 1 || nodeFams[0].GetName() != defaultName {
                t.Fatalf("node template projection wrong shape, got %+v", 
nodeFams)
        }
        saw := map[string]bool{}
@@ -511,7 +521,7 @@ func 
TestAnalyzeDistributed_OrderByByIndexRule_HiddenProjectionAdded(t *testing.
 func TestAnalyzeDistributed_OrderByByIndexRule_UnknownRuleErrors(t *testing.T) 
{
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                OrderBy:         &modelv1.QueryOrder{IndexRuleName: "nope"},
@@ -533,7 +543,7 @@ func testMeasureSchemaForGroup(group string) 
*databasev1.Measure {
                Metadata: &commonv1.Metadata{Name: "demo", Group: group},
                TagFamilies: []*databasev1.TagFamilySpec{
                        {
-                               Name: "default",
+                               Name: defaultName,
                                Tags: []*databasev1.TagSpec{
                                        {Name: tagSvc, Type: 
databasev1.TagType_TAG_TYPE_STRING},
                                        {Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING},
@@ -590,7 +600,7 @@ func 
TestAnalyzeDistributed_MultiGroup_UnionsSchemaAcrossGroups(t *testing.T) {
                Metadata: &commonv1.Metadata{Name: "demo", Group: "groupB"},
                TagFamilies: []*databasev1.TagFamilySpec{
                        {
-                               Name: "default",
+                               Name: defaultName,
                                Tags: []*databasev1.TagSpec{
                                        {Name: tagSvc, Type: 
databasev1.TagType_TAG_TYPE_STRING},
                                        {Name: "region", Type: 
databasev1.TagType_TAG_TYPE_STRING},
@@ -693,8 +703,8 @@ func TestHiddenFieldsMIterator_StripsHiddenTopField(t 
*testing.T) {
 func TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t 
*testing.T) {
        cases := []struct {
                name      string
-               topN      int32
                groups    []string
+               topN      int32
                wantLimit uint32
        }{
                {
@@ -712,7 +722,7 @@ func 
TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t *testing.T)
                {
                        name:      "SingleGroup_Top5_WithGroupBy",
                        topN:      5,
-                       groups:    []string{"default"},
+                       groups:    []string{defaultName},
                        wantLimit: 
calibratedTopWithoutAggLimit(&measurev1.QueryRequest_Top{Number: 5, FieldName: 
fieldValue}, 1),
                },
        }
@@ -774,13 +784,13 @@ func 
TestAnalyzeDistributed_TopNonAggUnboundsNodeLimit_MultiGroup(t *testing.T)
 // TestAnalyzeDistributed_RawGroupBy_NodeTemplateKeepsGroupBy verifies that
 // Phase 5 propagates GroupBy to data nodes for raw GroupBy requests (GroupBy
 // without Agg). Data nodes must receive the GroupBy so they run their per-node
-// BatchGroupByFirst pass and emit at most one row per group, minimising wire
-// bytes. The old behaviour (clearing nodeTemplate.GroupBy when Agg is nil) is
+// BatchGroupByFirst pass and emit at most one row per group, minimizing wire
+// bytes. The old behavior (clearing nodeTemplate.GroupBy when Agg is nil) is
 // intentionally dropped in Phase 5.
 func TestAnalyzeDistributed_RawGroupBy_NodeTemplateKeepsGroupBy(t *testing.T) {
        req := &measurev1.QueryRequest{
                Name:            "demo",
-               Groups:          []string{"default"},
+               Groups:          []string{defaultName},
                TagProjection:   projTagProj(),
                FieldProjection: &measurev1.QueryRequest_FieldProjection{Names: 
[]string{fieldValue}},
                GroupBy: &measurev1.QueryRequest_GroupBy{
diff --git a/pkg/query/vectorized/measure/plan/hidden_tags.go 
b/pkg/query/vectorized/measure/plan/hidden_tags.go
index 47862f9c6..dc70a70a3 100644
--- a/pkg/query/vectorized/measure/plan/hidden_tags.go
+++ b/pkg/query/vectorized/measure/plan/hidden_tags.go
@@ -99,7 +99,7 @@ func (h *hiddenTagsMIterator) Close() error { return 
h.inner.Close() }
 // EmitFrame implements vmeasure.FrameEmitter. Draining via the wrapper's
 // own Next / Current keeps the hidden-tag strip as the single source of
 // truth — Current already removes the hidden criteria tags from each
-// emitted DataPoint, so the reverse-serialised RecordBatch carries only
+// emitted DataPoint, so the reverse-serialized RecordBatch carries only
 // the projected columns. The columnar wire then matches what a query
 // without hidden criteria tags would have emitted, byte-identical to
 // the row path.
@@ -119,7 +119,7 @@ func (h *hiddenTagsMIterator) EmitFrame(_ context.Context) 
([]byte, error) {
 // hiddenFieldsMIterator wraps an MIterator and strips a single hidden field
 // from each Current() result. It is the Phase 4 parallel of 
hiddenTagsMIterator:
 // when Top.FieldName is not in the user-visible FieldProjection, the analyzer
-// appends it to the nodeTemplate so data nodes materialise it for BatchTop
+// appends it to the nodeTemplate so data nodes materialize it for BatchTop
 // sorting. This wrapper removes it at egress so the wire bytes match a query
 // without the extra field projection — byte-identical to the row path's
 // hidden-projection strip for criteria tags.
diff --git a/pkg/query/vectorized/measure/plan/multi_group_schema.go 
b/pkg/query/vectorized/measure/plan/multi_group_schema.go
index bb0c30624..1b665b24a 100644
--- a/pkg/query/vectorized/measure/plan/multi_group_schema.go
+++ b/pkg/query/vectorized/measure/plan/multi_group_schema.go
@@ -173,7 +173,7 @@ func BuildMultiGroupBatchSchema(measureSchemas 
[]*databasev1.Measure, req *measu
                                orderedFields = append(orderedFields, name)
                        } else {
                                // Field not in any group's schema — include as 
passthrough
-                               // (BuildBatchSchema's null-slot behaviour for 
unknown fields).
+                               // (BuildBatchSchema's null-slot behavior for 
unknown fields).
                                orderedFields = append(orderedFields, name)
                                fieldColType[name] = 
vectorized.ColumnTypeFieldValue
                        }
@@ -204,7 +204,7 @@ func nativeKeyMulti(family, name string) string { return 
family + "\x00" + name
 // for types that have no direct vec mapping (UNSPECIFIED, TIMESTAMP) so the
 // caller can fall back to ColumnTypeTagValue.
 func tagTypeToColumnTypeMG(t databasev1.TagType) (vectorized.ColumnType, 
error) {
-       switch t {
+       switch t { //nolint:exhaustive // TAG_TYPE_TIMESTAMP and UNSPECIFIED 
fall through to the error return below
        case databasev1.TagType_TAG_TYPE_INT:
                return vectorized.ColumnTypeInt64, nil
        case databasev1.TagType_TAG_TYPE_STRING:
diff --git a/pkg/query/vectorized/measure/plan/orderby.go 
b/pkg/query/vectorized/measure/plan/orderby.go
index 01b94a26e..4fd1e1409 100644
--- a/pkg/query/vectorized/measure/plan/orderby.go
+++ b/pkg/query/vectorized/measure/plan/orderby.go
@@ -78,4 +78,3 @@ func resolveOrderByTag(measureSchema *databasev1.Measure, 
indexRules []*database
        }
        return resolvedOrderByTag{}, fmt.Errorf("tag %s not found", tagName)
 }
-
diff --git a/pkg/query/vectorized/measure/raw_emit.go 
b/pkg/query/vectorized/measure/raw_emit.go
index c6a4cdc5c..9f6fedbee 100644
--- a/pkg/query/vectorized/measure/raw_emit.go
+++ b/pkg/query/vectorized/measure/raw_emit.go
@@ -37,18 +37,18 @@ import (
 //
 //   - VectorizedMIterator: drains the underlying vec Pipeline directly
 //     via DrainPipelineToFrame — the throughput-optimal path that
-//     never materialises proto datapoints.
+//     never materializes proto datapoints.
 //   - emptyMIterator: returns a nil body (matches the codec layer's
 //     RawFrameCodec carve-out for empty distributed results).
 //   - hiddenTagsMIterator: drains via Next / Current (which already
 //     strips hidden criteria tags from the egress datapoints), then
-//     reverse-serialises the surviving rows into a passthrough
+//     reverse-serializes the surviving rows into a passthrough
 //     RecordBatch through SerializeDataPointsToFrame.
 //   - sortedMIterator: drains via Next / Current (which already
 //     applies cross-group merge + version dedup), then reverse-
-//     serialises through SerializeDataPointsToFrame.
+//     serializes through SerializeDataPointsToFrame.
 //
-// The reverse-serialise path is less efficient than draining a vec
+// The reverse-serialize path is less efficient than draining a vec
 // Pipeline (one allocation per cell during passthrough rebuild) but
 // keeps the wrapper's row-side semantics — hidden-tag strip, sort,
 // dedup — as the single source of truth on the wire.
@@ -120,7 +120,7 @@ func DrainPipelineToFrame(ctx context.Context, p 
*vectorized.Pipeline, schema *v
        // typed columns; the typed-cell → TagValue/FieldValue reconstruction
        // happens at the row-egress (one allocation per surviving row, vs the
        // scan-time decode the storage avoided via passthrough — but the
-       // trade is favourable when the wire crossing in between would
+       // trade is favorable when the wire crossing in between would
        // otherwise dominate).
        converted, convErr := convertPassthroughForFrame(out)
        if convErr != nil {
@@ -134,15 +134,15 @@ func DrainPipelineToFrame(ctx context.Context, p 
*vectorized.Pipeline, schema *v
 // strip / cross-group merge logic operates on []*InternalDataPoint
 // rather than on a vec Pipeline. The wrapper drains itself via the
 // row-side Next / Current API (so its existing strip / merge / dedup
-// logic still runs); the resulting rows are reverse-serialised into a
+// logic still runs); the resulting rows are reverse-serialized into a
 // passthrough RecordBatch, convertPassthroughForFrame decodes the
 // passthrough columns to typed wire columns, and frame.Encode produces
 // the body.
 //
 // This path is less efficient than the vec native pipeline drain — it
 // allocates a *modelv1.TagValue / *FieldValue per cell during reverse-
-// serialise — but it keeps the wrapper's egress semantics intact end-
-// to-end on the wire (hidden tags stripped, cross-group order honoured,
+// serialize — but it keeps the wrapper's egress semantics intact end-
+// to-end on the wire (hidden tags stripped, cross-group order honored,
 // version dedup applied) without re-implementing each one in columnar
 // form.
 //
@@ -285,7 +285,7 @@ func buildPassthroughBatchFromDataPoints(idps 
[]*measurev1.InternalDataPoint) (*
 // so this is a no-op.
 //
 // Wire-type selection is variant-driven: the first non-null cell decides
-// what typed column to materialise (a Str variant ⇒ string column, Int ⇒
+// what typed column to materialize (a Str variant ⇒ string column, Int ⇒
 // int64, BinaryData ⇒ bytes). All-null columns default to bytes — the
 // receiver's reconstruction uses the validity bitmap, not the data, so
 // the chosen wire type is irrelevant for purely-null columns and bytes
@@ -295,7 +295,7 @@ func buildPassthroughBatchFromDataPoints(idps 
[]*measurev1.InternalDataPoint) (*
 // frame.Encode (the format lacks array column types); the helper returns
 // a typed error so the caller can surface it as a hard failure rather
 // than silently mis-encoding. Adding native array column types to the
-// frame format is the natural follow-up; until then, scans that materialise
+// frame format is the natural follow-up; until then, scans that materialize
 // array-typed tags on the cluster wire are unsupported.
 func convertPassthroughForFrame(b *vectorized.RecordBatch) 
(*vectorized.RecordBatch, error) {
        if !hasPassthroughColumn(b.Schema) {
@@ -356,10 +356,7 @@ func convertTagValueColumn(def vectorized.ColumnDef, col 
vectorized.Column, n in
        if !ok {
                return vectorized.ColumnDef{}, nil, fmt.Errorf("declared 
TagValue passthrough but column is %T", col)
        }
-       wireType, err := inferTagValueWireType(tc, n)
-       if err != nil {
-               return vectorized.ColumnDef{}, nil, err
-       }
+       wireType := inferTagValueWireType(tc, n)
        if wireType == vectorized.ColumnTypeTagValue {
                return def, col, nil
        }
@@ -419,7 +416,7 @@ func convertTagValueColumn(def vectorized.ColumnDef, col 
vectorized.Column, n in
 // (frame v3 ColumnTypeTagValue) which preserves the oneof intact. The
 // receive side reconstructs them as TagValue passthrough so
 // serializeBatchToProto handles them via the pointer-return fast path.
-func inferTagValueWireType(tc *vectorized.TypedColumn[*modelv1.TagValue], n 
int) (vectorized.ColumnType, error) {
+func inferTagValueWireType(tc *vectorized.TypedColumn[*modelv1.TagValue], n 
int) vectorized.ColumnType {
        for i := range n {
                if tc.IsNull(i) {
                        continue
@@ -430,18 +427,18 @@ func inferTagValueWireType(tc 
*vectorized.TypedColumn[*modelv1.TagValue], n int)
                }
                switch v.GetValue().(type) {
                case *modelv1.TagValue_Str:
-                       return vectorized.ColumnTypeString, nil
+                       return vectorized.ColumnTypeString
                case *modelv1.TagValue_Int:
-                       return vectorized.ColumnTypeInt64, nil
+                       return vectorized.ColumnTypeInt64
                case *modelv1.TagValue_BinaryData:
-                       return vectorized.ColumnTypeBytes, nil
+                       return vectorized.ColumnTypeBytes
                case *modelv1.TagValue_IntArray, *modelv1.TagValue_StrArray:
-                       return vectorized.ColumnTypeTagValue, nil
+                       return vectorized.ColumnTypeTagValue
                case *modelv1.TagValue_Null:
                        continue
                }
        }
-       return vectorized.ColumnTypeTagValue, nil
+       return vectorized.ColumnTypeTagValue
 }
 
 // convertFieldValueColumn is the FieldValue counterpart of
@@ -567,7 +564,7 @@ func appendActive(dst, src *vectorized.RecordBatch) {
 // scope for G9f.5), this round trip drops out.
 //
 // Empty input (no non-empty frames) returns an empty slice with no
-// error — matches the row path's behaviour when every data node returned
+// error — matches the row path's behavior when every data node returned
 // an empty distributed result.
 func ReduceFramesToInternalDataPoints(
        frames [][]byte,
@@ -639,7 +636,7 @@ func DecodeFramesToInternalDataPoints(frames [][]byte) 
([]*measurev1.InternalDat
 // end up in different sortField groups across the flat sequence.
 //
 // nil / empty frame bodies produce no slice in the output — the codec
-// carve-out for empty bodies is honoured here too.
+// carve-out for empty bodies is honored here too.
 func DecodeFramesPerSource(frames [][]byte) ([][]*measurev1.InternalDataPoint, 
error) {
        out := make([][]*measurev1.InternalDataPoint, 0, len(frames))
        for i, body := range frames {
diff --git a/pkg/query/vectorized/measure/raw_emit_bench_test.go 
b/pkg/query/vectorized/measure/raw_emit_bench_test.go
index 2bc4bcfc9..c8084e2c9 100644
--- a/pkg/query/vectorized/measure/raw_emit_bench_test.go
+++ b/pkg/query/vectorized/measure/raw_emit_bench_test.go
@@ -30,12 +30,12 @@ import (
 // distributed soak/bench. It simulates a fan-out of N data nodes each
 // running AggModeMap → frame.Encode, the cluster wire (just the byte
 // slice handoff), and the liaison's frame.Decode → AggModeReduce +
-// (shard, group) dedup → serialise to InternalDataPoint.
+// (shard, group) dedup → serialize to InternalDataPoint.
 //
 // Compared against BenchmarkG9f_SingleNodeAggModeAll on the same row
 // set, this measures the *full path overhead* of the throughout-vec
-// distributed loop: per-shard Map fold + columnar serialise + bytes +
-// columnar deserialise + Reduce + final proto materialise. The ratio
+// distributed loop: per-shard Map fold + columnar serialize + bytes +
+// columnar deserialise + Reduce + final proto materialize. The ratio
 // of the two numbers is the load-bearing efficiency claim of G9f.
 //
 // Real-cluster soak (gRPC + retry + network latency) is a follow-up
@@ -73,7 +73,7 @@ func BenchmarkG9f_DistributedFanout(b *testing.B) {
 // BenchmarkG9f_SingleNodeAggModeAll is the AggModeAll oracle: full
 // dataset processed in one BatchAggregation pass with no wire crossing.
 // The number this produces is the lower bound — any distributed path
-// adding wire / serialise / decode steps must stay close to it.
+// adding wire / serialize / decode steps must stay close to it.
 func BenchmarkG9f_SingleNodeAggModeAll(b *testing.B) {
        rows := generateBenchRows(1000)
        schema := topologyRawSchema()
@@ -108,7 +108,7 @@ func BenchmarkG9f_SingleNodeAggModeAll(b *testing.B) {
 
 // BenchmarkG9f_DataNodeEmit measures the per-data-node Map + Encode
 // step alone — the upstream cost of the distributed path. Useful when
-// regressions appear in BenchmarkG9f_DistributedFanout to localise
+// regressions appear in BenchmarkG9f_DistributedFanout to localize
 // whether they're on the encode side or the decode/reduce side.
 func BenchmarkG9f_DataNodeEmit(b *testing.B) {
        rows := generateBenchRows(1000)
diff --git a/pkg/query/vectorized/measure/raw_emit_test.go 
b/pkg/query/vectorized/measure/raw_emit_test.go
index 27fb30e92..c95886e0c 100644
--- a/pkg/query/vectorized/measure/raw_emit_test.go
+++ b/pkg/query/vectorized/measure/raw_emit_test.go
@@ -108,7 +108,7 @@ func TestDrainPipelineToFrame_AggModeMap(t *testing.T) {
 // counterpart of TestDrainPipelineToFrame_AggModeMap: it pipes a vec
 // pipeline's frame body through the liaison's vmeasure receive path
 // (frame.Decode → AggModeReduce + (shard,group) dedup → optional Top →
-// serialise to InternalDataPoint) and asserts the final values match an
+// serialize to InternalDataPoint) and asserts the final values match an
 // AggModeAll oracle.
 //
 // Replica duplication (the frame is appended twice for shard=1) verifies
@@ -334,9 +334,9 @@ func TestConvertPassthroughForFrame_NoOpForNativeColumns(t 
*testing.T) {
 // distributed integration scenario where MEAN over an int field returned
 // 0 instead of the computed mean. The wire flow:
 //
-//   data-node: BatchAggregation(AggModeMap, MEAN) → frame.Encode →
-//   liaison: frame.Decode → BatchAggregation(AggModeReduce, MEAN) →
-//   serializeBatchToProto → InternalDataPoint
+//     data-node: BatchAggregation(AggModeMap, MEAN) → frame.Encode →
+//     liaison: frame.Decode → BatchAggregation(AggModeReduce, MEAN) →
+//     serializeBatchToProto → InternalDataPoint
 //
 // Sample: group "a" with values {1, 2, 3} → mean 2; group "b" with {4,
 // 5, 6} → mean 5. After the round trip the final InternalDataPoint must
diff --git a/pkg/query/vectorized/measure/topology_matrix_test.go 
b/pkg/query/vectorized/measure/topology_matrix_test.go
index 43c5e5345..0a86170fe 100644
--- a/pkg/query/vectorized/measure/topology_matrix_test.go
+++ b/pkg/query/vectorized/measure/topology_matrix_test.go
@@ -34,7 +34,7 @@ import (
 // harness specified in the G9f spec (Principle 4): the row path / vec
 // AggModeAll defines the truth, and the distributed vec pipeline must
 // agree byte-for-byte (well, value-for-value) under any topology a
-// production cluster can realise.
+// production cluster can realize.
 //
 // Per cell:
 //   - Split the source rows across S shards (rows i → shard i%S).
@@ -49,7 +49,7 @@ import (
 // times. With dedup, the answer must be identical regardless of R.
 //
 // Aggs covered: SUM, COUNT, MIN, MAX, MEAN. Each runs as its own subtest;
-// MEAN exercises the count sidecar + Reduce.Val finalisation.
+// MEAN exercises the count sidecar + Reduce.Val finalization.
 func TestTopologyMatrix(t *testing.T) {
        rows := []topologyRow{
                {group: "a", value: 10},
@@ -246,7 +246,7 @@ func flattenReduce(batches []*vectorized.RecordBatch) 
map[string]string {
        out := make(map[string]string)
        for _, b := range batches {
                // First RoleTag column is "g"; first RoleField column is "out".
-               var gIdx, outIdx = -1, -1
+               gIdx, outIdx := -1, -1
                for i, def := range b.Schema.Columns {
                        if def.Role == vectorized.RoleTag && gIdx < 0 {
                                gIdx = i
@@ -309,12 +309,18 @@ func numericallyEqual(a, b string) bool {
 // ApplyTopToReduce's bind-by-name + heap consistency.
 func TestTopologyMatrix_WithTop(t *testing.T) {
        rows := []topologyRow{
-               {group: "a", value: 10}, {group: "b", value: 20},
-               {group: "c", value: 30}, {group: "d", value: 40},
-               {group: "e", value: 50}, {group: "f", value: 60},
-               {group: "a", value: 100}, {group: "b", value: 100},
-               {group: "c", value: 100}, {group: "d", value: 100},
-               {group: "e", value: 100}, {group: "f", value: 100},
+               {group: "a", value: 10},
+               {group: "b", value: 20},
+               {group: "c", value: 30},
+               {group: "d", value: 40},
+               {group: "e", value: 50},
+               {group: "f", value: 60},
+               {group: "a", value: 100},
+               {group: "b", value: 100},
+               {group: "c", value: 100},
+               {group: "d", value: 100},
+               {group: "e", value: 100},
+               {group: "f", value: 100},
        }
        cases := []struct {
                name string
diff --git a/test/integration/distributed/querybench/config.go 
b/test/integration/distributed/querybench/config.go
index dfd58d011..edf8af924 100644
--- a/test/integration/distributed/querybench/config.go
+++ b/test/integration/distributed/querybench/config.go
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package querybench is the docker-gated distributed-query benchmark harness
+// that drives row vs vec mode comparisons across scenarios and cardinalities.
 package querybench
 
 import (
@@ -97,10 +99,10 @@ type Config struct {
 // LoadConfig reads benchmark settings from environment variables.
 func LoadConfig() Config {
        return Config{
-               RunBench:         getBool(envRunBench, false),
-               InContainer:      getBool(envInContainer, false),
-               Profile:          getBool(envProfile, false),
-               Merge:            getBool(envMerge, false),
+               RunBench:         getBool(envRunBench),
+               InContainer:      getBool(envInContainer),
+               Profile:          getBool(envProfile),
+               Merge:            getBool(envMerge),
                ReportDir:        getString(envReportDir, defaultReportDir),
                DockerImage:      getString(envDockerImage, ""),
                CPULimit:         getString(envCPULimit, ""),
@@ -184,16 +186,16 @@ func getString(key, def string) string {
        return def
 }
 
-func getBool(key string, def bool) bool {
-       if value := os.Getenv(key); value != "" {
-               switch strings.ToLower(strings.TrimSpace(value)) {
-               case "1", "true", "yes", "on":
-                       return true
-               case "0", "false", "no", "off":
-                       return false
-               }
+func getBool(key string) bool {
+       value := os.Getenv(key)
+       if value == "" {
+               return false
        }
-       return def
+       switch strings.ToLower(strings.TrimSpace(value)) {
+       case "1", "true", "yes", "on":
+               return true
+       }
+       return false
 }
 
 func getInt(key string, def int) int {
diff --git a/test/integration/distributed/querybench/report.go 
b/test/integration/distributed/querybench/report.go
index 9b57d89ec..db54dc246 100644
--- a/test/integration/distributed/querybench/report.go
+++ b/test/integration/distributed/querybench/report.go
@@ -33,8 +33,8 @@ import (
 type Report struct {
        GeneratedAt time.Time   `json:"generated_at"`
        Environment Environment `json:"environment"`
-       Config      ConfigView  `json:"config"`
        Results     []Result    `json:"results"`
+       Config      ConfigView  `json:"config"`
 }
 
 // ConfigView is a JSON-friendly copy of Config.
@@ -54,68 +54,63 @@ type Environment struct {
        GoVersion    string `json:"go_version"`
        GOOS         string `json:"goos"`
        GOARCH       string `json:"goarch"`
-       NumCPU       int    `json:"num_cpu"`
        DockerImage  string `json:"docker_image,omitempty"`
        CPULimit     string `json:"cpu_limit,omitempty"`
        MemoryLimit  string `json:"memory_limit,omitempty"`
        Cgroup       string `json:"cgroup,omitempty"`
        ContainerID  string `json:"container_id,omitempty"`
        ResourceNote string `json:"resource_note,omitempty"`
+       NumCPU       int    `json:"num_cpu"`
 }
 
 // Result records one mode/scenario/cardinality benchmark outcome.
 type Result struct {
-       Mode             string            `json:"mode"`
-       Scenario         Scenario          `json:"scenario"`
-       Cardinality      int               `json:"cardinality"`
-       Entities         int               `json:"entities"`
-       PointsEach       int               `json:"points_each"`
-       ResponseRows     int               `json:"response_rows"`
-       Correctness      string            `json:"correctness"`
-       QueryIterations  int               `json:"query_iterations"`
-       QueryWorkers     int               `json:"query_workers"`
-       Latency          LatencyStats      `json:"latency"`
-       QPS              float64           `json:"qps"`
-       Resources        ResourceStats     `json:"resources"`
-       Allocations      AllocationStats   `json:"allocations"`
-       Profiles         map[string]string `json:"profiles,omitempty"`
-       Error            string            `json:"error,omitempty"`
-       ApproxResultHash uint64            `json:"approx_result_hash,omitempty"`
-       // SampleDataPointText is the prototext of the first DataPoint of the
-       // first response. Captured so the merge pass can dump row vs vec
-       // shapes side by side when the correctness gate fires on a hash
-       // mismatch — the row counts can match while the proto byte layout
-       // diverges (TagFamily order, oneof variant choice, etc.).
-       SampleDataPointText string `json:"sample_data_point_text,omitempty"`
+       Profiles            map[string]string `json:"profiles,omitempty"`
+       Scenario            Scenario          `json:"scenario"`
+       SampleDataPointText string            
`json:"sample_data_point_text,omitempty"`
+       Error               string            `json:"error,omitempty"`
+       Mode                string            `json:"mode"`
+       Correctness         string            `json:"correctness"`
+       Allocations         AllocationStats   `json:"allocations"`
+       Resources           ResourceStats     `json:"resources"`
+       Latency             LatencyStats      `json:"latency"`
+       PointsEach          int               `json:"points_each"`
+       QPS                 float64           `json:"qps"`
+       QueryWorkers        int               `json:"query_workers"`
+       QueryIterations     int               `json:"query_iterations"`
+       ResponseRows        int               `json:"response_rows"`
+       Entities            int               `json:"entities"`
+       ApproxResultHash    uint64            
`json:"approx_result_hash,omitempty"`
+       Cardinality         int               `json:"cardinality"`
 }
 
 // LatencyStats contains latency percentiles in milliseconds.
 type LatencyStats struct {
-       P50Ms float64 `json:"p50_ms"`
-       P90Ms float64 `json:"p90_ms"`
-       P95Ms float64 `json:"p95_ms"`
-       P99Ms float64 `json:"p99_ms"`
-       MaxMs float64 `json:"max_ms"`
+       P50Ms  float64 `json:"p50_ms"`
+       P90Ms  float64 `json:"p90_ms"`
+       P95Ms  float64 `json:"p95_ms"`
+       P99Ms  float64 `json:"p99_ms"`
+       MaxMs  float64 `json:"max_ms"`
        MeanMs float64 `json:"mean_ms"`
 }
 
 // ResourceStats records process-level resource deltas for the in-process 
cluster harness.
 type ResourceStats struct {
+       MetricSource    string  `json:"metric_source"`
        CPUSecondsDelta float64 `json:"cpu_seconds_delta,omitempty"`
        RSSBytes        uint64  `json:"rss_bytes,omitempty"`
        HeapAllocBytes  uint64  `json:"heap_alloc_bytes,omitempty"`
        HeapSysBytes    uint64  `json:"heap_sys_bytes,omitempty"`
        NumGC           uint32  `json:"num_gc,omitempty"`
-       MetricSource    string  `json:"metric_source"`
 }
 
 // AllocationStats records allocation counters for the timed read phase.
 type AllocationStats struct {
-       MallocsDelta      uint64  `json:"mallocs_delta"`
-       TotalAllocDelta   uint64  `json:"total_alloc_delta"`
-       MallocsPerQuery   float64 `json:"mallocs_per_query"`
+       MetricSource       string  `json:"metric_source"`
+       MallocsDelta       uint64  `json:"mallocs_delta"`
+       TotalAllocDelta    uint64  `json:"total_alloc_delta"`
+       MallocsPerQuery    float64 `json:"mallocs_per_query"`
        AllocBytesPerQuery float64 `json:"alloc_bytes_per_query"`
-       MetricSource      string  `json:"metric_source"`
 }
 
 // newReportFromShards builds the unified Report from the shard set the
@@ -178,7 +173,7 @@ func newReportFromShards(cfg Config, results []Result) 
Report {
        }
 }
 
-// writeShard serialises a single single-shot Result to ReportDir/shards/.
+// writeShard serializes a single single-shot Result to ReportDir/shards/.
 // The filename encodes mode_scenario_cardinality so merge can pair vec
 // shards with their row counterparts without parsing the JSON.
 func writeShard(result Result, reportDir string) (string, error) {
@@ -192,7 +187,7 @@ func writeShard(result Result, reportDir string) (string, 
error) {
        if marshalErr != nil {
                return "", fmt.Errorf("marshal shard: %w", marshalErr)
        }
-       if writeErr := os.WriteFile(shardPath, append(body, '\n'), 0o644); 
writeErr != nil {
+       if writeErr := os.WriteFile(shardPath, append(body, '\n'), 0o600); 
writeErr != nil {
                return "", fmt.Errorf("write shard: %w", writeErr)
        }
        return shardPath, nil
@@ -235,11 +230,11 @@ func summarizeLatencies(latencies []time.Duration, 
elapsed time.Duration) (Laten
                total += latency
        }
        stats := LatencyStats{
-               P50Ms: percentile(sorted, 0.50),
-               P90Ms: percentile(sorted, 0.90),
-               P95Ms: percentile(sorted, 0.95),
-               P99Ms: percentile(sorted, 0.99),
-               MaxMs: float64(sorted[len(sorted)-1].Microseconds()) / 1000,
+               P50Ms:  percentile(sorted, 0.50),
+               P90Ms:  percentile(sorted, 0.90),
+               P95Ms:  percentile(sorted, 0.95),
+               P99Ms:  percentile(sorted, 0.99),
+               MaxMs:  float64(sorted[len(sorted)-1].Microseconds()) / 1000,
                MeanMs: float64(total.Microseconds()) / 1000 / 
float64(len(sorted)),
        }
        qps := 0.0
@@ -272,11 +267,11 @@ func writeReport(report Report, dir string) (string, 
string, error) {
        if marshalErr != nil {
                return "", "", marshalErr
        }
-       if writeErr := os.WriteFile(jsonPath, append(jsonBody, '\n'), 0o644); 
writeErr != nil {
+       if writeErr := os.WriteFile(jsonPath, append(jsonBody, '\n'), 0o600); 
writeErr != nil {
                return "", "", writeErr
        }
        mdPath := filepath.Join(dir, "distributed-querybench.md")
-       if writeErr := os.WriteFile(mdPath, []byte(renderMarkdown(report)), 
0o644); writeErr != nil {
+       if writeErr := os.WriteFile(mdPath, []byte(renderMarkdown(report)), 
0o600); writeErr != nil {
                return "", "", writeErr
        }
        return jsonPath, mdPath, nil
diff --git a/test/integration/distributed/querybench/report_test.go 
b/test/integration/distributed/querybench/report_test.go
index 6b896f14c..48aa6271a 100644
--- a/test/integration/distributed/querybench/report_test.go
+++ b/test/integration/distributed/querybench/report_test.go
@@ -39,8 +39,16 @@ func TestWriteReportFromShards(t *testing.T) {
        dir := t.TempDir()
        cfg := Config{ReportDir: dir, QueryWorkers: 1, QueryIterations: 5, 
WarmupIterations: 0, Writers: 1}
        results := []Result{
-               {Mode: modeRow, Scenario: ScenarioScanAll, Cardinality: 1024, 
ResponseRows: 1024, Latency: LatencyStats{P50Ms: 1}, Allocations: 
AllocationStats{MallocsPerQuery: 2}, QueryIterations: 5, QueryWorkers: 1},
-               {Mode: modeVec, Scenario: ScenarioScanAll, Cardinality: 1024, 
ResponseRows: 1024, Latency: LatencyStats{P50Ms: 2}, Allocations: 
AllocationStats{MallocsPerQuery: 4}, QueryIterations: 5, QueryWorkers: 1, 
Correctness: "matched"},
+               {
+                       Mode: modeRow, Scenario: ScenarioScanAll, Cardinality: 
1024, ResponseRows: 1024,
+                       Latency: LatencyStats{P50Ms: 1}, Allocations: 
AllocationStats{MallocsPerQuery: 2},
+                       QueryIterations: 5, QueryWorkers: 1,
+               },
+               {
+                       Mode: modeVec, Scenario: ScenarioScanAll, Cardinality: 
1024, ResponseRows: 1024,
+                       Latency: LatencyStats{P50Ms: 2}, Allocations: 
AllocationStats{MallocsPerQuery: 4},
+                       QueryIterations: 5, QueryWorkers: 1, Correctness: 
"matched",
+               },
        }
        report := newReportFromShards(cfg, results)
        if len(report.Config.Cardinalities) != 1 || 
report.Config.Cardinalities[0] != 1024 {
diff --git a/test/integration/distributed/querybench/workload.go 
b/test/integration/distributed/querybench/workload.go
index a01e73523..18a95223c 100644
--- a/test/integration/distributed/querybench/workload.go
+++ b/test/integration/distributed/querybench/workload.go
@@ -57,11 +57,11 @@ type writeSummary struct {
 }
 
 type queryRunSummary struct {
-       Latencies     []time.Duration
-       SampleDPText  string
-       Rows          int
-       Hash          uint64
-       Elapsed       time.Duration
+       SampleDPText string
+       Latencies    []time.Duration
+       Rows         int
+       Hash         uint64
+       Elapsed      time.Duration
 }
 
 func writeBenchmarkData(ctx context.Context, conn *grpc.ClientConn, cfg 
Config, cardinality int, base time.Time) (writeSummary, error) {
@@ -146,10 +146,7 @@ func writeEntityRange(ctx context.Context, client 
measurev1.MeasureServiceClient
        if closeErr := stream.CloseSend(); closeErr != nil {
                return fmt.Errorf("close measure write stream: %w", closeErr)
        }
-       if recvErr := <-recvErrCh; recvErr != nil {
-               return recvErr
-       }
-       return nil
+       return <-recvErrCh
 }
 
 func benchmarkDataPointSpec() *measurev1.DataPointSpec {
diff --git a/test/integration/distributed/querybench/workload_test.go 
b/test/integration/distributed/querybench/workload_test.go
index 47763576d..f3827e8e6 100644
--- a/test/integration/distributed/querybench/workload_test.go
+++ b/test/integration/distributed/querybench/workload_test.go
@@ -49,7 +49,11 @@ func 
TestBuildScenarioQueryTopWithFilterMatchesFixtureShape(t *testing.T) {
        if condition.GetName() != benchTagID || condition.GetOp() != 
modelv1.Condition_BINARY_OP_NE || condition.GetValue().GetStr().GetValue() != 
"svc3" {
                t.Fatalf("top-with-filter condition does not match fixture: 
%+v", condition)
        }
-       if req.GetAgg().GetFunction() != 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN || 
req.GetTop().GetNumber() != 2 || req.GetTop().GetFieldValueSort() != 
modelv1.Sort_SORT_DESC {
+       agg := req.GetAgg()
+       top := req.GetTop()
+       if agg.GetFunction() != 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_MEAN ||
+               top.GetNumber() != 2 ||
+               top.GetFieldValueSort() != modelv1.Sort_SORT_DESC {
                t.Fatalf("top-with-filter agg/top does not match fixture: %+v", 
req)
        }
 }

Reply via email to