hanahmily commented on code in PR #1110:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3177975535


##########
pkg/flow/streaming/topn.go:
##########
@@ -77,26 +76,27 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) 
flow.Flow {
 type topNAggregatorGroup struct {
        aggregatorGroup   map[string]*topNAggregator
        keyExtractor      func(flow.StreamRecord) uint64
-       sortKeyExtractor  func(flow.StreamRecord) int64
+       sortKeyExtractor  func(flow.StreamRecord) interface{}
        groupKeyExtractor func(flow.StreamRecord) string
        comparator        utils.Comparator
        l                 *logger.Logger
        cacheSize         int
        sort              TopNSort
+       fieldType         databasev1.FieldType
 }
 
 type topNAggregator struct {
        *topNAggregatorGroup
        treeMap *treemap.Map
-       dict    map[uint64]int64
+       dict    map[uint64]interface{}

Review Comment:
   The generic `K TopSortKey` only reaches the measure layer; the streaming 
aggregator stops at `interface{}`:
   
   ```go
   sortKeyExtractor func(flow.StreamRecord) interface{}
   dict             map[uint64]interface{}
   treeMap          *treemap.Map      // gods, interface{} keys
   comparator       utils.Comparator  // func(any, any) int
   ```
   
   For every record this costs:
   - 1 box on the extractor return,
   - 1 box on `dict[key] = sortKey`,
   - 2 boxes per comparator call inside `treemap.Put`/`Floor`/`Get`,
   - 1 box per `treeMap.Put(sortKey, ...)`.
   
   For INT-only deployments this is **a strict regression** vs. pre-PR `int64` 
typing, and the whole point of the generics work is lost on the hot path. It 
also forces the runtime field-type check in `setComparatorFromFieldType` whose 
`default` arm is a silent-wrong-answer no-op comparator.
   
   ### Suggestion
   
   Push `K` end-to-end:
   
   ```go
   type topNAggregatorGroup[K TopSortKey] struct {
       sortKeyExtractor func(flow.StreamRecord) K
       comparator       func(a, b K) int      // typed, monomorphized per K
       aggregatorGroup  map[string]*topNAggregator[K]
       ...
   }
   type topNAggregator[K TopSortKey] struct {
       *topNAggregatorGroup[K]
       buffer *topNBuffer[K]                   // replaces gods/treemap
       dict   map[uint64]K
   }
   ```
   
   `gods/treemap` is non-generic, which is what blocks this today. Two cheap 
replacements:
   
   1. **Generic min/max heap** wrapping `container/heap`. TopN only keeps 
`cacheSize` items (typically ≤100), so `O(log N)` is ~7 comparisons; 
`doCleanUp` becomes one `heap.Pop`.
   2. **Sorted `[]K`** with binary insert. For these sizes the memmove cost is 
lower than treemap's pointer chasing, and there are zero allocations per insert.
   
   With K threaded through, `WithFieldType` and `setComparatorFromFieldType` go 
away entirely — the int/float dispatch already happened in 
`topNProcessorManager.start` when it instantiated 
`topNStreamingProcessor[int64]` vs `[float64]`. Don't re-route inside the 
streaming flow.
   
   The public TopN entry point can either stay generic (`func TopN[K](...)`) or 
split into `TopNInt` / `TopNFloat`; Go monomorphizes per K either way, so the 
int64 and float64 paths get separate concrete code with no shared interface 
dispatch.
   
   The `flow.StreamRecord.Data []any` upstream is a separate (broader) source 
of boxing that this PR shouldn't tackle, but the sort-key path and comparator 
path are the dominant hot-path cost and they can be made box-free without 
touching the flow framework.



##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,162 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
        return dst, nil
 }
 
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+       dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+       floatValues := make([]float64, len(t.values))
+       for i, v := range t.values {
+               floatValues[i] = float64(v)
+       }
+
+       intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil, 
floatValues)
+       if err != nil {
+               return nil, fmt.Errorf("failed to convert float64 to decimal 
int: %w", err)
+       }
+       t.exponent = exponent
+
+       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, 
intValues)
+       dst = append(dst, byte(t.encodeType))
+       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+       dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       evv := t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               ev := evv[i]
+               ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+               if err != nil {
+                       return nil, err
+               }
+               evv[i] = ev
+       }
+       dst = encoding.EncodeBytesBlock(dst, evv)
+       return dst, nil
+}

Review Comment:
   The float marshal path has three layered problems and they should be fixed 
together in this PR.
   
   ## What's wrong with the current algorithm
   
   `encoding.Float64ListToDecimalIntList` is **lossy by design**, not a bug in 
any single line:
   
   1. `countDecimalPlaces` uses a global `1e-9` rounding tolerance. Any 
fractional component below that threshold is silently truncated. `0.1234567891` 
and `0.1234567892` collapse to the same int. The tolerance is hard-coded and 
global — there is no signal back to the caller that precision was lost.
   2. `countDecimalPlaces` caps at `maxDecimalPlaces = 15`. Floats whose 
canonical decimal form needs 16+ significant digits (which is within 
`float64`'s ~17-digit precision envelope) are silently rounded to 15.
   3. **Single shared exponent for the whole list.** All values are scaled to 
one common decimal factor. Heterogeneous magnitudes — e.g. `[1e-9, 1e9]` in the 
same TopN window — force one of them to lose precision when rescaled. Currently 
this manifests as a `value overflows int64` error and the whole row is dropped, 
but the underlying issue is that the algorithm assumes a homogeneous-magnitude 
list.
   
   On top of that, the marshal/unmarshal code adds two unnecessary costs:
   
   4. **Pool defeated by per-call copies.** `make([]float64, len(t.values))` 
plus a `K → float64` loop runs on every marshal, and a parallel `make([]int64, 
...)` runs inside the encoder. The whole point of `topNValueFloatPool` is to 
reuse `TopNValue` scratch buffers; the current code throws those buffers away 
every call.
   5. **Three redundant type discriminators.** `valuesCount`'s bit 63 (this 
file), `DetectFieldTypeFromBinary` (binary peek), and the existing `EncodeType` 
byte all answer the same question: "is this an int payload or a float payload?" 
The first two should go.
   
   ## How to fix it (three steps in this PR)
   
   ### Step 1 — `pkg/encoding/float.go`: rewrite `Float64ListToDecimalIntList` 
to strict-lossless
   
   Replace the global-tolerance approach with a two-tier per-value canonical 
conversion plus minimum-exponent calibration.
   
   Per-value canonical decimal:
   
   ```go
   // floatToDecimal returns (mantissa, exponent, ok) such that
   // mantissa * 10^exponent == f exactly (round-trip safe).
   func floatToDecimal(f float64) (int64, int16, bool) {
       if math.IsNaN(f) || math.IsInf(f, 0) {
           return 0, 0, false
       }
       if f == 0 {
           return 0, 0, true
       }
       // Fast path: integer-valued float that round-trips through int64.
       // Hits for counters, byte counts, integer latencies — empirically 
dominant.
       if u := int64(f); float64(u) == f {
           e := int16(0)
           for u%10 == 0 {
               u /= 10
               e++
           }
           return u, e, true
       }
       // Slow path: shortest round-trip via strconv.
       return floatToDecimalSlow(f)
   }
   ```
   
   `floatToDecimalSlow` calls `strconv.AppendFloat(buf[:0], f, 'e', -1, 64)` 
(Go's shortest round-trip representation), parses out mantissa + scientific 
exponent, removes the decimal point with a corresponding exponent shift, strips 
trailing zeros, and returns `false` if the mantissa overflows int64 or the 
final exponent overflows int16. With a pooled scratch buffer it is 
allocation-free.
   
   List-level helper:
   
   ```go
   func Float64ListToDecimalIntList(dst []int64, src []float64) ([]int64, 
int16, error) {
       if len(src) == 0 {
           return dst[:0], 0, nil
       }
       decimals := dst[:0]
       exps := getInt16Scratch(len(src)); defer putInt16Scratch(exps)
   
       minExp := int16(math.MaxInt16)
       for i, f := range src {
           d, e, ok := floatToDecimal(f)
           if !ok {
               return nil, 0, errCannotEncodeLossless
           }
           decimals = append(decimals, d)
           exps[i] = e
           if e < minExp {
               minExp = e
           }
       }
       for i := range decimals {
           diff := exps[i] - minExp
           if diff == 0 {
               continue
           }
           scaled, ok := mulPow10(decimals[i], diff)  // overflow-checked
           if !ok {
               return nil, 0, errCannotEncodeLossless
           }
           decimals[i] = scaled
       }
       return decimals, minExp, nil
   }
   ```
   
   Properties: lossless when it succeeds, explicit error when it can't. No 
silent truncation, no `1e-9` tolerance, no 15-digit cap. The fast path is ~5 
ops/value (vs. ~30 FP ops in the current implementation); the slow path is 
comparable to current cost but lossless.
   
   ### Step 2 — drop the bit-63 trick; reuse `EncodeType` for type framing
   
   `banyand/measure/column.go:202-219` and 
`banyand/internal/encoding/tag_encoder.go:184-205` already establish the 
codebase's pattern for float columns: prepend a 1-byte `EncodeType`, where 
`EncodeTypePlain` means "raw IEEE-754 8 bytes per value, encoder couldn't 
compress this". Follow that pattern here:
   
   ```go
   func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
       // ...header (valueName, entityTagNames, valuesCount — no bit-63 
trick)...
   
       floats := t.floatValues()  // typed view; see Step 3
       ints, exp, err := encoding.Float64ListToDecimalIntList(t.intScratch[:0], 
floats)
       if err != nil {
           // Lossless-fails fallback: raw IEEE-754, 8 bytes/value.
           dst = append(dst, byte(encoding.EncodeTypePlain))
           for _, f := range floats {
               dst = convert.AppendFloat64Bytes(dst, f)
           }
           return appendEntityValues(dst, t), nil
       }
       t.intScratch = ints
       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf[:0], 
ints)
       dst = append(dst, byte(t.encodeType))
       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
       dst = encoding.VarInt64ToBytes(dst, int64(exp))
       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
       dst = append(dst, t.buf...)
       return appendEntityValues(dst, t), nil
   }
   ```
   
   `unmarshalFloat64` switches on `EncodeType`: `EncodeTypePlain` reads 
`valuesCount * 8` raw bytes; everything else takes the existing decimal-int + 
exponent decode path.
   
   `DetectFieldTypeFromBinary` becomes unnecessary — the field type is already 
known to every caller via the schema. Its only caller (`MergeTopNBinaryValues`) 
is invoked from sites where K is statically known. Thread K through and delete 
the function, the bit-63 mask, and the binary peek.
   
   Net result: one type discriminator instead of three, and the on-disk framing 
matches the rest of the codebase.
   
   ### Step 3 — eliminate the per-call slice copies
   
   Replace:
   
   ```go
   floatValues := make([]float64, len(t.values))
   for i, v := range t.values { floatValues[i] = float64(v) }
   ```
   
   with a typed view at the K dispatch site, since `t.values` *is* `[]float64` 
when `K = float64`:
   
   **Option A — fork the marshal entry at the K switch into non-generic 
specializations:**
   
   ```go
   func (t *TopNValue[K]) marshal(dst []byte) ([]byte, error) {
       var k K
       switch any(k).(type) {
       case float64:
           return marshalFloat64Concrete(dst, t.valueName, t.entityTagNames,
               any(t.values).([]float64), t.entities, &t.intScratch, &t.buf)
       case int64:
           return marshalInt64Concrete(dst, t.valueName, t.entityTagNames,
               any(t.values).([]int64), t.entities, &t.buf)
       }
   }
   ```
   
   No `unsafe`, no slice copies, and the helper signatures are concrete and 
easy to test.
   
   **Option B — `unsafe` reinterpret** (faster than A only if profiling demands 
it): both `int64` and `float64` are 8-byte types so the slice header is 
identical:
   
   ```go
   func (t *TopNValue[K]) floatValues() []float64 {
       return *(*[]float64)(unsafe.Pointer(&t.values))
   }
   ```
   
   Prefer Option A unless benchmarks show a meaningful gap.
   
   The `intScratch []int64` field lives on `TopNValue[K]` and is reset between 
calls (`t.intScratch = ints` after the encoder returns), restoring the pool's 
intended benefit.
   
   ---
   
   These three steps share one goal: the float path becomes lossless when it 
succeeds, fast when it does, explicit when it can't, and uses the same on-disk 
framing as the rest of the codebase. The diff is largely deletion: 
`countDecimalPlaces` goes, the bit-63 mask goes, `DetectFieldTypeFromBinary` 
goes, the throwaway slices go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to