hanahmily commented on code in PR #1110:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3208331888


##########
banyand/measure/topn.go:
##########
@@ -935,32 +1225,96 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
                return fmt.Errorf("src is too short for reading string with 
size %d; len(src)=%d", valueLen, len(src))
        }
 
-       t.values, err = encoding.BytesToInt64List(t.values, src[:valueLen], 
t.encodeType, t.firstValue, int(valuesCount))
+       intValues, err := encoding.BytesToInt64List(nil, src[:valueLen], 
t.encodeType, t.firstValue, int(valuesCount))

Review Comment:
   ### Unmarshal allocation defeats the `TopNValue` pool
   
   Both the int64 (this line) and float64 (line 1299) unmarshal paths drop the 
pooled scratch by passing `nil`:
   
   ```go
   intValues, err := encoding.BytesToInt64List(nil, src[:valueLen], 
t.encodeType, t.firstValue, int(valuesCount))
   ```
   
   `BytesToInt64List` then allocates `make([]int64, valuesCount)` on every 
call. The `TopNValue` pool retains `t.intScratch` (line 933) and the marshal 
path correctly recycles it (lines 1052, 1062), but the unmarshal path throws 
that buffer away. On merge-heavy workloads — `MergeTopNBinaryValues` calls 
`Unmarshal` twice per overlapping timestamp pair — this is a steady GC-pressure 
source the pool was designed to avoid.
   
   Additionally, `int(valuesCount)` is unbounded: a corrupt block declaring 
`valuesCount = 10_000_000` with `valueLen = 8` triggers an 80 MB allocation. 
The `len(src) < valueLen` check guards the byte slice but not the element count.
   
   **Suggested fix**
   
   ```go
   if int(valuesCount) > len(src[:valueLen]) {
       return fmt.Errorf("topNValue.valuesCount %d exceeds payload bytes %d", 
valuesCount, valueLen)
   }
   intValues, err := encoding.BytesToInt64List(t.intScratch[:0], 
src[:valueLen], t.encodeType, t.firstValue, int(valuesCount))
   if err != nil {
       return fmt.Errorf("cannot unmarshal topNValue.values: %w", err)
   }
   t.intScratch = intValues
   ```
   
   Then assign `t.values` from `intValues` as today (and the same change at 
line 1299 for the float64 path).



##########
banyand/dquery/topn.go:
##########
@@ -111,36 +141,19 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        var allErr error
-       aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
-               agg, request.GetFieldValueSort())
-       var tags []string
+       fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0], 
request.GetName())

Review Comment:
   ### Broadcast futures leak when field-type lookup fails
   
   ```go
   ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, ...)
   if err != nil { ... return }
   var allErr error
   fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0], 
request.GetName())
   if fieldTypeErr != nil {
       resp = bus.NewMessage(now, common.NewError("failed to determine field 
type for topN query %s: %v", request.GetName(), fieldTypeErr))
       return                  // ff is never drained
   }
   ```
   
   If `getTopNFieldType` fails after a successful `Broadcast`, the futures in 
`ff` are abandoned. The success path drains them via `processTopNResponse(ff, 
...)`; this error path doesn't. Each abandoned future holds onto its goroutine, 
timeout timer, and any pooled response buffers attached to incoming messages 
until the broadcast timeout elapses.
   
   **Suggested fix** — `getTopNFieldType` only depends on `request.Groups[0]` 
and `request.GetName()`, neither of which depends on the broadcast. Move the 
lookup ahead of `Broadcast`:
   
   ```go
   fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0], 
request.GetName())
   if fieldTypeErr != nil {
       resp = bus.NewMessage(now, common.NewError("failed to determine field 
type for topN query %s: %v", request.GetName(), fieldTypeErr))
       return
   }
   ff, err := t.broadcaster.Broadcast(...)
   if err != nil { ... return }
   ```
   
   That avoids the leak entirely and fails fast before fanning out a query 
whose response we can't post-process. If a structural reason forces the 
broadcast to come first, then `defer drainFutures(ff)` after the broadcast and 
clear it on the success path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to