This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 27cfc5d0e Refactor TopN Aggregation result merge process (#961)
27cfc5d0e is described below

commit 27cfc5d0eed6891df69eacdf524611cb9fe241e3
Author: peachisai <[email protected]>
AuthorDate: Sun Feb 8 19:26:12 2026 +0800

    Refactor TopN Aggregation result merge process (#961)
    
    * Refactor TopN Aggregation result merge process
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 banyand/measure/block.go               | 145 ++++++--
 banyand/measure/merger.go              |  57 ++-
 banyand/measure/merger_test.go         | 636 +++++++++++++++++++++++++++++++++
 banyand/measure/metadata.go            |  33 +-
 banyand/measure/topn.go                |  13 +
 banyand/measure/topn_post_processor.go |  13 +-
 6 files changed, 862 insertions(+), 35 deletions(-)

diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 286f4bbf6..202e05245 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -731,13 +731,7 @@ func (bc *blockCursor) mergeTopNResult(r 
*model.MeasureResult, storedIndexValue
                valueName := topNValue.valueName
                entityTagNames := topNValue.entityTagNames
 
-               for j, entityList := range topNValue.entities {
-                       entityValues := make(pbv1.EntityValues, 0, 
len(entityList))
-                       for _, e := range entityList {
-                               entityValues = append(entityValues, e)
-                       }
-                       topNPostAggregator.Put(entityValues, 
topNValue.values[j], uTimestamps, r.Versions[len(r.Versions)-1])
-               }
+               putEntitiesToAggregator(topNValue, topNPostAggregator, 
uTimestamps, r.Versions[len(r.Versions)-1])
 
                topNValue.Reset()
                if err := topNValue.Unmarshal(destFieldValue.GetBinaryData(), 
decoder); err != nil {
@@ -745,18 +739,11 @@ func (bc *blockCursor) mergeTopNResult(r 
*model.MeasureResult, storedIndexValue
                        continue
                }
 
-               for j, entityList := range topNValue.entities {
-                       entityValues := make(pbv1.EntityValues, 0, 
len(entityList))
-                       for _, e := range entityList {
-                               entityValues = append(entityValues, e)
-                       }
-                       topNPostAggregator.Put(entityValues, 
topNValue.values[j], uTimestamps, bc.versions[bc.idx])
-               }
+               putEntitiesToAggregator(topNValue, topNPostAggregator, 
uTimestamps, bc.versions[bc.idx])
 
                items, err := topNPostAggregator.Flush()
                if err != nil {
-                       log.Error().Err(err).Msg("failed to flush aggregator, 
skip current batch")
-                       continue
+                       log.Error().Err(err).Msg("failed to flush aggregator")
                }
 
                topNValue.Reset()
@@ -768,8 +755,7 @@ func (bc *blockCursor) mergeTopNResult(r 
*model.MeasureResult, storedIndexValue
 
                buf, err := topNValue.marshal(make([]byte, 0, 128))
                if err != nil {
-                       log.Error().Err(err).Msg("failed to marshal topN value, 
skip current batch")
-                       continue
+                       log.Error().Err(err).Msg("failed to flush aggregator")
                }
 
                r.Fields[i].Values[len(r.Fields[i].Values)-1] = 
&modelv1.FieldValue{
@@ -779,7 +765,7 @@ func (bc *blockCursor) mergeTopNResult(r 
*model.MeasureResult, storedIndexValue
                }
        }
 
-       if bc.versions[bc.idx] >= r.Versions[len(r.Versions)-1] {
+       if bc.versions[bc.idx] > r.Versions[len(r.Versions)-1] {
                r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
        }
 }
@@ -955,16 +941,8 @@ func (bi *blockPointer) append(b *blockPointer, offset 
int) {
        if offset <= b.idx {
                return
        }
-       if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
-               fullTagAppend(bi, b, offset)
-       } else {
-               if err := fastTagAppend(bi, b, offset); err != nil {
-                       if log.Debug().Enabled() {
-                               log.Debug().Msgf("fastTagMerge failed: %v; 
falling back to fullTagMerge", err)
-                       }
-                       fullTagAppend(bi, b, offset)
-               }
-       }
+
+       bi.appendTagFamilies(b, offset)
 
        if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
                fullFieldAppend(bi, b, offset)
@@ -1151,6 +1129,115 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
        }
 }
 
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int, 
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+       topNValue := GenerateTopNValue()
+       defer ReleaseTopNValue(topNValue)
+       decoder := GenerateTopNValuesDecoder()
+       defer ReleaseTopNValuesDecoder(decoder)
+
+       topNPostAggregator.Reset()
+
+       uTimestamp := uint64(right.timestamps[rightIdx])
+       marshalBuf := make([]byte, 0, 128)
+       leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+       if len(bi.field.columns) == 0 {
+               for _, c := range right.field.columns {
+                       bi.field.columns = append(bi.field.columns, column{
+                               name:      c.name,
+                               valueType: c.valueType,
+                               values:    make([][]byte, 0),
+                       })
+               }
+       }
+
+       for idx := range right.field.columns {
+               topNValue.Reset()
+
+               var valueName string
+               var entityTagNames []string
+               hasValidData := false
+
+               if err := 
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err != 
nil {
+                       log.Warn().Err(err).Msg("failed to unmarshal left topN 
value, ignoring left side")
+               } else {
+                       valueName = topNValue.valueName
+                       entityTagNames = topNValue.entityTagNames
+                       putEntitiesToAggregator(topNValue, topNPostAggregator, 
uTimestamp, leftVer)
+                       hasValidData = true
+               }
+
+               topNValue.Reset()
+               if err := 
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err != 
nil {
+                       log.Warn().Err(err).Msg("failed to unmarshal right topN 
value, ignoring right side")
+               } else {
+                       if !hasValidData {
+                               valueName = topNValue.valueName
+                               entityTagNames = topNValue.entityTagNames
+                       }
+                       putEntitiesToAggregator(topNValue, topNPostAggregator, 
uTimestamp, rightVer)
+                       hasValidData = true
+               }
+
+               if !hasValidData {
+                       log.Error().Msg("both sides of topN value are 
malformed, append empty value")
+                       bi.field.columns[idx].values = 
append(bi.field.columns[idx].values, []byte{})
+                       continue
+               }
+
+               topNValue.Reset()
+               topNValue.setMetadata(valueName, entityTagNames)
+
+               items, err := topNPostAggregator.Flush()
+               if err != nil {
+                       log.Panic().Err(err).Msg("failed to flush aggregator")
+               }
+
+               for _, item := range items {
+                       topNValue.addValue(item.val, item.values)
+               }
+
+               marshalBuf = marshalBuf[:0]
+               buf, err := topNValue.marshal(marshalBuf)
+               if err != nil {
+                       log.Panic().Err(err).Msg("failed to marshal merged topN 
value")
+               }
+
+               bi.field.columns[idx].values = 
append(bi.field.columns[idx].values, buf)
+       }
+
+       if rightVer >= leftVer {
+               bi.appendTagFamilies(right, rightIdx+1)
+               bi.versions = append(bi.versions, rightVer)
+       } else {
+               bi.appendTagFamilies(left, leftIdx+1)
+               bi.versions = append(bi.versions, leftVer)
+       }
+       bi.timestamps = append(bi.timestamps, right.timestamps[rightIdx])
+}
+
+func (bi *blockPointer) appendTagFamilies(b *blockPointer, offset int) {
+       if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
+               fullTagAppend(bi, b, offset)
+               return
+       }
+
+       if err := fastTagAppend(bi, b, offset); err != nil {
+               if log.Debug().Enabled() {
+                       log.Debug().Msgf("fastTagAppend failed: %v; falling 
back to fullTagAppend", err)
+               }
+               fullTagAppend(bi, b, offset)
+       }
+}
+
+func putEntitiesToAggregator(topNValue *TopNValue, aggregator PostProcessor, 
timestamp uint64, version int64) {
+       for i, entityList := range topNValue.entities {
+               entityValues := make(pbv1.EntityValues, len(entityList))
+               copy(entityValues, entityList)
+               aggregator.Put(entityValues, topNValue.values[i], timestamp, 
version)
+       }
+}
+
 func assertIdxAndOffset(name string, length int, idx int, offset int) {
        if idx >= offset {
                logger.Panicf("%q idx %d must be less than offset %d", name, 
idx, offset)
diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go
index 1337dbcab..ef359288c 100644
--- a/banyand/measure/merger.go
+++ b/banyand/measure/merger.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/dustin/go-humanize"
 
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/pkg/cgroups"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -390,6 +391,20 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
                left, right = right, left
        }
 
+       var topNProcessor PostProcessor
+       var isTopN bool
+
+       if isTopNBlock(left) {
+               sort, limit, err := parseTopNMeta(left)
+               if err != nil {
+                       log.Error().Err(err).Msg("failed to parse TopN 
metadata, falling back to normal merge")
+                       isTopN = false
+               } else {
+                       isTopN = true
+                       topNProcessor = CreateTopNPostProcessor(limit, 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED, sort)
+               }
+       }
+
        for {
                i := left.idx
                ts2 := right.timestamps[right.idx]
@@ -397,12 +412,18 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
                        i++
                }
                if i > left.idx && left.timestamps[i-1] == ts2 {
-                       if left.versions[i-1] >= right.versions[right.idx] {
-                               target.append(left, i)
+                       if isTopN {
+                               target.append(left, i-1)
+                               target.mergeAndAppendTopN(left, i-1, right, 
right.idx, topNProcessor)
                        } else {
-                               target.append(left, i-1) // skip left
-                               target.append(right, right.idx+1)
+                               if left.versions[i-1] >= 
right.versions[right.idx] {
+                                       target.append(left, i)
+                               } else {
+                                       target.append(left, i-1) // skip left
+                                       target.append(right, right.idx+1)
+                               }
                        }
+
                        left.idx = i
                        right.idx++ // skip right
                        if appendIfEmpty(right, left) {
@@ -418,3 +439,31 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
                left, right = right, left
        }
 }
+
+func isTopNBlock(b *blockPointer) bool {
+       families := b.tagFamilies
+       if len(families) == 0 {
+               return false
+       }
+
+       if families[0].name == TopNTagFamily {
+               return true
+       }
+
+       return false
+}
+
+func parseTopNMeta(b *blockPointer) (modelv1.Sort, int32, error) {
+       tf := b.tagFamilies[0]
+
+       sortVal := mustDecodeTagValue(tf.columns[1].valueType, 
tf.columns[1].values[b.idx])
+
+       paramsVal := mustDecodeTagValue(tf.columns[3].valueType, 
tf.columns[3].values[b.idx])
+       paramStr := paramsVal.GetStr().Value
+       params, err := ParseTopNParameters(paramStr)
+       if err != nil {
+               return modelv1.Sort_SORT_UNSPECIFIED, 0, fmt.Errorf("failed to 
parse topN parameters : %w", err)
+       }
+
+       return modelv1.Sort(sortVal.GetInt().Value), int32(params.Limit), nil
+}
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 5e56fd674..7cdf97b98 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -28,6 +28,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -211,6 +212,390 @@ func Test_mergeTwoBlocks(t *testing.T) {
                        },
                        want: &blockPointer{block: mergedBlock, bm: 
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}},
                },
+               {
+                       name: "Merge two topN blocks",
+                       left: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2},
+                                       versions:   []int64{1, 2},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value1"),
+                                                                               
[]byte("value2"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value3"),
+                                                                               
[]byte("value4"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("2000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field1"), 
leftTopNBinaryData}},
+                                               },
+                                       },
+                               },
+                       },
+                       right: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{2, 3},
+                                       versions:   []int64{3, 4},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value5"),
+                                                                               
[]byte("value6"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value7"),
+                                                                               
[]byte("value8"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("2000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData, 
[]byte("field3")}},
+                                               },
+                                       },
+                               },
+                       },
+                       want: &blockPointer{block: mergedTopNBlock, bm: 
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+               },
+               {
+                       name: "Merging TopN blocks where the first timestamps 
are equal",
+                       left: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2},
+                                       versions:   []int64{1, 2},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated1"),
+                                                                               
[]byte("value1"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated2"),
+                                                                               
[]byte("value3"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{leftTopNBinaryData, 
[]byte("field1")}},
+                                               },
+                                       },
+                               },
+                       },
+                       right: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 3},
+                                       versions:   []int64{2, 3},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value5"),
+                                                                               
[]byte("value6"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value7"),
+                                                                               
[]byte("value8"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData, 
[]byte("field3")}},
+                                               },
+                                       },
+                               },
+                       },
+                       want: &blockPointer{block: mergedTopNBlock2, bm: 
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+               },
+               {
+                       name: "Merging TopN blocks where Left is malformed",
+                       left: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2},
+                                       versions:   []int64{1, 2},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated1"),
+                                                                               
[]byte("value1"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated2"),
+                                                                               
[]byte("value3"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field1")}},
+                                               },
+                                       },
+                               },
+                       },
+                       right: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 3},
+                                       versions:   []int64{2, 3},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value5"),
+                                                                               
[]byte("value6"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value7"),
+                                                                               
[]byte("value8"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData, 
[]byte("field3")}},
+                                               },
+                                       },
+                               },
+                       },
+                       want: &blockPointer{block: mergedTopNBlock3, bm: 
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+               },
+               {
+                       name: "Merging TopN blocks where both side are 
malformed",
+                       left: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 2},
+                                       versions:   []int64{1, 2},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated1"),
+                                                                               
[]byte("value1"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("duplicated2"),
+                                                                               
[]byte("value3"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field1")}},
+                                               },
+                                       },
+                               },
+                       },
+                       right: &blockPointer{
+                               block: block{
+                                       timestamps: []int64{1, 3},
+                                       versions:   []int64{2, 3},
+                                       tagFamilies: []columnFamily{
+                                               {
+                                                       name: "_topN",
+                                                       columns: []column{
+                                                               {
+                                                                       name: 
"name", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value5"),
+                                                                               
[]byte("value6"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"direction", valueType: pbv1.ValueTypeInt64,
+                                                                       values: 
[][]byte{
+                                                                               
convert.Int64ToBytes(2),
+                                                                               
convert.Int64ToBytes(2),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"group", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("value7"),
+                                                                               
[]byte("value8"),
+                                                                       },
+                                                               },
+                                                               {
+                                                                       name: 
"parameters", valueType: pbv1.ValueTypeStr,
+                                                                       values: 
[][]byte{
+                                                                               
[]byte("1000"),
+                                                                               
[]byte("1000"),
+                                                                       },
+                                                               },
+                                                       },
+                                               },
+                                       },
+                                       field: columnFamily{
+                                               columns: []column{
+                                                       {name: "value", 
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field3")}},
+                                               },
+                                       },
+                               },
+                       },
+                       want: &blockPointer{block: mergedTopNBlock4, bm: 
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+               },
        }
 
        for _, tt := range tests {
@@ -224,6 +609,61 @@ func Test_mergeTwoBlocks(t *testing.T) {
        }
 }
 
+var (
+       leftTopNValue = &TopNValue{
+               valueName:      "value",
+               entityTagNames: []string{"entity_id"},
+               values:         []int64{1000, 200, 300, 400, 500},
+               entities: [][]*modelv1.TagValue{
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_1"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_2"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_3"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_4"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_5"}}}},
+               },
+       }
+       rightTopNValue = &TopNValue{
+               valueName:      "value",
+               entityTagNames: []string{"entity_id"},
+               values:         []int64{600, 550, 530, 400, 300},
+               entities: [][]*modelv1.TagValue{
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_6"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_3"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_5"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_7"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_4"}}}},
+               },
+       }
+
+       mergedTopNValue = &TopNValue{
+               valueName:      "value",
+               entityTagNames: []string{"entity_id"},
+               values:         []int64{1000, 600, 550, 530, 400, 300, 200},
+               entities: [][]*modelv1.TagValue{
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_1"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_6"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_3"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_5"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_7"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_4"}}}},
+                       {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
"entity_2"}}}},
+               },
+       }
+
+       leftTopNBinaryData = func() []byte {
+               b, _ := leftTopNValue.marshal(make([]byte, 0, 256))
+               return b
+       }()
+       rightTopNBinaryData = func() []byte {
+               b, _ := rightTopNValue.marshal(make([]byte, 0, 256))
+               return b
+       }()
+       mergedTopNBinaryData = func() []byte {
+               b, _ := mergedTopNValue.marshal(make([]byte, 0, 256))
+               return b
+       }()
+)
+
 // Test_mergeTwoBlocks_edgeCase tests the edge case that previously caused 
panic:
 // runtime error: index out of range [-1] at merger.go:394.
 // This occurs when left.idx = 0 and left.timestamps[0] > 
right.timestamps[right.idx].
@@ -330,6 +770,202 @@ var mergedBlock = block{
        },
 }
 
+var mergedTopNBlock = block{
+       timestamps: []int64{1, 2, 3},
+       versions:   []int64{1, 3, 4},
+       tagFamilies: []columnFamily{
+               {
+                       name: "_topN",
+                       columns: []column{
+                               {
+                                       name: "name", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value1"),
+                                               []byte("value5"),
+                                               []byte("value6"),
+                                       },
+                               },
+                               {
+                                       name: "direction", valueType: 
pbv1.ValueTypeInt64,
+                                       values: [][]byte{
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                       },
+                               },
+                               {
+                                       name: "group", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value3"),
+                                               []byte("value7"),
+                                               []byte("value8"),
+                                       },
+                               },
+                               {
+                                       name: "parameters", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                               []byte("2000"),
+                                       },
+                               },
+                       },
+               },
+       },
+       field: columnFamily{
+               columns: []column{
+                       {name: "value", valueType: pbv1.ValueTypeStr, values: 
[][]byte{[]byte("field1"), mergedTopNBinaryData, []byte("field3")}},
+               },
+       },
+}
+
+var mergedTopNBlock2 = block{
+       timestamps: []int64{1, 2, 3},
+       versions:   []int64{2, 2, 3},
+       tagFamilies: []columnFamily{
+               {
+                       name: "_topN",
+                       columns: []column{
+                               {
+                                       name: "name", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value5"),
+                                               []byte("value1"),
+                                               []byte("value6"),
+                                       },
+                               },
+                               {
+                                       name: "direction", valueType: 
pbv1.ValueTypeInt64,
+                                       values: [][]byte{
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                       },
+                               },
+                               {
+                                       name: "group", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value7"),
+                                               []byte("value3"),
+                                               []byte("value8"),
+                                       },
+                               },
+                               {
+                                       name: "parameters", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                       },
+                               },
+                       },
+               },
+       },
+       field: columnFamily{
+               columns: []column{
+                       {name: "value", valueType: pbv1.ValueTypeStr, values: 
[][]byte{mergedTopNBinaryData, []byte("field1"), []byte("field3")}},
+               },
+       },
+}
+
+var mergedTopNBlock3 = block{
+       timestamps: []int64{1, 2, 3},
+       versions:   []int64{2, 2, 3},
+       tagFamilies: []columnFamily{
+               {
+                       name: "_topN",
+                       columns: []column{
+                               {
+                                       name: "name", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value5"),
+                                               []byte("value1"),
+                                               []byte("value6"),
+                                       },
+                               },
+                               {
+                                       name: "direction", valueType: 
pbv1.ValueTypeInt64,
+                                       values: [][]byte{
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                       },
+                               },
+                               {
+                                       name: "group", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value7"),
+                                               []byte("value3"),
+                                               []byte("value8"),
+                                       },
+                               },
+                               {
+                                       name: "parameters", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                       },
+                               },
+                       },
+               },
+       },
+       field: columnFamily{
+               columns: []column{
+                       {name: "value", valueType: pbv1.ValueTypeStr, values: 
[][]byte{rightTopNBinaryData, []byte("field1"), []byte("field3")}},
+               },
+       },
+}
+
+var mergedTopNBlock4 = block{
+       timestamps: []int64{1, 2, 3},
+       versions:   []int64{2, 2, 3},
+       tagFamilies: []columnFamily{
+               {
+                       name: "_topN",
+                       columns: []column{
+                               {
+                                       name: "name", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value5"),
+                                               []byte("value1"),
+                                               []byte("value6"),
+                                       },
+                               },
+                               {
+                                       name: "direction", valueType: 
pbv1.ValueTypeInt64,
+                                       values: [][]byte{
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                               convert.Int64ToBytes(2),
+                                       },
+                               },
+                               {
+                                       name: "group", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("value7"),
+                                               []byte("value3"),
+                                               []byte("value8"),
+                                       },
+                               },
+                               {
+                                       name: "parameters", valueType: 
pbv1.ValueTypeStr,
+                                       values: [][]byte{
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                               []byte("1000"),
+                                       },
+                               },
+                       },
+               },
+       },
+       field: columnFamily{
+               columns: []column{
+                       {name: "value", valueType: pbv1.ValueTypeStr, values: 
[][]byte{{}, []byte("field1"), []byte("field3")}},
+               },
+       },
+}
+
 func Test_mergeParts(t *testing.T) {
        tests := []struct {
                wantErr error
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 3482b0964..fd117050c 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -21,6 +21,7 @@ import (
        "context"
        "fmt"
        "path"
+       "strconv"
        "strings"
        "sync"
        "time"
@@ -67,7 +68,7 @@ var (
                CompressionMethod: 
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
        }}
        // TopNTagNames is the tag names of the topN result measure.
-       TopNTagNames = []string{"name", "direction", "group", "source"}
+       TopNTagNames = []string{"name", "direction", "group", "parameters"}
 )
 
 // SchemaService allows querying schema information.
@@ -896,3 +897,33 @@ func GetTopNSchemaMetadata(group string) 
*commonv1.Metadata {
 func getKey(metadata *commonv1.Metadata) string {
        return path.Join(metadata.GetGroup(), metadata.GetName())
 }
+
+// TopNParameters defines the structure for the "parameters" tag value (JSON).
+type TopNParameters struct {
+       // Limit defines the number of top items to be kept.
+       Limit int64
+}
+
+// String implements the fmt.Stringer interface.
+func (p *TopNParameters) String() string {
+       if p == nil {
+               return ""
+       }
+       return strconv.FormatInt(p.Limit, 10)
+}
+
+// ParseTopNParameters decodes the JSON metadata.
+func ParseTopNParameters(val string) (*TopNParameters, error) {
+       if val == "" {
+               return &TopNParameters{}, nil
+       }
+
+       limit, err := strconv.ParseInt(val, 10, 64)
+       if err != nil {
+               return nil, err
+       }
+
+       return &TopNParameters{
+               Limit: limit,
+       }, nil
+}
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 3c042017f..c09340a6b 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -364,6 +364,12 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord, buf
                        )
                        shardID = data[3].(uint32)
                }
+
+               params := &TopNParameters{
+                       Limit: int64(t.topNSchema.CountersNumber),
+               }
+               paramsStr := params.String()
+
                entityValues := []*modelv1.TagValue{
                        {
                                Value: &modelv1.TagValue_Str{
@@ -386,6 +392,13 @@ func (t *topNStreamingProcessor) writeStreamRecord(record 
flow.StreamRecord, buf
                                        },
                                },
                        },
+                       {
+                               Value: &modelv1.TagValue_Str{
+                                       Str: &modelv1.Str{
+                                               Value: paramsStr,
+                                       },
+                               },
+                       },
                }
                buf = buf[:0]
                if buf, err = topNValue.marshal(buf); err != nil {
diff --git a/banyand/measure/topn_post_processor.go 
b/banyand/measure/topn_post_processor.go
index fde0f5a44..4d219d73c 100644
--- a/banyand/measure/topn_post_processor.go
+++ b/banyand/measure/topn_post_processor.go
@@ -36,6 +36,7 @@ type PostProcessor interface {
        Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64, 
version int64)
        Flush() ([]*topNAggregatorItem, error)
        Val([]string) ([]*measurev1.TopNList, error)
+       Reset()
 }
 
 // CreateTopNPostProcessor creates a Top-N post processor with or without 
aggregation.
@@ -240,7 +241,6 @@ func (taggr *topNPostProcessor) Flush() 
([]*topNAggregatorItem, error) {
                                result = append(result, item)
                        }
                }
-               clear(taggr.timelines)
        } else {
                for _, timeline := range taggr.timelines {
                        for _, item := range timeline.items {
@@ -272,6 +272,7 @@ func (taggr *topNPostProcessor) Flush() 
([]*topNAggregatorItem, error) {
                        result = append(result, item)
                }
        }
+       taggr.Reset()
 
        return result, nil
 }
@@ -342,3 +343,13 @@ func (taggr *topNPostProcessor) 
valWithoutAggregation(tagNames []string) []*meas
 
        return topNLists
 }
+
+func (taggr *topNPostProcessor) Reset() {
+       clear(taggr.timelines)
+
+       if taggr.aggrFunc != 
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+               clear(taggr.cache)
+
+               taggr.items = taggr.items[:0]
+       }
+}

Reply via email to