Copilot commented on code in PR #937:
URL:
https://github.com/apache/skywalking-banyandb/pull/937#discussion_r2686486413
##########
banyand/measure/block.go:
##########
@@ -704,8 +706,80 @@ func (bc *blockCursor) replace(r *model.MeasureResult,
storedIndexValue map[comm
}
}
}
- for i, c := range bc.fields.columns {
- r.Fields[i].Values[len(r.Fields[i].Values)-1] =
mustDecodeFieldValue(c.valueType, c.values[bc.idx])
+
+ if topNPostAggregator != nil {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamps := uint64(bc.timestamps[bc.idx])
+
+ for i, c := range bc.fields.columns {
+ srcFieldValue :=
r.Fields[i].Values[len(r.Fields[i].Values)-1]
+ destFieldValue := mustDecodeFieldValue(c.valueType,
c.values[bc.idx])
+
+ topNValue.Reset()
+
+ if err :=
topNValue.Unmarshal(srcFieldValue.GetBinaryData(), decoder); err != nil {
+ log.Error().Err(err).Msg("failed to unmarshal
topN value, skip current batch")
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ for j, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, 0,
len(topNValue.entityValues))
Review Comment:
The code creates an EntityValues slice with capacity based on
`len(topNValue.entityValues)`, but then populates it by iterating over
`entityList`. This is inconsistent and could lead to incorrect capacity
allocation. The capacity should be `len(entityList)` to match the actual data
being added to the slice.
##########
banyand/measure/block.go:
##########
@@ -704,8 +706,80 @@ func (bc *blockCursor) replace(r *model.MeasureResult,
storedIndexValue map[comm
}
}
}
- for i, c := range bc.fields.columns {
- r.Fields[i].Values[len(r.Fields[i].Values)-1] =
mustDecodeFieldValue(c.valueType, c.values[bc.idx])
+
+ if topNPostAggregator != nil {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ uTimestamps := uint64(bc.timestamps[bc.idx])
+
+ for i, c := range bc.fields.columns {
+ srcFieldValue :=
r.Fields[i].Values[len(r.Fields[i].Values)-1]
+ destFieldValue := mustDecodeFieldValue(c.valueType,
c.values[bc.idx])
+
+ topNValue.Reset()
+
+ if err :=
topNValue.Unmarshal(srcFieldValue.GetBinaryData(), decoder); err != nil {
+ log.Error().Err(err).Msg("failed to unmarshal
topN value, skip current batch")
+ continue
+ }
+
+ valueName := topNValue.valueName
+ entityTagNames := topNValue.entityTagNames
+
+ for j, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, 0,
len(topNValue.entityValues))
+ for _, e := range entityList {
+ entityValues = append(entityValues, e)
+ }
+ topNPostAggregator.Put(entityValues,
topNValue.values[j], uTimestamps, bc.versions[bc.idx])
+ }
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(destFieldValue.GetBinaryData(), decoder); err != nil {
+ log.Error().Err(err).Msg("failed to unmarshal
topN value, skip current batch")
+ continue
+ }
+
+ for j, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, 0,
len(topNValue.entityValues))
Review Comment:
The code creates an EntityValues slice with capacity based on
`len(topNValue.entityValues)`, but then populates it by iterating over
`entityList`. This is inconsistent and could lead to incorrect capacity
allocation. The capacity should be `len(entityList)` to match the actual data
being added to the slice.
##########
banyand/measure/topn_aggregator.go:
##########
@@ -0,0 +1,343 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "container/heap"
+ "slices"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/flow"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+)
+
+// PostProcessor defines necessary methods for Top-N post processor with or
without aggregation.
+type PostProcessor interface {
+ Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64,
version int64)
+ Flush() ([]*topNAggregatorItem, error)
+ Val([]string) []*measurev1.TopNList
+}
+
+// CreateTopNPostAggregator creates a Top-N post processor with or without
aggregation.
+func CreateTopNPostAggregator(topN int32, aggrFunc
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
+ if aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ // if aggregation is not specified, we have to keep all
timelines
+ return &topNPostAggregationProcessor{
+ topN: topN,
+ sort: sort,
+ timelines: make(map[uint64]*topNTimelineItem),
+ }
+ }
+ aggregator := &topNPostAggregationProcessor{
+ topN: topN,
+ sort: sort,
+ aggrFunc: aggrFunc,
+ cache: make(map[string]*topNAggregatorItem),
+ timelines: make(map[uint64]*topNTimelineItem),
+ items: make([]*topNAggregatorItem, 0, topN),
+ }
+ heap.Init(aggregator)
+ return aggregator
+}
+
+func (taggr *topNPostAggregationProcessor) Len() int {
+ return len(taggr.items)
+}
+
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
+func (taggr *topNPostAggregationProcessor) Less(i, j int) bool {
+ if taggr.sort == modelv1.Sort_SORT_DESC {
+ return taggr.items[i].int64Func.Val() <
taggr.items[j].int64Func.Val()
+ }
+ return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+}
+
+func (taggr *topNPostAggregationProcessor) Swap(i, j int) {
+ taggr.items[i], taggr.items[j] = taggr.items[j], taggr.items[i]
+ taggr.items[i].index = i
+ taggr.items[j].index = j
+}
+
+func (taggr *topNPostAggregationProcessor) Push(x any) {
+ n := len(taggr.items)
+ item := x.(*topNAggregatorItem)
+ item.index = n
+ taggr.items = append(taggr.items, item)
+}
+
+func (taggr *topNPostAggregationProcessor) Pop() any {
+ old := taggr.items
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil
+ item.index = -1
+ taggr.items = old[0 : n-1]
+ return item
+}
+
+func (taggr *topNPostAggregationProcessor) tryEnqueue(key string, item
*topNAggregatorItem) {
+ if lowest := taggr.items[0]; lowest != nil {
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() < item.int64Func.Val()) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() > item.int64Func.Val())
+
+ if shouldReplace {
+ delete(taggr.cache, lowest.key)
+ taggr.cache[key] = item
+ taggr.items[0] = item
+ item.index = 0
+ heap.Fix(taggr, 0)
+ }
+ }
+}
+
+var _ flow.Element = (*topNAggregatorItem)(nil)
+
+type topNAggregatorItem struct {
+ int64Func aggregation.Func[int64]
+ key string
+ values pbv1.EntityValues
+ val int64
+ version int64
+ index int
+}
+
+func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+ tags := make([]*modelv1.Tag, len(n.values))
+ for i := 0; i < len(tags); i++ {
+ tags[i] = &modelv1.Tag{
+ Key: tagNames[i],
+ Value: n.values[i],
+ }
+ }
+ return tags
+}
+
+func (n *topNAggregatorItem) GetIndex() int {
+ return n.index
+}
+
+func (n *topNAggregatorItem) SetIndex(i int) {
+ n.index = i
+}
+
+type topNTimelineItem struct {
+ queue *flow.DedupPriorityQueue
+ items map[string]*topNAggregatorItem
+}
+
+type topNPostAggregationProcessor struct {
+ cache map[string]*topNAggregatorItem
+ timelines map[uint64]*topNTimelineItem
+ items []*topNAggregatorItem
+ sort modelv1.Sort
+ aggrFunc modelv1.AggregationFunction
+ topN int32
+}
+
+func (taggr *topNPostAggregationProcessor) Put(entityValues pbv1.EntityValues,
val int64, timestampMillis uint64, version int64) {
+ timeline, ok := taggr.timelines[timestampMillis]
+ key := entityValues.String()
+ if !ok {
+ timeline = &topNTimelineItem{
+ queue: flow.NewPriorityQueue(func(a, b interface{}) int
{
+ if taggr.sort == modelv1.Sort_SORT_DESC {
+ if a.(*topNAggregatorItem).val <
b.(*topNAggregatorItem).val {
+ return -1
+ } else if a.(*topNAggregatorItem).val
== b.(*topNAggregatorItem).val {
+ return 0
+ }
+ return 1
+ }
+ if a.(*topNAggregatorItem).val <
b.(*topNAggregatorItem).val {
+ return 1
+ } else if a.(*topNAggregatorItem).val ==
b.(*topNAggregatorItem).val {
+ return 0
+ }
+ return -1
+ }, false),
+ items: make(map[string]*topNAggregatorItem),
+ }
+
+ newItem := &topNAggregatorItem{
+ val: val,
+ key: key,
+ values: entityValues,
+ version: version,
+ }
+
+ timeline.items[key] = newItem
+ heap.Push(timeline.queue, newItem)
+ taggr.timelines[timestampMillis] = timeline
+ return
+ }
+
+ if item, exist := timeline.items[key]; exist {
+ if version >= item.version {
+ item.val = val
+ item.version = version
+ heap.Fix(timeline.queue, item.index)
+ }
+
+ return
+ }
+
+ newItem := &topNAggregatorItem{
+ val: val,
+ key: key,
+ values: entityValues,
+ version: version,
+ }
+
+ if timeline.queue.Len() < int(taggr.topN) {
+ heap.Push(timeline.queue, newItem)
+ timeline.items[key] = newItem
+ return
+ }
+
+ if lowest := timeline.queue.Peek(); lowest != nil {
+ lowestItem := lowest.(*topNAggregatorItem)
+
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowestItem.val < val) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC && lowestItem.val
> val)
+
+ if shouldReplace {
+ delete(timeline.items, lowestItem.key)
+ timeline.items[key] = newItem
+ timeline.queue.ReplaceLowest(newItem)
+ }
+ }
+}
+
+func (taggr *topNPostAggregationProcessor) Flush() ([]*topNAggregatorItem,
error) {
+ var result []*topNAggregatorItem
+
+ if taggr.aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ for _, timeline := range taggr.timelines {
+ for _, nonAggItem := range timeline.items {
+ result = append(result, nonAggItem)
+ }
+ }
+ clear(taggr.timelines)
+ } else {
+ for _, timeline := range taggr.timelines {
+ for _, item := range timeline.items {
+ if exist, found := taggr.cache[item.key]; found
{
+ exist.int64Func.In(item.val)
+ heap.Fix(taggr, exist.index)
+ continue
+ }
+
+ aggrFunc, err :=
aggregation.NewFunc[int64](taggr.aggrFunc)
+ if err != nil {
+ return nil, err
+ }
+
+ item.int64Func = aggrFunc
+ item.int64Func.In(item.val)
+
+ if taggr.Len() < int(taggr.topN) {
+ taggr.cache[item.key] = item
+ heap.Push(taggr, item)
+ } else {
+ taggr.tryEnqueue(item.key, item)
+ }
+ }
+ }
+ result = make([]*topNAggregatorItem, 0, taggr.Len())
+ for taggr.Len() > 0 {
+ item := heap.Pop(taggr).(*topNAggregatorItem)
+ result = append(result, item)
+ }
+ }
+
+ return result, nil
+}
+
+func (taggr *topNPostAggregationProcessor) Val(tagNames []string)
[]*measurev1.TopNList {
+ if taggr.aggrFunc !=
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ return taggr.valWithAggregation(tagNames)
+ }
+
+ return taggr.valWithoutAggregation(tagNames)
+}
+
+func (taggr *topNPostAggregationProcessor) valWithAggregation(tagNames
[]string) []*measurev1.TopNList {
+ topNAggregatorItems, err := taggr.Flush()
+ if err != nil {
+ return []*measurev1.TopNList{}
Review Comment:
When Flush returns an error, an empty TopNList slice is returned silently
without logging or propagating the error. This could hide important failures
from the caller. Consider logging the error or handling it more explicitly to
ensure failures are visible and can be diagnosed.
##########
banyand/measure/topn_aggregator.go:
##########
@@ -0,0 +1,343 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+ "container/heap"
+ "slices"
+ "time"
+
+ "google.golang.org/protobuf/types/known/timestamppb"
+
+ measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/flow"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/query/aggregation"
+)
+
+// PostProcessor defines necessary methods for Top-N post processor with or
without aggregation.
+type PostProcessor interface {
+ Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64,
version int64)
+ Flush() ([]*topNAggregatorItem, error)
+ Val([]string) []*measurev1.TopNList
+}
+
+// CreateTopNPostAggregator creates a Top-N post processor with or without
aggregation.
+func CreateTopNPostAggregator(topN int32, aggrFunc
modelv1.AggregationFunction, sort modelv1.Sort) PostProcessor {
+ if aggrFunc ==
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ // if aggregation is not specified, we have to keep all
timelines
+ return &topNPostAggregationProcessor{
+ topN: topN,
+ sort: sort,
+ timelines: make(map[uint64]*topNTimelineItem),
+ }
+ }
+ aggregator := &topNPostAggregationProcessor{
+ topN: topN,
+ sort: sort,
+ aggrFunc: aggrFunc,
+ cache: make(map[string]*topNAggregatorItem),
+ timelines: make(map[uint64]*topNTimelineItem),
+ items: make([]*topNAggregatorItem, 0, topN),
+ }
+ heap.Init(aggregator)
+ return aggregator
+}
+
+func (taggr *topNPostAggregationProcessor) Len() int {
+ return len(taggr.items)
+}
+
+// Less reports whether min/max heap has to be built.
+// For DESC, a min heap has to be built,
+// while for ASC, a max heap has to be built.
+func (taggr *topNPostAggregationProcessor) Less(i, j int) bool {
+ if taggr.sort == modelv1.Sort_SORT_DESC {
+ return taggr.items[i].int64Func.Val() <
taggr.items[j].int64Func.Val()
+ }
+ return taggr.items[i].int64Func.Val() > taggr.items[j].int64Func.Val()
+}
+
+func (taggr *topNPostAggregationProcessor) Swap(i, j int) {
+ taggr.items[i], taggr.items[j] = taggr.items[j], taggr.items[i]
+ taggr.items[i].index = i
+ taggr.items[j].index = j
+}
+
+func (taggr *topNPostAggregationProcessor) Push(x any) {
+ n := len(taggr.items)
+ item := x.(*topNAggregatorItem)
+ item.index = n
+ taggr.items = append(taggr.items, item)
+}
+
+func (taggr *topNPostAggregationProcessor) Pop() any {
+ old := taggr.items
+ n := len(old)
+ item := old[n-1]
+ old[n-1] = nil
+ item.index = -1
+ taggr.items = old[0 : n-1]
+ return item
+}
+
+func (taggr *topNPostAggregationProcessor) tryEnqueue(key string, item
*topNAggregatorItem) {
+ if lowest := taggr.items[0]; lowest != nil {
+ shouldReplace := (taggr.sort == modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() < item.int64Func.Val()) ||
+ (taggr.sort != modelv1.Sort_SORT_DESC &&
lowest.int64Func.Val() > item.int64Func.Val())
+
+ if shouldReplace {
+ delete(taggr.cache, lowest.key)
+ taggr.cache[key] = item
+ taggr.items[0] = item
+ item.index = 0
+ heap.Fix(taggr, 0)
+ }
+ }
+}
+
+var _ flow.Element = (*topNAggregatorItem)(nil)
+
+type topNAggregatorItem struct {
+ int64Func aggregation.Func[int64]
+ key string
+ values pbv1.EntityValues
+ val int64
+ version int64
+ index int
+}
+
+func (n *topNAggregatorItem) GetTags(tagNames []string) []*modelv1.Tag {
+ tags := make([]*modelv1.Tag, len(n.values))
+ for i := 0; i < len(tags); i++ {
+ tags[i] = &modelv1.Tag{
+ Key: tagNames[i],
+ Value: n.values[i],
+ }
+ }
+ return tags
+}
+
+func (n *topNAggregatorItem) GetIndex() int {
+ return n.index
+}
+
+func (n *topNAggregatorItem) SetIndex(i int) {
+ n.index = i
+}
+
+type topNTimelineItem struct {
+ queue *flow.DedupPriorityQueue
+ items map[string]*topNAggregatorItem
+}
+
+type topNPostAggregationProcessor struct {
Review Comment:
The type name `topNPostAggregationProcessor` is misleading because it's used
for both aggregation and non-aggregation cases. When aggrFunc is UNSPECIFIED
(line 43), this processor doesn't perform aggregation but is still named as if
it does. Consider renaming to something more neutral like `topNPostProcessor`
to accurately reflect its dual purpose.
##########
banyand/measure/query.go:
##########
@@ -842,6 +855,13 @@ func (qr *queryResult) merge(storedIndexValue
map[common.SeriesID]map[string]*mo
var lastVersion int64
var lastSid common.SeriesID
+ var topNPostAggregator PostProcessor
+
+ if qr.topNQueryOptions != nil {
+ topNPostAggregator =
CreateTopNPostAggregator(qr.topNQueryOptions.number,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED,
+ qr.topNQueryOptions.sortDirection)
+ }
Review Comment:
A new PostProcessor is created for each query result merge operation. If
there are many concurrent queries, this could lead to excessive allocations.
Consider reusing aggregators through object pooling if this becomes a
performance bottleneck. However, this may be acceptable depending on query
frequency and should be validated through profiling.
##########
banyand/measure/topn.go:
##########
@@ -842,7 +838,8 @@ func (t *TopNValue) resizeEntities(size, entitySize int)
[][]*modelv1.TagValue {
return t.entities
}
-func (t *TopNValue) marshal(dst []byte) ([]byte, error) {
+// Marshal marshal the topNValue to the dst.
Review Comment:
The documentation comment should follow Go conventions with proper grammar.
It should be "Marshal marshals the topNValue to the dst" instead of "Marshal
marshal the topNValue to the dst". The verb should be in third-person singular
present tense.
```suggestion
// Marshal marshals the topNValue to the dst.
```
##########
banyand/measure/topn.go:
##########
@@ -784,12 +778,14 @@ type TopNValue struct {
firstValue int64
}
-func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+// SetMetadata set valueName and entityTagNames.
+func (t *TopNValue) SetMetadata(valueName string, entityTagNames []string) {
t.valueName = valueName
t.entityTagNames = entityTagNames
}
-func (t *TopNValue) addValue(value int64, entityValues []*modelv1.TagValue) {
+// AddValue add value and entityValues.
Review Comment:
The documentation comment should follow Go conventions with proper grammar.
It should be "AddValue adds value and entityValues" instead of "AddValue add
value and entityValues". The verb should be in third-person singular present
tense.
```suggestion
// AddValue adds value and entityValues.
```
##########
banyand/measure/topn.go:
##########
@@ -784,12 +778,14 @@ type TopNValue struct {
firstValue int64
}
-func (t *TopNValue) setMetadata(valueName string, entityTagNames []string) {
+// SetMetadata set valueName and entityTagNames.
Review Comment:
The documentation comment should follow Go conventions with proper grammar.
It should be "SetMetadata sets valueName and entityTagNames" instead of
"SetMetadata set valueName and entityTagNames". The verb should be in
third-person singular present tense.
```suggestion
// SetMetadata sets valueName and entityTagNames.
```
##########
banyand/measure/topn.go:
##########
@@ -414,6 +407,7 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord, buf
},
},
},
+ Version: time.Now().UnixNano(),
Review Comment:
Using `time.Now().UnixNano()` for version generation could lead to version
conflicts in high-throughput scenarios where multiple operations occur within
the same nanosecond. Consider using a more robust versioning mechanism such as
a monotonically increasing counter or a combination of timestamp and sequence
number to ensure uniqueness.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]