hanahmily commented on code in PR #1001:
URL:
https://github.com/apache/skywalking-banyandb/pull/1001#discussion_r2978606124
##########
banyand/dquery/topn.go:
##########
@@ -103,6 +131,10 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
span.Stop()
}()
}
+ originalTopN := request.GetTopN()
+ // Set topN to 0 to disable truncation on data nodes for all
aggregation functions.
+ // This ensures coordinator-side aggregation can see all relevant
candidates.
+ request.TopN = 0
Review Comment:
For high-cardinality entities, this could OOM data nodes during distributed
TopN queries.
##########
pkg/flow/streaming/topn.go:
##########
Review Comment:
Introduce a generic type:
```
// TopSortKey is the constraint for TopN sort keys (int64 or float64).
type TopSortKey interface {
int64 | float64
}
```
topNAggregatorGroup, TopNOption and etc should this type.
##########
banyand/dquery/topn.go:
##########
@@ -46,6 +49,31 @@ type topNQueryProcessor struct {
*bus.UnImplementedHealthyListener
}
+func (t *topNQueryProcessor) getTopNFieldType(ctx context.Context, group,
topNName string) databasev1.FieldType {
Review Comment:
It falls back silently to FIELD_TYPE_UNSPECIFIED → treated as int64. It's
not correct.
##########
pkg/query/logical/measure/topn_plan_localscan.go:
##########
@@ -207,49 +207,78 @@ func (ei *topNMIterator) Next() bool {
return false
}
ts := timestamppb.New(time.Unix(0, r.Timestamps[i]))
- topNValue.Reset()
- err := topNValue.Unmarshal(bd, decoder)
- if err != nil {
- ei.err = multierr.Append(ei.err,
errors.WithMessagef(err, "failed to unmarshal topN values[%d]:[%s]%s", i, ts,
hex.EncodeToString(fv.GetBinaryData())))
+
+ fieldType, detectErr := measure.DetectFieldTypeFromBinary(bd)
+ if detectErr != nil {
+ ei.err = multierr.Append(ei.err,
errors.WithMessagef(detectErr,
+ "failed to detect field type for topN
values[%d]:[%s]%s", i, ts, hex.EncodeToString(fv.GetBinaryData())))
continue
}
- shardID := uint32(0)
- if i < len(r.ShardIDs) {
- shardID = uint32(r.ShardIDs[i])
- }
- fieldName, entityNames, values, entities := topNValue.Values()
- for j := range entities {
- dp := &measurev1.DataPoint{
- Timestamp: ts,
- Sid: uint64(r.SID),
- Version: r.Versions[i],
+
+ if fieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {
+ topNValue := measure.GenerateTopNValueFloat()
+ defer measure.ReleaseTopNValueFloat(topNValue)
+ topNValue.Reset()
+ unmarshalErr := topNValue.Unmarshal(bd, decoder)
+ if unmarshalErr != nil {
+ ei.err = multierr.Append(ei.err,
errors.WithMessagef(unmarshalErr, "failed to unmarshal topN values[%d]:[%s]%s",
i, ts, hex.EncodeToString(fv.GetBinaryData())))
+ continue
}
- tagFamily := &modelv1.TagFamily{
- Name: measure.TopNTagFamily,
+ if procErr := processTopNValue(ei, topNValue, r, i,
ts); procErr != nil {
+ ei.err = multierr.Append(ei.err, procErr)
}
- dp.TagFamilies = append(dp.TagFamilies, tagFamily)
- for k, entityName := range entityNames {
- tagFamily.Tags = append(tagFamily.Tags,
&modelv1.Tag{
- Key: entityName,
- Value: entities[j][k],
- })
+ } else {
+ topNValue := measure.GenerateTopNValueInt()
+ defer measure.ReleaseTopNValueInt(topNValue)
Review Comment:
Move them outside the loop. Reuse the topNValue in the loop. defer's scope
is the function not the for loop body.
##########
banyand/dquery/topn.go:
##########
@@ -207,6 +215,53 @@ func (s *sortedTopNList) Val() *comparableTopNItem {
return &comparableTopNItem{s.Items[s.index-1]}
}
+func processTopNResponse[N aggregation.Number](
Review Comment:
In the old code, partial results from successful nodes were still used even
when some nodes errored. Now, if 1 out of 10 nodes fails, the entire query
returns an error. This is a regression for distributed scenarios.
##########
banyand/measure/topn.go:
##########
@@ -881,8 +1021,157 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
return dst, nil
}
+func (t *TopNValue[N]) marshalFloat64(dst []byte) ([]byte, error) {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ valuesCount := uint64(len(t.values))
+ if valuesCount&(1<<63) != 0 {
+ return nil, errors.New("float values count overflow")
+ }
+ valuesCount |= (1 << 63)
+ dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+ floatValues := make([]float64, len(t.values))
+ for i, v := range t.values {
+ floatValues[i] = float64(v)
+ }
+
+ intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil,
floatValues)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert float64 to decimal
int: %w", err)
+ }
+ t.exponent = exponent
+
+ t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf,
intValues)
+ dst = append(dst, byte(t.encodeType))
+ dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+ dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ evv := t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ ev := evv[i]
+ ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+ if err != nil {
+ return nil, err
+ }
+ evv[i] = ev
+ }
+ dst = encoding.EncodeBytesBlock(dst, evv)
+ return dst, nil
+}
+
+// DetectFieldTypeFromBinary detects the field type from binary data.
+func DetectFieldTypeFromBinary(src []byte) (databasev1.FieldType, error) {
+ var err error
+ src, _, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+ }
+ var count uint64
+ src, count = encoding.BytesToVarUint64(src)
+ for i := uint64(0); i < count; i++ {
+ src, _, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+ }
+ }
+ _, count = encoding.BytesToVarUint64(src)
+ if (count & (1 << 63)) != 0 {
+ return databasev1.FieldType_FIELD_TYPE_FLOAT, nil
+ }
+ return databasev1.FieldType_FIELD_TYPE_INT, nil
+}
+
+// MergeTopNBinaryValues merges two TopN binary values and returns the merged
binary value.
+func MergeTopNBinaryValues(
+ left, right []byte, topN int32, sort modelv1.Sort, decoder
*encoding.BytesBlockDecoder,
+ timestamp uint64, leftVersion, rightVersion int64,
+) ([]byte, error) {
+ fieldType, detectErr := DetectFieldTypeFromBinary(left)
+ if detectErr != nil {
Review Comment:
If left fails detection but right succeeds and returns FLOAT, then left
(which might be int64 data from an older format) will be deserialized as
float64 — causing data corruption.
##########
banyand/measure/topn_post_processor.go:
##########
@@ -207,24 +253,34 @@ func (taggr *topNPostProcessor) Put(entityValues
pbv1.EntityValues, val int64, t
return
}
- newItem := &topNAggregatorItem{
+ newItem := &topNAggregatorItem[N]{
val: val,
key: key,
values: entityValues,
version: version,
}
+ if taggr.topN <= 0 {
Review Comment:
This will cause OOM
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]