This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 27cfc5d0e Refactor TopN Aggregation result merge process (#961)
27cfc5d0e is described below
commit 27cfc5d0eed6891df69eacdf524611cb9fe241e3
Author: peachisai <[email protected]>
AuthorDate: Sun Feb 8 19:26:12 2026 +0800
Refactor TopN Aggregation result merge process (#961)
* Refactor TopN Aggregation result merge process
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
banyand/measure/block.go | 145 ++++++--
banyand/measure/merger.go | 57 ++-
banyand/measure/merger_test.go | 636 +++++++++++++++++++++++++++++++++
banyand/measure/metadata.go | 33 +-
banyand/measure/topn.go | 13 +
banyand/measure/topn_post_processor.go | 13 +-
6 files changed, 862 insertions(+), 35 deletions(-)
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 286f4bbf6..202e05245 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -731,13 +731,7 @@ func (bc *blockCursor) mergeTopNResult(r
*model.MeasureResult, storedIndexValue
valueName := topNValue.valueName
entityTagNames := topNValue.entityTagNames
- for j, entityList := range topNValue.entities {
- entityValues := make(pbv1.EntityValues, 0,
len(entityList))
- for _, e := range entityList {
- entityValues = append(entityValues, e)
- }
- topNPostAggregator.Put(entityValues,
topNValue.values[j], uTimestamps, r.Versions[len(r.Versions)-1])
- }
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamps, r.Versions[len(r.Versions)-1])
topNValue.Reset()
if err := topNValue.Unmarshal(destFieldValue.GetBinaryData(),
decoder); err != nil {
@@ -745,18 +739,11 @@ func (bc *blockCursor) mergeTopNResult(r
*model.MeasureResult, storedIndexValue
continue
}
- for j, entityList := range topNValue.entities {
- entityValues := make(pbv1.EntityValues, 0,
len(entityList))
- for _, e := range entityList {
- entityValues = append(entityValues, e)
- }
- topNPostAggregator.Put(entityValues,
topNValue.values[j], uTimestamps, bc.versions[bc.idx])
- }
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamps, bc.versions[bc.idx])
items, err := topNPostAggregator.Flush()
if err != nil {
- log.Error().Err(err).Msg("failed to flush aggregator,
skip current batch")
- continue
+ log.Error().Err(err).Msg("failed to flush aggregator")
}
topNValue.Reset()
@@ -768,8 +755,7 @@ func (bc *blockCursor) mergeTopNResult(r
*model.MeasureResult, storedIndexValue
buf, err := topNValue.marshal(make([]byte, 0, 128))
if err != nil {
- log.Error().Err(err).Msg("failed to marshal topN value,
skip current batch")
- continue
+ log.Error().Err(err).Msg("failed to flush aggregator")
}
r.Fields[i].Values[len(r.Fields[i].Values)-1] =
&modelv1.FieldValue{
@@ -779,7 +765,7 @@ func (bc *blockCursor) mergeTopNResult(r
*model.MeasureResult, storedIndexValue
}
}
- if bc.versions[bc.idx] >= r.Versions[len(r.Versions)-1] {
+ if bc.versions[bc.idx] > r.Versions[len(r.Versions)-1] {
r.Versions[len(r.Versions)-1] = bc.versions[bc.idx]
}
}
@@ -955,16 +941,8 @@ func (bi *blockPointer) append(b *blockPointer, offset
int) {
if offset <= b.idx {
return
}
- if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
- fullTagAppend(bi, b, offset)
- } else {
- if err := fastTagAppend(bi, b, offset); err != nil {
- if log.Debug().Enabled() {
- log.Debug().Msgf("fastTagMerge failed: %v;
falling back to fullTagMerge", err)
- }
- fullTagAppend(bi, b, offset)
- }
- }
+
+ bi.appendTagFamilies(b, offset)
if len(bi.field.columns) == 0 && len(b.field.columns) > 0 {
fullFieldAppend(bi, b, offset)
@@ -1151,6 +1129,115 @@ func fullFieldAppend(bi, b *blockPointer, offset int) {
}
}
+func (bi *blockPointer) mergeAndAppendTopN(left *blockPointer, leftIdx int,
right *blockPointer, rightIdx int, topNPostAggregator PostProcessor) {
+ topNValue := GenerateTopNValue()
+ defer ReleaseTopNValue(topNValue)
+ decoder := GenerateTopNValuesDecoder()
+ defer ReleaseTopNValuesDecoder(decoder)
+
+ topNPostAggregator.Reset()
+
+ uTimestamp := uint64(right.timestamps[rightIdx])
+ marshalBuf := make([]byte, 0, 128)
+ leftVer, rightVer := left.versions[leftIdx], right.versions[rightIdx]
+
+ if len(bi.field.columns) == 0 {
+ for _, c := range right.field.columns {
+ bi.field.columns = append(bi.field.columns, column{
+ name: c.name,
+ valueType: c.valueType,
+ values: make([][]byte, 0),
+ })
+ }
+ }
+
+ for idx := range right.field.columns {
+ topNValue.Reset()
+
+ var valueName string
+ var entityTagNames []string
+ hasValidData := false
+
+ if err :=
topNValue.Unmarshal(left.field.columns[idx].values[leftIdx], decoder); err !=
nil {
+ log.Warn().Err(err).Msg("failed to unmarshal left topN
value, ignoring left side")
+ } else {
+ valueName = topNValue.valueName
+ entityTagNames = topNValue.entityTagNames
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, leftVer)
+ hasValidData = true
+ }
+
+ topNValue.Reset()
+ if err :=
topNValue.Unmarshal(right.field.columns[idx].values[rightIdx], decoder); err !=
nil {
+ log.Warn().Err(err).Msg("failed to unmarshal right topN
value, ignoring right side")
+ } else {
+ if !hasValidData {
+ valueName = topNValue.valueName
+ entityTagNames = topNValue.entityTagNames
+ }
+ putEntitiesToAggregator(topNValue, topNPostAggregator,
uTimestamp, rightVer)
+ hasValidData = true
+ }
+
+ if !hasValidData {
+ log.Error().Msg("both sides of topN value are
malformed, append empty value")
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, []byte{})
+ continue
+ }
+
+ topNValue.Reset()
+ topNValue.setMetadata(valueName, entityTagNames)
+
+ items, err := topNPostAggregator.Flush()
+ if err != nil {
+ log.Panic().Err(err).Msg("failed to flush aggregator")
+ }
+
+ for _, item := range items {
+ topNValue.addValue(item.val, item.values)
+ }
+
+ marshalBuf = marshalBuf[:0]
+ buf, err := topNValue.marshal(marshalBuf)
+ if err != nil {
+ log.Panic().Err(err).Msg("failed to marshal merged topN
value")
+ }
+
+ bi.field.columns[idx].values =
append(bi.field.columns[idx].values, buf)
+ }
+
+ if rightVer >= leftVer {
+ bi.appendTagFamilies(right, rightIdx+1)
+ bi.versions = append(bi.versions, rightVer)
+ } else {
+ bi.appendTagFamilies(left, leftIdx+1)
+ bi.versions = append(bi.versions, leftVer)
+ }
+ bi.timestamps = append(bi.timestamps, right.timestamps[rightIdx])
+}
+
+func (bi *blockPointer) appendTagFamilies(b *blockPointer, offset int) {
+ if len(bi.tagFamilies) == 0 && len(b.tagFamilies) > 0 {
+ fullTagAppend(bi, b, offset)
+ return
+ }
+
+ if err := fastTagAppend(bi, b, offset); err != nil {
+ if log.Debug().Enabled() {
+ log.Debug().Msgf("fastTagAppend failed: %v; falling
back to fullTagAppend", err)
+ }
+ fullTagAppend(bi, b, offset)
+ }
+}
+
+func putEntitiesToAggregator(topNValue *TopNValue, aggregator PostProcessor,
timestamp uint64, version int64) {
+ for i, entityList := range topNValue.entities {
+ entityValues := make(pbv1.EntityValues, len(entityList))
+ copy(entityValues, entityList)
+ aggregator.Put(entityValues, topNValue.values[i], timestamp,
version)
+ }
+}
+
func assertIdxAndOffset(name string, length int, idx int, offset int) {
if idx >= offset {
logger.Panicf("%q idx %d must be less than offset %d", name,
idx, offset)
diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go
index 1337dbcab..ef359288c 100644
--- a/banyand/measure/merger.go
+++ b/banyand/measure/merger.go
@@ -25,6 +25,7 @@ import (
"github.com/dustin/go-humanize"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/pkg/cgroups"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -390,6 +391,20 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
left, right = right, left
}
+ var topNProcessor PostProcessor
+ var isTopN bool
+
+ if isTopNBlock(left) {
+ sort, limit, err := parseTopNMeta(left)
+ if err != nil {
+ log.Error().Err(err).Msg("failed to parse TopN
metadata, falling back to normal merge")
+ isTopN = false
+ } else {
+ isTopN = true
+ topNProcessor = CreateTopNPostProcessor(limit,
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED, sort)
+ }
+ }
+
for {
i := left.idx
ts2 := right.timestamps[right.idx]
@@ -397,12 +412,18 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
i++
}
if i > left.idx && left.timestamps[i-1] == ts2 {
- if left.versions[i-1] >= right.versions[right.idx] {
- target.append(left, i)
+ if isTopN {
+ target.append(left, i-1)
+ target.mergeAndAppendTopN(left, i-1, right,
right.idx, topNProcessor)
} else {
- target.append(left, i-1) // skip left
- target.append(right, right.idx+1)
+ if left.versions[i-1] >=
right.versions[right.idx] {
+ target.append(left, i)
+ } else {
+ target.append(left, i-1) // skip left
+ target.append(right, right.idx+1)
+ }
}
+
left.idx = i
right.idx++ // skip right
if appendIfEmpty(right, left) {
@@ -418,3 +439,31 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
left, right = right, left
}
}
+
+func isTopNBlock(b *blockPointer) bool {
+ families := b.tagFamilies
+ if len(families) == 0 {
+ return false
+ }
+
+ if families[0].name == TopNTagFamily {
+ return true
+ }
+
+ return false
+}
+
+func parseTopNMeta(b *blockPointer) (modelv1.Sort, int32, error) {
+ tf := b.tagFamilies[0]
+
+ sortVal := mustDecodeTagValue(tf.columns[1].valueType,
tf.columns[1].values[b.idx])
+
+ paramsVal := mustDecodeTagValue(tf.columns[3].valueType,
tf.columns[3].values[b.idx])
+ paramStr := paramsVal.GetStr().Value
+ params, err := ParseTopNParameters(paramStr)
+ if err != nil {
+ return modelv1.Sort_SORT_UNSPECIFIED, 0, fmt.Errorf("failed to
parse topN parameters : %w", err)
+ }
+
+ return modelv1.Sort(sortVal.GetInt().Value), int32(params.Limit), nil
+}
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 5e56fd674..7cdf97b98 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -211,6 +212,390 @@ func Test_mergeTwoBlocks(t *testing.T) {
},
want: &blockPointer{block: mergedBlock, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 4}}},
},
+ {
+ name: "Merge two topN blocks",
+ left: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 2},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value1"),
+
[]byte("value2"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value3"),
+
[]byte("value4"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("2000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{[]byte("field1"),
leftTopNBinaryData}},
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ timestamps: []int64{2, 3},
+ versions: []int64{3, 4},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value5"),
+
[]byte("value6"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value7"),
+
[]byte("value8"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("2000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData,
[]byte("field3")}},
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedTopNBlock, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+ },
+ {
+ name: "Merging TopN blocks where the first timestamps
are equal",
+ left: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 2},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated1"),
+
[]byte("value1"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated2"),
+
[]byte("value3"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{leftTopNBinaryData,
[]byte("field1")}},
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 3},
+ versions: []int64{2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value5"),
+
[]byte("value6"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value7"),
+
[]byte("value8"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData,
[]byte("field3")}},
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedTopNBlock2, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+ },
+ {
+ name: "Merging TopN blocks where Left is malformed",
+ left: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 2},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated1"),
+
[]byte("value1"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated2"),
+
[]byte("value3"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field1")}},
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 3},
+ versions: []int64{2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value5"),
+
[]byte("value6"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value7"),
+
[]byte("value8"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{rightTopNBinaryData,
[]byte("field3")}},
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedTopNBlock3, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+ },
+ {
+ name: "Merging TopN blocks where both side are
malformed",
+ left: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 2},
+ versions: []int64{1, 2},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated1"),
+
[]byte("value1"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("duplicated2"),
+
[]byte("value3"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field1")}},
+ },
+ },
+ },
+ },
+ right: &blockPointer{
+ block: block{
+ timestamps: []int64{1, 3},
+ versions: []int64{2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name:
"name", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value5"),
+
[]byte("value6"),
+ },
+ },
+ {
+ name:
"direction", valueType: pbv1.ValueTypeInt64,
+ values:
[][]byte{
+
convert.Int64ToBytes(2),
+
convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name:
"group", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("value7"),
+
[]byte("value8"),
+ },
+ },
+ {
+ name:
"parameters", valueType: pbv1.ValueTypeStr,
+ values:
[][]byte{
+
[]byte("1000"),
+
[]byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value",
valueType: pbv1.ValueTypeStr, values: [][]byte{nil, []byte("field3")}},
+ },
+ },
+ },
+ },
+ want: &blockPointer{block: mergedTopNBlock4, bm:
blockMetadata{timestamps: timestampsMetadata{min: 1, max: 3}}},
+ },
}
for _, tt := range tests {
@@ -224,6 +609,61 @@ func Test_mergeTwoBlocks(t *testing.T) {
}
}
+var (
+ leftTopNValue = &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{1000, 200, 300, 400, 500},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_1"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_2"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_4"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_5"}}}},
+ },
+ }
+ rightTopNValue = &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{600, 550, 530, 400, 300},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_6"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_5"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_7"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_4"}}}},
+ },
+ }
+
+ mergedTopNValue = &TopNValue{
+ valueName: "value",
+ entityTagNames: []string{"entity_id"},
+ values: []int64{1000, 600, 550, 530, 400, 300, 200},
+ entities: [][]*modelv1.TagValue{
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_1"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_6"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_3"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_5"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_7"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_4"}}}},
+ {{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
"entity_2"}}}},
+ },
+ }
+
+ leftTopNBinaryData = func() []byte {
+ b, _ := leftTopNValue.marshal(make([]byte, 0, 256))
+ return b
+ }()
+ rightTopNBinaryData = func() []byte {
+ b, _ := rightTopNValue.marshal(make([]byte, 0, 256))
+ return b
+ }()
+ mergedTopNBinaryData = func() []byte {
+ b, _ := mergedTopNValue.marshal(make([]byte, 0, 256))
+ return b
+ }()
+)
+
// Test_mergeTwoBlocks_edgeCase tests the edge case that previously caused
panic:
// runtime error: index out of range [-1] at merger.go:394.
// This occurs when left.idx = 0 and left.timestamps[0] >
right.timestamps[right.idx].
@@ -330,6 +770,202 @@ var mergedBlock = block{
},
}
+var mergedTopNBlock = block{
+ timestamps: []int64{1, 2, 3},
+ versions: []int64{1, 3, 4},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name: "name", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value1"),
+ []byte("value5"),
+ []byte("value6"),
+ },
+ },
+ {
+ name: "direction", valueType:
pbv1.ValueTypeInt64,
+ values: [][]byte{
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name: "group", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value3"),
+ []byte("value7"),
+ []byte("value8"),
+ },
+ },
+ {
+ name: "parameters", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("1000"),
+ []byte("1000"),
+ []byte("2000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value", valueType: pbv1.ValueTypeStr, values:
[][]byte{[]byte("field1"), mergedTopNBinaryData, []byte("field3")}},
+ },
+ },
+}
+
+var mergedTopNBlock2 = block{
+ timestamps: []int64{1, 2, 3},
+ versions: []int64{2, 2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name: "name", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value5"),
+ []byte("value1"),
+ []byte("value6"),
+ },
+ },
+ {
+ name: "direction", valueType:
pbv1.ValueTypeInt64,
+ values: [][]byte{
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name: "group", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value7"),
+ []byte("value3"),
+ []byte("value8"),
+ },
+ },
+ {
+ name: "parameters", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("1000"),
+ []byte("1000"),
+ []byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value", valueType: pbv1.ValueTypeStr, values:
[][]byte{mergedTopNBinaryData, []byte("field1"), []byte("field3")}},
+ },
+ },
+}
+
+var mergedTopNBlock3 = block{
+ timestamps: []int64{1, 2, 3},
+ versions: []int64{2, 2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name: "name", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value5"),
+ []byte("value1"),
+ []byte("value6"),
+ },
+ },
+ {
+ name: "direction", valueType:
pbv1.ValueTypeInt64,
+ values: [][]byte{
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name: "group", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value7"),
+ []byte("value3"),
+ []byte("value8"),
+ },
+ },
+ {
+ name: "parameters", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("1000"),
+ []byte("1000"),
+ []byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value", valueType: pbv1.ValueTypeStr, values:
[][]byte{rightTopNBinaryData, []byte("field1"), []byte("field3")}},
+ },
+ },
+}
+
+var mergedTopNBlock4 = block{
+ timestamps: []int64{1, 2, 3},
+ versions: []int64{2, 2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "_topN",
+ columns: []column{
+ {
+ name: "name", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value5"),
+ []byte("value1"),
+ []byte("value6"),
+ },
+ },
+ {
+ name: "direction", valueType:
pbv1.ValueTypeInt64,
+ values: [][]byte{
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ convert.Int64ToBytes(2),
+ },
+ },
+ {
+ name: "group", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("value7"),
+ []byte("value3"),
+ []byte("value8"),
+ },
+ },
+ {
+ name: "parameters", valueType:
pbv1.ValueTypeStr,
+ values: [][]byte{
+ []byte("1000"),
+ []byte("1000"),
+ []byte("1000"),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "value", valueType: pbv1.ValueTypeStr, values:
[][]byte{{}, []byte("field1"), []byte("field3")}},
+ },
+ },
+}
+
func Test_mergeParts(t *testing.T) {
tests := []struct {
wantErr error
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 3482b0964..fd117050c 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"path"
+ "strconv"
"strings"
"sync"
"time"
@@ -67,7 +68,7 @@ var (
CompressionMethod:
databasev1.CompressionMethod_COMPRESSION_METHOD_ZSTD,
}}
// TopNTagNames is the tag names of the topN result measure.
- TopNTagNames = []string{"name", "direction", "group", "source"}
+ TopNTagNames = []string{"name", "direction", "group", "parameters"}
)
// SchemaService allows querying schema information.
@@ -896,3 +897,33 @@ func GetTopNSchemaMetadata(group string)
*commonv1.Metadata {
func getKey(metadata *commonv1.Metadata) string {
return path.Join(metadata.GetGroup(), metadata.GetName())
}
+
+// TopNParameters defines the structure for the "parameters" tag value (JSON).
+type TopNParameters struct {
+ // Limit defines the number of top items to be kept.
+ Limit int64
+}
+
+// String implements the fmt.Stringer interface.
+func (p *TopNParameters) String() string {
+ if p == nil {
+ return ""
+ }
+ return strconv.FormatInt(p.Limit, 10)
+}
+
+// ParseTopNParameters decodes the JSON metadata.
+func ParseTopNParameters(val string) (*TopNParameters, error) {
+ if val == "" {
+ return &TopNParameters{}, nil
+ }
+
+ limit, err := strconv.ParseInt(val, 10, 64)
+ if err != nil {
+ return nil, err
+ }
+
+ return &TopNParameters{
+ Limit: limit,
+ }, nil
+}
diff --git a/banyand/measure/topn.go b/banyand/measure/topn.go
index 3c042017f..c09340a6b 100644
--- a/banyand/measure/topn.go
+++ b/banyand/measure/topn.go
@@ -364,6 +364,12 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord, buf
)
shardID = data[3].(uint32)
}
+
+ params := &TopNParameters{
+ Limit: int64(t.topNSchema.CountersNumber),
+ }
+ paramsStr := params.String()
+
entityValues := []*modelv1.TagValue{
{
Value: &modelv1.TagValue_Str{
@@ -386,6 +392,13 @@ func (t *topNStreamingProcessor) writeStreamRecord(record
flow.StreamRecord, buf
},
},
},
+ {
+ Value: &modelv1.TagValue_Str{
+ Str: &modelv1.Str{
+ Value: paramsStr,
+ },
+ },
+ },
}
buf = buf[:0]
if buf, err = topNValue.marshal(buf); err != nil {
diff --git a/banyand/measure/topn_post_processor.go
b/banyand/measure/topn_post_processor.go
index fde0f5a44..4d219d73c 100644
--- a/banyand/measure/topn_post_processor.go
+++ b/banyand/measure/topn_post_processor.go
@@ -36,6 +36,7 @@ type PostProcessor interface {
Put(entityValues pbv1.EntityValues, val int64, timestampMillis uint64,
version int64)
Flush() ([]*topNAggregatorItem, error)
Val([]string) ([]*measurev1.TopNList, error)
+ Reset()
}
// CreateTopNPostProcessor creates a Top-N post processor with or without
aggregation.
@@ -240,7 +241,6 @@ func (taggr *topNPostProcessor) Flush()
([]*topNAggregatorItem, error) {
result = append(result, item)
}
}
- clear(taggr.timelines)
} else {
for _, timeline := range taggr.timelines {
for _, item := range timeline.items {
@@ -272,6 +272,7 @@ func (taggr *topNPostProcessor) Flush()
([]*topNAggregatorItem, error) {
result = append(result, item)
}
}
+ taggr.Reset()
return result, nil
}
@@ -342,3 +343,13 @@ func (taggr *topNPostProcessor)
valWithoutAggregation(tagNames []string) []*meas
return topNLists
}
+
+func (taggr *topNPostProcessor) Reset() {
+ clear(taggr.timelines)
+
+ if taggr.aggrFunc !=
modelv1.AggregationFunction_AGGREGATION_FUNCTION_UNSPECIFIED {
+ clear(taggr.cache)
+
+ taggr.items = taggr.items[:0]
+ }
+}
