This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 1afd6ec6 Enhance block reading and writing with raw block handling 
(#804)
1afd6ec6 is described below

commit 1afd6ec68667e66e22ad9945ed8341df3102145c
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Oct 11 16:06:11 2025 +0800

    Enhance block reading and writing with raw block handling (#804)
---
 banyand/trace/block_reader.go      |   54 ++
 banyand/trace/block_writer.go      |   80 +++
 banyand/trace/merger.go            |   24 +
 banyand/trace/merger_bench_test.go |  387 ++++++++++++++
 banyand/trace/merger_test.go       | 1030 ++++++++++++++++++++++++++++++++++++
 banyand/trace/part_iter.go         |   80 +++
 banyand/trace/tstable_test.go      |   63 +++
 7 files changed, 1718 insertions(+)

diff --git a/banyand/trace/block_reader.go b/banyand/trace/block_reader.go
index 9fdf4145..72bc2d8c 100644
--- a/banyand/trace/block_reader.go
+++ b/banyand/trace/block_reader.go
@@ -29,6 +29,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
+type rawBlock struct {
+       bm          *blockMetadata
+       tags        map[string][]byte
+       tagMetadata map[string][]byte
+       spans       []byte
+}
+
 type seqReader struct {
        sr        fs.SeqReader
        r         fs.Reader
@@ -127,6 +134,7 @@ type blockReader struct {
        err           error
        block         *blockPointer
        pih           partMergeIterHeap
+       peekBlock     blockPointer
        nextBlockNoop bool
 }
 
@@ -138,6 +146,7 @@ func (br *blockReader) reset() {
        br.pih = br.pih[:0]
        br.nextBlockNoop = false
        br.err = nil
+       br.peekBlock.reset()
 }
 
 func (br *blockReader) init(pii []*partMergeIter) {
@@ -209,6 +218,51 @@ func (br *blockReader) loadBlockData(decoder 
*encoding.BytesBlockDecoder) {
        br.pih[0].mustLoadBlockData(decoder, br.block)
 }
 
+func (br *blockReader) peek() *blockPointer {
+       // Peek at the next block that will be processed to determine if 
current traceID is unique
+       // We need to check two things:
+       // 1. Does the current partMergeIter (pih[0]) have another block with 
the same traceID?
+       // 2. Do any other partMergeIters have a block with the same traceID?
+
+       if len(br.pih) == 0 {
+               return nil
+       }
+
+       currentTraceID := br.block.bm.traceID
+
+       // First check if pih[0] has another block
+       if nextBM, ok := br.pih[0].peekBlockMetadata(); ok {
+               // If the next block in the same part has the same traceID, we 
can't use fast path
+               if nextBM.traceID == currentTraceID {
+                       // Reuse peekBlock to avoid allocation
+                       br.peekBlock.reset()
+                       br.peekBlock.bm = *nextBM
+                       releaseBlockMetadata(nextBM)
+                       return &br.peekBlock
+               }
+               releaseBlockMetadata(nextBM)
+               // The next block in pih[0] has a different traceID
+               // But we still need to check other parts in the heap
+       }
+
+       // Check if any other part (pih[1], pih[2], etc.) has the same traceID
+       // Note: The heap is a min-heap, not fully sorted, so we must check ALL 
elements
+       for i := 1; i < len(br.pih); i++ {
+               if br.pih[i].block.bm.traceID == currentTraceID {
+                       // Another part has the same traceID
+                       return &br.pih[i].block
+               }
+       }
+
+       // No other blocks with this traceID - it's unique
+       return nil
+}
+
+func (br *blockReader) mustReadRaw(r *rawBlock, bm *blockMetadata) {
+       // Delegate to the current partMergeIter to read raw block
+       br.pih[0].mustReadRaw(r, bm)
+}
+
 func (br *blockReader) error() error {
        if errors.Is(br.err, io.EOF) {
                return nil
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 264cade8..8bfb06f8 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -264,6 +264,86 @@ func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
        bw.maxTimestamp = 0
 }
 
+func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
+       bm := r.bm
+       if bm.count == 0 {
+               return
+       }
+       hasWrittenBlocks := len(bw.traceIDs) > 0
+       var tidLast string
+       var isSeenTid bool
+
+       if hasWrittenBlocks {
+               tidLast = bw.traceIDs[len(bw.traceIDs)-1]
+               if bm.traceID < tidLast {
+                       logger.Panicf("the tid=%s cannot be smaller than the 
previously written tid=%s", bm.traceID, tidLast)
+               }
+               isSeenTid = bm.traceID == tidLast
+       }
+       bw.traceIDs = append(bw.traceIDs, bm.traceID)
+       bw.tagType.copyFrom(bm.tagType)
+
+       tm := &bm.timestamps
+       if bw.totalCount == 0 || tm.min < bw.totalMinTimestamp {
+               bw.totalMinTimestamp = tm.min
+       }
+       if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
+               bw.totalMaxTimestamp = tm.max
+       }
+       if !hasWrittenBlocks || tm.min < bw.minTimestamp {
+               bw.minTimestamp = tm.min
+       }
+       if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
+               bw.maxTimestamp = tm.max
+       }
+       if isSeenTid && tm.min < bw.minTimestampLast {
+               logger.Panicf("the block for tid=%s cannot contain timestamp 
smaller than %d, but it contains timestamp %d", bm.traceID, 
bw.minTimestampLast, tm.min)
+       }
+       bw.minTimestampLast = tm.min
+
+       bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
+       bw.totalCount += bm.count
+       bw.totalBlocksCount++
+
+       newBm := generateBlockMetadata()
+       newBm.copyFrom(bm)
+
+       newBm.spans.offset = bw.writers.spanWriter.bytesWritten
+       bw.writers.spanWriter.MustWrite(r.spans)
+
+       for name := range bm.tags {
+               tmw, tw := bw.writers.getWriters(name)
+               newDB := newBm.getTagMetadata(name)
+
+               // Unmarshal the tag metadata to update the tag value offset
+               tm := generateTagMetadata()
+               if err := tm.unmarshal(r.tagMetadata[name]); err != nil {
+                       logger.Panicf("cannot unmarshal tag metadata for %s: 
%v", name, err)
+               }
+
+               // Update the tag value offset to point to the correct location 
in the new file
+               tm.offset = tw.bytesWritten
+               tw.MustWrite(r.tags[name])
+
+               // Now marshal the updated metadata and write it
+               bb := bigValuePool.Generate()
+               bb.Buf = tm.marshal(bb.Buf[:0])
+               newDB.offset = tmw.bytesWritten
+               newDB.size = uint64(len(bb.Buf))
+               tmw.MustWrite(bb.Buf)
+               bigValuePool.Release(bb)
+               releaseTagMetadata(tm)
+       }
+
+       bw.primaryBlockData = newBm.marshal(bw.primaryBlockData)
+       releaseBlockMetadata(newBm)
+
+       if len(bw.primaryBlockData) > maxUncompressedPrimaryBlockSize {
+               bw.mustFlushPrimaryBlock(bw.primaryBlockData)
+               bw.primaryBlockData = bw.primaryBlockData[:0]
+       }
+}
+
 func (bw *blockWriter) Flush(pm *partMetadata, tf *traceIDFilter, tt *tagType) 
{
        pm.UncompressedSpanSizeBytes = bw.totalUncompressedSpanSizeBytes
        pm.TotalCount = bw.totalCount
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 3892f045..0c5b5ae2 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -328,12 +328,16 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
 
 var errClosed = fmt.Errorf("the merger is closed")
 
+// forceSlowMerge is used for testing to disable the fast raw merge path.
+var forceSlowMerge = false
+
 func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, br *blockReader) 
(*partMetadata, *traceIDFilter, *tagType, error) {
        pendingBlockIsEmpty := true
        pendingBlock := generateBlockPointer()
        defer releaseBlockPointer(pendingBlock)
        var tmpBlock *blockPointer
        var decoder *encoding.BytesBlockDecoder
+       var rawBlk rawBlock
        getDecoder := func() *encoding.BytesBlockDecoder {
                if decoder == nil {
                        decoder = generateColumnValuesDecoder()
@@ -353,6 +357,15 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                default:
                }
                b := br.block
+               // Fast path: if this is the only block for this traceID AND we 
have no pending block,
+               // copy it raw without unmarshaling
+               nextB := br.peek()
+               if !forceSlowMerge && pendingBlockIsEmpty && (nextB == nil || 
nextB.bm.traceID != b.bm.traceID) {
+                       // fast path: only a single block for the trace id and 
no pending data
+                       br.mustReadRaw(&rawBlk, &b.bm)
+                       bw.mustWriteRawBlock(&rawBlk)
+                       continue
+               }
 
                if pendingBlockIsEmpty {
                        br.loadBlockData(getDecoder())
@@ -365,8 +378,19 @@ func mergeBlocks(closeCh <-chan struct{}, bw *blockWriter, 
br *blockReader) (*pa
                        bw.mustWriteBlock(pendingBlock.bm.traceID, 
&pendingBlock.block)
                        releaseDecoder()
                        pendingBlock.reset()
+                       // After writing the pending block, check if the new 
block can be copied raw
+                       // This is the same fast path check as at the beginning 
of the loop
+                       nextB = br.peek()
+                       if !forceSlowMerge && (nextB == nil || nextB.bm.traceID 
!= b.bm.traceID) {
+                               // fast path: only a single block for this new 
trace id
+                               br.mustReadRaw(&rawBlk, &b.bm)
+                               bw.mustWriteRawBlock(&rawBlk)
+                               continue
+                       }
+                       // Slow path: start accumulating the new block
                        br.loadBlockData(getDecoder())
                        pendingBlock.copyFrom(b)
+                       pendingBlockIsEmpty = false
                        continue
                }
 
diff --git a/banyand/trace/merger_bench_test.go 
b/banyand/trace/merger_bench_test.go
new file mode 100644
index 00000000..607c5d94
--- /dev/null
+++ b/banyand/trace/merger_bench_test.go
@@ -0,0 +1,387 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Benchmark_mergeBlocks_FastRawMerge benchmarks the fast path where trace IDs 
don't overlap.
+// This uses br.mustReadRaw and bw.mustWriteRawBlock to copy blocks without 
unmarshaling.
+//
+// Performance characteristics:
+// - Lower memory usage (up to 1400x less for large datasets)
+// - Fewer allocations
+// - Slightly slower or comparable time performance
+// - Optimal when trace IDs are unique across parts (common case).
+func Benchmark_mergeBlocks_FastRawMerge(b *testing.B) {
+       benchmarkCases := []struct {
+               tracesGen func() *traces
+               name      string
+               numParts  int
+       }{
+               {
+                       name:     "small_parts_non_overlapping",
+                       numParts: 10,
+                       tracesGen: func() *traces {
+                               return &traces{
+                                       traceIDs:   []string{"trace1", 
"trace2", "trace3"},
+                                       timestamps: []int64{1, 2, 3},
+                                       tags: [][]*tagValue{
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil}},
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil}},
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil}},
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span2"), []byte("span3")},
+                                       spanIDs: []string{"span1", "span2", 
"span3"},
+                               }
+                       },
+               },
+               {
+                       name:     "medium_parts_non_overlapping",
+                       numParts: 50,
+                       tracesGen: func() *traces {
+                               return generateHugeTraces(100)
+                       },
+               },
+               {
+                       name:     "large_parts_non_overlapping",
+                       numParts: 100,
+                       tracesGen: func() *traces {
+                               return generateHugeTraces(500)
+                       },
+               },
+       }
+
+       for _, bc := range benchmarkCases {
+               b.Run(bc.name, func(b *testing.B) {
+                       tmpPath, defFn := test.Space(require.New(b))
+                       defer defFn()
+
+                       fileSystem := fs.NewLocalFileSystem()
+
+                       // Create parts with non-overlapping trace IDs (fast 
path)
+                       var parts []*part
+                       for i := 0; i < bc.numParts; i++ {
+                               mp := generateMemPart()
+                               traces := bc.tracesGen()
+                               // Make trace IDs unique per part to trigger 
fast path
+                               for j := range traces.traceIDs {
+                                       traces.traceIDs[j] = traces.traceIDs[j] 
+ "_part" + string(rune(i))
+                               }
+                               mp.mustInitFromTraces(traces)
+                               mp.mustFlush(fileSystem, partPath(tmpPath, 
uint64(i)))
+                               p := mustOpenFilePart(uint64(i), tmpPath, 
fileSystem)
+                               parts = append(parts, p)
+                               releaseMemPart(mp)
+                       }
+
+                       b.ResetTimer()
+                       b.ReportAllocs()
+
+                       for i := 0; i < b.N; i++ {
+                               // Create new iterators for each benchmark 
iteration
+                               var pmi []*partMergeIter
+                               for _, p := range parts {
+                                       iter := generatePartMergeIter()
+                                       iter.mustInitFromPart(p)
+                                       pmi = append(pmi, iter)
+                               }
+
+                               br := generateBlockReader()
+                               br.init(pmi)
+                               bw := generateBlockWriter()
+                               dstPath := partPath(tmpPath, uint64(10000+i))
+                               bw.mustInitForFilePart(fileSystem, dstPath, 
false)
+
+                               closeCh := make(chan struct{})
+
+                               _, _, _, err := mergeBlocks(closeCh, bw, br)
+                               if err != nil {
+                                       b.Fatal(err)
+                               }
+
+                               close(closeCh)
+                               releaseBlockWriter(bw)
+                               releaseBlockReader(br)
+
+                               // Release iterators
+                               for _, iter := range pmi {
+                                       releasePartMergeIter(iter)
+                               }
+                       }
+               })
+       }
+}
+
+// Benchmark_mergeBlocks_OriginalMerge benchmarks the slow path where trace 
IDs overlap.
+// This requires unmarshaling, merging blocks, and then writing them.
+//
+// Performance characteristics:
+// - Higher memory usage (up to 1400x more for large datasets)
+// - More allocations
+// - Can be slightly faster for some workloads due to batching
+// - Required when trace IDs overlap across parts (needs merging).
+func Benchmark_mergeBlocks_OriginalMerge(b *testing.B) {
+       benchmarkCases := []struct {
+               tracesGen func() *traces
+               name      string
+               numParts  int
+       }{
+               {
+                       name:     "small_parts_overlapping",
+                       numParts: 10,
+                       tracesGen: func() *traces {
+                               return &traces{
+                                       traceIDs:   []string{"trace1", 
"trace2", "trace3"},
+                                       timestamps: []int64{1, 2, 3},
+                                       tags: [][]*tagValue{
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil}},
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil}},
+                                               {{tag: "tag1", valueType: 
pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil}},
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span2"), []byte("span3")},
+                                       spanIDs: []string{"span1", "span2", 
"span3"},
+                               }
+                       },
+               },
+               {
+                       name:     "medium_parts_overlapping",
+                       numParts: 50,
+                       tracesGen: func() *traces {
+                               return generateHugeTraces(100)
+                       },
+               },
+               {
+                       name:     "large_parts_overlapping",
+                       numParts: 100,
+                       tracesGen: func() *traces {
+                               return generateHugeTraces(500)
+                       },
+               },
+       }
+
+       for _, bc := range benchmarkCases {
+               b.Run(bc.name, func(b *testing.B) {
+                       tmpPath, defFn := test.Space(require.New(b))
+                       defer defFn()
+
+                       fileSystem := fs.NewLocalFileSystem()
+
+                       // Create parts with overlapping trace IDs (slow path)
+                       var parts []*part
+                       for i := 0; i < bc.numParts; i++ {
+                               mp := generateMemPart()
+                               traces := bc.tracesGen()
+                               // Keep same trace IDs across parts to trigger 
slow path (merging)
+                               mp.mustInitFromTraces(traces)
+                               mp.mustFlush(fileSystem, partPath(tmpPath, 
uint64(i)))
+                               p := mustOpenFilePart(uint64(i), tmpPath, 
fileSystem)
+                               parts = append(parts, p)
+                               releaseMemPart(mp)
+                       }
+
+                       b.ResetTimer()
+                       b.ReportAllocs()
+
+                       for i := 0; i < b.N; i++ {
+                               // Create new iterators for each benchmark 
iteration
+                               var pmi []*partMergeIter
+                               for _, p := range parts {
+                                       iter := generatePartMergeIter()
+                                       iter.mustInitFromPart(p)
+                                       pmi = append(pmi, iter)
+                               }
+
+                               br := generateBlockReader()
+                               br.init(pmi)
+                               bw := generateBlockWriter()
+                               dstPath := partPath(tmpPath, uint64(20000+i))
+                               bw.mustInitForFilePart(fileSystem, dstPath, 
false)
+
+                               closeCh := make(chan struct{})
+
+                               _, _, _, err := mergeBlocks(closeCh, bw, br)
+                               if err != nil {
+                                       b.Fatal(err)
+                               }
+
+                               close(closeCh)
+                               releaseBlockWriter(bw)
+                               releaseBlockReader(br)
+
+                               // Release iterators
+                               for _, iter := range pmi {
+                                       releasePartMergeIter(iter)
+                               }
+                       }
+               })
+       }
+}
+
+// Benchmark_mergeBlocks_Comparison provides a direct comparison benchmark
+// between fast raw merge and original merge with the SAME test data.
+//
+// This properly compares the two merge strategies by using identical data
+// (non-overlapping trace IDs) and toggling the forceSlowMerge flag.
+//
+// Uses generateRealisticTraces with:
+// - More than 10,000 traces per part
+// - Each trace has 3-5 spans
+// - Each span is more than 100KB
+//
+// Usage:
+//
+//     go test -bench=Benchmark_mergeBlocks_Comparison -run='^$' 
./banyand/trace -benchtime=10x
+//
+// This benchmark clearly demonstrates the memory vs. time tradeoff:
+// - Fast raw merge: Lower memory, uses raw block copy
+// - Slow merge (forced): Higher memory, decompresses and processes blocks.
+func Benchmark_mergeBlocks_Comparison(b *testing.B) {
+       numParts := 20
+       tracesPerPart := 10000 // More than 10,000 traces per part
+
+       // Create shared test data with non-overlapping trace IDs
+       tmpPath, defFn := test.Space(require.New(b))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+
+       b.Logf("Creating %d parts with %d traces each (3-5 spans per trace, 
>100KB per span)", numParts, tracesPerPart)
+
+       var parts []*part
+       for i := 0; i < numParts; i++ {
+               mp := generateMemPart()
+               // Use new realistic trace generator
+               traces := generateRealisticTraces(tracesPerPart)
+               // Make trace IDs unique per part to avoid merging
+               for j := range traces.traceIDs {
+                       traces.traceIDs[j] = traces.traceIDs[j] + "_part" + 
string(rune(i))
+               }
+               mp.mustInitFromTraces(traces)
+               mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i)))
+               p := mustOpenFilePart(uint64(i), tmpPath, fileSystem)
+               parts = append(parts, p)
+               releaseMemPart(mp)
+
+               if i == 0 {
+                       // Log stats for first part
+                       totalSpans := len(traces.spans)
+                       totalSize := int64(0)
+                       for _, span := range traces.spans {
+                               totalSize += int64(len(span))
+                       }
+                       b.Logf("Part 0 stats: %d spans, total size: %.2f MB, 
avg span size: %.2f KB",
+                               totalSpans, float64(totalSize)/(1024*1024), 
float64(totalSize)/float64(totalSpans)/1024)
+               }
+       }
+
+       b.Run("fast_raw_merge", func(b *testing.B) {
+               forceSlowMerge = false // Enable fast path
+               defer func() { forceSlowMerge = false }()
+
+               // Create separate temp dir for fast merge output
+               tmpPathFast, defFnFast := test.Space(require.New(b))
+               defer defFnFast()
+
+               b.ResetTimer()
+               b.ReportAllocs()
+
+               for i := 0; i < b.N; i++ {
+                       // Create new iterators for each benchmark iteration
+                       var pmi []*partMergeIter
+                       for _, p := range parts {
+                               iter := generatePartMergeIter()
+                               iter.mustInitFromPart(p)
+                               pmi = append(pmi, iter)
+                       }
+
+                       br := generateBlockReader()
+                       br.init(pmi)
+                       bw := generateBlockWriter()
+                       dstPath := partPath(tmpPathFast, uint64(30000+i))
+                       bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+                       closeCh := make(chan struct{})
+
+                       _, _, _, err := mergeBlocks(closeCh, bw, br)
+                       if err != nil {
+                               b.Fatal(err)
+                       }
+
+                       close(closeCh)
+                       releaseBlockWriter(bw)
+                       releaseBlockReader(br)
+
+                       // Release iterators
+                       for _, iter := range pmi {
+                               releasePartMergeIter(iter)
+                       }
+               }
+       })
+
+       b.Run("slow_merge_forced", func(b *testing.B) {
+               forceSlowMerge = true // Force slow path (original merge)
+               defer func() { forceSlowMerge = false }()
+
+               // Create separate temp dir for slow merge output
+               tmpPathSlow, defFnSlow := test.Space(require.New(b))
+               defer defFnSlow()
+
+               b.ResetTimer()
+               b.ReportAllocs()
+
+               for i := 0; i < b.N; i++ {
+                       // Create new iterators for each benchmark iteration
+                       var pmi []*partMergeIter
+                       for _, p := range parts {
+                               iter := generatePartMergeIter()
+                               iter.mustInitFromPart(p)
+                               pmi = append(pmi, iter)
+                       }
+
+                       br := generateBlockReader()
+                       br.init(pmi)
+                       bw := generateBlockWriter()
+                       dstPath := partPath(tmpPathSlow, uint64(40000+i))
+                       bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+                       closeCh := make(chan struct{})
+
+                       _, _, _, err := mergeBlocks(closeCh, bw, br)
+                       if err != nil {
+                               b.Fatal(err)
+                       }
+
+                       close(closeCh)
+                       releaseBlockWriter(bw)
+                       releaseBlockReader(br)
+
+                       // Release iterators
+                       for _, iter := range pmi {
+                               releasePartMergeIter(iter)
+                       }
+               }
+       })
+}
diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go
index 33f1209d..7196f85a 100644
--- a/banyand/trace/merger_test.go
+++ b/banyand/trace/merger_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/test"
@@ -98,6 +99,213 @@ func Test_mergeTwoBlocks(t *testing.T) {
        }
 }
 
+func Test_mergeBlocks_fastPath(t *testing.T) {
+       tests := []struct {
+               name        string
+               description string
+               parts       []*traces
+               wantBlocks  []blockMetadata
+       }{
+               {
+                       name: "Fast path with completely different trace IDs",
+                       parts: []*traces{
+                               {
+                                       traceIDs:   []string{"traceA", 
"traceB"},
+                                       timestamps: []int64{1, 2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("spanA"), 
[]byte("spanB")},
+                                       spanIDs: []string{"spanA", "spanB"},
+                               },
+                               {
+                                       traceIDs:   []string{"traceC", 
"traceD"},
+                                       timestamps: []int64{3, 4},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("spanC"), 
[]byte("spanD")},
+                                       spanIDs: []string{"spanC", "spanD"},
+                               },
+                       },
+                       wantBlocks: []blockMetadata{
+                               {traceID: "traceA", count: 1},
+                               {traceID: "traceB", count: 1},
+                               {traceID: "traceC", count: 1},
+                               {traceID: "traceD", count: 1},
+                       },
+                       description: "Each block should use 
readRaw/WriteRawBlock as there's only one block per trace ID",
+               },
+               {
+                       name: "Fast path with single trace ID per part",
+                       parts: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1"},
+                                       timestamps: []int64{1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1")},
+                                       spanIDs: []string{"span1"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace2"},
+                                       timestamps: []int64{2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span2")},
+                                       spanIDs: []string{"span2"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace3"},
+                                       timestamps: []int64{3},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span3")},
+                                       spanIDs: []string{"span3"},
+                               },
+                       },
+                       wantBlocks: []blockMetadata{
+                               {traceID: "trace1", count: 1},
+                               {traceID: "trace2", count: 1},
+                               {traceID: "trace3", count: 1},
+                       },
+                       description: "All blocks should use fast path as each 
trace ID appears only once",
+               },
+               {
+                       name: "Mixed fast and slow path - some trace IDs 
overlap",
+                       parts: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1", 
"trace2"},
+                                       timestamps: []int64{1, 2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span2")},
+                                       spanIDs: []string{"span1", "span2"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace2", 
"trace3"},
+                                       timestamps: []int64{3, 4},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "tag1", 
valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span2b"), 
[]byte("span3")},
+                                       spanIDs: []string{"span2b", "span3"},
+                               },
+                       },
+                       wantBlocks: []blockMetadata{
+                               {traceID: "trace1", count: 1},
+                               {traceID: "trace2", count: 2},
+                               {traceID: "trace3", count: 1},
+                       },
+                       description: "trace1 and trace3 use fast path, trace2 
uses slow path (needs merging)",
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+
+                       fileSystem := fs.NewLocalFileSystem()
+
+                       // Create parts
+                       var pmi []*partMergeIter
+                       for i, traces := range tt.parts {
+                               mp := generateMemPart()
+                               mp.mustInitFromTraces(traces)
+                               mp.mustFlush(fileSystem, partPath(tmpPath, 
uint64(i)))
+                               p := mustOpenFilePart(uint64(i), tmpPath, 
fileSystem)
+                               iter := generatePartMergeIter()
+                               iter.mustInitFromPart(p)
+                               pmi = append(pmi, iter)
+                               releaseMemPart(mp)
+                       }
+
+                       // Test mergeBlocks
+                       br := generateBlockReader()
+                       br.init(pmi)
+                       bw := generateBlockWriter()
+                       dstPath := partPath(tmpPath, 9999)
+                       bw.mustInitForFilePart(fileSystem, dstPath, false)
+
+                       closeCh := make(chan struct{})
+                       defer close(closeCh)
+
+                       pm, tf, tagTypes, err := mergeBlocks(closeCh, bw, br)
+                       require.NoError(t, err)
+                       require.NotNil(t, pm)
+                       require.NotNil(t, tf)
+                       require.NotNil(t, tagTypes)
+
+                       releaseBlockWriter(bw)
+                       releaseBlockReader(br)
+                       for _, iter := range pmi {
+                               releasePartMergeIter(iter)
+                       }
+
+                       // Write metadata to disk so we can read it back
+                       pm.mustWriteMetadata(fileSystem, dstPath)
+                       tf.mustWriteTraceIDFilter(fileSystem, dstPath)
+                       tagTypes.mustWriteTagType(fileSystem, dstPath)
+                       fileSystem.SyncPath(dstPath)
+
+                       // Verify merged results by reading back
+                       mergedPart := mustOpenFilePart(9999, tmpPath, 
fileSystem)
+                       mergedIter := generatePartMergeIter()
+                       mergedIter.mustInitFromPart(mergedPart)
+
+                       reader := generateBlockReader()
+                       reader.init([]*partMergeIter{mergedIter})
+
+                       var gotBlocks []blockMetadata
+                       for reader.nextBlockMetadata() {
+                               gotBlocks = append(gotBlocks, reader.block.bm)
+                       }
+                       require.NoError(t, reader.error())
+
+                       releaseBlockReader(reader)
+                       releasePartMergeIter(mergedIter)
+
+                       // Verify block counts and trace IDs
+                       require.Len(t, gotBlocks, len(tt.wantBlocks), 
tt.description)
+                       for i, want := range tt.wantBlocks {
+                               require.Equal(t, want.traceID, 
gotBlocks[i].traceID, "Block %d trace ID mismatch", i)
+                               require.Equal(t, want.count, 
gotBlocks[i].count, "Block %d count mismatch for trace %s", i, want.traceID)
+                       }
+               })
+       }
+}
+
 var mergedBlock = block{
        spans:   [][]byte{[]byte("span1"), []byte("span2"), []byte("span3"), 
[]byte("span4")},
        spanIDs: []string{"span1", "span2", "span3", "span4"},
@@ -112,6 +320,657 @@ var mergedBlock = block{
        },
 }
 
+// Helper function to verify that a merged part contains expected traces.
+func verifyPartContainsTraces(t *testing.T, partID uint64, tmpPath string, 
fileSystem fs.FileSystem, expectedTraces map[string]int) {
+       t.Helper()
+
+       // Open the part
+       p := mustOpenFilePart(partID, tmpPath, fileSystem)
+       defer func() {
+               for _, r := range p.tags {
+                       fs.MustClose(r)
+               }
+               for _, r := range p.tagMetadata {
+                       fs.MustClose(r)
+               }
+               fs.MustClose(p.primary)
+               fs.MustClose(p.spans)
+       }()
+
+       // Create iterator to read blocks
+       pmi := generatePartMergeIter()
+       pmi.mustInitFromPart(p)
+       defer releasePartMergeIter(pmi)
+
+       reader := generateBlockReader()
+       reader.init([]*partMergeIter{pmi})
+       defer releaseBlockReader(reader)
+
+       // Read all blocks and verify detailed content
+       foundTraces := make(map[string]int)
+       decoder := &encoding.BytesBlockDecoder{}
+       defer decoder.Reset()
+
+       for reader.nextBlockMetadata() {
+               bm := reader.block.bm
+               traceID := bm.traceID
+               spanCount := int(bm.count)
+
+               foundTraces[traceID] += spanCount
+
+               // Verify the block was read successfully
+               require.NotNil(t, reader.block, "Block should not be nil for 
trace %s", traceID)
+               require.NotEmpty(t, traceID, "Trace ID should not be empty")
+               require.Greater(t, spanCount, 0, "Trace %s: span count should 
be greater than 0", traceID)
+
+               // Load the actual block data to verify spans and tags
+               reader.loadBlockData(decoder)
+
+               // Verify span count matches
+               require.Equal(t, spanCount, len(reader.block.spanIDs),
+                       "Trace %s: span count mismatch between metadata (%d) 
and actual span IDs (%d)",
+                       traceID, spanCount, len(reader.block.spanIDs))
+               require.Equal(t, spanCount, len(reader.block.spans),
+                       "Trace %s: span count mismatch between span IDs (%d) 
and span data (%d)",
+                       traceID, len(reader.block.spanIDs), 
len(reader.block.spans))
+
+               // Verify each span has valid data
+               for i, span := range reader.block.spans {
+                       require.NotEmpty(t, span, "Trace %s: span %d should not 
be empty", traceID, i)
+                       require.NotEmpty(t, reader.block.spanIDs[i], "Trace %s: 
span ID %d should not be empty", traceID, i)
+               }
+
+               // Verify tags
+               require.NotNil(t, reader.block.tags, "Trace %s: tags should not 
be nil", traceID)
+               for _, tag := range reader.block.tags {
+                       require.NotEmpty(t, tag.name, "Trace %s: tag name 
should not be empty", traceID)
+                       require.Equal(t, spanCount, len(tag.values),
+                               "Trace %s: tag %s should have %d values, got 
%d",
+                               traceID, tag.name, spanCount, len(tag.values))
+
+                       // Verify each tag value is valid
+                       for i, value := range tag.values {
+                               require.NotNil(t, value, "Trace %s: tag %s 
value %d should not be nil", traceID, tag.name, i)
+                       }
+               }
+
+               // Verify timestamps metadata is valid
+               require.True(t, bm.timestamps.min <= bm.timestamps.max,
+                       "Trace %s: timestamp min (%d) should be <= max (%d)",
+                       traceID, bm.timestamps.min, bm.timestamps.max)
+
+               // Verify tag metadata consistency
+               require.NotNil(t, bm.tags, "Trace %s: tag metadata should not 
be nil", traceID)
+               require.Equal(t, len(bm.tags), len(reader.block.tags),
+                       "Trace %s: tag metadata count (%d) should match block 
tags count (%d)",
+                       traceID, len(bm.tags), len(reader.block.tags))
+       }
+       require.NoError(t, reader.error())
+
+       // Verify all expected traces are present with correct counts
+       for traceID, expectedCount := range expectedTraces {
+               actualCount, found := foundTraces[traceID]
+               require.True(t, found, "Trace ID %s not found in merged part 
%d", traceID, partID)
+               require.Equal(t, expectedCount, actualCount, "Trace ID %s has 
incorrect span count in part %d", traceID, partID)
+       }
+
+       // Verify no unexpected traces exist
+       require.Equal(t, len(expectedTraces), len(foundTraces), "Part %d has 
unexpected number of trace IDs", partID)
+}
+
+func Test_multipleRoundMerges(t *testing.T) {
+       tests := []struct {
+               name   string
+               rounds []struct {
+                       expectedTraces map[string]int
+                       tsList         []*traces
+               }
+               want []blockMetadata
+       }{
+               {
+                       name: "Two rounds with overlapping trace IDs in both 
rounds",
+                       rounds: []struct {
+                               expectedTraces map[string]int
+                               tsList         []*traces
+                       }{
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace2"},
+                                                       timestamps: []int64{1, 
2},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1"), []byte("span2")},
+                                                       spanIDs: 
[]string{"span1", "span2"},
+                                               },
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace3"},
+                                                       timestamps: []int64{3, 
4},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1b"), []byte("span3")},
+                                                       spanIDs: 
[]string{"span1b", "span3"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 2, // 1 from first 
part + 1 from second part
+                                               "trace2": 1, // 1 from first 
part
+                                               "trace3": 1, // 1 from second 
part
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace2"},
+                                                       timestamps: []int64{5, 
6},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1c"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1c"), []byte("span2b")},
+                                                       spanIDs: 
[]string{"span1c", "span2b"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 3, // 2 from previous 
round + 1 new
+                                               "trace2": 2, // 1 from previous 
round + 1 new
+                                               "trace3": 1, // 1 from previous 
round (unchanged)
+                                       },
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 3, 
uncompressedSpanSizeBytes: 17},
+                               {traceID: "trace2", count: 2, 
uncompressedSpanSizeBytes: 11},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Two rounds with non-overlapping trace IDs",
+                       rounds: []struct {
+                               expectedTraces map[string]int
+                               tsList         []*traces
+                       }{
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceA", "traceB"},
+                                                       timestamps: []int64{1, 
2},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanA"), []byte("spanB")},
+                                                       spanIDs: 
[]string{"spanA", "spanB"},
+                                               },
+                                               {
+                                                       traceIDs:   
[]string{"traceC"},
+                                                       timestamps: []int64{3},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valC"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanC")},
+                                                       spanIDs: 
[]string{"spanC"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 1,
+                                               "traceB": 1,
+                                               "traceC": 1,
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceD", "traceE"},
+                                                       timestamps: []int64{4, 
5},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valD"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valE"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanD"), []byte("spanE")},
+                                                       spanIDs: 
[]string{"spanD", "spanE"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 1, // from previous 
round
+                                               "traceB": 1, // from previous 
round
+                                               "traceC": 1, // from previous 
round
+                                               "traceD": 1, // new
+                                               "traceE": 1, // new
+                                       },
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "traceA", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceB", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceC", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceD", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceE", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Three rounds - first round overlapping, 
subsequent rounds non-overlapping",
+                       rounds: []struct {
+                               expectedTraces map[string]int
+                               tsList         []*traces
+                       }{
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace2"},
+                                                       timestamps: []int64{1, 
2},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1"), []byte("span2")},
+                                                       spanIDs: 
[]string{"span1", "span2"},
+                                               },
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace2"},
+                                                       timestamps: []int64{3, 
4},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2b"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1b"), []byte("span2b")},
+                                                       spanIDs: 
[]string{"span1b", "span2b"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 2, // overlapping 
from 2 parts
+                                               "trace2": 2, // overlapping 
from 2 parts
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace3"},
+                                                       timestamps: []int64{5},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span3")},
+                                                       spanIDs: 
[]string{"span3"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 2, // from previous 
round
+                                               "trace2": 2, // from previous 
round
+                                               "trace3": 1, // new, 
non-overlapping
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace4"},
+                                                       timestamps: []int64{6},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span4")},
+                                                       spanIDs: 
[]string{"span4"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 2, // from previous 
rounds
+                                               "trace2": 2, // from previous 
rounds
+                                               "trace3": 1, // from previous 
round
+                                               "trace4": 1, // new, 
non-overlapping
+                                       },
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 2, 
uncompressedSpanSizeBytes: 11},
+                               {traceID: "trace2", count: 2, 
uncompressedSpanSizeBytes: 11},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace4", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Three rounds - overlapping in first, overlapping 
in second, non-overlapping in third",
+                       rounds: []struct {
+                               expectedTraces map[string]int
+                               tsList         []*traces
+                       }{
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace1"},
+                                                       timestamps: []int64{1},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1")},
+                                                       spanIDs: 
[]string{"span1"},
+                                               },
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace2"},
+                                                       timestamps: []int64{2, 
3},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1b"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val2"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1b"), []byte("span2")},
+                                                       spanIDs: 
[]string{"span1b", "span2"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 2, // overlapping 
from 2 parts
+                                               "trace2": 1, // from second part
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace1", "trace3"},
+                                                       timestamps: []int64{4, 
5},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val1c"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val3"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span1c"), []byte("span3")},
+                                                       spanIDs: 
[]string{"span1c", "span3"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 3, // 2 from previous 
+ 1 new (overlapping)
+                                               "trace2": 1, // from previous 
round
+                                               "trace3": 1, // new
+                                       },
+                               },
+                               {
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"trace4"},
+                                                       timestamps: []int64{6},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("val4"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("span4")},
+                                                       spanIDs: 
[]string{"span4"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "trace1": 3, // from previous 
rounds
+                                               "trace2": 1, // from previous 
rounds
+                                               "trace3": 1, // from previous 
round
+                                               "trace4": 1, // new, 
non-overlapping
+                                       },
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 3, 
uncompressedSpanSizeBytes: 17},
+                               {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace4", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Four rounds - complex overlapping pattern",
+                       rounds: []struct {
+                               expectedTraces map[string]int
+                               tsList         []*traces
+                       }{
+                               {
+                                       // Round 1: merge traceA and traceB 
with overlaps
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceA"},
+                                                       timestamps: []int64{1},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanA1")},
+                                                       spanIDs: 
[]string{"spanA1"},
+                                               },
+                                               {
+                                                       traceIDs:   
[]string{"traceA", "traceB"},
+                                                       timestamps: []int64{2, 
3},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valA2"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanA2"), []byte("spanB1")},
+                                                       spanIDs: 
[]string{"spanA2", "spanB1"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 2, // overlapping 
from 2 parts
+                                               "traceB": 1, // from second part
+                                       },
+                               },
+                               {
+                                       // Round 2: merge with traceB 
(overlapping) and traceC (new)
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceB", "traceC"},
+                                                       timestamps: []int64{4, 
5},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valB2"), valueArr: nil},
+                                                               },
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valC1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanB2"), []byte("spanC1")},
+                                                       spanIDs: 
[]string{"spanB2", "spanC1"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 2, // from previous 
round
+                                               "traceB": 2, // 1 from previous 
+ 1 new (overlapping)
+                                               "traceC": 1, // new
+                                       },
+                               },
+                               {
+                                       // Round 3: non-overlapping traces
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceD"},
+                                                       timestamps: []int64{6},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valD1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanD1")},
+                                                       spanIDs: 
[]string{"spanD1"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 2, // from previous 
rounds
+                                               "traceB": 2, // from previous 
rounds
+                                               "traceC": 1, // from previous 
round
+                                               "traceD": 1, // new, 
non-overlapping
+                                       },
+                               },
+                               {
+                                       // Round 4: non-overlapping traces
+                                       tsList: []*traces{
+                                               {
+                                                       traceIDs:   
[]string{"traceE"},
+                                                       timestamps: []int64{7},
+                                                       tags: [][]*tagValue{
+                                                               {
+                                                                       {tag: 
"tag1", valueType: pbv1.ValueTypeStr, value: []byte("valE1"), valueArr: nil},
+                                                               },
+                                                       },
+                                                       spans:   
[][]byte{[]byte("spanE1")},
+                                                       spanIDs: 
[]string{"spanE1"},
+                                               },
+                                       },
+                                       expectedTraces: map[string]int{
+                                               "traceA": 2, // from previous 
rounds
+                                               "traceB": 2, // from previous 
rounds
+                                               "traceC": 1, // from previous 
rounds
+                                               "traceD": 1, // from previous 
round
+                                               "traceE": 1, // new, 
non-overlapping
+                                       },
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "traceA", count: 2, 
uncompressedSpanSizeBytes: 12},
+                               {traceID: "traceB", count: 2, 
uncompressedSpanSizeBytes: 12},
+                               {traceID: "traceC", count: 1, 
uncompressedSpanSizeBytes: 6},
+                               {traceID: "traceD", count: 1, 
uncompressedSpanSizeBytes: 6},
+                               {traceID: "traceE", count: 1, 
uncompressedSpanSizeBytes: 6},
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       fileSystem := fs.NewLocalFileSystem()
+                       tst := &tsTable{pm: protector.Nop{}, fileSystem: 
fileSystem, root: tmpPath}
+
+                       var mergedPartID uint64
+                       partID := uint64(1)
+
+                       // Execute multiple merge rounds
+                       for roundIdx, round := range tt.rounds {
+                               t.Logf("Round %d: merging %d parts", 
roundIdx+1, len(round.tsList))
+
+                               var partsToMerge []*partWrapper
+
+                               // If we have a merged part from previous 
round, include it
+                               if roundIdx > 0 && mergedPartID != 0 {
+                                       // Reopen the merged part from the 
previous round
+                                       prevMergedPart := 
mustOpenFilePart(mergedPartID, tmpPath, fileSystem)
+                                       prevMergedPart.partMetadata.ID = 
mergedPartID
+                                       partsToMerge = append(partsToMerge, 
newPartWrapper(nil, prevMergedPart))
+                                       t.Logf("  Including previous merged 
part ID %d", mergedPartID)
+                               }
+
+                               // Create new parts for this round
+                               for i, traces := range round.tsList {
+                                       mp := generateMemPart()
+                                       mp.mustInitFromTraces(traces)
+                                       mp.mustFlush(fileSystem, 
partPath(tmpPath, partID))
+                                       p := mustOpenFilePart(partID, tmpPath, 
fileSystem)
+                                       p.partMetadata.ID = partID
+                                       partsToMerge = append(partsToMerge, 
newPartWrapper(nil, p))
+                                       releaseMemPart(mp)
+                                       partID++
+                                       t.Logf("  Created part %d with %d 
trace(s)", i+1, len(traces.traceIDs))
+                               }
+
+                               // Merge all parts for this round
+                               closeCh := make(chan struct{})
+                               mergedPart, err := tst.mergeParts(fileSystem, 
closeCh, partsToMerge, partID, tmpPath)
+                               close(closeCh)
+                               require.NoError(t, err, "Round %d merge 
failed", roundIdx+1)
+                               require.NotNil(t, mergedPart, "Round %d 
produced nil merged part", roundIdx+1)
+
+                               // Release all parts that were merged
+                               for _, pw := range partsToMerge {
+                                       pw.decRef()
+                               }
+
+                               // Store the merged part ID and release the 
wrapper
+                               mergedPartID = mergedPart.ID()
+                               mergedPart.decRef()
+                               partID++
+                               t.Logf("  Merged into part ID %d", mergedPartID)
+
+                               // Verify the merged part contains expected 
traces
+                               if round.expectedTraces != nil {
+                                       t.Logf("  Verifying merged part %d 
contains expected traces", mergedPartID)
+                                       verifyPartContainsTraces(t, 
mergedPartID, tmpPath, fileSystem, round.expectedTraces)
+                                       t.Logf("  ✓ Verification passed for 
round %d", roundIdx+1)
+                               }
+                       }
+
+                       // Verify final merged results by reopening the final 
merged part
+                       require.NotZero(t, mergedPartID, "No merged part 
produced")
+                       finalMergedPart := mustOpenFilePart(mergedPartID, 
tmpPath, fileSystem)
+                       finalPW := newPartWrapper(nil, finalMergedPart)
+                       defer finalPW.decRef()
+
+                       pmi := generatePartMergeIter()
+                       pmi.mustInitFromPart(finalPW.p)
+                       reader := generateBlockReader()
+                       reader.init([]*partMergeIter{pmi})
+
+                       var got []blockMetadata
+                       for reader.nextBlockMetadata() {
+                               got = append(got, reader.block.bm)
+                       }
+                       require.NoError(t, reader.error())
+                       releaseBlockReader(reader)
+                       releasePartMergeIter(pmi)
+
+                       // Verify block counts and trace IDs
+                       require.Len(t, got, len(tt.want), "Final result should 
have %d blocks", len(tt.want))
+                       for i, want := range tt.want {
+                               require.Equal(t, want.traceID, got[i].traceID, 
"Block %d trace ID mismatch", i)
+                               require.Equal(t, want.count, got[i].count, 
"Block %d count mismatch for trace %s", i, want.traceID)
+                               require.Equal(t, 
want.uncompressedSpanSizeBytes, got[i].uncompressedSpanSizeBytes,
+                                       "Block %d uncompressed size mismatch 
for trace %s", i, want.traceID)
+                       }
+               })
+       }
+}
+
 func Test_mergeParts(t *testing.T) {
        tests := []struct {
                wantErr error
@@ -151,6 +1010,177 @@ func Test_mergeParts(t *testing.T) {
                                {traceID: "trace3", count: 2, 
uncompressedSpanSizeBytes: 10},
                        },
                },
+               {
+                       name: "Test with different trace IDs - should use fast 
path (readRaw/WriteRawBlock)",
+                       tsList: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1", 
"trace2"},
+                                       timestamps: []int64{1, 1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span2")},
+                                       spanIDs: []string{"span1", "span2"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace3", 
"trace4"},
+                                       timestamps: []int64{2, 2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span3"), 
[]byte("span4")},
+                                       spanIDs: []string{"span3", "span4"},
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace4", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Test with completely non-overlapping trace IDs 
across 3 parts - fast path",
+                       tsList: []*traces{
+                               {
+                                       traceIDs:   []string{"traceA"},
+                                       timestamps: []int64{1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("valueA"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("spanA")},
+                                       spanIDs: []string{"spanA"},
+                               },
+                               {
+                                       traceIDs:   []string{"traceB"},
+                                       timestamps: []int64{2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("valueB"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("spanB")},
+                                       spanIDs: []string{"spanB"},
+                               },
+                               {
+                                       traceIDs:   []string{"traceC"},
+                                       timestamps: []int64{3},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("valueC"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("spanC")},
+                                       spanIDs: []string{"spanC"},
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "traceA", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceB", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "traceC", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Test with interleaved non-overlapping trace IDs 
- fast path",
+                       tsList: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1", 
"trace3", "trace5"},
+                                       timestamps: []int64{1, 1, 1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value5"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span3"), []byte("span5")},
+                                       spanIDs: []string{"span1", "span3", 
"span5"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace2", 
"trace4", "trace6"},
+                                       timestamps: []int64{2, 2, 2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value6"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span2"), 
[]byte("span4"), []byte("span6")},
+                                       spanIDs: []string{"span2", "span4", 
"span6"},
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace4", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace5", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace6", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
+               {
+                       name: "Test with mixed same and different trace IDs - 
both fast and slow paths",
+                       tsList: []*traces{
+                               {
+                                       traceIDs:   []string{"trace1", 
"trace2", "trace3"},
+                                       timestamps: []int64{1, 1, 1},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value2"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value3"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1"), 
[]byte("span2"), []byte("span3")},
+                                       spanIDs: []string{"span1", "span2", 
"span3"},
+                               },
+                               {
+                                       traceIDs:   []string{"trace1", 
"trace4"},
+                                       timestamps: []int64{2, 2},
+                                       tags: [][]*tagValue{
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value1b"), valueArr: nil},
+                                               },
+                                               {
+                                                       {tag: "strTag", 
valueType: pbv1.ValueTypeStr, value: []byte("value4"), valueArr: nil},
+                                               },
+                                       },
+                                       spans:   [][]byte{[]byte("span1b"), 
[]byte("span4")},
+                                       spanIDs: []string{"span1b", "span4"},
+                               },
+                       },
+                       want: []blockMetadata{
+                               {traceID: "trace1", count: 2, 
uncompressedSpanSizeBytes: 11},
+                               {traceID: "trace2", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace3", count: 1, 
uncompressedSpanSizeBytes: 5},
+                               {traceID: "trace4", count: 1, 
uncompressedSpanSizeBytes: 5},
+                       },
+               },
        }
 
        for _, tt := range tests {
diff --git a/banyand/trace/part_iter.go b/banyand/trace/part_iter.go
index d10b7620..c2cb3cee 100644
--- a/banyand/trace/part_iter.go
+++ b/banyand/trace/part_iter.go
@@ -334,10 +334,90 @@ func (pmi *partMergeIter) loadBlockMetadata() error {
        return nil
 }
 
+func (pmi *partMergeIter) peekBlockMetadata() (*blockMetadata, bool) {
+       if len(pmi.primaryBuf) == 0 {
+               return nil, false
+       }
+       bm := generateBlockMetadata()
+       _, err := bm.unmarshal(pmi.primaryBuf, pmi.tagType)
+       if err != nil {
+               return nil, false
+       }
+       return bm, true
+}
+
 func (pmi *partMergeIter) mustLoadBlockData(decoder 
*encoding.BytesBlockDecoder, block *blockPointer) {
        block.block.mustSeqReadFrom(decoder, &pmi.seqReaders, pmi.block.bm)
 }
 
+func (pmi *partMergeIter) mustReadRaw(r *rawBlock, bm *blockMetadata) {
+       r.bm = bm
+       // spans
+       if bm.spans != nil && bm.spans.size > 0 {
+               // Validate the reader is aligned to the expected offset
+               if bm.spans.offset != pmi.seqReaders.spans.bytesRead {
+                       logger.Panicf("offset %d must be equal to bytesRead 
%d", bm.spans.offset, pmi.seqReaders.spans.bytesRead)
+               }
+               r.spans = bytes.ResizeOver(r.spans[:0], int(bm.spans.size))
+               pmi.seqReaders.spans.mustReadFull(r.spans)
+       } else {
+               r.spans = r.spans[:0]
+       }
+       // tags
+       if len(bm.tags) > 0 {
+               if r.tags == nil {
+                       r.tags = make(map[string][]byte, len(bm.tags))
+               }
+               if r.tagMetadata == nil {
+                       r.tagMetadata = make(map[string][]byte, len(bm.tags))
+               }
+               for name, db := range bm.tags {
+                       // read tag metadata block
+                       // Validate the tag metadata reader alignment
+                       if db.offset != 
pmi.seqReaders.tagMetadata[name].bytesRead {
+                               logger.Panicf("offset %d must be equal to 
bytesRead %d", db.offset, pmi.seqReaders.tagMetadata[name].bytesRead)
+                       }
+                       mb := r.tagMetadata[name]
+                       mb = bytes.ResizeOver(mb[:0], int(db.size))
+                       pmi.seqReaders.tagMetadata[name].mustReadFull(mb)
+                       r.tagMetadata[name] = mb
+                       // parse to locate tag values range
+                       tm := generateTagMetadata()
+                       if err := tm.unmarshal(mb); err != nil {
+                               logger.Panicf("cannot unmarshal tag metadata: 
%v", err)
+                       }
+                       // read tag values
+                       // Validate the tag values reader alignment
+                       if tm.offset != pmi.seqReaders.tags[name].bytesRead {
+                               logger.Panicf("offset %d must be equal to 
bytesRead %d", tm.offset, pmi.seqReaders.tags[name].bytesRead)
+                       }
+                       vb := r.tags[name]
+                       vb = bytes.ResizeOver(vb[:0], int(tm.size))
+                       pmi.seqReaders.tags[name].mustReadFull(vb)
+                       r.tags[name] = vb
+                       releaseTagMetadata(tm)
+               }
+               for k := range r.tags {
+                       if _, ok := bm.tags[k]; !ok {
+                               delete(r.tags, k)
+                       }
+               }
+               for k := range r.tagMetadata {
+                       if _, ok := bm.tags[k]; !ok {
+                               delete(r.tagMetadata, k)
+                       }
+               }
+       } else {
+               // Clear maps for tags that are not present
+               for k := range r.tags {
+                       delete(r.tags, k)
+               }
+               for k := range r.tagMetadata {
+                       delete(r.tagMetadata, k)
+               }
+       }
+}
+
 func generatePartMergeIter() *partMergeIter {
        v := pmiPool.Get()
        if v == nil {
diff --git a/banyand/trace/tstable_test.go b/banyand/trace/tstable_test.go
index 6652aa9c..c9ac7b5b 100644
--- a/banyand/trace/tstable_test.go
+++ b/banyand/trace/tstable_test.go
@@ -19,6 +19,7 @@ package trace
 
 import (
        "errors"
+       "fmt"
        "testing"
        "time"
 
@@ -305,3 +306,65 @@ func generateHugeTraces(num int) *traces {
        traces.spanIDs = append(traces.spanIDs, []string{"span2", "span3"}...)
        return traces
 }
+
+// generateRealisticTraces creates a more realistic dataset for benchmarking:
+// - Each trace has 3-5 spans
+// - Each span is more than 100KB
+// - Creates the specified number of unique traces.
+func generateRealisticTraces(numTraces int) *traces {
+       traces := &traces{
+               traceIDs:   []string{},
+               timestamps: []int64{},
+               tags:       [][]*tagValue{},
+               spans:      [][]byte{},
+               spanIDs:    []string{},
+       }
+
+       // Create a large span payload (>100 KiB)
+       // Using a mix of realistic data: stack traces, error messages, metadata
+       const spanPayloadSizeKiB = 110 // 110 KiB (1 KiB = 1024 bytes)
+       spanPayloadTemplate := make([]byte, spanPayloadSizeKiB*1024)
+       for i := range spanPayloadTemplate {
+               // Fill with semi-realistic data
+               spanPayloadTemplate[i] = byte('A' + (i % 26))
+       }
+
+       timestamp := int64(1000000)
+
+       for traceIdx := 0; traceIdx < numTraces; traceIdx++ {
+               traceID := fmt.Sprintf("trace_%d", traceIdx)
+
+               // Each trace has 3-5 spans
+               numSpans := 3 + (traceIdx % 3) // Will give 3, 4, or 5 spans
+
+               for spanIdx := 0; spanIdx < numSpans; spanIdx++ {
+                       spanID := fmt.Sprintf("%s_span_%d", traceID, spanIdx)
+
+                       // Create a unique span payload by appending 
span-specific data
+                       spanPayload := make([]byte, len(spanPayloadTemplate))
+                       copy(spanPayload, spanPayloadTemplate)
+                       // Add span-specific suffix to ensure uniqueness
+                       suffix := []byte(fmt.Sprintf("_trace_%d_span_%d", 
traceIdx, spanIdx))
+                       copy(spanPayload[len(spanPayload)-len(suffix):], suffix)
+
+                       traces.traceIDs = append(traces.traceIDs, traceID)
+                       traces.timestamps = append(traces.timestamps, timestamp)
+                       timestamp++
+
+                       // Add realistic tags
+                       traces.tags = append(traces.tags, []*tagValue{
+                               {tag: "http.method", valueType: 
pbv1.ValueTypeStr, value: []byte("POST"), valueArr: nil},
+                               {tag: "http.status", valueType: 
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(200), valueArr: nil},
+                               {tag: "http.url", valueType: pbv1.ValueTypeStr, 
value: []byte(fmt.Sprintf("/api/v1/trace/%d", traceIdx)), valueArr: nil},
+                               {tag: "service.name", valueType: 
pbv1.ValueTypeStr, value: []byte("test-service"), valueArr: nil},
+                               {tag: "span.kind", valueType: 
pbv1.ValueTypeStr, value: []byte("server"), valueArr: nil},
+                               {tag: "error", valueType: 
pbv1.ValueTypeBinaryData, value: spanPayload[:1024], valueArr: nil}, // Use 
part of payload for error
+                       })
+
+                       traces.spans = append(traces.spans, spanPayload)
+                       traces.spanIDs = append(traces.spanIDs, spanID)
+               }
+       }
+
+       return traces
+}

Reply via email to