hanahmily commented on code in PR #1110:
URL:
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3177975535
##########
pkg/flow/streaming/topn.go:
##########
@@ -77,26 +76,27 @@ func (s *windowedFlow) TopN(topNum int, opts ...any)
flow.Flow {
type topNAggregatorGroup struct {
aggregatorGroup map[string]*topNAggregator
keyExtractor func(flow.StreamRecord) uint64
- sortKeyExtractor func(flow.StreamRecord) int64
+ sortKeyExtractor func(flow.StreamRecord) interface{}
groupKeyExtractor func(flow.StreamRecord) string
comparator utils.Comparator
l *logger.Logger
cacheSize int
sort TopNSort
+ fieldType databasev1.FieldType
}
type topNAggregator struct {
*topNAggregatorGroup
treeMap *treemap.Map
- dict map[uint64]int64
+ dict map[uint64]interface{}
Review Comment:
The generic `K TopSortKey` only reaches the measure layer; the streaming
aggregator stops at `interface{}`:
```go
sortKeyExtractor func(flow.StreamRecord) interface{}
dict map[uint64]interface{}
treeMap *treemap.Map // gods, interface{} keys
comparator utils.Comparator // func(any, any) int
```
For every record this costs:
- 1 box on the extractor return,
- 1 box on `dict[key] = sortKey`,
- 2 boxes per comparator call inside `treemap.Put`/`Floor`/`Get`,
- 1 box per `treeMap.Put(sortKey, ...)`.
For INT-only deployments this is **a strict regression** vs. pre-PR `int64`
typing, and the whole point of the generics work is lost on the hot path. It
also forces the runtime field-type check in `setComparatorFromFieldType` whose
`default` arm is a silent-wrong-answer no-op comparator.
### Suggestion
Push `K` end-to-end:
```go
type topNAggregatorGroup[K TopSortKey] struct {
sortKeyExtractor func(flow.StreamRecord) K
comparator func(a, b K) int // typed, monomorphized per K
aggregatorGroup map[string]*topNAggregator[K]
...
}
type topNAggregator[K TopSortKey] struct {
*topNAggregatorGroup[K]
buffer *topNBuffer[K] // replaces gods/treemap
dict map[uint64]K
}
```
`gods/treemap` is non-generic, which is what blocks this today. Two cheap
replacements:
1. **Generic min/max heap** wrapping `container/heap`. TopN only keeps
`cacheSize` items (typically ≤100), so `O(log N)` is ~7 comparisons;
`doCleanUp` becomes one `heap.Pop`.
2. **Sorted `[]K`** with binary insert. For these sizes the memmove cost is
lower than treemap's pointer chasing, and there are zero allocations per insert.
With K threaded through, `WithFieldType` and `setComparatorFromFieldType` go
away entirely — the int/float dispatch already happened in
`topNProcessorManager.start` when it instantiated
`topNStreamingProcessor[int64]` vs `[float64]`. Don't re-route inside the
streaming flow.
The public TopN entry point can either stay generic (`func TopN[K](...)`) or
split into `TopNInt` / `TopNFloat`; Go monomorphizes per K either way, so the
int64 and float64 paths get separate concrete code with no shared interface
dispatch.
The `flow.StreamRecord.Data []any` upstream is a separate (broader) source
of boxing that this PR shouldn't tackle, but the sort-key path and comparator
path are the dominant hot-path cost and they can be made box-free without
touching the flow framework.
##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,162 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
return dst, nil
}
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+ dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+ floatValues := make([]float64, len(t.values))
+ for i, v := range t.values {
+ floatValues[i] = float64(v)
+ }
+
+ intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil,
floatValues)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert float64 to decimal
int: %w", err)
+ }
+ t.exponent = exponent
+
+ t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf,
intValues)
+ dst = append(dst, byte(t.encodeType))
+ dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+ dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ evv := t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ ev := evv[i]
+ ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+ if err != nil {
+ return nil, err
+ }
+ evv[i] = ev
+ }
+ dst = encoding.EncodeBytesBlock(dst, evv)
+ return dst, nil
+}
Review Comment:
The float marshal path has three layered problems and they should be fixed
together in this PR.
## What's wrong with the current algorithm
`encoding.Float64ListToDecimalIntList` is **lossy by design**, not a bug in
any single line:
1. `countDecimalPlaces` uses a global `1e-9` rounding tolerance. Any
fractional component below that threshold is silently truncated. `0.1234567891`
and `0.1234567892` collapse to the same int. The tolerance is hard-coded and
global — there is no signal back to the caller that precision was lost.
2. `countDecimalPlaces` caps at `maxDecimalPlaces = 15`. Floats whose
canonical decimal form needs 16+ significant digits (which is within
`float64`'s ~17-digit precision envelope) are silently rounded to 15.
3. **Single shared exponent for the whole list.** All values are scaled to
one common decimal factor. Heterogeneous magnitudes — e.g. `[1e-9, 1e9]` in the
same TopN window — force one of them to lose precision when rescaled. Currently
this manifests as a `value overflows int64` error and the whole row is dropped,
but the underlying issue is that the algorithm assumes a homogeneous-magnitude
list.
On top of that, the marshal/unmarshal code adds two unnecessary costs:
4. **Pool defeated by per-call copies.** `make([]float64, len(t.values))`
plus a `K → float64` loop runs on every marshal, and a parallel `make([]int64,
...)` runs inside the encoder. The whole point of `topNValueFloatPool` is to
reuse `TopNValue` scratch buffers; the current code throws those buffers away
every call.
5. **Three redundant type discriminators.** `valuesCount`'s bit 63 (this
file), `DetectFieldTypeFromBinary` (binary peek), and the existing `EncodeType`
byte all answer the same question: "is this an int payload or a float payload?"
The first two should go.
## How to fix it (three steps in this PR)
### Step 1 — `pkg/encoding/float.go`: rewrite `Float64ListToDecimalIntList`
to strict-lossless
Replace the global-tolerance approach with a two-tier per-value canonical
conversion plus minimum-exponent calibration.
Per-value canonical decimal:
```go
// floatToDecimal returns (mantissa, exponent, ok) such that
// mantissa * 10^exponent == f exactly (round-trip safe).
func floatToDecimal(f float64) (int64, int16, bool) {
if math.IsNaN(f) || math.IsInf(f, 0) {
return 0, 0, false
}
if f == 0 {
return 0, 0, true
}
// Fast path: integer-valued float that round-trips through int64.
// Hits for counters, byte counts, integer latencies — empirically
dominant.
if u := int64(f); float64(u) == f {
e := int16(0)
for u%10 == 0 {
u /= 10
e++
}
return u, e, true
}
// Slow path: shortest round-trip via strconv.
return floatToDecimalSlow(f)
}
```
`floatToDecimalSlow` calls `strconv.AppendFloat(buf[:0], f, 'e', -1, 64)`
(Go's shortest round-trip representation), parses out mantissa + scientific
exponent, removes the decimal point with a corresponding exponent shift, strips
trailing zeros, and returns `false` if the mantissa overflows int64 or the
final exponent overflows int16. With a pooled scratch buffer it is
allocation-free.
List-level helper:
```go
func Float64ListToDecimalIntList(dst []int64, src []float64) ([]int64,
int16, error) {
if len(src) == 0 {
return dst[:0], 0, nil
}
decimals := dst[:0]
exps := getInt16Scratch(len(src)); defer putInt16Scratch(exps)
minExp := int16(math.MaxInt16)
for i, f := range src {
d, e, ok := floatToDecimal(f)
if !ok {
return nil, 0, errCannotEncodeLossless
}
decimals = append(decimals, d)
exps[i] = e
if e < minExp {
minExp = e
}
}
for i := range decimals {
diff := exps[i] - minExp
if diff == 0 {
continue
}
scaled, ok := mulPow10(decimals[i], diff) // overflow-checked
if !ok {
return nil, 0, errCannotEncodeLossless
}
decimals[i] = scaled
}
return decimals, minExp, nil
}
```
Properties: lossless when it succeeds, explicit error when it can't. No
silent truncation, no `1e-9` tolerance, no 15-digit cap. The fast path is ~5
ops/value (vs. ~30 FP ops in the current implementation); the slow path is
comparable to current cost but lossless.
### Step 2 — drop the bit-63 trick; reuse `EncodeType` for type framing
`banyand/measure/column.go:202-219` and
`banyand/internal/encoding/tag_encoder.go:184-205` already establish the
codebase's pattern for float columns: prepend a 1-byte `EncodeType`, where
`EncodeTypePlain` means "raw IEEE-754 8 bytes per value, encoder couldn't
compress this". Follow that pattern here:
```go
func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
// ...header (valueName, entityTagNames, valuesCount — no bit-63
trick)...
floats := t.floatValues() // typed view; see Step 3
ints, exp, err := encoding.Float64ListToDecimalIntList(t.intScratch[:0],
floats)
if err != nil {
// Lossless-fails fallback: raw IEEE-754, 8 bytes/value.
dst = append(dst, byte(encoding.EncodeTypePlain))
for _, f := range floats {
dst = convert.AppendFloat64Bytes(dst, f)
}
return appendEntityValues(dst, t), nil
}
t.intScratch = ints
t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf[:0],
ints)
dst = append(dst, byte(t.encodeType))
dst = encoding.VarInt64ToBytes(dst, t.firstValue)
dst = encoding.VarInt64ToBytes(dst, int64(exp))
dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
dst = append(dst, t.buf...)
return appendEntityValues(dst, t), nil
}
```
`unmarshalFloat64` switches on `EncodeType`: `EncodeTypePlain` reads
`valuesCount * 8` raw bytes; everything else takes the existing decimal-int +
exponent decode path.
`DetectFieldTypeFromBinary` becomes unnecessary — the field type is already
known to every caller via the schema. Its only caller (`MergeTopNBinaryValues`)
is invoked from sites where K is statically known. Thread K through and delete
the function, the bit-63 mask, and the binary peek.
Net result: one type discriminator instead of three, and the on-disk framing
matches the rest of the codebase.
### Step 3 — eliminate the per-call slice copies
Replace:
```go
floatValues := make([]float64, len(t.values))
for i, v := range t.values { floatValues[i] = float64(v) }
```
with a typed view at the K dispatch site, since `t.values` *is* `[]float64`
when `K = float64`:
**Option A — fork the marshal entry at the K switch into non-generic
specializations:**
```go
func (t *TopNValue[K]) marshal(dst []byte) ([]byte, error) {
var k K
switch any(k).(type) {
case float64:
return marshalFloat64Concrete(dst, t.valueName, t.entityTagNames,
any(t.values).([]float64), t.entities, &t.intScratch, &t.buf)
case int64:
return marshalInt64Concrete(dst, t.valueName, t.entityTagNames,
any(t.values).([]int64), t.entities, &t.buf)
}
}
```
No `unsafe`, no slice copies, and the helper signatures are concrete and
easy to test.
**Option B — `unsafe` reinterpret** (faster than A only if profiling demands
it): both `int64` and `float64` are 8-byte types so the slice header is
identical:
```go
func (t *TopNValue[K]) floatValues() []float64 {
return *(*[]float64)(unsafe.Pointer(&t.values))
}
```
Prefer Option A unless benchmarks show a meaningful gap.
The `intScratch []int64` field lives on `TopNValue[K]` and is reset between
calls (`t.intScratch = ints` after the encoder returns), restoring the pool's
intended benefit.
---
These three steps share one goal: the float path becomes lossless when it
succeeds, fast when it does, explicit when it can't, and uses the same on-disk
framing as the rest of the codebase. The diff is largely deletion:
`countDecimalPlaces` goes, the bit-63 mask goes, `DetectFieldTypeFromBinary`
goes, the throwaway slices go.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]