Copilot commented on code in PR #1001:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1001#discussion_r2915242698


##########
pkg/flow/streaming/topn.go:
##########
@@ -194,11 +200,30 @@ func (t *topNAggregatorGroup) getOrCreateGroup(group 
string) *topNAggregator {
        t.aggregatorGroup[group] = &topNAggregator{
                topNAggregatorGroup: t,
                treeMap:             treemap.NewWith(t.comparator),
-               dict:                make(map[uint64]int64),
+               dict:                make(map[uint64]interface{}),
        }
        return t.aggregatorGroup[group]
 }
 
+func (t *topNAggregatorGroup) setComparatorFromFieldType() {
+       var baseComparator utils.Comparator
+       switch t.fieldType {
+       case databasev1.FieldType_FIELD_TYPE_INT:
+               baseComparator = utils.Int64Comparator
+       case databasev1.FieldType_FIELD_TYPE_FLOAT:
+               baseComparator = utils.Float64Comparator
+       default:
+               panic("unsupported field type: must be ValueTypeInt64 or 
ValueTypeFloat64")

Review Comment:
   `setComparatorFromFieldType` panics on unsupported `FieldType`. Since 
`TopN()` is part of the streaming pipeline setup, a panic here can crash the 
process from a misconfiguration or schema evolution. Consider converting this 
to an error path (e.g., `drainErr`) and make the message reflect actual 
`databasev1.FieldType` values (the current text mentions 
`ValueTypeInt64/Float64`).
   ```suggestion
                // Unsupported field type: log and fall back to a no-op 
comparator instead of panicking.
                if t.l != nil {
                        t.l.Error().
                                Interface("fieldType", t.fieldType).
                                Msg("unsupported field type for TopN; 
defaulting to no-op comparator")
                }
                baseComparator = func(a, b interface{}) int {
                        // Treat all values as equal to avoid type assumptions 
and panics.
                        return 0
                }
   ```



##########
banyand/dquery/topn.go:
##########
@@ -180,7 +182,17 @@ type comparableTopNItem struct {
        *measurev1.TopNList_Item
 }
 
+func truncationDataNode(request *measurev1.TopNRequest) {
+       // Set topN to 0 to disable truncation on data nodes
+       if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX {
+               request.TopN = 0
+       }
+}
+

Review Comment:
   `truncationDataNode` keeps per-node truncation for 
`AGGREGATION_FUNCTION_MAX`, but coordinator-side aggregation still depends on 
seeing candidates that may be outside any single node’s local TopN. This can 
produce incorrect global TopN results (entities can be dropped on all nodes due 
to local competition). If the goal is correctness for distributed aggregation, 
consider disabling truncation for *all* aggregation functions (or over-fetching 
more than `TopN` per node).
   ```suggestion
        // Set topN to 0 to disable truncation on data nodes for all 
aggregation functions.
        // This ensures coordinator-side aggregation can see all relevant 
candidates.
        request.TopN = 0
   }
   ```



##########
banyand/measure/topn.go:
##########
@@ -678,26 +717,27 @@ func (manager *topNProcessorManager) buildFilter(criteria 
*modelv1.Criteria) (fl
        }, nil
 }
 
-func (manager *topNProcessorManager) buildMapper(fieldName string, 
groupByNames ...string) (flow.UnaryFunc[any], error) {
+func (manager *topNProcessorManager) buildMapper(fieldName string, 
groupByNames ...string) (flow.UnaryFunc[any], databasev1.FieldType, error) {
        fieldIdx := slices.IndexFunc(manager.m.GetFields(), func(spec 
*databasev1.FieldSpec) bool {
                return spec.GetName() == fieldName
        })
        if fieldIdx == -1 {
                manager.l.Warn().Str("fieldName", fieldName).Str("measure", 
manager.m.Metadata.GetName()).
                        Msg("TopNAggregation references removed field which no 
longer exists in schema, ignoring this TopNAggregation")
-               return nil, fmt.Errorf("field %s is not found in %s schema", 
fieldName, manager.m.Metadata.GetName())
+               return nil, databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, 
fmt.Errorf("field %s is not found in %s schema", fieldName, 
manager.m.Metadata.GetName())
        }
+       fieldType := manager.m.GetFields()[fieldIdx].GetFieldType()
        if len(groupByNames) == 0 {
                return func(_ context.Context, request any) any {
                        dpWithEvs := request.(*dataPointWithEntityValues)
                        return flow.Data{
                                dpWithEvs.entityValues,
                                "",
-                               dpWithEvs.intFieldValue(fieldName, manager.l),
+                               dpWithEvs.fieldValue(fieldName, fieldType, 
manager.l),

Review Comment:
   `buildMapper` returns the field’s `FieldType` but doesn’t validate that it’s 
supported by the streaming TopN comparator (currently INT/FLOAT only). If the 
field is another type (or later extended), this will either silently map to 
`intFieldValue` (returning 0) or trigger a panic downstream when selecting the 
comparator. Consider explicitly validating `fieldType` here and returning an 
error for unsupported types.



##########
banyand/measure/topn.go:
##########
@@ -956,6 +1102,19 @@ func (t *TopNValue) Unmarshal(src []byte, decoder 
*encoding.BytesBlockDecoder) e
        return nil
 }
 
+func (t *TopNValue) bytesToFloat64List(dst []float64, src []byte, count int) 
[]float64 {
+       dst = dst[:0]
+       for i := 0; i < count; i++ {
+               if len(src) < 8 {
+                       break
+               }
+               v := convert.BytesToFloat64(src[:8])
+               dst = append(dst, v)
+               src = src[8:]

Review Comment:
   `bytesToFloat64List` silently truncates if `src` is shorter than `count*8` 
(it `break`s), which can mask data corruption and lead to mismatched 
`valuesCount` vs decoded values/entities. Consider validating `valueLen == 
valuesCount*8` (and that enough bytes exist) and returning an error instead of 
returning a shorter slice.



##########
banyand/measure/topn_post_processor.go:
##########
@@ -160,26 +298,28 @@ type topNPostProcessor struct {
        topN      int32
 }
 
-func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val int64, 
timestampMillis uint64, version int64) {
+func (taggr *topNPostProcessor) Put(entityValues pbv1.EntityValues, val 
SortableValue, timestampMillis uint64, version int64) {
        timeline, ok := taggr.timelines[timestampMillis]
        key := entityValues.String()
        if !ok {
                timeline = &topNTimelineItem{

Review Comment:
   `Put` currently doesn't treat `topN <= 0` as "no truncation": with `topN==0` 
the first item is still pushed and subsequent inserts are effectively limited 
to 1 element (because `Len() < int(taggr.topN)` is always false). This breaks 
the distributed-query workaround that sets `TopN=0` to disable truncation. 
Consider adding an early branch for `taggr.topN <= 0` to always accept all 
items (no ReplaceLowest/eviction) or define a separate explicit sentinel for 
"unbounded".



##########
banyand/measure/topn.go:
##########
@@ -881,6 +932,50 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
        return dst, nil
 }
 
+func (t *TopNValue) marshalFloat64(dst []byte) ([]byte, error) {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       valuesCount := uint64(len(t.values))
+       if valuesCount&(1<<63) != 0 {
+               return nil, errors.New("float values count overflow")
+       }
+       valuesCount |= (1 << 63)
+       dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+       floatValues := make([]float64, len(t.values))
+       for i, v := range t.values {
+               floatValues[i] = float64(v.(FloatValue))
+       }
+       t.buf = t.float64ListToBytes(t.buf, floatValues)
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       var err error
+       evv := t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               ev := evv[i]
+               ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+               if err != nil {
+                       return nil, err
+               }
+               evv[i] = ev
+       }
+       dst = encoding.EncodeBytesBlock(dst, evv)
+       return dst, nil
+}
+
+func (t *TopNValue) float64ListToBytes(dst []byte, values []float64) []byte {
+       dst = dst[:0]
+       for _, v := range values {
+               dst = append(dst, convert.Float64ToBytes(v)...)
+       }

Review Comment:
   `float64ListToBytes` currently appends `convert.Float64ToBytes(v)...` for 
each value, which allocates a new 8-byte slice per element and can be hot in 
TopN streaming. Consider pre-growing `dst` to `len(values)*8` and writing 
in-place with `binary.BigEndian.PutUint64` to avoid per-value allocations.



##########
pkg/flow/streaming/topn.go:
##########
@@ -61,13 +62,10 @@ func (s *windowedFlow) TopN(topNum int, opts ...any) 
flow.Flow {
                if topNAggrFunc.sortKeyExtractor == nil {
                        s.f.drainErr(errors.New("sortKeyExtractor must be 
specified"))
                }
-               if topNAggrFunc.sort == ASC {
-                       topNAggrFunc.comparator = utils.Int64Comparator
-               } else { // DESC
-                       topNAggrFunc.comparator = func(a, b interface{}) int {
-                               return utils.Int64Comparator(b, a)
-                       }
+               if topNAggrFunc.fieldType == 
databasev1.FieldType_FIELD_TYPE_UNSPECIFIED {
+                       s.f.drainErr(errors.New("fieldType must be specified"))
                }
+               topNAggrFunc.setComparatorFromFieldType()

Review Comment:
   In `windowedFlow.TopN`, after calling `drainErr` for missing 
`sortKeyExtractor` / `fieldType`, the code continues and unconditionally calls 
`setComparatorFromFieldType()`. If `fieldType` is still 
`FIELD_TYPE_UNSPECIFIED`, this will panic. Consider returning early after 
draining the error (or guarding `setComparatorFromFieldType` so it never panics 
on invalid config).
   ```suggestion
                } else {
                        topNAggrFunc.setComparatorFromFieldType()
                }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to