Copilot commented on code in PR #1001:
URL:
https://github.com/apache/skywalking-banyandb/pull/1001#discussion_r2915242698
##########
pkg/flow/streaming/topn.go:
##########
@@ -194,11 +200,30 @@ func (t *topNAggregatorGroup) getOrCreateGroup(group
string) *topNAggregator {
t.aggregatorGroup[group] = &topNAggregator{
topNAggregatorGroup: t,
treeMap: treemap.NewWith(t.comparator),
- dict: make(map[uint64]int64),
+ dict: make(map[uint64]interface{}),
}
return t.aggregatorGroup[group]
}
+func (t *topNAggregatorGroup) setComparatorFromFieldType() {
+ var baseComparator utils.Comparator
+ switch t.fieldType {
+ case databasev1.FieldType_FIELD_TYPE_INT:
+ baseComparator = utils.Int64Comparator
+ case databasev1.FieldType_FIELD_TYPE_FLOAT:
+ baseComparator = utils.Float64Comparator
+ default:
+ panic("unsupported field type: must be ValueTypeInt64 or
ValueTypeFloat64")
Review Comment:
`setComparatorFromFieldType` panics on unsupported `FieldType`. Since
`TopN()` is part of the streaming pipeline setup, a panic here can crash the
process from a misconfiguration or schema evolution. Consider converting this
to an error path (e.g., `drainErr`) and make the message reflect actual
`databasev1.FieldType` values (the current text mentions
`ValueTypeInt64/Float64`).
```suggestion
// Unsupported field type: log and fall back to a no-op
comparator instead of panicking.
if t.l != nil {
t.l.Error().
Interface("fieldType", t.fieldType).
Msg("unsupported field type for TopN;
defaulting to no-op comparator")
}
baseComparator = func(a, b interface{}) int {
// Treat all values as equal to avoid type assumptions
and panics.
return 0
}
```
##########
banyand/dquery/topn.go:
##########
@@ -180,7 +182,17 @@ type comparableTopNItem struct {
*measurev1.TopNList_Item
}
+func truncationDataNode(request *measurev1.TopNRequest) {
+ // Set topN to 0 to disable truncation on data nodes
+ if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX {
+ request.TopN = 0
+ }
+}
+
Review Comment:
`truncationDataNode` keeps per-node truncation for
`AGGREGATION_FUNCTION_MAX`, but coordinator-side aggregation still depends on
seeing candidates that may be outside any single node’s local TopN. This can
produce incorrect global TopN results (entities can be dropped on all nodes due
to local competition). If the goal is correctness for distributed aggregation,
consider disabling truncation for *all* aggregation functions (or over-fetching
more than `TopN` per node).
```suggestion
// Set topN to 0 to disable truncation on data nodes for all
aggregation functions.
// This ensures coordinator-side aggregation can see all relevant
candidates.
request.TopN = 0
}
```
##########
banyand/measure/topn.go:
##########
@@ -678,26 +717,27 @@ func (manager *topNProcessorManager) buildFilter(criteria
*modelv1.Criteria) (fl
}, nil
}
-func (manager *topNProcessorManager) buildMapper(fieldName string,
groupByNames ...string) (flow.UnaryFunc[any], error) {
+func (manager *topNProcessorManager) buildMapper(fieldName string,
groupByNames ...string) (flow.UnaryFunc[any], databasev1.FieldType, error) {
fieldIdx := slices.IndexFunc(manager.m.GetFields(), func(spec
*databasev1.FieldSpec) bool {
return spec.GetName() == fieldName
})
if fieldIdx == -1 {
manager.l.Warn().Str("fieldName", fieldName).Str("measure",
manager.m.Metadata.GetName()).
Msg("TopNAggregation references removed field which no
longer exists in schema, ignoring this TopNAggregation")
- return nil, fmt.Errorf("field %s is not found in %s schema",
fieldName, manager.m.Metadata.GetName())
+ return nil, databasev1.FieldType_FIELD_TYPE_UNSPECIFIED,
fmt.Errorf("field %s is not found in %s schema", fieldName,
manager.m.Metadata.GetName())
}
+ fieldType := manager.m.GetFields()[fieldIdx].GetFieldType()
if len(groupByNames) == 0 {
return func(_ context.Context, request any) any {
dpWithEvs := request.(*dataPointWithEntityValues)
return flow.Data{
dpWithEvs.entityValues,
"",
- dpWithEvs.intFieldValue(fieldName, manager.l),
+ dpWithEvs.fieldValue(fieldName, fieldType,
manager.l),
Review Comment:
`buildMapper` returns the field’s `FieldType` but doesn’t validate that it’s
supported by the streaming TopN comparator (currently INT/FLOAT only). If the
field is another type (or later extended), this will either silently map to
`intFieldValue` (returning 0) or trigger a panic downstream when selecting the
comparator. Consider explicitly validating `fieldType` here and returning an
error for unsupported types.
##########
banyand/measure/topn.go:
##########
@@ -956,6 +1102,19 @@ func (t *TopNValue) Unmarshal(src []byte, decoder
*encoding.BytesBlockDecoder) e
return nil
}
+func (t *TopNValue) bytesToFloat64List(dst []float64, src []byte, count int)
[]float64 {
+ dst = dst[:0]
+ for i := 0; i < count; i++ {
+ if len(src) < 8 {
+ break
+ }
+ v := convert.BytesToFloat64(src[:8])
+ dst = append(dst, v)
+ src = src[8:]
Review Comment:
`bytesToFloat64List` silently truncates if `src` is shorter than `count*8`
(it `break`s), which can mask data corruption and lead to mismatched
`valuesCount` vs decoded values/entities. Consider validating `valueLen ==
valuesCount*8` (and that enough bytes exist) and returning an error instead of
returning a shorter slice.
##########
banyand/measure/topn_post_processor.go:
##########
@@ -160,26 +298,28 @@ type topNPostProcessor struct {
topN int32
}
-func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val int64,
timestampMillis uint64, version int64) {
+func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val
SortableValue, timestampMillis uint64, version int64) {
timeline, ok := taggr.timelines[timestampMillis]
key := entityValues.String()
if !ok {
timeline = &topNTimelineItem{
Review Comment:
`Put` currently doesn't treat `topN <= 0` as "no truncation": with `topN==0`
the first item is still pushed and subsequent inserts are effectively limited
to 1 element (because `Len() < int(taggr.topN)` is always false). This breaks
the distributed-query workaround that sets `TopN=0` to disable truncation.
Consider adding an early branch for `taggr.topN <= 0` to always accept all
items (no ReplaceLowest/eviction) or define a separate explicit sentinel for
"unbounded".
##########
banyand/measure/topn.go:
##########
@@ -881,6 +932,50 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
return dst, nil
}
+func (t *TopNValue) marshalFloat64(dst []byte) ([]byte, error) {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ valuesCount := uint64(len(t.values))
+ if valuesCount&(1<<63) != 0 {
+ return nil, errors.New("float values count overflow")
+ }
+ valuesCount |= (1 << 63)
+ dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+ floatValues := make([]float64, len(t.values))
+ for i, v := range t.values {
+ floatValues[i] = float64(v.(FloatValue))
+ }
+ t.buf = t.float64ListToBytes(t.buf, floatValues)
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ var err error
+ evv := t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ ev := evv[i]
+ ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+ if err != nil {
+ return nil, err
+ }
+ evv[i] = ev
+ }
+ dst = encoding.EncodeBytesBlock(dst, evv)
+ return dst, nil
+}
+
+func (t *TopNValue) float64ListToBytes(dst []byte, values []float64) []byte {
+ dst = dst[:0]
+ for _, v := range values {
+ dst = append(dst, convert.Float64ToBytes(v)...)
+ }
Review Comment:
`float64ListToBytes` currently appends `convert.Float64ToBytes(v)...` for
each value, which allocates a new 8-byte slice per element and can be hot in
TopN streaming. Consider pre-growing `dst` to `len(values)*8` and writing
in-place with `binary.BigEndian.PutUint64` to avoid per-value allocations.
##########
pkg/flow/streaming/topn.go:
##########
@@ -61,13 +62,10 @@ func (s *windowedFlow) TopN(topNum int, opts ...any)
flow.Flow {
if topNAggrFunc.sortKeyExtractor == nil {
s.f.drainErr(errors.New("sortKeyExtractor must be
specified"))
}
- if topNAggrFunc.sort == ASC {
- topNAggrFunc.comparator = utils.Int64Comparator
- } else { // DESC
- topNAggrFunc.comparator = func(a, b interface{}) int {
- return utils.Int64Comparator(b, a)
- }
+ if topNAggrFunc.fieldType ==
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED {
+ s.f.drainErr(errors.New("fieldType must be specified"))
}
+ topNAggrFunc.setComparatorFromFieldType()
Review Comment:
In `windowedFlow.TopN`, after calling `drainErr` for missing
`sortKeyExtractor` / `fieldType`, the code continues and unconditionally calls
`setComparatorFromFieldType()`. If `fieldType` is still
`FIELD_TYPE_UNSPECIFIED`, this will panic. Consider returning early after
draining the error (or guarding `setComparatorFromFieldType` so it never panics
on invalid config).
```suggestion
} else {
topNAggrFunc.setComparatorFromFieldType()
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]