This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 1afd6ec6 Enhance block reading and writing with raw block handling
(#804)
1afd6ec6 is described below
commit 1afd6ec68667e66e22ad9945ed8341df3102145c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Oct 11 16:06:11 2025 +0800
Enhance block reading and writing with raw block handling (#804)
---
banyand/trace/block_reader.go | 54 ++
banyand/trace/block_writer.go | 80 +++
banyand/trace/merger.go | 24 +
banyand/trace/merger_bench_test.go | 387 ++++++++++++++
banyand/trace/merger_test.go | 1030 ++++++++++++++++++++++++++++++++++++
banyand/trace/part_iter.go | 80 +++
banyand/trace/tstable_test.go | 63 +++
7 files changed, 1718 insertions(+)
diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go
index 9fdf4145..72bc2d8c 100644
--- a/banyand/trace/block_reader.go
+++ b/banyand/trace/block_reader.go
@@ -29,6 +29,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/pool"
)
+type rawBlock struct {
+ bm *blockMetadata
+ tags map[string][]byte
+ tagMetadata map[string][]byte
+ spans []byte
+}
+
type seqReader struct {
sr fs.SeqReader
r fs.Reader
@@ -127,6 +134,7 @@ type blockReader struct {
err error
block *blockPointer
pih partMergeIterHeap
+ peekBlock blockPointer
nextBlockNoop bool
}
@@ -138,6 +146,7 @@ func (br *blockReader) reset() {
br.pih = br.pih[:0]
br.nextBlockNoop = false
br.err = nil
+ br.peekBlock.reset()
}
func (br *blockReader) init(pii []*partMergeIter) {
@@ -209,6 +218,51 @@ func (br *blockReader) loadBlockData(decoder
*encoding.BytesBlockDecoder) {
br.pih[0].mustLoadBlockData(decoder, br.block)
}
+func (br *blockReader) peek() *blockPointer {
+ // Peek at the next block that will be processed to determine if
current traceID is unique
+ // We need to check two things:
+ // 1. Does the current partMergeIter (pih[0]) have another block with
the same traceID?
+ // 2. Do any other partMergeIters have a block with the same traceID?
+
+ if len(br.pih) == 0 {
+ return nil
+ }
+
+ currentTraceID := br.block.bm.traceID
+
+ // First check if pih[0] has another block
+ if nextBM, ok := br.pih[0].peekBlockMetadata(); ok {
+ // If the next block in the same part has the same traceID, we
can't use fast path
+ if nextBM.traceID == currentTraceID {
+ // Reuse peekBlock to avoid allocation
+ br.peekBlock.reset()
+ br.peekBlock.bm = *nextBM
+ releaseBlockMetadata(nextBM)
+ return &br.peekBlock
+ }
+ releaseBlockMetadata(nextBM)
+ // The next block in pih[0] has a different traceID
+ // But we still need to check other parts in the heap
+ }
+
+ // Check if any other part (pih[1], pih[2], etc.) has the same traceID
+ // Note: The heap is a min-heap, not fully sorted, so we must check ALL
elements
+ for i := 1; i < len(br.pih); i++ {
+ if br.pih[i].block.bm.traceID == currentTraceID {
+ // Another part has the same traceID
+ return &br.pih[i].block
+ }
+ }
+
+ // No other blocks with this traceID - it's unique
+ return nil
+}
+
+func (br *blockReader) mustReadRaw(r *rawBlock, bm *blockMetadata) {
+ // Delegate to the current partMergeIter to read raw block
+ br.pih[0].mustReadRaw(r, bm)
+}
+
func (br *blockReader) error() error {
if errors.Is(br.err, io.EOF) {
return nil
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 264cade8..8bfb06f8 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -264,6 +264,86 @@ func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
bw.maxTimestamp = 0
}
+func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
+ bm := r.bm
+ if bm.count == 0 {
+ return
+ }
+ hasWrittenBlocks := len(bw.traceIDs) > 0
+ var tidLast string
+ var isSeenTid bool
+
+ if hasWrittenBlocks {
+ tidLast = bw.traceIDs[len(bw.traceIDs)-1]
+ if bm.traceID < tidLast {
+ logger.Panicf("the tid=%s cannot be smaller than the
previously written tid=%s", bm.traceID, tidLast)
+ }
+ isSeenTid = bm.traceID == tidLast
+ }
+ bw.traceIDs = append(bw.traceIDs, bm.traceID)
+ bw.tagType.copyFrom(bm.tagType)
+
+ tm := &bm.timestamps
+ if bw.totalCount == 0 || tm.min < bw.totalMinTimestamp {
+ bw.totalMinTimestamp = tm.min
+ }
+ if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
+ bw.totalMaxTimestamp = tm.max
+ }
+ if !hasWrittenBlocks || tm.min < bw.minTimestamp {
+ bw.minTimestamp = tm.min
+ }
+ if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
+ bw.maxTimestamp = tm.max
+ }
+ if isSeenTid && tm.min < bw.minTimestampLast {
+ logger.Panicf("the block for tid=%s cannot contain timestamp
smaller than %d, but it contains timestamp %d", bm.traceID,
bw.minTimestampLast, tm.min)
+ }
+ bw.minTimestampLast = tm.min
+
+ bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
+ bw.totalCount += bm.count
+ bw.totalBlocksCount++
+
+ newBm := generateBlockMetadata()
+ newBm.copyFrom(bm)
+
+ newBm.spans.offset = bw.writers.spanWriter.bytesWritten
+ bw.writers.spanWriter.MustWrite(r.spans)
+
+ for name := range bm.tags {
+ tmw, tw := bw.writers.getWriters(name)
+ newDB := newBm.getTagMetadata(name)
+
+ // Unmarshal the tag metadata to update the tag value offset
+ tm := generateTagMetadata()
+ if err := tm.unmarshal(r.tagMetadata[name]); err != nil {
+ logger.Panicf("cannot unmarshal tag metadata for %s:
%v", name, err)
+ }
+
+ // Update the tag value offset to point to the correct location
in the new file
+ tm.offset = tw.bytesWritten
+ tw.MustWrite(r.tags[name])
+
+ // Now marshal the updated metadata and write it
+ bb := bigValuePool.Generate()
+ bb.Buf = tm.marshal(bb.Buf[:0])
+ newDB.offset = tmw.bytesWritten
+ newDB.size = uint64(len(bb.Buf))
+ tmw.MustWrite(bb.Buf)
+ bigValuePool.Release(bb)
+ releaseTagMetadata(tm)
+ }
+
+ bw.primaryBlockData = newBm.marshal(bw.primaryBlockData)
+ releaseBlockMetadata(newBm)
+
+ if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
+ bw.mustFlushPrimaryBlock(bw.primaryBlockData)
+ bw.primaryBlockData = bw.primaryBlockData[:0]
+ }
+}
+
func (bw *blockWriter) Flush(pm *partMetadata, tf *traceIDFilter, tt *tagType)
{
pm.UncompressedSpanSizeBytes = bw.totalUncompressedSpanSizeBytes
pm.TotalCount = bw.totalCount
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 3892f045..0c5b5ae2 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -328,12 +328,16 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem,
closeCh <-chan struct{}
var errClosed = fmt.Errorf("the merger is closed")
+// forceSlowMerge is used for testing to disable the fast raw merge path.
+var forceSlowMerge = false
+
func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader)
(*partMetadata, *traceIDFilter, *tagType, error) {
pendingBlockIsEmpty := true
pendingBlock := generateBlockPointer()
defer releaseBlockPointer(pendingBlock)
var tmpBlock *blockPointer
var decoder *encoding.BytesBlockDecoder
+ var rawBlk rawBlock
getDecoder := func() *encoding.BytesBlockDecoder {
if decoder == nil {
decoder = generateColumnValuesDecoder()
@@ -353,6 +357,15 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter,
br *blockReader) (*pa
default:
}
b := br.block
+ // Fast path: if this is the only block for this traceID AND we
have no pending block,
+ // copy it raw without unmarshaling
+ nextB := br.peek()
+ if !forceSlowMerge && pendingBlockIsEmpty && (nextB == nil ||
nextB.bm.traceID != b.bm.traceID) {
+ // fast path: only a single block for the trace id and
no pending data
+ br.mustReadRaw(&rawBlk, &b.bm)
+ bw.mustWriteRawBlock(&rawBlk)
+ continue
+ }
if pendingBlockIsEmpty {
br.loadBlockData(getDecoder())
@@ -365,8 +378,19 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter,
br *blockReader) (*pa
bw.mustWriteBlock(pendingBlock.bm.traceID,
&pendingBlock.block)
releaseDecoder()
pendingBlock.reset()
+ // After writing the pending block, check if the new
block can be copied raw
+ // This is the same fast path check as at the beginning
of the loop
+ nextB = br.peek()
+ if !forceSlowMerge && (nextB == nil || nextB.bm.traceID
!= b.bm.traceID) {
+ // fast path: only a single block for this new
trace id
+ br.mustReadRaw(&rawBlk, &b.bm)
+ bw.mustWriteRawBlock(&rawBlk)
+ continue
+ }
+ // Slow path: start accumulating the new block
br.loadBlockData(getDecoder())
pendingBlock.copyFrom(b)
+ pendingBlockIsEmpty = false
continue
}
diff --git a/banyand/trace/merger_bench_test.go
b/banyand/trace/merger_bench_test.go
new file mode 100644
index 00000000..607c5d94
--- /dev/null
+++ b/banyand/trace/merger_bench_test.go
@@ -0,0 +1,387 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Benchmark_mergeBlocks_FastRawMerge benchmarks the fast path where trace IDs
don't overlap.
+// This uses br.mustReadRaw and bw.mustWriteRawBlock to copy blocks without
unmarshaling.
+//
+// Performance characteristics:
+// - Lower memory usage (up to 1400x less for large datasets)
+// - Fewer allocations
+// - Slightly slower or comparable time performance
+// - Optimal when trace IDs are unique across parts (common case).
+func Benchmark_mergeBlocks_FastRawMerge(b *testing.B) {
+ benchmarkCases := []struct {
+ tracesGen func() *traces
+ name string
+ numParts int
+ }{
+ {
+ name: "small_parts_non_overlapping",
+ numParts: 10,
+ tracesGen: func() *traces {
+ return &traces{
+ traceIDs: []string{"trace1",
"trace2", "trace3"},
+ timestamps: []int64{1, 2, 3},
+ tags: [][]*tagValue{
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil}},
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil}},
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil}},
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span2"), []byte("span3")},
+ spanIDs: []string{"span1", "span2",
"span3"},
+ }
+ },
+ },
+ {
+ name: "medium_parts_non_overlapping",
+ numParts: 50,
+ tracesGen: func() *traces {
+ return generateHugeTraces(100)
+ },
+ },
+ {
+ name: "large_parts_non_overlapping",
+ numParts: 100,
+ tracesGen: func() *traces {
+ return generateHugeTraces(500)
+ },
+ },
+ }
+
+ for _, bc := range benchmarkCases {
+ b.Run(bc.name, func(b *testing.B) {
+ tmpPath, defFn := test.Space(require.New(b))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create parts with non-overlapping trace IDs (fast
path)
+ var parts []*part
+ for i := 0; i < bc.numParts; i++ {
+ mp := generateMemPart()
+ traces := bc.tracesGen()
+ // Make trace IDs unique per part to trigger
fast path
+ for j := range traces.traceIDs {
+ traces.traceIDs[j] = traces.traceIDs[j]
+ "_part" + string(rune(i))
+ }
+ mp.mustInitFromTraces(traces)
+ mp.mustFlush(fileSystem, partPath(tmpPath,
uint64(i)))
+ p := mustOpenFilePart(uint64(i), tmpPath,
fileSystem)
+ parts = append(parts, p)
+ releaseMemPart(mp)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ // Create new iterators for each benchmark
iteration
+ var pmi []*partMergeIter
+ for _, p := range parts {
+ iter := generatePartMergeIter()
+ iter.mustInitFromPart(p)
+ pmi = append(pmi, iter)
+ }
+
+ br := generateBlockReader()
+ br.init(pmi)
+ bw := generateBlockWriter()
+ dstPath := partPath(tmpPath, uint64(10000+i))
+ bw.mustInitForFilePart(fileSystem, dstPath,
false)
+
+ closeCh := make(chan struct{})
+
+ _, _, _, err := mergeBlocks(closeCh, bw, br)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ close(closeCh)
+ releaseBlockWriter(bw)
+ releaseBlockReader(br)
+
+ // Release iterators
+ for _, iter := range pmi {
+ releasePartMergeIter(iter)
+ }
+ }
+ })
+ }
+}
+
+// Benchmark_mergeBlocks_OriginalMerge benchmarks the slow path where trace
IDs overlap.
+// This requires unmarshaling, merging blocks, and then writing them.
+//
+// Performance characteristics:
+// - Higher memory usage (up to 1400x more for large datasets)
+// - More allocations
+// - Can be slightly faster for some workloads due to batching
+// - Required when trace IDs overlap across parts (needs merging).
+func Benchmark_mergeBlocks_OriginalMerge(b *testing.B) {
+ benchmarkCases := []struct {
+ tracesGen func() *traces
+ name string
+ numParts int
+ }{
+ {
+ name: "small_parts_overlapping",
+ numParts: 10,
+ tracesGen: func() *traces {
+ return &traces{
+ traceIDs: []string{"trace1",
"trace2", "trace3"},
+ timestamps: []int64{1, 2, 3},
+ tags: [][]*tagValue{
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil}},
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil}},
+ {{tag: "tag1", valueType:
pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil}},
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span2"), []byte("span3")},
+ spanIDs: []string{"span1", "span2",
"span3"},
+ }
+ },
+ },
+ {
+ name: "medium_parts_overlapping",
+ numParts: 50,
+ tracesGen: func() *traces {
+ return generateHugeTraces(100)
+ },
+ },
+ {
+ name: "large_parts_overlapping",
+ numParts: 100,
+ tracesGen: func() *traces {
+ return generateHugeTraces(500)
+ },
+ },
+ }
+
+ for _, bc := range benchmarkCases {
+ b.Run(bc.name, func(b *testing.B) {
+ tmpPath, defFn := test.Space(require.New(b))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create parts with overlapping trace IDs (slow path)
+ var parts []*part
+ for i := 0; i < bc.numParts; i++ {
+ mp := generateMemPart()
+ traces := bc.tracesGen()
+ // Keep same trace IDs across parts to trigger
slow path (merging)
+ mp.mustInitFromTraces(traces)
+ mp.mustFlush(fileSystem, partPath(tmpPath,
uint64(i)))
+ p := mustOpenFilePart(uint64(i), tmpPath,
fileSystem)
+ parts = append(parts, p)
+ releaseMemPart(mp)
+ }
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ // Create new iterators for each benchmark
iteration
+ var pmi []*partMergeIter
+ for _, p := range parts {
+ iter := generatePartMergeIter()
+ iter.mustInitFromPart(p)
+ pmi = append(pmi, iter)
+ }
+
+ br := generateBlockReader()
+ br.init(pmi)
+ bw := generateBlockWriter()
+ dstPath := partPath(tmpPath, uint64(20000+i))
+ bw.mustInitForFilePart(fileSystem, dstPath,
false)
+
+ closeCh := make(chan struct{})
+
+ _, _, _, err := mergeBlocks(closeCh, bw, br)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ close(closeCh)
+ releaseBlockWriter(bw)
+ releaseBlockReader(br)
+
+ // Release iterators
+ for _, iter := range pmi {
+ releasePartMergeIter(iter)
+ }
+ }
+ })
+ }
+}
+
+// Benchmark_mergeBlocks_Comparison provides a direct comparison benchmark
+// between fast raw merge and original merge with the SAME test data.
+//
+// This properly compares the two merge strategies by using identical data
+// (non-overlapping trace IDs) and toggling the forceSlowMerge flag.
+//
+// Uses generateRealisticTraces with:
+// - More than 10,000 traces per part
+// - Each trace has 3-5 spans
+// - Each span is more than 100KB
+//
+// Usage:
+//
+// go test -bench=Benchmark_mergeBlocks_Comparison -run='^$'
./banyand/trace -benchtime=10x
+//
+// This benchmark clearly demonstrates the memory vs. time tradeoff:
+// - Fast raw merge: Lower memory, uses raw block copy
+// - Slow merge (forced): Higher memory, decompresses and processes blocks.
+func Benchmark_mergeBlocks_Comparison(b *testing.B) {
+ numParts := 20
+ tracesPerPart := 10000 // More than 10,000 traces per part
+
+ // Create shared test data with non-overlapping trace IDs
+ tmpPath, defFn := test.Space(require.New(b))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ b.Logf("Creating %d parts with %d traces each (3-5 spans per trace,
>100KB per span)", numParts, tracesPerPart)
+
+ var parts []*part
+ for i := 0; i < numParts; i++ {
+ mp := generateMemPart()
+ // Use new realistic trace generator
+ traces := generateRealisticTraces(tracesPerPart)
+ // Make trace IDs unique per part to avoid merging
+ for j := range traces.traceIDs {
+ traces.traceIDs[j] = traces.traceIDs[j] + "_part" +
string(rune(i))
+ }
+ mp.mustInitFromTraces(traces)
+ mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i)))
+ p := mustOpenFilePart(uint64(i), tmpPath, fileSystem)
+ parts = append(parts, p)
+ releaseMemPart(mp)
+
+ if i == 0 {
+ // Log stats for first part
+ totalSpans := len(traces.spans)
+ totalSize := int64(0)
+ for _, span := range traces.spans {
+ totalSize += int64(len(span))
+ }
+ b.Logf("Part 0 stats: %d spans, total size: %.2f MB,
avg span size: %.2f KB",
+ totalSpans, float64(totalSize)/(1024*1024),
float64(totalSize)/float64(totalSpans)/1024)
+ }
+ }
+
+ b.Run("fast_raw_merge", func(b *testing.B) {
+ forceSlowMerge = false // Enable fast path
+ defer func() { forceSlowMerge = false }()
+
+ // Create separate temp dir for fast merge output
+ tmpPathFast, defFnFast := test.Space(require.New(b))
+ defer defFnFast()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ // Create new iterators for each benchmark iteration
+ var pmi []*partMergeIter
+ for _, p := range parts {
+ iter := generatePartMergeIter()
+ iter.mustInitFromPart(p)
+ pmi = append(pmi, iter)
+ }
+
+ br := generateBlockReader()
+ br.init(pmi)
+ bw := generateBlockWriter()
+ dstPath := partPath(tmpPathFast, uint64(30000+i))
+ bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+ closeCh := make(chan struct{})
+
+ _, _, _, err := mergeBlocks(closeCh, bw, br)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ close(closeCh)
+ releaseBlockWriter(bw)
+ releaseBlockReader(br)
+
+ // Release iterators
+ for _, iter := range pmi {
+ releasePartMergeIter(iter)
+ }
+ }
+ })
+
+ b.Run("slow_merge_forced", func(b *testing.B) {
+ forceSlowMerge = true // Force slow path (original merge)
+ defer func() { forceSlowMerge = false }()
+
+ // Create separate temp dir for slow merge output
+ tmpPathSlow, defFnSlow := test.Space(require.New(b))
+ defer defFnSlow()
+
+ b.ResetTimer()
+ b.ReportAllocs()
+
+ for i := 0; i < b.N; i++ {
+ // Create new iterators for each benchmark iteration
+ var pmi []*partMergeIter
+ for _, p := range parts {
+ iter := generatePartMergeIter()
+ iter.mustInitFromPart(p)
+ pmi = append(pmi, iter)
+ }
+
+ br := generateBlockReader()
+ br.init(pmi)
+ bw := generateBlockWriter()
+ dstPath := partPath(tmpPathSlow, uint64(40000+i))
+ bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+ closeCh := make(chan struct{})
+
+ _, _, _, err := mergeBlocks(closeCh, bw, br)
+ if err != nil {
+ b.Fatal(err)
+ }
+
+ close(closeCh)
+ releaseBlockWriter(bw)
+ releaseBlockReader(br)
+
+ // Release iterators
+ for _, iter := range pmi {
+ releasePartMergeIter(iter)
+ }
+ }
+ })
+}
diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go
index 33f1209d..7196f85a 100644
--- a/banyand/trace/merger_test.go
+++ b/banyand/trace/merger_test.go
@@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/banyand/protector"
+ "github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
"github.com/apache/skywalking-banyandb/pkg/test"
@@ -98,6 +99,213 @@ func Test_mergeTwoBlocks(t *testing.T) {
}
}
+func Test_mergeBlocks_fastPath(t *testing.T) {
+ tests := []struct {
+ name string
+ description string
+ parts []*traces
+ wantBlocks []blockMetadata
+ }{
+ {
+ name: "Fast path with completely different trace IDs",
+ parts: []*traces{
+ {
+ traceIDs: []string{"traceA",
"traceB"},
+ timestamps: []int64{1, 2},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("spanA"),
[]byte("spanB")},
+ spanIDs: []string{"spanA", "spanB"},
+ },
+ {
+ traceIDs: []string{"traceC",
"traceD"},
+ timestamps: []int64{3, 4},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("spanC"),
[]byte("spanD")},
+ spanIDs: []string{"spanC", "spanD"},
+ },
+ },
+ wantBlocks: []blockMetadata{
+ {traceID: "traceA", count: 1},
+ {traceID: "traceB", count: 1},
+ {traceID: "traceC", count: 1},
+ {traceID: "traceD", count: 1},
+ },
+ description: "Each block should use
readRaw/WriteRawBlock as there's only one block per trace ID",
+ },
+ {
+ name: "Fast path with single trace ID per part",
+ parts: []*traces{
+ {
+ traceIDs: []string{"trace1"},
+ timestamps: []int64{1},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1")},
+ spanIDs: []string{"span1"},
+ },
+ {
+ traceIDs: []string{"trace2"},
+ timestamps: []int64{2},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span2")},
+ spanIDs: []string{"span2"},
+ },
+ {
+ traceIDs: []string{"trace3"},
+ timestamps: []int64{3},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span3")},
+ spanIDs: []string{"span3"},
+ },
+ },
+ wantBlocks: []blockMetadata{
+ {traceID: "trace1", count: 1},
+ {traceID: "trace2", count: 1},
+ {traceID: "trace3", count: 1},
+ },
+ description: "All blocks should use fast path as each
trace ID appears only once",
+ },
+ {
+ name: "Mixed fast and slow path - some trace IDs
overlap",
+ parts: []*traces{
+ {
+ traceIDs: []string{"trace1",
"trace2"},
+ timestamps: []int64{1, 2},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span2")},
+ spanIDs: []string{"span1", "span2"},
+ },
+ {
+ traceIDs: []string{"trace2",
"trace3"},
+ timestamps: []int64{3, 4},
+ tags: [][]*tagValue{
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+ },
+ {
+ {tag: "tag1",
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span2b"),
[]byte("span3")},
+ spanIDs: []string{"span2b", "span3"},
+ },
+ },
+ wantBlocks: []blockMetadata{
+ {traceID: "trace1", count: 1},
+ {traceID: "trace2", count: 2},
+ {traceID: "trace3", count: 1},
+ },
+ description: "trace1 and trace3 use fast path, trace2
uses slow path (needs merging)",
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ fileSystem := fs.NewLocalFileSystem()
+
+ // Create parts
+ var pmi []*partMergeIter
+ for i, traces := range tt.parts {
+ mp := generateMemPart()
+ mp.mustInitFromTraces(traces)
+ mp.mustFlush(fileSystem, partPath(tmpPath,
uint64(i)))
+ p := mustOpenFilePart(uint64(i), tmpPath,
fileSystem)
+ iter := generatePartMergeIter()
+ iter.mustInitFromPart(p)
+ pmi = append(pmi, iter)
+ releaseMemPart(mp)
+ }
+
+ // Test mergeBlocks
+ br := generateBlockReader()
+ br.init(pmi)
+ bw := generateBlockWriter()
+ dstPath := partPath(tmpPath, 9999)
+ bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+ closeCh := make(chan struct{})
+ defer close(closeCh)
+
+ pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br)
+ require.NoError(t, err)
+ require.NotNil(t, pm)
+ require.NotNil(t, tf)
+ require.NotNil(t, tagTypes)
+
+ releaseBlockWriter(bw)
+ releaseBlockReader(br)
+ for _, iter := range pmi {
+ releasePartMergeIter(iter)
+ }
+
+ // Write metadata to disk so we can read it back
+ pm.mustWriteMetadata(fileSystem, dstPath)
+ tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+ tagTypes.mustWriteTagType(fileSystem, dstPath)
+ fileSystem.SyncPath(dstPath)
+
+ // Verify merged results by reading back
+ mergedPart := mustOpenFilePart(9999, tmpPath,
fileSystem)
+ mergedIter := generatePartMergeIter()
+ mergedIter.mustInitFromPart(mergedPart)
+
+ reader := generateBlockReader()
+ reader.init([]*partMergeIter{mergedIter})
+
+ var gotBlocks []blockMetadata
+ for reader.nextBlockMetadata() {
+ gotBlocks = append(gotBlocks, reader.block.bm)
+ }
+ require.NoError(t, reader.error())
+
+ releaseBlockReader(reader)
+ releasePartMergeIter(mergedIter)
+
+ // Verify block counts and trace IDs
+ require.Len(t, gotBlocks, len(tt.wantBlocks),
tt.description)
+ for i, want := range tt.wantBlocks {
+ require.Equal(t, want.traceID,
gotBlocks[i].traceID, "Block %d trace ID mismatch", i)
+ require.Equal(t, want.count,
gotBlocks[i].count, "Block %d count mismatch for trace %s", i, want.traceID)
+ }
+ })
+ }
+}
+
var mergedBlock = block{
spans: [][]byte{[]byte("span1"), []byte("span2"), []byte("span3"),
[]byte("span4")},
spanIDs: []string{"span1", "span2", "span3", "span4"},
@@ -112,6 +320,657 @@ var mergedBlock = block{
},
}
+// Helper function to verify that a merged part contains expected traces.
+func verifyPartContainsTraces(t *testing.T, partID uint64, tmpPath string,
fileSystem fs.FileSystem, expectedTraces map[string]int) {
+ t.Helper()
+
+ // Open the part
+ p := mustOpenFilePart(partID, tmpPath, fileSystem)
+ defer func() {
+ for _, r := range p.tags {
+ fs.MustClose(r)
+ }
+ for _, r := range p.tagMetadata {
+ fs.MustClose(r)
+ }
+ fs.MustClose(p.primary)
+ fs.MustClose(p.spans)
+ }()
+
+ // Create iterator to read blocks
+ pmi := generatePartMergeIter()
+ pmi.mustInitFromPart(p)
+ defer releasePartMergeIter(pmi)
+
+ reader := generateBlockReader()
+ reader.init([]*partMergeIter{pmi})
+ defer releaseBlockReader(reader)
+
+ // Read all blocks and verify detailed content
+ foundTraces := make(map[string]int)
+ decoder := &encoding.BytesBlockDecoder{}
+ defer decoder.Reset()
+
+ for reader.nextBlockMetadata() {
+ bm := reader.block.bm
+ traceID := bm.traceID
+ spanCount := int(bm.count)
+
+ foundTraces[traceID] += spanCount
+
+ // Verify the block was read successfully
+ require.NotNil(t, reader.block, "Block should not be nil for
trace %s", traceID)
+ require.NotEmpty(t, traceID, "Trace ID should not be empty")
+ require.Greater(t, spanCount, 0, "Trace %s: span count should
be greater than 0", traceID)
+
+ // Load the actual block data to verify spans and tags
+ reader.loadBlockData(decoder)
+
+ // Verify span count matches
+ require.Equal(t, spanCount, len(reader.block.spanIDs),
+ "Trace %s: span count mismatch between metadata (%d)
and actual span IDs (%d)",
+ traceID, spanCount, len(reader.block.spanIDs))
+ require.Equal(t, spanCount, len(reader.block.spans),
+ "Trace %s: span count mismatch between span IDs (%d)
and span data (%d)",
+ traceID, len(reader.block.spanIDs),
len(reader.block.spans))
+
+ // Verify each span has valid data
+ for i, span := range reader.block.spans {
+ require.NotEmpty(t, span, "Trace %s: span %d should not
be empty", traceID, i)
+ require.NotEmpty(t, reader.block.spanIDs[i], "Trace %s:
span ID %d should not be empty", traceID, i)
+ }
+
+ // Verify tags
+ require.NotNil(t, reader.block.tags, "Trace %s: tags should not
be nil", traceID)
+ for _, tag := range reader.block.tags {
+ require.NotEmpty(t, tag.name, "Trace %s: tag name
should not be empty", traceID)
+ require.Equal(t, spanCount, len(tag.values),
+ "Trace %s: tag %s should have %d values, got
%d",
+ traceID, tag.name, spanCount, len(tag.values))
+
+ // Verify each tag value is valid
+ for i, value := range tag.values {
+ require.NotNil(t, value, "Trace %s: tag %s
value %d should not be nil", traceID, tag.name, i)
+ }
+ }
+
+ // Verify timestamps metadata is valid
+ require.True(t, bm.timestamps.min <= bm.timestamps.max,
+ "Trace %s: timestamp min (%d) should be <= max (%d)",
+ traceID, bm.timestamps.min, bm.timestamps.max)
+
+ // Verify tag metadata consistency
+ require.NotNil(t, bm.tags, "Trace %s: tag metadata should not
be nil", traceID)
+ require.Equal(t, len(bm.tags), len(reader.block.tags),
+ "Trace %s: tag metadata count (%d) should match block
tags count (%d)",
+ traceID, len(bm.tags), len(reader.block.tags))
+ }
+ require.NoError(t, reader.error())
+
+ // Verify all expected traces are present with correct counts
+ for traceID, expectedCount := range expectedTraces {
+ actualCount, found := foundTraces[traceID]
+ require.True(t, found, "Trace ID %s not found in merged part
%d", traceID, partID)
+ require.Equal(t, expectedCount, actualCount, "Trace ID %s has
incorrect span count in part %d", traceID, partID)
+ }
+
+ // Verify no unexpected traces exist
+ require.Equal(t, len(expectedTraces), len(foundTraces), "Part %d has
unexpected number of trace IDs", partID)
+}
+
+func Test_multipleRoundMerges(t *testing.T) {
+ tests := []struct {
+ name string
+ rounds []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }
+ want []blockMetadata
+ }{
+ {
+ name: "Two rounds with overlapping trace IDs in both
rounds",
+ rounds: []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }{
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace1", "trace2"},
+ timestamps: []int64{1,
2},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1"), []byte("span2")},
+ spanIDs:
[]string{"span1", "span2"},
+ },
+ {
+ traceIDs:
[]string{"trace1", "trace3"},
+ timestamps: []int64{3,
4},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1b"), []byte("span3")},
+ spanIDs:
[]string{"span1b", "span3"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 2, // 1 from first
part + 1 from second part
+ "trace2": 1, // 1 from first
part
+ "trace3": 1, // 1 from second
part
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace1", "trace2"},
+ timestamps: []int64{5,
6},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1c"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1c"), []byte("span2b")},
+ spanIDs:
[]string{"span1c", "span2b"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 3, // 2 from previous
round + 1 new
+ "trace2": 2, // 1 from previous
round + 1 new
+ "trace3": 1, // 1 from previous
round (unchanged)
+ },
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 3,
uncompressedSpanSizeBytes: 17},
+ {traceID: "trace2", count: 2,
uncompressedSpanSizeBytes: 11},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Two rounds with non-overlapping trace IDs",
+ rounds: []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }{
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceA", "traceB"},
+ timestamps: []int64{1,
2},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanA"), []byte("spanB")},
+ spanIDs:
[]string{"spanA", "spanB"},
+ },
+ {
+ traceIDs:
[]string{"traceC"},
+ timestamps: []int64{3},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valC"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanC")},
+ spanIDs:
[]string{"spanC"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 1,
+ "traceB": 1,
+ "traceC": 1,
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceD", "traceE"},
+ timestamps: []int64{4,
5},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valD"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valE"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanD"), []byte("spanE")},
+ spanIDs:
[]string{"spanD", "spanE"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 1, // from previous
round
+ "traceB": 1, // from previous
round
+ "traceC": 1, // from previous
round
+ "traceD": 1, // new
+ "traceE": 1, // new
+ },
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "traceA", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceB", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceC", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceD", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceE", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Three rounds - first round overlapping,
subsequent rounds non-overlapping",
+ rounds: []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }{
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace1", "trace2"},
+ timestamps: []int64{1,
2},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1"), []byte("span2")},
+ spanIDs:
[]string{"span1", "span2"},
+ },
+ {
+ traceIDs:
[]string{"trace1", "trace2"},
+ timestamps: []int64{3,
4},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1b"), []byte("span2b")},
+ spanIDs:
[]string{"span1b", "span2b"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 2, // overlapping
from 2 parts
+ "trace2": 2, // overlapping
from 2 parts
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace3"},
+ timestamps: []int64{5},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span3")},
+ spanIDs:
[]string{"span3"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 2, // from previous
round
+ "trace2": 2, // from previous
round
+ "trace3": 1, // new,
non-overlapping
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace4"},
+ timestamps: []int64{6},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span4")},
+ spanIDs:
[]string{"span4"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 2, // from previous
rounds
+ "trace2": 2, // from previous
rounds
+ "trace3": 1, // from previous
round
+ "trace4": 1, // new,
non-overlapping
+ },
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 2,
uncompressedSpanSizeBytes: 11},
+ {traceID: "trace2", count: 2,
uncompressedSpanSizeBytes: 11},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace4", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Three rounds - overlapping in first, overlapping
in second, non-overlapping in third",
+ rounds: []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }{
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace1"},
+ timestamps: []int64{1},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1")},
+ spanIDs:
[]string{"span1"},
+ },
+ {
+ traceIDs:
[]string{"trace1", "trace2"},
+ timestamps: []int64{2,
3},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1b"), []byte("span2")},
+ spanIDs:
[]string{"span1b", "span2"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 2, // overlapping
from 2 parts
+ "trace2": 1, // from second part
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace1", "trace3"},
+ timestamps: []int64{4,
5},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1c"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span1c"), []byte("span3")},
+ spanIDs:
[]string{"span1c", "span3"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 3, // 2 from previous
+ 1 new (overlapping)
+ "trace2": 1, // from previous
round
+ "trace3": 1, // new
+ },
+ },
+ {
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"trace4"},
+ timestamps: []int64{6},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("span4")},
+ spanIDs:
[]string{"span4"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "trace1": 3, // from previous
rounds
+ "trace2": 1, // from previous
rounds
+ "trace3": 1, // from previous
round
+ "trace4": 1, // new,
non-overlapping
+ },
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 3,
uncompressedSpanSizeBytes: 17},
+ {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace4", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Four rounds - complex overlapping pattern",
+ rounds: []struct {
+ expectedTraces map[string]int
+ tsList []*traces
+ }{
+ {
+ // Round 1: merge traceA and traceB
with overlaps
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceA"},
+ timestamps: []int64{1},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanA1")},
+ spanIDs:
[]string{"spanA1"},
+ },
+ {
+ traceIDs:
[]string{"traceA", "traceB"},
+ timestamps: []int64{2,
3},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA2"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanA2"), []byte("spanB1")},
+ spanIDs:
[]string{"spanA2", "spanB1"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 2, // overlapping
from 2 parts
+ "traceB": 1, // from second part
+ },
+ },
+ {
+ // Round 2: merge with traceB
(overlapping) and traceC (new)
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceB", "traceC"},
+ timestamps: []int64{4,
5},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB2"), valueArr: nil},
+ },
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valC1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanB2"), []byte("spanC1")},
+ spanIDs:
[]string{"spanB2", "spanC1"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 2, // from previous
round
+ "traceB": 2, // 1 from previous
+ 1 new (overlapping)
+ "traceC": 1, // new
+ },
+ },
+ {
+ // Round 3: non-overlapping traces
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceD"},
+ timestamps: []int64{6},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valD1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanD1")},
+ spanIDs:
[]string{"spanD1"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 2, // from previous
rounds
+ "traceB": 2, // from previous
rounds
+ "traceC": 1, // from previous
round
+ "traceD": 1, // new,
non-overlapping
+ },
+ },
+ {
+ // Round 4: non-overlapping traces
+ tsList: []*traces{
+ {
+ traceIDs:
[]string{"traceE"},
+ timestamps: []int64{7},
+ tags: [][]*tagValue{
+ {
+ {tag:
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valE1"), valueArr: nil},
+ },
+ },
+ spans:
[][]byte{[]byte("spanE1")},
+ spanIDs:
[]string{"spanE1"},
+ },
+ },
+ expectedTraces: map[string]int{
+ "traceA": 2, // from previous
rounds
+ "traceB": 2, // from previous
rounds
+ "traceC": 1, // from previous
rounds
+ "traceD": 1, // from previous
round
+ "traceE": 1, // new,
non-overlapping
+ },
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "traceA", count: 2,
uncompressedSpanSizeBytes: 12},
+ {traceID: "traceB", count: 2,
uncompressedSpanSizeBytes: 12},
+ {traceID: "traceC", count: 1,
uncompressedSpanSizeBytes: 6},
+ {traceID: "traceD", count: 1,
uncompressedSpanSizeBytes: 6},
+ {traceID: "traceE", count: 1,
uncompressedSpanSizeBytes: 6},
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+ fileSystem := fs.NewLocalFileSystem()
+ tst := &tsTable{pm: protector.Nop{}, fileSystem:
fileSystem, root: tmpPath}
+
+ var mergedPartID uint64
+ partID := uint64(1)
+
+ // Execute multiple merge rounds
+ for roundIdx, round := range tt.rounds {
+ t.Logf("Round %d: merging %d parts",
roundIdx+1, len(round.tsList))
+
+ var partsToMerge []*partWrapper
+
+ // If we have a merged part from previous
round, include it
+ if roundIdx > 0 && mergedPartID != 0 {
+ // Reopen the merged part from the
previous round
+ prevMergedPart :=
mustOpenFilePart(mergedPartID, tmpPath, fileSystem)
+ prevMergedPart.partMetadata.ID =
mergedPartID
+ partsToMerge = append(partsToMerge,
newPartWrapper(nil, prevMergedPart))
+ t.Logf(" Including previous merged
part ID %d", mergedPartID)
+ }
+
+ // Create new parts for this round
+ for i, traces := range round.tsList {
+ mp := generateMemPart()
+ mp.mustInitFromTraces(traces)
+ mp.mustFlush(fileSystem,
partPath(tmpPath, partID))
+ p := mustOpenFilePart(partID, tmpPath,
fileSystem)
+ p.partMetadata.ID = partID
+ partsToMerge = append(partsToMerge,
newPartWrapper(nil, p))
+ releaseMemPart(mp)
+ partID++
+ t.Logf(" Created part %d with %d
trace(s)", i+1, len(traces.traceIDs))
+ }
+
+ // Merge all parts for this round
+ closeCh := make(chan struct{})
+ mergedPart, err := tst.mergeParts(fileSystem,
closeCh, partsToMerge, partID, tmpPath)
+ close(closeCh)
+ require.NoError(t, err, "Round %d merge
failed", roundIdx+1)
+ require.NotNil(t, mergedPart, "Round %d
produced nil merged part", roundIdx+1)
+
+ // Release all parts that were merged
+ for _, pw := range partsToMerge {
+ pw.decRef()
+ }
+
+ // Store the merged part ID and release the
wrapper
+ mergedPartID = mergedPart.ID()
+ mergedPart.decRef()
+ partID++
+ t.Logf(" Merged into part ID %d", mergedPartID)
+
+ // Verify the merged part contains expected
traces
+ if round.expectedTraces != nil {
+ t.Logf(" Verifying merged part %d
contains expected traces", mergedPartID)
+ verifyPartContainsTraces(t,
mergedPartID, tmpPath, fileSystem, round.expectedTraces)
+ t.Logf(" ✓ Verification passed for
round %d", roundIdx+1)
+ }
+ }
+
+ // Verify final merged results by reopening the final
merged part
+ require.NotZero(t, mergedPartID, "No merged part
produced")
+ finalMergedPart := mustOpenFilePart(mergedPartID,
tmpPath, fileSystem)
+ finalPW := newPartWrapper(nil, finalMergedPart)
+ defer finalPW.decRef()
+
+ pmi := generatePartMergeIter()
+ pmi.mustInitFromPart(finalPW.p)
+ reader := generateBlockReader()
+ reader.init([]*partMergeIter{pmi})
+
+ var got []blockMetadata
+ for reader.nextBlockMetadata() {
+ got = append(got, reader.block.bm)
+ }
+ require.NoError(t, reader.error())
+ releaseBlockReader(reader)
+ releasePartMergeIter(pmi)
+
+ // Verify block counts and trace IDs
+ require.Len(t, got, len(tt.want), "Final result should
have %d blocks", len(tt.want))
+ for i, want := range tt.want {
+ require.Equal(t, want.traceID, got[i].traceID,
"Block %d trace ID mismatch", i)
+ require.Equal(t, want.count, got[i].count,
"Block %d count mismatch for trace %s", i, want.traceID)
+ require.Equal(t,
want.uncompressedSpanSizeBytes, got[i].uncompressedSpanSizeBytes,
+ "Block %d uncompressed size mismatch
for trace %s", i, want.traceID)
+ }
+ })
+ }
+}
+
func Test_mergeParts(t *testing.T) {
tests := []struct {
wantErr error
@@ -151,6 +1010,177 @@ func Test_mergeParts(t *testing.T) {
{traceID: "trace3", count: 2,
uncompressedSpanSizeBytes: 10},
},
},
+ {
+ name: "Test with different trace IDs - should use fast
path (readRaw/WriteRawBlock)",
+ tsList: []*traces{
+ {
+ traceIDs: []string{"trace1",
"trace2"},
+ timestamps: []int64{1, 1},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span2")},
+ spanIDs: []string{"span1", "span2"},
+ },
+ {
+ traceIDs: []string{"trace3",
"trace4"},
+ timestamps: []int64{2, 2},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span3"),
[]byte("span4")},
+ spanIDs: []string{"span3", "span4"},
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace4", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Test with completely non-overlapping trace IDs
across 3 parts - fast path",
+ tsList: []*traces{
+ {
+ traceIDs: []string{"traceA"},
+ timestamps: []int64{1},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("valueA"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("spanA")},
+ spanIDs: []string{"spanA"},
+ },
+ {
+ traceIDs: []string{"traceB"},
+ timestamps: []int64{2},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("valueB"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("spanB")},
+ spanIDs: []string{"spanB"},
+ },
+ {
+ traceIDs: []string{"traceC"},
+ timestamps: []int64{3},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("valueC"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("spanC")},
+ spanIDs: []string{"spanC"},
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "traceA", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceB", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "traceC", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Test with interleaved non-overlapping trace IDs
- fast path",
+ tsList: []*traces{
+ {
+ traceIDs: []string{"trace1",
"trace3", "trace5"},
+ timestamps: []int64{1, 1, 1},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value5"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span3"), []byte("span5")},
+ spanIDs: []string{"span1", "span3",
"span5"},
+ },
+ {
+ traceIDs: []string{"trace2",
"trace4", "trace6"},
+ timestamps: []int64{2, 2, 2},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value6"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span2"),
[]byte("span4"), []byte("span6")},
+ spanIDs: []string{"span2", "span4",
"span6"},
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace4", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace5", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace6", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
+ {
+ name: "Test with mixed same and different trace IDs -
both fast and slow paths",
+ tsList: []*traces{
+ {
+ traceIDs: []string{"trace1",
"trace2", "trace3"},
+ timestamps: []int64{1, 1, 1},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1"),
[]byte("span2"), []byte("span3")},
+ spanIDs: []string{"span1", "span2",
"span3"},
+ },
+ {
+ traceIDs: []string{"trace1",
"trace4"},
+ timestamps: []int64{2, 2},
+ tags: [][]*tagValue{
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value1b"), valueArr: nil},
+ },
+ {
+ {tag: "strTag",
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+ },
+ },
+ spans: [][]byte{[]byte("span1b"),
[]byte("span4")},
+ spanIDs: []string{"span1b", "span4"},
+ },
+ },
+ want: []blockMetadata{
+ {traceID: "trace1", count: 2,
uncompressedSpanSizeBytes: 11},
+ {traceID: "trace2", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace3", count: 1,
uncompressedSpanSizeBytes: 5},
+ {traceID: "trace4", count: 1,
uncompressedSpanSizeBytes: 5},
+ },
+ },
}
for _, tt := range tests {
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index d10b7620..c2cb3cee 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -334,10 +334,90 @@ func (pmi *partMergeIter) loadBlockMetadata() error {
return nil
}
+func (pmi *partMergeIter) peekBlockMetadata() (*blockMetadata, bool) {
+ if len(pmi.primaryBuf) == 0 {
+ return nil, false
+ }
+ bm := generateBlockMetadata()
+ _, err := bm.unmarshal(pmi.primaryBuf, pmi.tagType)
+ if err != nil {
+ return nil, false
+ }
+ return bm, true
+}
+
func (pmi *partMergeIter) mustLoadBlockData(decoder
*encoding.BytesBlockDecoder, block *blockPointer) {
block.block.mustSeqReadFrom(decoder, &pmi.seqReaders, pmi.block.bm)
}
+func (pmi *partMergeIter) mustReadRaw(r *rawBlock, bm *blockMetadata) {
+ r.bm = bm
+ // spans
+ if bm.spans != nil && bm.spans.size > 0 {
+ // Validate the reader is aligned to the expected offset
+ if bm.spans.offset != pmi.seqReaders.spans.bytesRead {
+ logger.Panicf("offset %d must be equal to bytesRead
%d", bm.spans.offset, pmi.seqReaders.spans.bytesRead)
+ }
+ r.spans = bytes.ResizeOver(r.spans[:0], int(bm.spans.size))
+ pmi.seqReaders.spans.mustReadFull(r.spans)
+ } else {
+ r.spans = r.spans[:0]
+ }
+ // tags
+ if len(bm.tags) > 0 {
+ if r.tags == nil {
+ r.tags = make(map[string][]byte, len(bm.tags))
+ }
+ if r.tagMetadata == nil {
+ r.tagMetadata = make(map[string][]byte, len(bm.tags))
+ }
+ for name, db := range bm.tags {
+ // read tag metadata block
+ // Validate the tag metadata reader alignment
+ if db.offset !=
pmi.seqReaders.tagMetadata[name].bytesRead {
+ logger.Panicf("offset %d must be equal to
bytesRead %d", db.offset, pmi.seqReaders.tagMetadata[name].bytesRead)
+ }
+ mb := r.tagMetadata[name]
+ mb = bytes.ResizeOver(mb[:0], int(db.size))
+ pmi.seqReaders.tagMetadata[name].mustReadFull(mb)
+ r.tagMetadata[name] = mb
+ // parse to locate tag values range
+ tm := generateTagMetadata()
+ if err := tm.unmarshal(mb); err != nil {
+ logger.Panicf("cannot unmarshal tag metadata:
%v", err)
+ }
+ // read tag values
+ // Validate the tag values reader alignment
+ if tm.offset != pmi.seqReaders.tags[name].bytesRead {
+ logger.Panicf("offset %d must be equal to
bytesRead %d", tm.offset, pmi.seqReaders.tags[name].bytesRead)
+ }
+ vb := r.tags[name]
+ vb = bytes.ResizeOver(vb[:0], int(tm.size))
+ pmi.seqReaders.tags[name].mustReadFull(vb)
+ r.tags[name] = vb
+ releaseTagMetadata(tm)
+ }
+ for k := range r.tags {
+ if _, ok := bm.tags[k]; !ok {
+ delete(r.tags, k)
+ }
+ }
+ for k := range r.tagMetadata {
+ if _, ok := bm.tags[k]; !ok {
+ delete(r.tagMetadata, k)
+ }
+ }
+ } else {
+ // Clear maps for tags that are not present
+ for k := range r.tags {
+ delete(r.tags, k)
+ }
+ for k := range r.tagMetadata {
+ delete(r.tagMetadata, k)
+ }
+ }
+}
+
func generatePartMergeIter() *partMergeIter {
v := pmiPool.Get()
if v == nil {
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 6652aa9c..c9ac7b5b 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -19,6 +19,7 @@ package trace
import (
"errors"
+ "fmt"
"testing"
"time"
@@ -305,3 +306,65 @@ func generateHugeTraces(num int) *traces {
traces.spanIDs = append(traces.spanIDs, []string{"span2", "span3"}...)
return traces
}
+
+// generateRealisticTraces creates a more realistic dataset for benchmarking:
+// - Each trace has 3-5 spans
+// - Each span is more than 100KB
+// - Creates the specified number of unique traces.
+func generateRealisticTraces(numTraces int) *traces {
+ traces := &traces{
+ traceIDs: []string{},
+ timestamps: []int64{},
+ tags: [][]*tagValue{},
+ spans: [][]byte{},
+ spanIDs: []string{},
+ }
+
+ // Create a large span payload (>100 KiB)
+ // Using a mix of realistic data: stack traces, error messages, metadata
+ const spanPayloadSizeKiB = 110 // 110 KiB (1 KiB = 1024 bytes)
+ spanPayloadTemplate := make([]byte, spanPayloadSizeKiB*1024)
+ for i := range spanPayloadTemplate {
+ // Fill with semi-realistic data
+ spanPayloadTemplate[i] = byte('A' + (i % 26))
+ }
+
+ timestamp := int64(1000000)
+
+ for traceIdx := 0; traceIdx < numTraces; traceIdx++ {
+ traceID := fmt.Sprintf("trace_%d", traceIdx)
+
+ // Each trace has 3-5 spans
+ numSpans := 3 + (traceIdx % 3) // Will give 3, 4, or 5 spans
+
+ for spanIdx := 0; spanIdx < numSpans; spanIdx++ {
+ spanID := fmt.Sprintf("%s_span_%d", traceID, spanIdx)
+
+ // Create a unique span payload by appending
span-specific data
+ spanPayload := make([]byte, len(spanPayloadTemplate))
+ copy(spanPayload, spanPayloadTemplate)
+ // Add span-specific suffix to ensure uniqueness
+ suffix := []byte(fmt.Sprintf("_trace_%d_span_%d",
traceIdx, spanIdx))
+ copy(spanPayload[len(spanPayload)-len(suffix):], suffix)
+
+ traces.traceIDs = append(traces.traceIDs, traceID)
+ traces.timestamps = append(traces.timestamps, timestamp)
+ timestamp++
+
+ // Add realistic tags
+ traces.tags = append(traces.tags, []*tagValue{
+ {tag: "http.method", valueType:
pbv1.ValueTypeStr, value: []byte("POST"), valueArr: nil},
+ {tag: "http.status", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(200), valueArr: nil},
+ {tag: "http.url", valueType: pbv1.ValueTypeStr,
value: []byte(fmt.Sprintf("/api/v1/trace/%d", traceIdx)), valueArr: nil},
+ {tag: "service.name", valueType:
pbv1.ValueTypeStr, value: []byte("test-service"), valueArr: nil},
+ {tag: "span.kind", valueType:
pbv1.ValueTypeStr, value: []byte("server"), valueArr: nil},
+ {tag: "error", valueType:
pbv1.ValueTypeBinaryData, value: spanPayload[:1024], valueArr: nil}, // Use
part of payload for error
+ })
+
+ traces.spans = append(traces.spans, spanPayload)
+ traces.spanIDs = append(traces.spanIDs, spanID)
+ }
+ }
+
+ return traces
+}