eye-gu commented on code in PR #1110:
URL:
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3199228651
##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,162 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
return dst, nil
}
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+ dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+ floatValues := make([]float64, len(t.values))
+ for i, v := range t.values {
+ floatValues[i] = float64(v)
+ }
+
+ intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil,
floatValues)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert float64 to decimal
int: %w", err)
+ }
+ t.exponent = exponent
+
+ t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf,
intValues)
+ dst = append(dst, byte(t.encodeType))
+ dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+ dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ evv := t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ ev := evv[i]
+ ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+ if err != nil {
+ return nil, err
+ }
+ evv[i] = ev
+ }
+ dst = encoding.EncodeBytesBlock(dst, evv)
+ return dst, nil
+}
Review Comment:
@hanahmily The float64 encoding has been fixed, and pool-based memory reuse
is now applied. PTAL
##########
pkg/flow/streaming/topn.go:
##########
@@ -77,26 +76,27 @@ func (s *windowedFlow) TopN(topNum int, opts ...any)
flow.Flow {
type topNAggregatorGroup struct {
aggregatorGroup map[string]*topNAggregator
keyExtractor func(flow.StreamRecord) uint64
- sortKeyExtractor func(flow.StreamRecord) int64
+ sortKeyExtractor func(flow.StreamRecord) interface{}
groupKeyExtractor func(flow.StreamRecord) string
comparator utils.Comparator
l *logger.Logger
cacheSize int
sort TopNSort
+ fieldType databasev1.FieldType
}
type topNAggregator struct {
*topNAggregatorGroup
treeMap *treemap.Map
- dict map[uint64]int64
+ dict map[uint64]interface{}
Review Comment:
@hanahmily Thanks for the thorough review. All the interface{} boxing issues
in the streaming layer have been addressed:
1. gods/treemap replaced with a generic heap — A new topNHeap[K TopSortKey]
(pkg/flow/streaming/topn_heap.go) wraps container/heap.
2. Add FieldType persisted in TopNParameters for merge-time type recovery —
the merge path (banyand/measure/merger.go) cannot recover the type parameter K
from serialized block data.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]