hanahmily commented on code in PR #1001:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1001#discussion_r2978606124


##########
banyand/dquery/topn.go:
##########
@@ -103,6 +131,10 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        span.Stop()
                }()
        }
+       originalTopN := request.GetTopN()
+       // Set topN to 0 to disable truncation on data nodes for all 
aggregation functions.
+       // This ensures coordinator-side aggregation can see all relevant 
candidates.
+       request.TopN = 0

Review Comment:
   For high-cardinality entities, this could OOM data nodes during distributed 
TopN queries.



##########
pkg/flow/streaming/topn.go:
##########


Review Comment:
   Introduce a generic type:
   
   ```
   // TopSortKey is the constraint for TopN sort keys (int64 or float64).
   type TopSortKey interface {
        int64 | float64
   }
   ```
   
   topNAggregatorGroup, TopNOption and etc should this type. 



##########
banyand/dquery/topn.go:
##########
@@ -46,6 +49,31 @@ type topNQueryProcessor struct {
        *bus.UnImplementedHealthyListener
 }
 
+func (t *topNQueryProcessor) getTopNFieldType(ctx context.Context, group, 
topNName string) databasev1.FieldType {

Review Comment:
   It falls back silently to FIELD_TYPE_UNSPECIFIED → treated as int64. It's 
not correct.



##########
pkg/query/logical/measure/topn_plan_localscan.go:
##########
@@ -207,49 +207,78 @@ func (ei *topNMIterator) Next() bool {
                        return false
                }
                ts := timestamppb.New(time.Unix(0, r.Timestamps[i]))
-               topNValue.Reset()
-               err := topNValue.Unmarshal(bd, decoder)
-               if err != nil {
-                       ei.err = multierr.Append(ei.err, 
errors.WithMessagef(err, "failed to unmarshal topN values[%d]:[%s]%s", i, ts, 
hex.EncodeToString(fv.GetBinaryData())))
+
+               fieldType, detectErr := measure.DetectFieldTypeFromBinary(bd)
+               if detectErr != nil {
+                       ei.err = multierr.Append(ei.err, 
errors.WithMessagef(detectErr,
+                               "failed to detect field type for topN 
values[%d]:[%s]%s", i, ts, hex.EncodeToString(fv.GetBinaryData())))
                        continue
                }
-               shardID := uint32(0)
-               if i < len(r.ShardIDs) {
-                       shardID = uint32(r.ShardIDs[i])
-               }
-               fieldName, entityNames, values, entities := topNValue.Values()
-               for j := range entities {
-                       dp := &measurev1.DataPoint{
-                               Timestamp: ts,
-                               Sid:       uint64(r.SID),
-                               Version:   r.Versions[i],
+
+               if fieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {
+                       topNValue := measure.GenerateTopNValueFloat()
+                       defer measure.ReleaseTopNValueFloat(topNValue)
+                       topNValue.Reset()
+                       unmarshalErr := topNValue.Unmarshal(bd, decoder)
+                       if unmarshalErr != nil {
+                               ei.err = multierr.Append(ei.err, 
errors.WithMessagef(unmarshalErr, "failed to unmarshal topN values[%d]:[%s]%s", 
i, ts, hex.EncodeToString(fv.GetBinaryData())))
+                               continue
                        }
-                       tagFamily := &modelv1.TagFamily{
-                               Name: measure.TopNTagFamily,
+                       if procErr := processTopNValue(ei, topNValue, r, i, 
ts); procErr != nil {
+                               ei.err = multierr.Append(ei.err, procErr)
                        }
-                       dp.TagFamilies = append(dp.TagFamilies, tagFamily)
-                       for k, entityName := range entityNames {
-                               tagFamily.Tags = append(tagFamily.Tags, 
&modelv1.Tag{
-                                       Key:   entityName,
-                                       Value: entities[j][k],
-                               })
+               } else {
+                       topNValue := measure.GenerateTopNValueInt()
+                       defer measure.ReleaseTopNValueInt(topNValue)

Review Comment:
   Move them outside the loop. Reuse the topNValue in the loop. defer's scope 
is the function not the for loop body. 



##########
banyand/dquery/topn.go:
##########
@@ -207,6 +215,53 @@ func (s *sortedTopNList) Val() *comparableTopNItem {
        return &comparableTopNItem{s.Items[s.index-1]}
 }
 
+func processTopNResponse[N aggregation.Number](

Review Comment:
   In the old code, partial results from successful nodes were still used even 
when some nodes errored. Now, if 1 out of 10 nodes fails, the entire query 
returns an error. This is a regression for distributed scenarios.



##########
banyand/measure/topn.go:
##########
@@ -881,8 +1021,157 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
        return dst, nil
 }
 
+func (t *TopNValue[N]) marshalFloat64(dst []byte) ([]byte, error) {
+       dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+       for _, entityTagName := range t.entityTagNames {
+               dst = encoding.EncodeBytes(dst, 
convert.StringToBytes(entityTagName))
+       }
+
+       valuesCount := uint64(len(t.values))
+       if valuesCount&(1<<63) != 0 {
+               return nil, errors.New("float values count overflow")
+       }
+       valuesCount |= (1 << 63)
+       dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+       floatValues := make([]float64, len(t.values))
+       for i, v := range t.values {
+               floatValues[i] = float64(v)
+       }
+
+       intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil, 
floatValues)
+       if err != nil {
+               return nil, fmt.Errorf("failed to convert float64 to decimal 
int: %w", err)
+       }
+       t.exponent = exponent
+
+       t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf, 
intValues)
+       dst = append(dst, byte(t.encodeType))
+       dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+       dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+       dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+       dst = append(dst, t.buf...)
+
+       evv := t.resizeEntityValues(len(t.entities))
+       for i, tvv := range t.entities {
+               ev := evv[i]
+               ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+               if err != nil {
+                       return nil, err
+               }
+               evv[i] = ev
+       }
+       dst = encoding.EncodeBytesBlock(dst, evv)
+       return dst, nil
+}
+
+// DetectFieldTypeFromBinary detects the field type from binary data.
+func DetectFieldTypeFromBinary(src []byte) (databasev1.FieldType, error) {
+       var err error
+       src, _, err = encoding.DecodeBytes(src)
+       if err != nil {
+               return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+       }
+       var count uint64
+       src, count = encoding.BytesToVarUint64(src)
+       for i := uint64(0); i < count; i++ {
+               src, _, err = encoding.DecodeBytes(src)
+               if err != nil {
+                       return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+               }
+       }
+       _, count = encoding.BytesToVarUint64(src)
+       if (count & (1 << 63)) != 0 {
+               return databasev1.FieldType_FIELD_TYPE_FLOAT, nil
+       }
+       return databasev1.FieldType_FIELD_TYPE_INT, nil
+}
+
+// MergeTopNBinaryValues merges two TopN binary values and returns the merged 
binary value.
+func MergeTopNBinaryValues(
+       left, right []byte, topN int32, sort modelv1.Sort, decoder 
*encoding.BytesBlockDecoder,
+       timestamp uint64, leftVersion, rightVersion int64,
+) ([]byte, error) {
+       fieldType, detectErr := DetectFieldTypeFromBinary(left)
+       if detectErr != nil {

Review Comment:
   If left fails detection but right succeeds and returns FLOAT, then left 
(which might be int64 data from an older format) will be deserialized as 
float64 — causing data corruption.



##########
banyand/measure/topn_post_processor.go:
##########
@@ -207,24 +253,34 @@ func (taggr *topNPostProcessor) Put(entityValues 
pbv1.EntityValues, val int64, t
                return
        }
 
-       newItem := &topNAggregatorItem{
+       newItem := &topNAggregatorItem[N]{
                val:     val,
                key:     key,
                values:  entityValues,
                version: version,
        }
 
+       if taggr.topN <= 0 {

Review Comment:
   This will cause OOM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to