This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new dbfcebcc fix: resolve panic in measure block merger for overlapping
timestamps (#914)
dbfcebcc is described below
commit dbfcebcc65d401eb30e92db79f1457a5acc85856
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Dec 23 09:58:21 2025 +0800
fix: resolve panic in measure block merger for overlapping timestamps (#914)
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
CHANGES.md | 1 +
banyand/measure/merger.go | 7 +-
banyand/measure/merger_test.go | 484 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 491 insertions(+), 1 deletion(-)
diff --git a/CHANGES.md b/CHANGES.md
index 969aca2a..bb5b4f96 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -21,6 +21,7 @@ Release Notes.
- Fix the wrong retention setting of each measure/stream/trace.
- Fix server got panic when create/update property with high dist usage.
- Fix incorrect key range update in sidx part metadata.
+- Fix panic in measure block merger when merging blocks with overlapping
timestamps.
### Document
diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go
index bde0eba0..1337dbca 100644
--- a/banyand/measure/merger.go
+++ b/banyand/measure/merger.go
@@ -385,13 +385,18 @@ func mergeTwoBlocks(target, left, right *blockPointer) {
return
}
+ // Optimization: ensure left starts with smaller or equal timestamp to
reduce unnecessary swaps
+ if right.timestamps[right.idx] < left.timestamps[left.idx] {
+ left, right = right, left
+ }
+
for {
i := left.idx
ts2 := right.timestamps[right.idx]
for i < len(left.timestamps) && left.timestamps[i] <= ts2 {
i++
}
- if left.timestamps[i-1] == ts2 {
+ if i > left.idx && left.timestamps[i-1] == ts2 {
if left.versions[i-1] >= right.versions[right.idx] {
target.append(left, i)
} else {
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index b29555e5..5e56fd67 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -21,12 +21,15 @@ import (
"errors"
"reflect"
"testing"
+ "time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/require"
+ "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/fs"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -221,6 +224,88 @@ func Test_mergeTwoBlocks(t *testing.T) {
}
}
+// Test_mergeTwoBlocks_edgeCase tests the edge case that previously caused
panic:
+// runtime error: index out of range [-1] at merger.go:394.
+// This occurs when left.idx = 0 and left.timestamps[0] >
right.timestamps[right.idx].
+// The fix includes:
+// 1. Check i > left.idx before accessing left.timestamps[i-1] to prevent
panic.
+// 2. Optimization to swap blocks upfront if right starts with smaller
timestamp.
+func Test_mergeTwoBlocks_edgeCase(t *testing.T) {
+ left := &blockPointer{
+ block: block{
+ timestamps: []int64{20, 30, 40},
+ versions: []int64{1, 2, 3},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name: "strArrTag",
valueType: pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("left1")}),
+
marshalStrArr([][]byte{[]byte("left2")}),
+
marshalStrArr([][]byte{[]byte("left3")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field20"), []byte("field30"),
[]byte("field40")}},
+ },
+ },
+ },
+ bm: blockMetadata{
+ timestamps: timestampsMetadata{min: 20, max: 40},
+ },
+ idx: 0, // Start at the beginning
+ }
+
+ right := &blockPointer{
+ block: block{
+ timestamps: []int64{5, 10, 15, 25},
+ versions: []int64{4, 5, 6, 7},
+ tagFamilies: []columnFamily{
+ {
+ name: "arrTag",
+ columns: []column{
+ {
+ name: "strArrTag",
valueType: pbv1.ValueTypeStrArr,
+ values: [][]byte{
+
marshalStrArr([][]byte{[]byte("right1")}),
+
marshalStrArr([][]byte{[]byte("right2")}),
+
marshalStrArr([][]byte{[]byte("right3")}),
+
marshalStrArr([][]byte{[]byte("right4")}),
+ },
+ },
+ },
+ },
+ },
+ field: columnFamily{
+ columns: []column{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, values: [][]byte{[]byte("field5"), []byte("field10"),
[]byte("field15"), []byte("field25")}},
+ },
+ },
+ },
+ bm: blockMetadata{
+ timestamps: timestampsMetadata{min: 5, max: 25}, //
Overlaps with left (20 <= 25 and 5 <= 40)
+ },
+ idx: 2, // Point to timestamp 15, which is less than
left.timestamps[0] = 20
+ }
+
+ // After the fix, this should NOT panic and should merge correctly
+ target := &blockPointer{}
+ mergeTwoBlocks(target, left, right)
+
+ // Verify the merge result: should have timestamps [15, 20, 25, 30, 40]
+ // Note: right.idx = 2, so merge starts from timestamp 15 (not 5 or 10)
+ expectedTimestamps := []int64{15, 20, 25, 30, 40}
+ require.Equal(t, expectedTimestamps, target.timestamps, "merged
timestamps should be correct")
+ require.Equal(t, int64(15), target.bm.timestamps.min, "min timestamp
should be 15")
+ require.Equal(t, int64(40), target.bm.timestamps.max, "max timestamp
should be 40")
+}
+
var mergedBlock = block{
timestamps: []int64{1, 2, 3, 4},
versions: []int64{1, 4, 5, 6},
@@ -383,3 +468,402 @@ func Test_mergeParts(t *testing.T) {
})
}
}
+
+// generateDatapointsWithMultipleBlocks generates datapoints that will create
multiple blocks
+// by using different seriesIDs. Each seriesID creates a separate block.
+func generateDatapointsWithMultipleBlocks(startTimestamp, countPerBlock,
numBlocks int64) *dataPoints {
+ dataPoints := &dataPoints{
+ seriesIDs: []common.SeriesID{},
+ timestamps: []int64{},
+ versions: []int64{},
+ tagFamilies: [][]nameValues{},
+ fields: []nameValues{},
+ }
+ now := time.Now().UnixNano()
+ ts := startTimestamp
+ for blockIdx := int64(0); blockIdx < numBlocks; blockIdx++ {
+ seriesID := common.SeriesID(blockIdx + 1) // Different seriesID
for each block
+ for i := int64(0); i < countPerBlock; i++ {
+ dataPoints.seriesIDs = append(dataPoints.seriesIDs,
seriesID)
+ dataPoints.timestamps = append(dataPoints.timestamps,
ts)
+ dataPoints.versions = append(dataPoints.versions,
now+ts)
+ dataPoints.tagFamilies = append(dataPoints.tagFamilies,
[]nameValues{
+ {
+ name: "arrTag", values: []*nameValue{
+ {name: "strArrTag", valueType:
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ },
+ },
+ })
+ dataPoints.fields = append(dataPoints.fields,
nameValues{
+ name: "skipped", values: []*nameValue{
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1000 + ts), valueArr: nil},
+ },
+ })
+ ts++
+ }
+ }
+ return dataPoints
+}
+
+// generateDatapointsForBlockSplit generates datapoints that will create blocks
+// that when merged will exceed maxBlockLength, triggering the split logic at
lines 338-348.
+func generateDatapointsForBlockSplit(startTimestamp, count int64) *dataPoints {
+ dataPoints := &dataPoints{
+ seriesIDs: []common.SeriesID{},
+ timestamps: []int64{},
+ versions: []int64{},
+ tagFamilies: [][]nameValues{},
+ fields: []nameValues{},
+ }
+ now := time.Now().UnixNano()
+ for i := int64(0); i < count; i++ {
+ dataPoints.seriesIDs = append(dataPoints.seriesIDs, 1)
+ dataPoints.timestamps = append(dataPoints.timestamps,
startTimestamp+i)
+ dataPoints.versions = append(dataPoints.versions, now+i)
+ dataPoints.tagFamilies = append(dataPoints.tagFamilies,
[]nameValues{
+ {
+ name: "arrTag", values: []*nameValue{
+ {name: "strArrTag", valueType:
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ },
+ },
+ })
+ dataPoints.fields = append(dataPoints.fields, nameValues{
+ name: "skipped", values: []*nameValue{
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1000 + i), valueArr: nil},
+ },
+ })
+ }
+ return dataPoints
+}
+
+// Test_mergeParts_fileBased tests file-based merging with various scenarios:
+// 1. Single seriesID exceeding maxBlockLength (triggers split at lines
338-348)
+// 2. Multiple blocks per part with overlapping timestamps.
+// 3. Multiple blocks per part exceeding maxBlockLength when merged.
+// 4. Multiple parts with multiple blocks.
+func Test_mergeParts_fileBased(t *testing.T) {
+ t.Run("exceeds maxBlockLength with single seriesID", func(t *testing.T)
{
+ // maxBlockLength is 8192, so we create parts that when merged
will exceed this
+ part1Count := int64(5000)
+ part2Count := int64(5000)
+ expectedTotalCount := part1Count + part2Count
+
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.decRef()
+ }
+ defFn()
+ }()
+
+ fileSystem := fs.NewLocalFileSystem()
+ dps1 := generateDatapointsForBlockSplit(1, part1Count)
+ dps2 := generateDatapointsForBlockSplit(part1Count+1,
part2Count)
+
+ mp1 := generateMemPart()
+ mp1.mustInitFromDataPoints(dps1)
+ mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+ filePW1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath,
fileSystem))
+ filePW1.p.partMetadata.ID = 1
+ fpp = append(fpp, filePW1)
+ releaseMemPart(mp1)
+
+ mp2 := generateMemPart()
+ mp2.mustInitFromDataPoints(dps2)
+ mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+ filePW2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath,
fileSystem))
+ filePW2.p.partMetadata.ID = 2
+ fpp = append(fpp, filePW2)
+ releaseMemPart(mp2)
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ tst := &tsTable{pm: protector.Nop{}}
+ p, err := tst.mergeParts(fileSystem, closeCh, fpp, 3, tmpPath)
+ require.NoError(t, err)
+ defer p.decRef()
+
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var totalCount uint64
+ var blockCount int
+ for reader.nextBlockMetadata() {
+ blockCount++
+ totalCount += reader.block.bm.count
+ require.LessOrEqual(t, reader.block.bm.count,
uint64(maxBlockLength),
+ "block count should not exceed maxBlockLength")
+ }
+ require.NoError(t, reader.error())
+ require.Equal(t, uint64(expectedTotalCount), totalCount,
+ "total count should match sum of input parts")
+ require.Greater(t, blockCount, 1, "should have multiple blocks
due to splitting")
+ })
+
+ t.Run("exceeds maxBlockLength with overlapping timestamps", func(t
*testing.T) {
+ part1Count := int64(6000)
+ part2Count := int64(6000)
+ overlapStart := part1Count / 2
+
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.decRef()
+ }
+ defFn()
+ }()
+
+ fileSystem := fs.NewLocalFileSystem()
+ dps1 := generateDatapointsForBlockSplit(1, part1Count)
+ dps2 := generateDatapointsForBlockSplit(overlapStart,
part2Count)
+
+ mp1 := generateMemPart()
+ mp1.mustInitFromDataPoints(dps1)
+ mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+ filePW1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath,
fileSystem))
+ filePW1.p.partMetadata.ID = 1
+ fpp = append(fpp, filePW1)
+ releaseMemPart(mp1)
+
+ mp2 := generateMemPart()
+ mp2.mustInitFromDataPoints(dps2)
+ mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+ filePW2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath,
fileSystem))
+ filePW2.p.partMetadata.ID = 2
+ fpp = append(fpp, filePW2)
+ releaseMemPart(mp2)
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ tst := &tsTable{pm: protector.Nop{}}
+ p, err := tst.mergeParts(fileSystem, closeCh, fpp, 3, tmpPath)
+ require.NoError(t, err)
+ defer p.decRef()
+
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var totalCount uint64
+ var blockCount int
+ for reader.nextBlockMetadata() {
+ blockCount++
+ totalCount += reader.block.bm.count
+ require.LessOrEqual(t, reader.block.bm.count,
uint64(maxBlockLength),
+ "block count should not exceed maxBlockLength")
+ }
+ require.NoError(t, reader.error())
+ overlapSize := (part1Count - overlapStart) + 1
+ expectedUniqueCount := part1Count + part2Count - overlapSize
+ require.Equal(t, uint64(expectedUniqueCount), totalCount,
+ "total count should match unique count after
deduplication")
+ require.Greater(t, blockCount, 1, "should have multiple blocks
due to splitting")
+ })
+
+ t.Run("multiple blocks per part", func(t *testing.T) {
+ blocksPerPart := int64(5)
+ countPerBlock := int64(2000)
+ totalPerPart := blocksPerPart * countPerBlock
+
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.decRef()
+ }
+ defFn()
+ }()
+
+ fileSystem := fs.NewLocalFileSystem()
+ dps1 := generateDatapointsWithMultipleBlocks(1, countPerBlock,
blocksPerPart)
+ dps2 := generateDatapointsWithMultipleBlocks(totalPerPart+1,
countPerBlock, blocksPerPart)
+
+ mp1 := generateMemPart()
+ mp1.mustInitFromDataPoints(dps1)
+ mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+ filePW1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath,
fileSystem))
+ filePW1.p.partMetadata.ID = 1
+ fpp = append(fpp, filePW1)
+ releaseMemPart(mp1)
+
+ mp2 := generateMemPart()
+ mp2.mustInitFromDataPoints(dps2)
+ mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+ filePW2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath,
fileSystem))
+ filePW2.p.partMetadata.ID = 2
+ fpp = append(fpp, filePW2)
+ releaseMemPart(mp2)
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ tst := &tsTable{pm: protector.Nop{}}
+ p, err := tst.mergeParts(fileSystem, closeCh, fpp, 3, tmpPath)
+ require.NoError(t, err)
+ defer p.decRef()
+
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var totalCount uint64
+ var blockCount int
+ seriesIDCounts := make(map[common.SeriesID]uint64)
+ for reader.nextBlockMetadata() {
+ blockCount++
+ seriesID := reader.block.bm.seriesID
+ count := reader.block.bm.count
+ totalCount += count
+ seriesIDCounts[seriesID] += count
+ require.LessOrEqual(t, count, uint64(maxBlockLength),
+ "block count should not exceed maxBlockLength")
+ }
+ require.NoError(t, reader.error())
+ expectedTotalCount := uint64(totalPerPart * int64(len(fpp)))
+ require.Equal(t, expectedTotalCount, totalCount,
+ "total count should match sum of all parts")
+ require.GreaterOrEqual(t, blockCount, int(blocksPerPart),
+ "should have at least one block per seriesID")
+ for sid := common.SeriesID(1); sid <=
common.SeriesID(blocksPerPart); sid++ {
+ expectedCount := uint64(countPerBlock * int64(len(fpp)))
+ require.Equal(t, expectedCount, seriesIDCounts[sid],
+ "seriesID %d should have merged count from all
parts", sid)
+ }
+ })
+
+ t.Run("multiple blocks exceeding maxBlockLength when merged", func(t
*testing.T) {
+ blocksPerPart := int64(3)
+ countPerBlock := int64(5000) // 5000 * 2 parts = 10000 per
seriesID (exceeds 8192)
+ totalPerPart := blocksPerPart * countPerBlock
+
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.decRef()
+ }
+ defFn()
+ }()
+
+ fileSystem := fs.NewLocalFileSystem()
+ dps1 := generateDatapointsWithMultipleBlocks(1, countPerBlock,
blocksPerPart)
+ dps2 := generateDatapointsWithMultipleBlocks(totalPerPart+1,
countPerBlock, blocksPerPart)
+
+ mp1 := generateMemPart()
+ mp1.mustInitFromDataPoints(dps1)
+ mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+ filePW1 := newPartWrapper(nil, mustOpenFilePart(1, tmpPath,
fileSystem))
+ filePW1.p.partMetadata.ID = 1
+ fpp = append(fpp, filePW1)
+ releaseMemPart(mp1)
+
+ mp2 := generateMemPart()
+ mp2.mustInitFromDataPoints(dps2)
+ mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+ filePW2 := newPartWrapper(nil, mustOpenFilePart(2, tmpPath,
fileSystem))
+ filePW2.p.partMetadata.ID = 2
+ fpp = append(fpp, filePW2)
+ releaseMemPart(mp2)
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ tst := &tsTable{pm: protector.Nop{}}
+ p, err := tst.mergeParts(fileSystem, closeCh, fpp, 3, tmpPath)
+ require.NoError(t, err)
+ defer p.decRef()
+
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var totalCount uint64
+ var blockCount int
+ seriesIDCounts := make(map[common.SeriesID]uint64)
+ for reader.nextBlockMetadata() {
+ blockCount++
+ seriesID := reader.block.bm.seriesID
+ count := reader.block.bm.count
+ totalCount += count
+ seriesIDCounts[seriesID] += count
+ require.LessOrEqual(t, count, uint64(maxBlockLength),
+ "block count should not exceed maxBlockLength")
+ }
+ require.NoError(t, reader.error())
+ expectedTotalCount := uint64(totalPerPart * int64(len(fpp)))
+ require.Equal(t, expectedTotalCount, totalCount,
+ "total count should match sum of all parts")
+ expectedCountPerSeriesID := uint64(countPerBlock *
int64(len(fpp)))
+ for sid := common.SeriesID(1); sid <=
common.SeriesID(blocksPerPart); sid++ {
+ require.Equal(t, expectedCountPerSeriesID,
seriesIDCounts[sid],
+ "seriesID %d should have merged count from all
parts", sid)
+ }
+ require.Greater(t, blockCount, int(blocksPerPart),
+ "should have more blocks due to splitting when
exceeding maxBlockLength")
+ })
+
+ t.Run("multiple parts with multiple blocks", func(t *testing.T) {
+ numParts := 3
+ blocksPerPart := int64(4)
+ countPerBlock := int64(1500)
+ totalPerPart := blocksPerPart * countPerBlock
+
+ var fpp []*partWrapper
+ tmpPath, defFn := test.Space(require.New(t))
+ defer func() {
+ for _, pw := range fpp {
+ pw.decRef()
+ }
+ defFn()
+ }()
+
+ fileSystem := fs.NewLocalFileSystem()
+ for i := 0; i < numParts; i++ {
+ startTS := int64(i)*totalPerPart + 1
+ dps := generateDatapointsWithMultipleBlocks(startTS,
countPerBlock, blocksPerPart)
+ mp := generateMemPart()
+ mp.mustInitFromDataPoints(dps)
+ mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i+1)))
+ filePW := newPartWrapper(nil,
mustOpenFilePart(uint64(i+1), tmpPath, fileSystem))
+ filePW.p.partMetadata.ID = uint64(i + 1)
+ fpp = append(fpp, filePW)
+ releaseMemPart(mp)
+ }
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+ tst := &tsTable{pm: protector.Nop{}}
+ p, err := tst.mergeParts(fileSystem, closeCh, fpp,
uint64(numParts+1), tmpPath)
+ require.NoError(t, err)
+ defer p.decRef()
+
+ pmi := &partMergeIter{}
+ pmi.mustInitFromPart(p.p)
+ reader := &blockReader{}
+ reader.init([]*partMergeIter{pmi})
+ var totalCount uint64
+ var blockCount int
+ seriesIDCounts := make(map[common.SeriesID]uint64)
+ for reader.nextBlockMetadata() {
+ blockCount++
+ seriesID := reader.block.bm.seriesID
+ count := reader.block.bm.count
+ totalCount += count
+ seriesIDCounts[seriesID] += count
+ require.LessOrEqual(t, count, uint64(maxBlockLength),
+ "block count should not exceed maxBlockLength")
+ }
+ require.NoError(t, reader.error())
+ expectedTotalCount := uint64(totalPerPart * int64(len(fpp)))
+ require.Equal(t, expectedTotalCount, totalCount,
+ "total count should match sum of all parts")
+ for sid := common.SeriesID(1); sid <=
common.SeriesID(blocksPerPart); sid++ {
+ expectedCount := uint64(countPerBlock * int64(len(fpp)))
+ require.Equal(t, expectedCount, seriesIDCounts[sid],
+ "seriesID %d should have merged count from all
parts", sid)
+ }
+ require.GreaterOrEqual(t, blockCount, int(blocksPerPart),
+ "should have at least one block per seriesID")
+ })
+}