Copilot commented on code in PR #1063: URL: https://github.com/apache/skywalking-banyandb/pull/1063#discussion_r3067925266
########## banyand/dquery/reduce_topn_aggregator.go: ########## @@ -0,0 +1,213 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package dquery + +import ( + "container/heap" + "fmt" + + measurev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1" + modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/query/aggregation" +) + +type entityAccumulator struct { + reduceFunc aggregation.Reduce[int64] + entityValues pbv1.EntityValues +} + +type heapEntry struct { + acc *entityAccumulator + key string +} + +var _ heap.Interface = (*topNEntityHeap)(nil) + +type topNEntityHeap struct { + keyToIdx map[string]int + items []*heapEntry + sortDir modelv1.Sort +} + +func (h *topNEntityHeap) Len() int { return len(h.items) } + +func (h *topNEntityHeap) Less(i, j int) bool { + vi := h.items[i].acc.reduceFunc.Val() + vj := h.items[j].acc.reduceFunc.Val() + if h.sortDir == modelv1.Sort_SORT_DESC { + return vi < vj + } + return vi > vj +} + +func (h *topNEntityHeap) Swap(i, j int) { + h.items[i], h.items[j] = h.items[j], h.items[i] + h.keyToIdx[h.items[i].key] = i + h.keyToIdx[h.items[j].key] = j +} + +func (h *topNEntityHeap) Push(x any) { + entry := x.(*heapEntry) + h.keyToIdx[entry.key] = len(h.items) + h.items = append(h.items, entry) +} + +func (h *topNEntityHeap) Pop() any { + old := h.items + n := len(old) + entry := old[n-1] + old[n-1] = nil + h.items = old[:n-1] + delete(h.keyToIdx, entry.key) + return entry +} + +func (h *topNEntityHeap) tryInsert(key string, acc *entityAccumulator, topN int32) { + if h.Len() < int(topN) { + heap.Push(h, &heapEntry{key: key, acc: acc}) + return + } + rootVal := h.items[0].acc.reduceFunc.Val() + newVal := acc.reduceFunc.Val() + sortDir := h.sortDir + shouldReplace := (sortDir == modelv1.Sort_SORT_DESC && newVal > rootVal) || + (sortDir == modelv1.Sort_SORT_ASC && newVal < rootVal) + if shouldReplace { + heap.Pop(h) + heap.Push(h, &heapEntry{key: key, acc: acc}) + } +} Review Comment: topNEntityHeap.tryInsert assumes topN > 0. If topN is 0 (or negative after a cast), the initial `h.Len() < int(topN)` check is false and `h.items[0]` panics when the heap is still empty. Please guard `topN <= 0` explicitly (e.g., treat as "no limit" and always push, or skip heap logic and return). ########## banyand/dquery/topn.go: ########## @@ -103,16 +93,69 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, message bus.Message) (resp span.Stop() }() } - agg := request.Agg - request.Agg = modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED + topn := request.GetTopN() + if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN && request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX { + request.TopN = 0 + } + request.EmitPartial = true ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, request.TimeRange, request)) Review Comment: Rev mutates the incoming request in-place (`request.TopN = 0`, `request.EmitPartial = true`) before broadcasting. Because the same request object is also used for logging/tracing/error messages, this can make observability misleading and risks unexpected side effects if the request is reused elsewhere. Prefer creating a shallow copy (or a new request) for the internal broadcast and keep the original request unchanged for logs/trace tags. ```suggestion broadcastRequest := *request if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN && request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX { broadcastRequest.TopN = 0 } broadcastRequest.EmitPartial = true ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, request.TimeRange, &broadcastRequest)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
