Copilot commented on code in PR #961:
URL:
https://github.com/apache/skywalking-banyandb/pull/961#discussion_r2749821112
##########
banyand/measure/block.go:
##########
@@ -1151,6 +1131,95 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ // avoid index out-of-bounds issues
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, leftVer)
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err !=
nil {
+ log.Error().Err(err).Msg("failed to unmarshal topN
value, skip current batch")
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, rightVer)
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
+ continue
+ }
+
+ for _, item := range items {
+ topNValue.addValue(item.val, item.values)
+ }
+
+ marshalBuf = marshalBuf[:0]
+ buf, err := topNValue.marshal(marshalBuf)
+ if err != nil {
+ log.Error().Err(err).Msg("failed to marshal merged topN
value")
+ continue
+ }
+
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, buf)
+ }
Review Comment:
In the TopN-specific merge path, `mergeAndAppendTopN` assumes that
`bi.field.columns` is already initialized, but in the equal-timestamp case
where the very first elements of `left` and `right` share the same timestamp,
`mergeTwoBlocks` calls `target.append(left, i-1)` with `offset == left.idx`,
which is a no-op, so `bi.field.columns` remains empty. `mergeAndAppendTopN`
then iterates over `right.field.columns` and writes to
`bi.field.columns[idx].values`, which will panic with an index-out-of-range
error when `len(bi.field.columns) == 0`. To avoid this, `mergeAndAppendTopN`
should initialize `bi.field.columns` (and possibly `bi.field.name`) from
`left.field` or `right.field` when they are empty before entering the `for idx
:= range right.field.columns` loop, so that merging also works when the
duplicated timestamp is the first record in the blocks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]