hanahmily commented on code in PR #1110:
URL:
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3208331888
##########
banyand/measure/topn.go:
##########
@@ -935,32 +1225,96 @@ func (t *TopNValue) Unmarshal(src []byte, decoder
*encoding.BytesBlockDecoder) e
return fmt.Errorf("src is too short for reading string with
size %d; len(src)=%d", valueLen, len(src))
}
- t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen],
t.encodeType, t.firstValue, int(valuesCount))
+ intValues, err := encoding.BytesToInt64List(nil, src[:valueLen],
t.encodeType, t.firstValue, int(valuesCount))
Review Comment:
### Unmarshal allocation defeats the `TopNValue` pool
Both the int64 (this line) and float64 (line 1299) unmarshal paths drop the
pooled scratch by passing `nil`:
```go
intValues, err := encoding.BytesToInt64List(nil, src[:valueLen],
t.encodeType, t.firstValue, int(valuesCount))
```
`BytesToInt64List` then allocates `make([]int64, valuesCount)` on every
call. The `TopNValue` pool retains `t.intScratch` (line 933) and the marshal
path correctly recycles it (lines 1052, 1062), but the unmarshal path throws
that buffer away. On merge-heavy workloads — `MergeTopNBinaryValues` calls
`Unmarshal` twice per overlapping timestamp pair — this is a steady GC-pressure
source the pool was designed to avoid.
Additionally, `int(valuesCount)` is unbounded: a corrupt block declaring
`valuesCount = 10_000_000` with `valueLen = 8` triggers an 80 MB allocation.
The `len(src) < valueLen` check guards the byte slice but not the element count.
**Suggested fix**
```go
if int(valuesCount) > len(src[:valueLen]) {
return fmt.Errorf("topNValue.valuesCount %d exceeds payload bytes %d",
valuesCount, valueLen)
}
intValues, err := encoding.BytesToInt64List(t.intScratch[:0],
src[:valueLen], t.encodeType, t.firstValue, int(valuesCount))
if err != nil {
return fmt.Errorf("cannot unmarshal topNValue.values: %w", err)
}
t.intScratch = intValues
```
Then assign `t.values` from `intValues` as today (and the same change at
line 1299 for the float64 path).
##########
banyand/dquery/topn.go:
##########
@@ -111,36 +141,19 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
return
}
var allErr error
- aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
- agg, request.GetFieldValueSort())
- var tags []string
+ fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0],
request.GetName())
Review Comment:
### Broadcast futures leak when field-type lookup fails
```go
ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout,
data.TopicTopNQuery, ...)
if err != nil { ... return }
var allErr error
fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0],
request.GetName())
if fieldTypeErr != nil {
resp = bus.NewMessage(now, common.NewError("failed to determine field
type for topN query %s: %v", request.GetName(), fieldTypeErr))
return // ff is never drained
}
```
If `getTopNFieldType` fails after a successful `Broadcast`, the futures in
`ff` are abandoned. The success path drains them via `processTopNResponse(ff,
...)`; this error path doesn't. Each abandoned future holds onto its goroutine,
timeout timer, and any pooled response buffers attached to incoming messages
until the broadcast timeout elapses.
**Suggested fix** — `getTopNFieldType` only depends on `request.Groups[0]`
and `request.GetName()`, neither of which depends on the broadcast. Move the
lookup ahead of `Broadcast`:
```go
fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0],
request.GetName())
if fieldTypeErr != nil {
resp = bus.NewMessage(now, common.NewError("failed to determine field
type for topN query %s: %v", request.GetName(), fieldTypeErr))
return
}
ff, err := t.broadcaster.Broadcast(...)
if err != nil { ... return }
```
That avoids the leak entirely and fails fast before fanning out a query
whose response we can't post-process. If a structural reason forces the
broadcast to come first, then `defer drainFutures(ff)` after the broadcast and
clear it on the success path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]