Copilot commented on code in PR #1110:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3176460559


##########
pkg/query/logical/measure/topn_analyzer.go:
##########
@@ -109,7 +109,15 @@ func TopNAnalyze(criteria *measurev1.TopNRequest, 
sourceMeasureSchemaList []*dat
                return nil, err
        }
 
-       plan = topNDistinct(plan, criteria.GetTopN(), 
criteria.GetFieldValueSort(), topNAggSchema.FieldName)
+       var fieldType databasev1.FieldType
+       for _, field := range sourceMeasureSchema.GetFields() {
+               if field.GetName() == topNAggSchema.FieldName {
+                       fieldType = field.GetFieldType()
+                       break
+               }
+       }
+
+       plan = topNDistinct(plan, criteria.GetTopN(), 
criteria.GetFieldValueSort(), topNAggSchema.FieldName, fieldType)

Review Comment:
   In multi-group TopN queries we only read the field type from 
`sourceMeasureSchemaList[0]`. Unlike the entity and field-name checks above, 
there is no verification that the same field has the same type in every source 
measure, so a mixed INT/FLOAT setup will be decoded through the wrong accessor 
and silently produce zero/incorrect rankings for some groups.



##########
banyand/dquery/topn.go:
##########
@@ -111,36 +140,19 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                return
        }
        var allErr error
-       aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
-               agg, request.GetFieldValueSort())
-       var tags []string
+       fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0], 
request.GetName())
+       if fieldTypeErr != nil {
+               resp = bus.NewMessage(now, common.NewError("failed to determine 
field type for topN query %s: %v", request.GetName(), fieldTypeErr))
+               return
+       }
+       var lists []*measurev1.TopNList
        var responseCount int
-       for _, f := range ff {
-               if m, getErr := f.Get(); getErr != nil {
-                       allErr = multierr.Append(allErr, getErr)
-               } else {
-                       d := m.Data()
-                       if d == nil {
-                               continue
-                       }
-                       responseCount++
-                       topNResp := d.(*measurev1.TopNResponse)
-                       for _, l := range topNResp.Lists {
-                               for _, tn := range l.Items {
-                                       if tags == nil {
-                                               tags = make([]string, 0, 
len(tn.Entity))
-                                               for _, e := range tn.Entity {
-                                                       tags = append(tags, 
e.Key)
-                                               }
-                                       }
-                                       entityValues := make(pbv1.EntityValues, 
0, len(tn.Entity))
-                                       for _, e := range tn.Entity {
-                                               entityValues = 
append(entityValues, e.Value)
-                                       }
-                                       aggregator.Put(entityValues, 
tn.Value.GetInt().GetValue(), uint64(tn.Timestamp.AsTime().UnixMilli()), 
tn.Version)
-                               }
-                       }
-               }
+       if fieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {
+               lists, responseCount, allErr = processTopNResponse(ff, 
request.GetTopN(), agg, request.GetFieldValueSort(),
+                       measure.CreateTopNPostProcessorFloat, 
measure.FieldValueToFloat)
+       } else {
+               lists, responseCount, allErr = processTopNResponse(ff, 
request.GetTopN(), agg, request.GetFieldValueSort(),
+                       measure.CreateTopNPostProcessorInt, 
measure.FieldValueToInt)

Review Comment:
   This distributed path also assumes the first requested group determines the 
value type for all responses. If two groups expose the same TopN name over 
different field types, we will instantiate the wrong post-processor here and 
misread the other group's items through `GetInt`/`GetFloat` instead of 
rejecting the mixed schema combination.



##########
pkg/query/logical/measure/topn_plan_distinct.go:
##########
@@ -116,24 +118,21 @@ func (h *entityDedupTopN) Put(key string, val int64, idp 
*measurev1.InternalData
        h.tryAddToHeap(key, val, idp)
 }
 
-func (h *entityDedupTopN) tryAddToHeap(key string, val int64, idp 
*measurev1.InternalDataPoint) {
-       if h.topN <= 0 {
-               return
-       }
+func (h *entityDedupTopN[K]) tryAddToHeap(key string, val K, idp 
*measurev1.InternalDataPoint) {
        if len(h.items) < h.topN {
-               heap.Push(h, &dedupHeapItem{key: key, val: val, idp: idp})
+               heap.Push(h, &dedupHeapItem[K]{key: key, val: val, idp: idp})
                return
        }
        root := h.items[0]
        if h.isBetter(val, root.val) {

Review Comment:
   The old zero/negative `topN` guard was removed here. If a request reaches 
this operator with `topN <= 0` (the standalone TopN path still doesn't validate 
it), `len(h.items) < h.topN` is false and `h.items[0]` panics on the first row 
instead of returning an empty result.



##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,158 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
        return dst, nil
 }
 
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+       dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+       floatValues := make([]float64, len(t.values))
+       for i, v := range t.values {
+               floatValues[i] = float64(v)
+       }
+
+       intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil, 
floatValues)
+       if err != nil {
+               return nil, fmt.Errorf("failed to convert float64 to decimal 
int: %w", err)
+       }
+       t.exponent = exponent
+
+       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, 
intValues)
+       dst = append(dst, byte(t.encodeType))
+       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+       dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       evv := t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               ev := evv[i]
+               ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+               if err != nil {
+                       return nil, err
+               }
+               evv[i] = ev
+       }
+       dst = encoding.EncodeBytesBlock(dst, evv)
+       return dst, nil
+}
+
+// DetectFieldTypeFromBinary detects the field type from binary data.
+func DetectFieldTypeFromBinary(src []byte) (databasev1.FieldType, error) {
+       var err error
+       src, _, err = encoding.DecodeBytes(src)
+       if err != nil {
+               return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+       }
+       var count uint64
+       src, count = encoding.BytesToVarUint64(src)
+       for i := uint64(0); i < count; i++ {
+               src, _, err = encoding.DecodeBytes(src)
+               if err != nil {
+                       return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+               }
+       }
+       _, count = encoding.BytesToVarUint64(src)
+       if (count & (uint64(1) << 63)) != 0 {
+               return databasev1.FieldType_FIELD_TYPE_FLOAT, nil
+       }
+       return databasev1.FieldType_FIELD_TYPE_INT, nil
+}
+
+// MergeTopNBinaryValues merges two TopN binary values and returns the merged 
binary value.
+func MergeTopNBinaryValues(
+       left, right []byte, topN int32, sort modelv1.Sort, decoder 
*encoding.BytesBlockDecoder,
+       timestamp uint64, leftVersion, rightVersion int64,
+) ([]byte, error) {
+       leftFieldType, leftErr := DetectFieldTypeFromBinary(left)
+       if leftErr != nil {
+               return nil, fmt.Errorf("failed to detect left field type: %w", 
leftErr)
+       }
+       rightFieldType, rightErr := DetectFieldTypeFromBinary(right)
+       if rightErr != nil {
+               return nil, fmt.Errorf("failed to detect right field type: %w", 
rightErr)
+       }
+
+       if leftFieldType != rightFieldType {
+               return nil, fmt.Errorf("field type mismatch between left (%s) 
and right (%s)", leftFieldType.String(), rightFieldType.String())
+       }
+
+       if leftFieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {

Review Comment:
   This now fails the whole merge as soon as either side's binary blob is 
malformed enough that field-type detection cannot parse it. The previous merge 
path tolerated a broken left or right value and still kept the valid side, so 
this change can turn single-sided corruption into complete data loss during 
query/compaction.
   



##########
banyand/measure/topn_post_processor.go:
##########
@@ -249,11 +277,10 @@ func (taggr *topNPostProcessor) Flush() 
([]*topNAggregatorItem, error) {
                        for _, item := range timeline.items {
                                if exist, found := taggr.cache[item.key]; found 
{
                                        exist.mapFunc.In(item.val)

Review Comment:
   Updating an existing aggregated entity changes its heap key, but this branch 
no longer re-heapifies `taggr.items`. For `MEAN`, `MIN`, or `MAX` the 
aggregated value can move in either direction, so subsequent `tryEnqueue` 
decisions can be made against a stale root and return the wrong top-N set.
   



##########
banyand/query/processor_topn.go:
##########
@@ -188,33 +190,36 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        resp = bus.NewMessage(bus.MessageID(now), 
&measurev1.TopNResponse{})
                        return
                }
-               aggregator := measure.CreateTopNPostProcessor(
-                       request.GetTopN(),
-                       request.GetAgg(),
-                       request.GetFieldValueSort(),
-               )
-               var tags []string
-               for _, dp := range result {
-                       dpTags := dp.GetTagFamilies()[0].GetTags()
-                       if tags == nil {
-                               tags = make([]string, len(dpTags))
-                               for i, tag := range dpTags {
-                                       tags[i] = tag.Key
-                               }
-                       }
-                       entityValues := make(pbv1.EntityValues, 0, len(dpTags))
-                       for _, tag := range dpTags {
-                               entityValues = append(entityValues, tag.Value)
-                       }
-                       aggregator.Put(entityValues,
-                               
dp.GetFields()[0].GetValue().GetInt().GetValue(),
-                               uint64(dp.GetTimestamp().AsTime().UnixMilli()),
-                               dp.GetVersion())
+               fieldName := qc.topNSchemas[0].GetFieldName()
+               isFloatField := false
+               fieldIdx := 
slices.IndexFunc(qc.sourceMeasureSchemas[0].GetFields(), func(spec 
*databasev1.FieldSpec) bool {
+                       return spec.GetName() == fieldName
+               })
+               if fieldIdx >= 0 {
+                       isFloatField = 
qc.sourceMeasureSchemas[0].GetFields()[fieldIdx].GetFieldType() == 
databasev1.FieldType_FIELD_TYPE_FLOAT
                }

Review Comment:
   Aggregation type selection is based only on `qc.sourceMeasureSchemas[0]`. 
For multi-group requests that share the same TopN name but not the same 
underlying field type, later groups will be post-aggregated with the wrong 
extractor (`GetInt` vs `GetFloat`) and their values collapse to incorrect 
results instead of failing fast.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to