This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch vectorized-query in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2078ed35ba5ac4ede41ee9f4414759a4dc32bc53 Author: Hongtao Gao <[email protected]> AuthorDate: Mon May 25 01:08:54 2026 +0000 chore(lint): sweep lint findings on the vec tracing changeset Follow-up to ad4638b7 to clear make lint: - gci: fix import grouping in api/data/codec_test.go and banyand/query/tracing_test.go (apache imports were split across two blocks; third-party google.golang.org/protobuf belongs in the second block, apache prefix in the third). - gci: drop the spurious blank line between two helpers in pkg/query/vectorized/measure/plan/distributed.go. - govet/fieldalignment: reorder BatchAggregation, BatchLimit, and rawFrameFuture struct fields by descending size to minimise padding. - govet/exhaustive: list AggModeAll explicitly in the two switches over a.mode so the default-case fallthrough is no longer implicit. - gocritic/stringXbytes: use bytes.Equal instead of string conversion in tracing_collect_test.go and tracing_rolling_upgrade_test.go. - gocritic/appendAssign: build the merged future slice with explicit allocation in tracing_rolling_upgrade_test.go. - gocritic/commentedOutCode: rephrase three comments whose ASCII math triggered the "commented-out code" heuristic. - misspell: normalise British → American (behaviour, summarise, prioritised, summarised) in tracing_summary.go and the matching test. - unparam: drop the minFactor argument from assertSpanIsBottleneck; the threshold is now a package-level const bottleneckMinFactor. - unconvert: drop the redundant int64() wrap around rows*8. No behaviour change. make lint and the tracing-related package tests remain green. via [HAPI](https://hapi.run) Co-Authored-By: HAPI <[email protected]> --- api/data/codec_test.go | 2 +- banyand/query/tracing_test.go | 2 +- pkg/query/vectorized/measure/aggregation.go | 16 ++++++---- pkg/query/vectorized/measure/limit.go | 4 +-- pkg/query/vectorized/measure/plan/distributed.go | 1 - .../measure/plan/tracing_bottleneck_test.go | 29 ++++++++++-------- .../measure/plan/tracing_collect_test.go | 5 ++-- .../measure/plan/tracing_rolling_upgrade_test.go | 7 +++-- .../vectorized/measure/plan/tracing_summary.go | 35 +++++++++++----------- .../measure/plan/tracing_summary_test.go | 12 ++++---- 10 files changed, 61 insertions(+), 52 deletions(-) diff --git a/api/data/codec_test.go b/api/data/codec_test.go index 5e21c43e9..da49f99fa 100644 --- a/api/data/codec_test.go +++ b/api/data/codec_test.go @@ -21,9 +21,9 @@ import ( "bytes" "testing" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "google.golang.org/protobuf/proto" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" ) diff --git a/banyand/query/tracing_test.go b/banyand/query/tracing_test.go index ae99a0590..6e5fea6b9 100644 --- a/banyand/query/tracing_test.go +++ b/banyand/query/tracing_test.go @@ -31,8 +31,8 @@ import ( "errors" "testing" - commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" query "github.com/apache/skywalking-banyandb/pkg/query" ) diff --git a/pkg/query/vectorized/measure/aggregation.go b/pkg/query/vectorized/measure/aggregation.go index 81488f039..b13fddada 100644 --- a/pkg/query/vectorized/measure/aggregation.go +++ b/pkg/query/vectorized/measure/aggregation.go @@ -126,18 +126,18 @@ type BatchAggregation struct { span *query.Span groups map[string]*aggGroup inputSchema *vectorized.BatchSchema - insertion []*aggGroup + aggValuePath AggValuePath tagIndices []int aggs []AggSpec aggOutOffsets []int aggHasCount []bool keyIndices []int aggInputCountIdx []int - shardIDIdx int - tagOutOffset int + insertion []*aggGroup outputShardIdx int + tagOutOffset int mode AggMode - aggValuePath AggValuePath + shardIDIdx int entrySize int64 reserved int64 rowsIn int64 @@ -275,8 +275,10 @@ func (a *BatchAggregation) Init(ctx context.Context) error { } tracer := query.GetTracer(ctx) if tracer != nil { - spanName := "groupby-agg-all" + var spanName string switch a.mode { + case AggModeAll: + spanName = "groupby-agg-all" case AggModeMap: spanName = "groupby-agg-map" case AggModeReduce: @@ -401,8 +403,10 @@ func (a *BatchAggregation) Close() error { a.reserved = 0 } if a.span != nil { - mode := "all" + var mode string switch a.mode { + case AggModeAll: + mode = "all" case AggModeMap: mode = "map" case AggModeReduce: diff --git a/pkg/query/vectorized/measure/limit.go b/pkg/query/vectorized/measure/limit.go index 779bc438a..2c0a37bdd 100644 --- a/pkg/query/vectorized/measure/limit.go +++ b/pkg/query/vectorized/measure/limit.go @@ -35,11 +35,11 @@ import ( type BatchLimit struct { schema *vectorized.BatchSchema span *query.Span + rowsIn int64 + rowsOut int64 offset uint32 limit uint32 seen uint32 - rowsIn int64 - rowsOut int64 closed bool } diff --git a/pkg/query/vectorized/measure/plan/distributed.go b/pkg/query/vectorized/measure/plan/distributed.go index 0eeeee9c0..75edfe33a 100644 --- a/pkg/query/vectorized/measure/plan/distributed.go +++ b/pkg/query/vectorized/measure/plan/distributed.go @@ -96,7 +96,6 @@ func nodeSelectorCount(nodeSelectors map[string][]string) int { return count } - func batchRows(batches []*vectorized.RecordBatch) int { rows := 0 for _, batch := range batches { diff --git a/pkg/query/vectorized/measure/plan/tracing_bottleneck_test.go b/pkg/query/vectorized/measure/plan/tracing_bottleneck_test.go index e942c222e..9c052f30a 100644 --- a/pkg/query/vectorized/measure/plan/tracing_bottleneck_test.go +++ b/pkg/query/vectorized/measure/plan/tracing_bottleneck_test.go @@ -81,11 +81,15 @@ func findSpanByMessage(root *commonv1.Span, msg string) *commonv1.Span { return nil } +// bottleneckMinFactor is the minimum skew factor (target.Duration / +// next-slowest.Duration) that assertSpanIsBottleneck enforces, per the plan +// spec. +const bottleneckMinFactor = 5.0 + // assertSpanIsBottleneck asserts that among all spans at the same level as -// target (i.e. the direct children of parent), target has the maximum Duration. -// It also asserts that target.Duration is at least minFactor times larger than -// the second-largest Duration (≥5× per spec). -func assertSpanIsBottleneck(t *testing.T, parent *commonv1.Span, targetMsg string, minFactor float64) { +// target (i.e. the direct children of parent), target has the maximum Duration +// and is at least bottleneckMinFactor× larger than the second-largest peer. +func assertSpanIsBottleneck(t *testing.T, parent *commonv1.Span, targetMsg string) { t.Helper() peers := parent.GetChildren() if len(peers) == 0 { @@ -108,7 +112,6 @@ func assertSpanIsBottleneck(t *testing.T, parent *commonv1.Span, targetMsg strin t.Fatalf("span %q duration %d is not the maximum (max=%d) among peers of %q", targetMsg, target.GetDuration(), maxDuration, parent.GetMessage()) } - // Find second-largest to verify the skew factor. secondMax := int64(0) for _, peer := range peers { if peer.GetMessage() == targetMsg { @@ -120,9 +123,9 @@ func assertSpanIsBottleneck(t *testing.T, parent *commonv1.Span, targetMsg strin } if secondMax > 0 { factor := float64(target.GetDuration()) / float64(secondMax) - if factor < minFactor { + if factor < bottleneckMinFactor { t.Fatalf("span %q duration %d is only %.2f× the next-slowest %d (want ≥%.1f×)", - targetMsg, target.GetDuration(), factor, secondMax, minFactor) + targetMsg, target.GetDuration(), factor, secondMax, bottleneckMinFactor) } } } @@ -161,7 +164,7 @@ func TestTracerBottleneckScan(t *testing.T) { limitSpan := bottleneckSpan("limit", 20_000_000) dataNode := bottleneckSpan("data-node1", 570_000_000, scanSpan, topSpan, limitSpan) - assertSpanIsBottleneck(t, dataNode, "scan", 5.0) + assertSpanIsBottleneck(t, dataNode, "scan") } // TestTracerBottleneckDecode validates that a decode/reduce-raw-frames span @@ -181,7 +184,7 @@ func TestTracerBottleneckDecode(t *testing.T) { topSpan := bottleneckSpan("apply-top-to-reduce", 40_000_000) root := bottleneckSpan("broadcast-rows", 900_000_000, reduceSpan, mergeSpan, topSpan) - assertSpanIsBottleneck(t, root, "reduce-raw-frames", 5.0) + assertSpanIsBottleneck(t, root, "reduce-raw-frames") } // TestTracerBottleneckReduce validates that a reduce-raw-frames span with many @@ -201,7 +204,7 @@ func TestTracerBottleneckReduce(t *testing.T) { schemaSpan := bottleneckSpan("build-multi-group-schema", 10_000_000) root := bottleneckSpan("broadcast-agg", 990_000_000, reduceSpan, topSpan, schemaSpan) - assertSpanIsBottleneck(t, root, "reduce-raw-frames", 5.0) + assertSpanIsBottleneck(t, root, "reduce-raw-frames") } // TestTracerBottleneckMerge validates that a merge-distributed-rows span @@ -222,7 +225,7 @@ func TestTracerBottleneckMerge(t *testing.T) { topSpan := bottleneckSpan("apply-top-to-reduce", 40_000_000) root := bottleneckSpan("broadcast-rows", 800_000_000, mergeSpan, reduceSpan, topSpan) - assertSpanIsBottleneck(t, root, "merge-distributed-rows", 5.0) + assertSpanIsBottleneck(t, root, "merge-distributed-rows") } // TestTracerBottleneckFrameEncode validates that a frame-encode span dominates @@ -242,7 +245,7 @@ func TestTracerBottleneckFrameEncode(t *testing.T) { topSpan := bottleneckSpan("top", 30_000_000) dataNode := bottleneckSpan("data-node1", 710_000_000, scanSpan, encodeSpan, topSpan) - assertSpanIsBottleneck(t, dataNode, "frame-encode", 5.0) + assertSpanIsBottleneck(t, dataNode, "frame-encode") } // TestTracerBottleneckBroadcastTail validates that in a 5-node fanout where one @@ -280,7 +283,7 @@ func TestTracerBottleneckBroadcastTail(t *testing.T) { ) // 1. Slow node has maximum duration among the node children. - assertSpanIsBottleneck(t, broadcastSpan, "data-node5", 5.0) + assertSpanIsBottleneck(t, broadcastSpan, "data-node5") // 2. data-node5.Duration > p50 of the other four nodes. otherDurations := []int64{ diff --git a/pkg/query/vectorized/measure/plan/tracing_collect_test.go b/pkg/query/vectorized/measure/plan/tracing_collect_test.go index 61f2dfa44..ee98ff3b9 100644 --- a/pkg/query/vectorized/measure/plan/tracing_collect_test.go +++ b/pkg/query/vectorized/measure/plan/tracing_collect_test.go @@ -16,6 +16,7 @@ package plan import ( + "bytes" "errors" "testing" @@ -25,8 +26,8 @@ import ( ) type rawFrameFuture struct { - message bus.Message err error + message bus.Message } func (f rawFrameFuture) Get() (bus.Message, error) { @@ -49,7 +50,7 @@ func TestCollectRawFrameResponsesTraceEnvelope(t *testing.T) { if collectErr != nil { t.Fatalf("collectRawFrameResponses returned error: %v", collectErr) } - if len(frames) != 1 || string(frames[0]) != string(rawBody) { + if len(frames) != 1 || !bytes.Equal(frames[0], rawBody) { t.Fatalf("frames = %v, want one raw body %v", frames, rawBody) } if len(traces) != 1 || traces[0].GetTraceId() != "trace-1" { diff --git a/pkg/query/vectorized/measure/plan/tracing_rolling_upgrade_test.go b/pkg/query/vectorized/measure/plan/tracing_rolling_upgrade_test.go index ca4e88aaf..bd0ae5829 100644 --- a/pkg/query/vectorized/measure/plan/tracing_rolling_upgrade_test.go +++ b/pkg/query/vectorized/measure/plan/tracing_rolling_upgrade_test.go @@ -22,6 +22,7 @@ package plan import ( + "bytes" "strings" "testing" @@ -77,7 +78,9 @@ func TestRawFrameTraceRequiresAllNodesUpgraded(t *testing.T) { rawFrameFuture{message: bus.NewMessage(5, common.NewError(preStory2TraceError))}, } - allFutures := append(upgradedFutures, laggardFutures...) + allFutures := make([]bus.Future, 0, len(upgradedFutures)+len(laggardFutures)) + allFutures = append(allFutures, upgradedFutures...) + allFutures = append(allFutures, laggardFutures...) frames, traces, collectErr := collectRawFrameResponses(allFutures) @@ -88,7 +91,7 @@ func TestRawFrameTraceRequiresAllNodesUpgraded(t *testing.T) { t.Fatalf("frames len = %d, want 3 (one per upgraded node)", len(frames)) } for idx, frame := range frames { - if string(frame) != string(validRawFrame) { + if !bytes.Equal(frame, validRawFrame) { t.Errorf("frames[%d] = %v, want %v", idx, frame, validRawFrame) } } diff --git a/pkg/query/vectorized/measure/plan/tracing_summary.go b/pkg/query/vectorized/measure/plan/tracing_summary.go index 4734bc91c..99545813e 100644 --- a/pkg/query/vectorized/measure/plan/tracing_summary.go +++ b/pkg/query/vectorized/measure/plan/tracing_summary.go @@ -82,12 +82,12 @@ func extractNodeTagInt64(trace *commonv1.Trace, key string) int64 { // span, so broadcastSpan is the current-span in that context). // // When len(nodes) ≤ maxIndividualSubTraces every sub-trace is attached -// individually via AddSubTrace — unchanged behaviour. +// individually via AddSubTrace — unchanged behavior. // // When len(nodes) > maxIndividualSubTraces: // - The first 19 individual slots are filled by priority order: // (1) error nodes, (2) zero-row nodes, (3) healthy nodes desc-by-latency. -// - The remaining (N−19) nodes are summarised into one "data-summary" child +// - The remaining (N−19) nodes are summarized into one "data-summary" child // span emitted via tracer.StartSpan on broadcastSpanCtx so the tracer // wires it as a child of broadcastSpan automatically. func applyFanoutCap(broadcastSpanCtx context.Context, broadcastSpan *query.Span, nodes []nodeInfo) { @@ -122,7 +122,7 @@ func applyFanoutCap(broadcastSpanCtx context.Context, broadcastSpan *query.Span, return healthyNodes[i].latencyNS > healthyNodes[j].latencyNS }) - // Build the prioritised ordering: error → zero-row → healthy(desc latency). + // Build the prioritized ordering: error → zero-row → healthy(desc latency). ordered := make([]nodeInfo, 0, len(nodes)) ordered = append(ordered, errorNodes...) ordered = append(ordered, zeroRowNodes...) @@ -135,17 +135,17 @@ func applyFanoutCap(broadcastSpanCtx context.Context, broadcastSpan *query.Span, } } - // Summarise the remaining nodes. - summarised := ordered[maxIndividualSubTraces:] - emitDataSummarySpan(broadcastSpanCtx, summarised) + // Summarize the remaining nodes. + summarized := ordered[maxIndividualSubTraces:] + emitDataSummarySpan(broadcastSpanCtx, summarized) } // emitDataSummarySpan opens a "data-summary" child span via tracer.StartSpan on // broadcastSpanCtx (where the broadcast span is the current span) so the tracer // automatically wires it as a direct child of the broadcast span. -// It tags the span with aggregate statistics over the summarised node slice. -func emitDataSummarySpan(broadcastSpanCtx context.Context, summarised []nodeInfo) { - if len(summarised) == 0 { +// It tags the span with aggregate statistics over the summarized node slice. +func emitDataSummarySpan(broadcastSpanCtx context.Context, summarized []nodeInfo) { + if len(summarized) == 0 { return } tracer := query.GetTracer(broadcastSpanCtx) @@ -155,8 +155,8 @@ func emitDataSummarySpan(broadcastSpanCtx context.Context, summarised []nodeInfo var errCount, zeroRowCount int64 var totalRows, totalBytes int64 - latencies := make([]int64, 0, len(summarised)) - for _, ni := range summarised { + latencies := make([]int64, 0, len(summarized)) + for _, ni := range summarized { if ni.hasError { errCount++ } @@ -171,7 +171,7 @@ func emitDataSummarySpan(broadcastSpanCtx context.Context, summarised []nodeInfo p50, p95, p99, nsMin, nsMax := percentilesInt64(latencies) summarySpan, _ := tracer.StartSpan(broadcastSpanCtx, "data-summary") - summarySpan.Tagf(tracelabels.TagAggregatedDataNodeSpans, "%d", len(summarised)) + summarySpan.Tagf(tracelabels.TagAggregatedDataNodeSpans, "%d", len(summarized)) summarySpan.Tagf(tracelabels.TagNodesWithErrors, "%d", errCount) summarySpan.Tagf(tracelabels.TagNodesWithZeroRows, "%d", zeroRowCount) summarySpan.Tagf(tracelabels.TagTotalRowsAcrossNodes, "%d", totalRows) @@ -199,7 +199,7 @@ func emitDataSummarySpan(broadcastSpanCtx context.Context, summarised []nodeInfo // spans (e.g. via streamed decode), the two tags should be re-anchored to // "individually emitted" vs "summarized only" semantics. // -// When frameCount ≤ maxIndividualSubTraces no span is emitted (no behaviour change). +// When frameCount ≤ maxIndividualSubTraces no span is emitted (no behavior change). func emitDecodeFrameSummarySpan(parentSpanCtx context.Context, frameDecodeDurations []time.Duration) { frameCount := len(frameDecodeDurations) if frameCount <= maxIndividualSubTraces { @@ -249,14 +249,13 @@ func percentilesInt64(vals []int64) (p50, p95, p99, nsMin, nsMax int64) { } // percentileIdx returns the 0-based index into a sorted slice of length n -// corresponding to the pth percentile (0–100). Uses nearest-rank rounding: -// idx = ceil(p/100 * n) − 1, clamped to [0, n−1]. +// corresponding to the pth percentile (0–100). Uses nearest-rank rounding, +// clamped to [0, n−1]. func percentileIdx(n, p int) int { if n <= 1 { return 0 } - idx := (p*n + 99) / 100 // ceil(p/100 * n) - idx-- // convert to 0-based + idx := (p*n+99)/100 - 1 if idx < 0 { return 0 } @@ -271,7 +270,7 @@ func percentileIdx(n, p int) int { // probe decode: the real decode happens inside ReduceRawFrames / // mergeDistributedRows; here we only measure the decode cost for tracing // purposes and discard the decoded batches immediately. -// Empty bodies are skipped (consistent with ReduceRawFrames behaviour). +// Empty bodies are skipped (consistent with ReduceRawFrames behavior). func collectFrameDecodeDurations(frames [][]byte) []time.Duration { durations := make([]time.Duration, 0, len(frames)) for _, body := range frames { diff --git a/pkg/query/vectorized/measure/plan/tracing_summary_test.go b/pkg/query/vectorized/measure/plan/tracing_summary_test.go index d633193df..8fbfaf9f1 100644 --- a/pkg/query/vectorized/measure/plan/tracing_summary_test.go +++ b/pkg/query/vectorized/measure/plan/tracing_summary_test.go @@ -142,7 +142,7 @@ func assertTagInt64Ordered(t *testing.T, span *commonv1.Span, keys ...string) { // - 5 zero-row nodes (nodeIDs 4–8, latency 80 ms each) // - 17 healthy nodes (nodeIDs 9–25, latencies 10–170 ms, 10 ms steps) // -// Expected behaviour: +// Expected behavior: // - Exactly 19 individual children on the broadcast span + 1 data-summary child. // - The 19 individual slots are filled by: 3 error + 5 zero-row + 11 highest-latency healthy nodes. // - data-summary.aggregated_data_node_spans = 6 (the remaining 6 lower-latency healthy nodes). @@ -167,7 +167,7 @@ func TestFanoutDataSummary_AtThreshold(t *testing.T) { for idx := range 17 { latencyMS := int64((17 - idx) * 10) // 170, 160, ..., 10 rows := int64((17 - idx) * 100) - nodes = append(nodes, makeNodeInfo(idx+9, latencyMS*int64(time.Millisecond), rows, int64(rows*8), false)) + nodes = append(nodes, makeNodeInfo(idx+9, latencyMS*int64(time.Millisecond), rows, rows*8, false)) } applyFanoutCap(broadcastSpanCtx, broadcastSpan, nodes) @@ -204,7 +204,7 @@ func TestFanoutDataSummary_AtThreshold(t *testing.T) { assertTagPresent(t, lastChild, tracelabels.TagNodeLatencyNSMin) assertTagPresent(t, lastChild, tracelabels.TagNodeLatencyNSMax) - // The 6 summarised nodes are all healthy (the 6 lowest-latency healthy ones: 10–60 ms). + // The 6 summarized nodes are all healthy (the 6 lowest-latency healthy ones: 10–60 ms). // So nodes_with_errors = 0 and nodes_with_zero_rows = 0 in the summary. assertTagValueEquals(t, lastChild, tracelabels.TagNodesWithErrors, "0") assertTagValueEquals(t, lastChild, tracelabels.TagNodesWithZeroRows, "0") @@ -289,11 +289,11 @@ func TestDecodeFrameSummary_AtThreshold(t *testing.T) { t.Fatalf("child message = %q, want decode-frame-summary", summarySpan.GetMessage()) } - // frames_total = 100. + // Every frame is reflected in frames_total; no individual frame spans + // are emitted so frames_emitted_individually is 0 and frames_skipped + // equals the total. assertTagValueEquals(t, summarySpan, tracelabels.TagFramesTotal, "100") - // frames_emitted_individually = 0 (no individual frame spans). assertTagValueEquals(t, summarySpan, tracelabels.TagFramesEmittedIndividually, "0") - // frames_skipped = 100. assertTagValueEquals(t, summarySpan, tracelabels.TagFramesSkipped, "100") // All decode NS tags present.
