Copilot commented on code in PR #1110:
URL:
https://github.com/apache/skywalking-banyandb/pull/1110#discussion_r3176460559
##########
pkg/query/logical/measure/topn_analyzer.go:
##########
@@ -109,7 +109,15 @@ func TopNAnalyze(criteria *measurev1.TopNRequest,
sourceMeasureSchemaList []*dat
return nil, err
}
- plan = topNDistinct(plan, criteria.GetTopN(),
criteria.GetFieldValueSort(), topNAggSchema.FieldName)
+ var fieldType databasev1.FieldType
+ for _, field := range sourceMeasureSchema.GetFields() {
+ if field.GetName() == topNAggSchema.FieldName {
+ fieldType = field.GetFieldType()
+ break
+ }
+ }
+
+ plan = topNDistinct(plan, criteria.GetTopN(),
criteria.GetFieldValueSort(), topNAggSchema.FieldName, fieldType)
Review Comment:
In multi-group TopN queries we only read the field type from
`sourceMeasureSchemaList[0]`. Unlike the entity and field-name checks above,
there is no verification that the same field has the same type in every source
measure, so a mixed INT/FLOAT setup will be decoded through the wrong accessor
and silently produce zero/incorrect rankings for some groups.
##########
banyand/dquery/topn.go:
##########
@@ -111,36 +140,19 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
return
}
var allErr error
- aggregator := measure.CreateTopNPostProcessor(request.GetTopN(),
- agg, request.GetFieldValueSort())
- var tags []string
+ fieldType, fieldTypeErr := t.getTopNFieldType(ctx, request.Groups[0],
request.GetName())
+ if fieldTypeErr != nil {
+ resp = bus.NewMessage(now, common.NewError("failed to determine
field type for topN query %s: %v", request.GetName(), fieldTypeErr))
+ return
+ }
+ var lists []*measurev1.TopNList
var responseCount int
- for _, f := range ff {
- if m, getErr := f.Get(); getErr != nil {
- allErr = multierr.Append(allErr, getErr)
- } else {
- d := m.Data()
- if d == nil {
- continue
- }
- responseCount++
- topNResp := d.(*measurev1.TopNResponse)
- for _, l := range topNResp.Lists {
- for _, tn := range l.Items {
- if tags == nil {
- tags = make([]string, 0,
len(tn.Entity))
- for _, e := range tn.Entity {
- tags = append(tags,
e.Key)
- }
- }
- entityValues := make(pbv1.EntityValues,
0, len(tn.Entity))
- for _, e := range tn.Entity {
- entityValues =
append(entityValues, e.Value)
- }
- aggregator.Put(entityValues,
tn.Value.GetInt().GetValue(), uint64(tn.Timestamp.AsTime().UnixMilli()),
tn.Version)
- }
- }
- }
+ if fieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {
+ lists, responseCount, allErr = processTopNResponse(ff,
request.GetTopN(), agg, request.GetFieldValueSort(),
+ measure.CreateTopNPostProcessorFloat,
measure.FieldValueToFloat)
+ } else {
+ lists, responseCount, allErr = processTopNResponse(ff,
request.GetTopN(), agg, request.GetFieldValueSort(),
+ measure.CreateTopNPostProcessorInt,
measure.FieldValueToInt)
Review Comment:
This distributed path also assumes the first requested group determines the
value type for all responses. If two groups expose the same TopN name over
different field types, we will instantiate the wrong post-processor here and
misread the other group's items through `GetInt`/`GetFloat` instead of
rejecting the mixed schema combination.
##########
pkg/query/logical/measure/topn_plan_distinct.go:
##########
@@ -116,24 +118,21 @@ func (h *entityDedupTopN) Put(key string, val int64, idp
*measurev1.InternalData
h.tryAddToHeap(key, val, idp)
}
-func (h *entityDedupTopN) tryAddToHeap(key string, val int64, idp
*measurev1.InternalDataPoint) {
- if h.topN <= 0 {
- return
- }
+func (h *entityDedupTopN[K]) tryAddToHeap(key string, val K, idp
*measurev1.InternalDataPoint) {
if len(h.items) < h.topN {
- heap.Push(h, &dedupHeapItem{key: key, val: val, idp: idp})
+ heap.Push(h, &dedupHeapItem[K]{key: key, val: val, idp: idp})
return
}
root := h.items[0]
if h.isBetter(val, root.val) {
Review Comment:
The old zero/negative `topN` guard was removed here. If a request reaches
this operator with `topN <= 0` (the standalone TopN path still doesn't validate
it), `len(h.items) < h.topN` is false and `h.items[0]` panics on the first row
instead of returning an empty result.
##########
banyand/measure/topn.go:
##########
@@ -888,8 +1036,158 @@ func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
return dst, nil
}
+func (t *TopNValue[K]) marshalFloat64(dst []byte) ([]byte, error) {
+ dst = encoding.EncodeBytes(dst, convert.StringToBytes(t.valueName))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.entityTagNames)))
+ for _, entityTagName := range t.entityTagNames {
+ dst = encoding.EncodeBytes(dst,
convert.StringToBytes(entityTagName))
+ }
+
+ valuesCount := uint64(len(t.values)) | (uint64(1) << 63)
+ dst = encoding.VarUint64ToBytes(dst, valuesCount)
+
+ floatValues := make([]float64, len(t.values))
+ for i, v := range t.values {
+ floatValues[i] = float64(v)
+ }
+
+ intValues, exponent, err := encoding.Float64ListToDecimalIntList(nil,
floatValues)
+ if err != nil {
+ return nil, fmt.Errorf("failed to convert float64 to decimal
int: %w", err)
+ }
+ t.exponent = exponent
+
+ t.buf, t.encodeType, t.firstValue = encoding.Int64ListToBytes(t.buf,
intValues)
+ dst = append(dst, byte(t.encodeType))
+ dst = encoding.VarInt64ToBytes(dst, t.firstValue)
+ dst = encoding.VarInt64ToBytes(dst, int64(t.exponent))
+ dst = encoding.VarUint64ToBytes(dst, uint64(len(t.buf)))
+ dst = append(dst, t.buf...)
+
+ evv := t.resizeEntityValues(len(t.entities))
+ for i, tvv := range t.entities {
+ ev := evv[i]
+ ev, err = pbv1.MarshalTagValues(ev[:0], tvv)
+ if err != nil {
+ return nil, err
+ }
+ evv[i] = ev
+ }
+ dst = encoding.EncodeBytesBlock(dst, evv)
+ return dst, nil
+}
+
+// DetectFieldTypeFromBinary detects the field type from binary data.
+func DetectFieldTypeFromBinary(src []byte) (databasev1.FieldType, error) {
+ var err error
+ src, _, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+ }
+ var count uint64
+ src, count = encoding.BytesToVarUint64(src)
+ for i := uint64(0); i < count; i++ {
+ src, _, err = encoding.DecodeBytes(src)
+ if err != nil {
+ return databasev1.FieldType_FIELD_TYPE_UNSPECIFIED, err
+ }
+ }
+ _, count = encoding.BytesToVarUint64(src)
+ if (count & (uint64(1) << 63)) != 0 {
+ return databasev1.FieldType_FIELD_TYPE_FLOAT, nil
+ }
+ return databasev1.FieldType_FIELD_TYPE_INT, nil
+}
+
+// MergeTopNBinaryValues merges two TopN binary values and returns the merged
binary value.
+func MergeTopNBinaryValues(
+ left, right []byte, topN int32, sort modelv1.Sort, decoder
*encoding.BytesBlockDecoder,
+ timestamp uint64, leftVersion, rightVersion int64,
+) ([]byte, error) {
+ leftFieldType, leftErr := DetectFieldTypeFromBinary(left)
+ if leftErr != nil {
+ return nil, fmt.Errorf("failed to detect left field type: %w",
leftErr)
+ }
+ rightFieldType, rightErr := DetectFieldTypeFromBinary(right)
+ if rightErr != nil {
+ return nil, fmt.Errorf("failed to detect right field type: %w",
rightErr)
+ }
+
+ if leftFieldType != rightFieldType {
+ return nil, fmt.Errorf("field type mismatch between left (%s)
and right (%s)", leftFieldType.String(), rightFieldType.String())
+ }
+
+ if leftFieldType == databasev1.FieldType_FIELD_TYPE_FLOAT {
Review Comment:
This now fails the whole merge as soon as either side's binary blob is
malformed enough that field-type detection cannot parse it. The previous merge
path tolerated a broken left or right value and still kept the valid side, so
this change can turn single-sided corruption into complete data loss during
query/compaction.
##########
banyand/measure/topn_post_processor.go:
##########
@@ -249,11 +277,10 @@ func (taggr *topNPostProcessor) Flush()
([]*topNAggregatorItem, error) {
for _, item := range timeline.items {
if exist, found := taggr.cache[item.key]; found
{
exist.mapFunc.In(item.val)
Review Comment:
Updating an existing aggregated entity changes its heap key, but this branch
no longer re-heapifies `taggr.items`. For `MEAN`, `MIN`, or `MAX` the
aggregated value can move in either direction, so subsequent `tryEnqueue`
decisions can be made against a stale root and return the wrong top-N set.
##########
banyand/query/processor_topn.go:
##########
@@ -188,33 +190,36 @@ func (t *topNQueryProcessor) Rev(ctx context.Context,
message bus.Message) (resp
resp = bus.NewMessage(bus.MessageID(now),
&measurev1.TopNResponse{})
return
}
- aggregator := measure.CreateTopNPostProcessor(
- request.GetTopN(),
- request.GetAgg(),
- request.GetFieldValueSort(),
- )
- var tags []string
- for _, dp := range result {
- dpTags := dp.GetTagFamilies()[0].GetTags()
- if tags == nil {
- tags = make([]string, len(dpTags))
- for i, tag := range dpTags {
- tags[i] = tag.Key
- }
- }
- entityValues := make(pbv1.EntityValues, 0, len(dpTags))
- for _, tag := range dpTags {
- entityValues = append(entityValues, tag.Value)
- }
- aggregator.Put(entityValues,
-
dp.GetFields()[0].GetValue().GetInt().GetValue(),
- uint64(dp.GetTimestamp().AsTime().UnixMilli()),
- dp.GetVersion())
+ fieldName := qc.topNSchemas[0].GetFieldName()
+ isFloatField := false
+ fieldIdx :=
slices.IndexFunc(qc.sourceMeasureSchemas[0].GetFields(), func(spec
*databasev1.FieldSpec) bool {
+ return spec.GetName() == fieldName
+ })
+ if fieldIdx >= 0 {
+ isFloatField =
qc.sourceMeasureSchemas[0].GetFields()[fieldIdx].GetFieldType() ==
databasev1.FieldType_FIELD_TYPE_FLOAT
}
Review Comment:
Aggregation type selection is based only on `qc.sourceMeasureSchemas[0]`.
For multi-group requests that share the same TopN name but not the same
underlying field type, later groups will be post-aggregated with the wrong
extractor (`GetInt` vs `GetFloat`) and their values collapse to incorrect
results instead of failing fast.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]