Copilot commented on code in PR #1063:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1063#discussion_r3067925266


##########
banyand/dquery/reduce_topn_aggregator.go:
##########
@@ -0,0 +1,213 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package dquery
+
+import (
+       "container/heap"
+       "fmt"
+
+       measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+)
+
+type entityAccumulator struct {
+       reduceFunc   aggregation.Reduce[int64]
+       entityValues pbv1.EntityValues
+}
+
+type heapEntry struct {
+       acc *entityAccumulator
+       key string
+}
+
+var _ heap.Interface = (*topNEntityHeap)(nil)
+
+type topNEntityHeap struct {
+       keyToIdx map[string]int
+       items    []*heapEntry
+       sortDir  modelv1.Sort
+}
+
+func (h *topNEntityHeap) Len() int { return len(h.items) }
+
+func (h *topNEntityHeap) Less(i, j int) bool {
+       vi := h.items[i].acc.reduceFunc.Val()
+       vj := h.items[j].acc.reduceFunc.Val()
+       if h.sortDir == modelv1.Sort_SORT_DESC {
+               return vi < vj
+       }
+       return vi > vj
+}
+
+func (h *topNEntityHeap) Swap(i, j int) {
+       h.items[i], h.items[j] = h.items[j], h.items[i]
+       h.keyToIdx[h.items[i].key] = i
+       h.keyToIdx[h.items[j].key] = j
+}
+
+func (h *topNEntityHeap) Push(x any) {
+       entry := x.(*heapEntry)
+       h.keyToIdx[entry.key] = len(h.items)
+       h.items = append(h.items, entry)
+}
+
+func (h *topNEntityHeap) Pop() any {
+       old := h.items
+       n := len(old)
+       entry := old[n-1]
+       old[n-1] = nil
+       h.items = old[:n-1]
+       delete(h.keyToIdx, entry.key)
+       return entry
+}
+
+func (h *topNEntityHeap) tryInsert(key string, acc *entityAccumulator, topN 
int32) {
+       if h.Len() < int(topN) {
+               heap.Push(h, &heapEntry{key: key, acc: acc})
+               return
+       }
+       rootVal := h.items[0].acc.reduceFunc.Val()
+       newVal := acc.reduceFunc.Val()
+       sortDir := h.sortDir
+       shouldReplace := (sortDir == modelv1.Sort_SORT_DESC && newVal > 
rootVal) ||
+               (sortDir == modelv1.Sort_SORT_ASC && newVal < rootVal)
+       if shouldReplace {
+               heap.Pop(h)
+               heap.Push(h, &heapEntry{key: key, acc: acc})
+       }
+}

Review Comment:
   topNEntityHeap.tryInsert assumes topN > 0. If topN is 0 (or negative after a 
cast), the initial `h.Len() < int(topN)` check is false and `h.items[0]` panics 
when the heap is still empty. Please guard `topN <= 0` explicitly (e.g., treat 
as "no limit" and always push, or skip heap logic and return).



##########
banyand/dquery/topn.go:
##########
@@ -103,16 +93,69 @@ func (t *topNQueryProcessor) Rev(ctx context.Context, 
message bus.Message) (resp
                        span.Stop()
                }()
        }
-       agg := request.Agg
-       request.Agg = 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED
+       topn := request.GetTopN()
+       if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN 
&& request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX {
+               request.TopN = 0
+       }
+       request.EmitPartial = true
        ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, 
request.TimeRange, request))

Review Comment:
   Rev mutates the incoming request in-place (`request.TopN = 0`, 
`request.EmitPartial = true`) before broadcasting. Because the same request 
object is also used for logging/tracing/error messages, this can make 
observability misleading and risks unexpected side effects if the request is 
reused elsewhere. Prefer creating a shallow copy (or a new request) for the 
internal broadcast and keep the original request unchanged for logs/trace tags.
   ```suggestion
        broadcastRequest := *request
        if request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MIN 
&& request.Agg != modelv1.AggregationFunction_AGGREGATION_FUNCTION_MAX {
                broadcastRequest.TopN = 0
        }
        broadcastRequest.EmitPartial = true
        ff, err := t.broadcaster.Broadcast(defaultTopNQueryTimeout, 
data.TopicTopNQuery, bus.NewMessageWithNodeSelectors(now, nodeSelectors, 
request.TimeRange, &broadcastRequest))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to