eye-gu commented on code in PR #1110:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3199228651


##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,162 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
        return dst, nil
 }
 
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+       dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+       floatValues := make([]float64, len(t.values))
+       for i, v := range t.values {
+               floatValues[i] = float64(v)
+       }
+
+       intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil, 
floatValues)
+       if err != nil {
+               return nil, fmt.Errorf("failed to convert float64 to decimal 
int: %w", err)
+       }
+       t.exponent = exponent
+
+       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, 
intValues)
+       dst = append(dst, byte(t.encodeType))
+       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+       dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       evv := t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               ev := evv[i]
+               ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+               if err != nil {
+                       return nil, err
+               }
+               evv[i] = ev
+       }
+       dst = encoding.EncodeBytesBlock(dst, evv)
+       return dst, nil
+}

Review Comment:
   @hanahmily The float64 encoding has been fixed, and pool-based memory reuse 
is now applied. PTAL



##########
pkg/flow/streaming/topn.go:
##########
@@ -77,26 +76,27 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) 
flow.Flow {
 type topNAggregatorGroup struct {
        aggregatorGroup   map[string]*topNAggregator
        keyExtractor      func(flow.StreamRecord) uint64
-       sortKeyExtractor  func(flow.StreamRecord) int64
+       sortKeyExtractor  func(flow.StreamRecord) interface{}
        groupKeyExtractor func(flow.StreamRecord) string
        comparator        utils.Comparator
        l                 *logger.Logger
        cacheSize         int
        sort              TopNSort
+       fieldType         databasev1.FieldType
 }
 
 type topNAggregator struct {
        *topNAggregatorGroup
        treeMap *treemap.Map
-       dict    map[uint64]int64
+       dict    map[uint64]interface{}

Review Comment:
   @hanahmily Thanks for the thorough review. All the interface{} boxing issues 
in the streaming layer have been addressed:
   
   1. gods/treemap replaced with a generic heap — A new topNHeap[K TopSortKey] 
(pkg/flow/streaming/topn_heap.go) wraps container/heap.
   2. Add FieldType persisted in TopNParameters for merge-time type recovery — 
the merge path (banyand/measure/merger.go) cannot recover the type parameter K 
from serialized block data.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to