This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new f6f1f6ce Improve sidx scan efficiency (#844)
f6f1f6ce is described below

commit f6f1f6cee1230d00ecc06a879ced2e8e140c5035
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Nov 14 09:34:23 2025 +0800

    Improve sidx scan efficiency (#844)
---
 banyand/internal/encoding/tag_encoder.go    |  27 ++
 banyand/internal/sidx/block.go              | 157 ++++++---
 banyand/internal/sidx/block_scanner.go      | 146 +++++----
 banyand/internal/sidx/block_test.go         |  15 +-
 banyand/internal/sidx/element.go            |  40 ++-
 banyand/internal/sidx/iter.go               | 205 ++++++++----
 banyand/internal/sidx/iter_test.go          | 311 ++++++++++--------
 banyand/internal/sidx/merge_test.go         |  83 ++---
 banyand/internal/sidx/part_key_iter.go      | 489 ++++++++++++++++++++++++++++
 banyand/internal/sidx/part_key_iter_test.go | 483 +++++++++++++++++++++++++++
 banyand/internal/sidx/query.go              |  49 ++-
 banyand/internal/sidx/query_result.go       |  49 ++-
 banyand/internal/sidx/query_test.go         |  41 ++-
 banyand/internal/sidx/sidx.go               | 276 ++++++++++------
 banyand/internal/sidx/tag.go                | 118 ++++---
 banyand/internal/sidx/tag_filter_op.go      |   4 +-
 banyand/internal/sidx/tag_test.go           |  98 ++++++
 banyand/stream/block.go                     |   2 +-
 banyand/stream/tag_filter.go                |   4 +-
 banyand/trace/block_writer.go               |   4 +-
 banyand/trace/bloom_filter.go               |   4 +-
 banyand/trace/query.go                      |   5 +-
 banyand/trace/snapshot_test.go              |   6 +-
 banyand/trace/traces.go                     |  27 --
 pkg/filter/bloom_filter.go                  |  41 ++-
 pkg/filter/bloom_filter_test.go             |  16 +
 26 files changed, 2080 insertions(+), 620 deletions(-)

diff --git a/banyand/internal/encoding/tag_encoder.go 
b/banyand/internal/encoding/tag_encoder.go
index 4800572d..2e375367 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -22,6 +22,7 @@ package encoding
 
 import (
        stdbytes "bytes"
+       "errors"
 
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -113,6 +114,32 @@ func MarshalVarArray(dest, src []byte) []byte {
        return dest
 }
 
+// UnmarshalVarArray unmarshals a variable-length array into a byte slice.
+func UnmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
+       if len(src) == 0 {
+               return nil, nil, errors.New("empty entity value")
+       }
+       if src[0] == EntityDelimiter {
+               return dest, src[1:], nil
+       }
+       for len(src) > 0 {
+               switch {
+               case src[0] == Escape:
+                       if len(src) < 2 {
+                               return nil, nil, errors.New("invalid escape 
character")
+                       }
+                       src = src[1:]
+                       dest = append(dest, src[0])
+               case src[0] == EntityDelimiter:
+                       return dest, src[1:], nil
+               default:
+                       dest = append(dest, src[0])
+               }
+               src = src[1:]
+       }
+       return nil, nil, errors.New("invalid variable array")
+}
+
 // EncodeTagValues encodes tag values based on the value type with optimal 
compression.
 // For int64: uses delta encoding with first value storage.
 // For float64: converts to decimal integers with exponent, then delta 
encoding.
diff --git a/banyand/internal/sidx/block.go b/banyand/internal/sidx/block.go
index bfd592b3..e5e77d2f 100644
--- a/banyand/internal/sidx/block.go
+++ b/banyand/internal/sidx/block.go
@@ -25,6 +25,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -109,7 +110,7 @@ func (b *block) mustInitFromTags(elementTags [][]*tag) {
 func (b *block) processTag(tagName string, elementTags [][]*tag) {
        td := generateTagData()
        td.name = tagName
-       td.values = make([][]byte, len(b.userKeys))
+       td.values = make([]tagRow, len(b.userKeys))
 
        var valueType pbv1.ValueType
        // Collect values for this tag across all elements
@@ -117,43 +118,25 @@ func (b *block) processTag(tagName string, elementTags 
[][]*tag) {
                found := false
                for _, tag := range tags {
                        if tag.name == tagName {
-                               td.values[i] = tag.marshal()
+                               // Store structured tagRow instead of marshaled 
bytes
+                               if tag.valueArr != nil {
+                                       td.values[i].valueArr = tag.valueArr
+                               } else {
+                                       td.values[i].value = tag.value
+                               }
                                valueType = tag.valueType
                                found = true
                                break
                        }
                }
                if !found {
-                       td.values[i] = nil // Missing tag value
+                       // Missing tag value - leave as zero value
+                       td.values[i] = tagRow{}
                }
        }
 
        td.valueType = valueType
 
-       // Create bloom filter for indexed tags
-       td.filter = generateBloomFilter(len(b.userKeys))
-       for _, tags := range elementTags {
-               for _, tag := range tags {
-                       if tag.name == tagName {
-                               if tag.valueArr != nil {
-                                       for _, v := range tag.valueArr {
-                                               if v != nil {
-                                                       td.filter.Add(v)
-                                               }
-                                       }
-                               } else if tag.value != nil {
-                                       td.filter.Add(tag.value)
-                               }
-                               break
-                       }
-               }
-       }
-
-       // Update min/max for int64 tags
-       if valueType == pbv1.ValueTypeInt64 {
-               td.updateMinMax()
-       }
-
        b.tags[tagName] = td
 }
 
@@ -197,9 +180,13 @@ func (b *block) uncompressedSizeBytes() uint64 {
        // Add tag data sizes
        for tagName, tagData := range b.tags {
                nameSize := uint64(len(tagName))
-               for _, value := range tagData.values {
-                       if value != nil {
-                               size += nameSize + uint64(len(value))
+               for _, row := range tagData.values {
+                       if row.valueArr != nil {
+                               for _, v := range row.valueArr {
+                                       size += nameSize + uint64(len(v))
+                               }
+                       } else if row.value != nil {
+                               size += nameSize + uint64(len(row.value))
                        }
                }
        }
@@ -267,6 +254,16 @@ func (b *block) mustWriteTag(tagName string, td *tagData, 
bm *blockMetadata, ww
        tm.name = tagName
        tm.valueType = td.valueType
 
+       // Marshal tagRow values to tmpBytes buffer for encoding
+       if cap(td.tmpBytes) < len(td.values) {
+               td.tmpBytes = make([][]byte, len(td.values))
+       } else {
+               td.tmpBytes = td.tmpBytes[:len(td.values)]
+       }
+       for i := range td.values {
+               td.tmpBytes[i] = marshalTagRow(&td.values[i], td.valueType)
+       }
+
        // Write tag values to data file
        bb := bigValuePool.Get()
        if bb == nil {
@@ -278,7 +275,7 @@ func (b *block) mustWriteTag(tagName string, td *tagData, 
bm *blockMetadata, ww
        }()
 
        // Encode tag values using the encoding module
-       err := internalencoding.EncodeTagValues(bb, td.values, td.valueType)
+       err := internalencoding.EncodeTagValues(bb, td.tmpBytes, td.valueType)
        if err != nil {
                panic(fmt.Sprintf("failed to encode tag values: %v", err))
        }
@@ -288,18 +285,82 @@ func (b *block) mustWriteTag(tagName string, td *tagData, 
bm *blockMetadata, ww
        tm.dataBlock.size = uint64(len(bb.Buf))
        tdw.MustWrite(bb.Buf)
 
-       // Write bloom filter
-       if td.filter != nil {
-               filterData := encodeBloomFilter(nil, td.filter)
-               tm.filterBlock.offset = tfw.bytesWritten
-               tm.filterBlock.size = uint64(len(filterData))
-               tfw.MustWrite(filterData)
+       // Create and write bloom filter at write time using unique values
+       uniqueValues := td.uniqueValues
+       if uniqueValues == nil {
+               uniqueValues = make(map[string]struct{})
+               td.uniqueValues = uniqueValues
+       } else {
+               for k := range uniqueValues {
+                       delete(uniqueValues, k)
+               }
        }
 
-       // Set min/max for int64 tags
-       if td.valueType == pbv1.ValueTypeInt64 {
-               tm.min = td.min
-               tm.max = td.max
+       var (
+               minVal    int64
+               maxVal    int64
+               hasMinMax bool
+       )
+
+       updateMinMax := func(v []byte) {
+               if td.valueType != pbv1.ValueTypeInt64 {
+                       return
+               }
+               if len(v) != 8 {
+                       return
+               }
+               val := encoding.BytesToInt64(v)
+               if !hasMinMax {
+                       minVal = val
+                       maxVal = val
+                       hasMinMax = true
+                       return
+               }
+               if val < minVal {
+                       minVal = val
+               }
+               if val > maxVal {
+                       maxVal = val
+               }
+       }
+
+       addUnique := func(v []byte) {
+               if v == nil {
+                       return
+               }
+               updateMinMax(v)
+               key := convert.BytesToString(v)
+               if _, exists := uniqueValues[key]; exists {
+                       return
+               }
+               uniqueValues[key] = struct{}{}
+       }
+
+       for i := range td.values {
+               if td.values[i].valueArr != nil {
+                       for _, v := range td.values[i].valueArr {
+                               addUnique(v)
+                       }
+                       continue
+               }
+               addUnique(td.values[i].value)
+       }
+
+       bf := generateBloomFilter(len(uniqueValues))
+       for v := range uniqueValues {
+               bf.Add(convert.StringToBytes(v))
+       }
+
+       bb.Buf = encodeBloomFilter(bb.Buf[:0], bf)
+       tm.filterBlock.offset = tfw.bytesWritten
+       tm.filterBlock.size = uint64(len(bb.Buf))
+       tfw.MustWrite(bb.Buf)
+       releaseBloomFilter(bf)
+
+       // Compute min/max for int64 tags during unique value iteration
+       if td.valueType == pbv1.ValueTypeInt64 && hasMinMax {
+               tm.min = encoding.Int64ToBytes(nil, minVal)
+               tm.max = encoding.Int64ToBytes(nil, maxVal)
        }
 
        // Marshal and write tag metadata
@@ -432,7 +493,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
                for _, t := range b.tags {
                        newTagData := tagData{name: t.name, valueType: 
t.valueType}
                        for j := 0; j < existDataSize; j++ {
-                               newTagData.values = append(newTagData.values, 
nil)
+                               newTagData.values = append(newTagData.values, 
tagRow{})
                        }
                        assertIdxAndOffset(t.name, len(t.values), b.idx, offset)
                        newTagData.values = append(newTagData.values, 
t.values[b.idx:offset]...)
@@ -448,7 +509,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
                } else {
                        newTagData := tagData{name: t.name, valueType: 
t.valueType}
                        for j := 0; j < existDataSize; j++ {
-                               newTagData.values = append(newTagData.values, 
nil)
+                               newTagData.values = append(newTagData.values, 
tagRow{})
                        }
                        assertIdxAndOffset(t.name, len(t.values), b.idx, offset)
                        newTagData.values = append(newTagData.values, 
t.values[b.idx:offset]...)
@@ -465,7 +526,7 @@ func fullTagAppend(bi, b *blockPointer, offset int) {
        for _, t := range bi.tags {
                if _, exists := sourceTags[t.name]; !exists {
                        for j := 0; j < emptySize; j++ {
-                               bi.tags[t.name].values = 
append(bi.tags[t.name].values, nil)
+                               bi.tags[t.name].values = 
append(bi.tags[t.name].values, tagRow{})
                        }
                }
        }
@@ -608,11 +669,13 @@ func (b *block) readSingleTag(decoder 
*encoding.BytesBlockDecoder, sr *seqReader
        td := generateTagData()
        td.name = tagName
        td.valueType = tm.valueType
-       td.values, err = internalencoding.DecodeTagValues(td.values[:0], 
decoder, bb, tm.valueType, count)
-       if err != nil {
+
+       // Decode and convert tag values using common helper
+       if err := decodeAndConvertTagValues(td, decoder, bb, tm.valueType, 
count); err != nil {
                releaseTagData(td)
-               return fmt.Errorf("cannot decode tag values: %w", err)
+               return err
        }
+
        b.tags[tagName] = td
        return nil
 }
diff --git a/banyand/internal/sidx/block_scanner.go 
b/banyand/internal/sidx/block_scanner.go
index 5cedd7fe..a7b39daa 100644
--- a/banyand/internal/sidx/block_scanner.go
+++ b/banyand/internal/sidx/block_scanner.go
@@ -84,6 +84,7 @@ type blockScanner struct {
        minKey     int64
        maxKey     int64
        asc        bool
+       batchSize  int
 }
 
 func (bsn *blockScanner) scan(ctx context.Context, blockCh chan 
*blockScanResultBatch) {
@@ -91,85 +92,59 @@ func (bsn *blockScanner) scan(ctx context.Context, blockCh 
chan *blockScanResult
                return
        }
 
-       // Check for context cancellation before starting expensive operations
-       select {
-       case <-ctx.Done():
+       if !bsn.checkContext(ctx) {
                return
-       default:
        }
 
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
-
        it := generateIter()
        defer releaseIter(it)
 
-       it.init(bma, bsn.parts, bsn.seriesIDs, bsn.minKey, bsn.maxKey, 
bsn.filter)
+       it.init(bsn.parts, bsn.seriesIDs, bsn.minKey, bsn.maxKey, bsn.filter, 
bsn.asc)
 
        batch := generateBlockScanResultBatch()
        if it.Error() != nil {
                batch.err = fmt.Errorf("cannot init iter: %w", it.Error())
-               select {
-               case blockCh <- batch:
-               case <-ctx.Done():
-                       releaseBlockScanResultBatch(batch)
-               }
+               bsn.sendBatch(ctx, blockCh, batch)
                return
        }
 
+       batchThreshold := bsn.batchSize
+       if batchThreshold <= 0 {
+               batchThreshold = blockScannerBatchSize
+       }
+
        var totalBlockBytes uint64
        for it.nextBlock() {
-               // Check for context cancellation during iteration
-               select {
-               case <-ctx.Done():
+               if !bsn.checkContext(ctx) {
                        releaseBlockScanResultBatch(batch)
                        return
-               default:
                }
 
-               p := it.piHeap[0]
+               bm, p := it.current()
+               if err := bsn.validateBlockMetadata(bm, p, it); err != nil {
+                       batch.err = err
+                       bsn.sendBatch(ctx, blockCh, batch)
+                       return
+               }
 
-               // Get block size before adding to batch
-               blockSize := p.curBlock.uncompressedSize
+               blockSize := bm.uncompressedSize
 
                // Check if adding this block would exceed quota
-               quota := bsn.pm.AvailableBytes()
-               if quota >= 0 && totalBlockBytes+blockSize > uint64(quota) {
-                       if len(batch.bss) > 0 {
-                               // Send current batch without error
-                               select {
-                               case blockCh <- batch:
-                               case <-ctx.Done():
-                                       releaseBlockScanResultBatch(batch)
-                               }
-                               return
-                       }
-                       // Batch is empty, send error
-                       err := fmt.Errorf("sidx block scan quota exceeded: 
block size %s, quota is %s", humanize.Bytes(blockSize), 
humanize.Bytes(uint64(quota)))
-                       batch.err = err
-                       select {
-                       case blockCh <- batch:
-                       case <-ctx.Done():
-                               releaseBlockScanResultBatch(batch)
-                               return
+               if exceeded, err := bsn.checkQuotaExceeded(totalBlockBytes, 
blockSize, batch); exceeded {
+                       if err != nil {
+                               batch.err = err
                        }
+                       bsn.sendBatch(ctx, blockCh, batch)
                        return
                }
 
                // Quota OK, add block to batch
-               batch.bss = append(batch.bss, blockScanResult{
-                       p: p.p,
-               })
-               bs := &batch.bss[len(batch.bss)-1]
-               bs.bm.copyFrom(p.curBlock)
+               bsn.addBlockToBatch(batch, bm, p)
                totalBlockBytes += blockSize
 
                // Check if batch is full
-               if len(batch.bss) >= cap(batch.bss) {
-                       select {
-                       case blockCh <- batch:
-                       case <-ctx.Done():
-                               releaseBlockScanResultBatch(batch)
+               if len(batch.bss) >= batchThreshold || len(batch.bss) >= 
cap(batch.bss) {
+                       if !bsn.sendBatch(ctx, blockCh, batch) {
                                if dl := bsn.l.Debug(); dl.Enabled() {
                                        dl.Int("batch.len", 
len(batch.bss)).Msg("context canceled while sending block")
                                }
@@ -181,26 +156,79 @@ func (bsn *blockScanner) scan(ctx context.Context, 
blockCh chan *blockScanResult
 
        if it.Error() != nil {
                batch.err = fmt.Errorf("cannot iterate iter: %w", it.Error())
-               select {
-               case blockCh <- batch:
-               case <-ctx.Done():
-                       releaseBlockScanResultBatch(batch)
-               }
+               bsn.sendBatch(ctx, blockCh, batch)
                return
        }
 
        if len(batch.bss) > 0 {
-               select {
-               case blockCh <- batch:
-               case <-ctx.Done():
-                       releaseBlockScanResultBatch(batch)
-               }
+               bsn.sendBatch(ctx, blockCh, batch)
                return
        }
 
        releaseBlockScanResultBatch(batch)
 }
 
+// checkContext returns false if context is canceled.
+func (bsn *blockScanner) checkContext(ctx context.Context) bool {
+       select {
+       case <-ctx.Done():
+               return false
+       default:
+               return true
+       }
+}
+
+// sendBatch sends a batch to the channel, handling context cancellation.
+// Returns false if context was canceled, true otherwise.
+func (bsn *blockScanner) sendBatch(ctx context.Context, blockCh chan 
*blockScanResultBatch, batch *blockScanResultBatch) bool {
+       select {
+       case blockCh <- batch:
+               return true
+       case <-ctx.Done():
+               releaseBlockScanResultBatch(batch)
+               return false
+       }
+}
+
+// validateBlockMetadata checks if block metadata and part are valid.
+func (bsn *blockScanner) validateBlockMetadata(bm *blockMetadata, p *part, it 
*iter) error {
+       if bm == nil {
+               it.err = fmt.Errorf("sidx iterator returned nil block")
+               return it.err
+       }
+       if p == nil {
+               it.err = fmt.Errorf("block missing part reference")
+               return it.err
+       }
+       return nil
+}
+
+// checkQuotaExceeded checks if adding a block would exceed the memory quota.
+// Returns (exceeded, error) where exceeded is true if quota would be exceeded.
+func (bsn *blockScanner) checkQuotaExceeded(totalBlockBytes, blockSize uint64, 
batch *blockScanResultBatch) (bool, error) {
+       quota := bsn.pm.AvailableBytes()
+       if quota < 0 || totalBlockBytes+blockSize <= uint64(quota) {
+               return false, nil
+       }
+
+       // Quota would be exceeded
+       if len(batch.bss) > 0 {
+               // Send current batch without error
+               return true, nil
+       }
+
+       // Batch is empty, return error
+       return true, fmt.Errorf("sidx block scan quota exceeded: block size %s, 
quota is %s",
+               humanize.Bytes(blockSize), humanize.Bytes(uint64(quota)))
+}
+
+// addBlockToBatch adds a block to the batch.
+func (bsn *blockScanner) addBlockToBatch(batch *blockScanResultBatch, bm 
*blockMetadata, p *part) {
+       batch.bss = append(batch.bss, blockScanResult{p: p})
+       bs := &batch.bss[len(batch.bss)-1]
+       bs.bm.copyFrom(bm)
+}
+
 func (bsn *blockScanner) close() {
        for i := range bsn.finalizers {
                bsn.finalizers[i]()
diff --git a/banyand/internal/sidx/block_test.go 
b/banyand/internal/sidx/block_test.go
index 64627ef4..7e96305f 100644
--- a/banyand/internal/sidx/block_test.go
+++ b/banyand/internal/sidx/block_test.go
@@ -158,9 +158,14 @@ func TestBlock_ProcessTag_WithArrValues(t *testing.T) {
 
        b.processTag("arr_tag", elementTags)
 
-       assert.Equal(t, "a|b|", string(b.tags["arr_tag"].values[0]))
-       assert.Equal(t, "c", string(b.tags["arr_tag"].values[1]))
-       assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("a")))
-       assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("b")))
-       assert.True(t, b.tags["arr_tag"].filter.MightContain([]byte("c")))
+       // Check the first element has valueArr
+       assert.NotNil(t, b.tags["arr_tag"].values[0].valueArr)
+       assert.Equal(t, 2, len(b.tags["arr_tag"].values[0].valueArr))
+       assert.Equal(t, "a", string(b.tags["arr_tag"].values[0].valueArr[0]))
+       assert.Equal(t, "b", string(b.tags["arr_tag"].values[0].valueArr[1]))
+
+       // Check the second element has value
+       assert.Equal(t, "c", string(b.tags["arr_tag"].values[1].value))
+
+       // Note: bloom filter is now created at write time, not during 
processTag
 }
diff --git a/banyand/internal/sidx/element.go b/banyand/internal/sidx/element.go
index d9987e7e..470174af 100644
--- a/banyand/internal/sidx/element.go
+++ b/banyand/internal/sidx/element.go
@@ -23,6 +23,7 @@ package sidx
 import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
@@ -51,20 +52,37 @@ func (t *tag) reset() {
        t.valueType = pbv1.ValueTypeUnknown
 }
 
-// marshal marshals the tag value to a byte slice.
-func (t *tag) marshal() []byte {
-       if t.valueArr != nil {
-               var dst []byte
-               for i := range t.valueArr {
-                       if t.valueType == pbv1.ValueTypeInt64Arr {
-                               dst = append(dst, t.valueArr[i]...)
-                               continue
+func unmarshalTag(dest [][]byte, src []byte, valueType pbv1.ValueType) 
([][]byte, error) {
+       if valueType == pbv1.ValueTypeInt64Arr {
+               for i := 0; i < len(src); i += 8 {
+                       dest = append(dest, src[i:i+8])
+               }
+               return dest, nil
+       }
+       if valueType == pbv1.ValueTypeStrArr {
+               bb := bigValuePool.Get()
+               if bb == nil {
+                       bb = &bytes.Buffer{}
+               }
+               defer func() {
+                       bb.Buf = bb.Buf[:0]
+                       bigValuePool.Put(bb)
+               }()
+               var err error
+               for len(src) > 0 {
+                       bb.Buf, src, err = 
encoding.UnmarshalVarArray(bb.Buf[:0], src)
+                       if err != nil {
+                               return nil, err
                        }
-                       dst = encoding.MarshalVarArray(dst, t.valueArr[i])
+                       // Make a copy since bb.Buf will be reused
+                       valueCopy := make([]byte, len(bb.Buf))
+                       copy(valueCopy, bb.Buf)
+                       dest = append(dest, valueCopy)
                }
-               return dst
+               return dest, nil
        }
-       return t.value
+       dest = append(dest, src)
+       return dest, nil
 }
 
 // reset elements collection for pooling.
diff --git a/banyand/internal/sidx/iter.go b/banyand/internal/sidx/iter.go
index dba836f2..e48bd4a6 100644
--- a/banyand/internal/sidx/iter.go
+++ b/banyand/internal/sidx/iter.go
@@ -29,11 +29,21 @@ import (
 )
 
 type iter struct {
-       err           error
-       parts         []*part
-       piPool        []partIter
-       piHeap        partIterHeap
-       nextBlockNoop bool
+       err       error
+       curBlock  *blockMetadata
+       curPart   *part
+       parts     []*part
+       partIters []*partKeyIter
+       heap      partKeyIterHeap
+       asc       bool
+}
+
+func (it *iter) releaseCurBlock() {
+       if it.curBlock != nil {
+               releaseBlockMetadata(it.curBlock)
+               it.curBlock = nil
+       }
+       it.curPart = nil
 }
 
 func (it *iter) reset() {
@@ -42,88 +52,132 @@ func (it *iter) reset() {
        }
        it.parts = it.parts[:0]
 
-       for i := range it.piPool {
-               it.piPool[i].reset()
+       for i := range it.partIters {
+               if it.partIters[i] != nil {
+                       releasePartKeyIter(it.partIters[i])
+                       it.partIters[i] = nil
+               }
        }
-       it.piPool = it.piPool[:0]
+       it.partIters = it.partIters[:0]
 
-       for i := range it.piHeap {
-               it.piHeap[i] = nil
+       for i := range it.heap {
+               it.heap[i] = nil
        }
-       it.piHeap = it.piHeap[:0]
+       it.heap = it.heap[:0]
+
+       it.releaseCurBlock()
 
        it.err = nil
-       it.nextBlockNoop = false
+       it.asc = false
 }
 
-func (it *iter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minKey, maxKey int64, blockFilter index.Filter) {
+func (it *iter) init(parts []*part, sids []common.SeriesID, minKey, maxKey 
int64, blockFilter index.Filter, asc bool) {
        it.reset()
-       it.parts = parts
+       it.parts = append(it.parts[:0], parts...)
+       it.asc = asc
 
-       if n := len(it.parts) - cap(it.piPool); n > 0 {
-               it.piPool = append(it.piPool[:cap(it.piPool)], make([]partIter, 
n)...)
-       }
-       it.piPool = it.piPool[:len(it.parts)]
-       for i, p := range it.parts {
-               it.piPool[i].init(bma, p, sids, minKey, maxKey, blockFilter)
+       if cap(it.partIters) < len(parts) {
+               it.partIters = make([]*partKeyIter, len(parts))
+       } else {
+               it.partIters = it.partIters[:len(parts)]
+               for i := range it.partIters {
+                       if it.partIters[i] != nil {
+                               releasePartKeyIter(it.partIters[i])
+                               it.partIters[i] = nil
+                       }
+               }
        }
 
-       it.piHeap = it.piHeap[:0]
-       for i := range it.piPool {
-               ps := &it.piPool[i]
-               if !ps.nextBlock() {
-                       if err := ps.error(); err != nil {
+       it.heap = it.heap[:0]
+
+       for i, p := range parts {
+               pki := generatePartKeyIter()
+               it.partIters[i] = pki
+
+               pki.init(p, sids, minKey, maxKey, blockFilter, asc)
+               if err := pki.error(); err != nil {
+                       if !errors.Is(err, io.EOF) {
+                               releasePartKeyIter(pki)
+                               it.partIters[i] = nil
                                it.err = fmt.Errorf("cannot initialize sidx 
iteration: %w", err)
                                return
                        }
+                       releasePartKeyIter(pki)
+                       it.partIters[i] = nil
                        continue
                }
-               it.piHeap = append(it.piHeap, ps)
+
+               if !pki.nextBlock() {
+                       if err := pki.error(); err != nil && !errors.Is(err, 
io.EOF) {
+                               releasePartKeyIter(pki)
+                               it.partIters[i] = nil
+                               it.err = fmt.Errorf("cannot initialize sidx 
iteration: %w", err)
+                               return
+                       }
+                       releasePartKeyIter(pki)
+                       it.partIters[i] = nil
+                       continue
+               }
+
+               bm, _ := pki.current()
+               if bm == nil {
+                       releasePartKeyIter(pki)
+                       it.partIters[i] = nil
+                       continue
+               }
+
+               it.heap = append(it.heap, pki)
        }
-       if len(it.piHeap) == 0 {
+
+       if len(it.heap) == 0 {
                it.err = io.EOF
                return
        }
-       heap.Init(&it.piHeap)
-       it.nextBlockNoop = true
+
+       heap.Init(&it.heap)
 }
 
 func (it *iter) nextBlock() bool {
        if it.err != nil {
                return false
        }
-       if it.nextBlockNoop {
-               it.nextBlockNoop = false
-               return true
-       }
 
-       it.err = it.next()
-       if it.err != nil {
-               if errors.Is(it.err, io.EOF) {
-                       it.err = fmt.Errorf("cannot obtain the next block to 
search in the partition: %w", it.err)
-               }
+       it.releaseCurBlock()
+
+       if len(it.heap) == 0 {
+               it.err = io.EOF
                return false
        }
-       return true
-}
 
-func (it *iter) next() error {
-       psMin := it.piHeap[0]
-       if psMin.nextBlock() {
-               heap.Fix(&it.piHeap, 0)
-               return nil
+       pki := heap.Pop(&it.heap).(*partKeyIter)
+       bm, p := pki.current()
+       if bm == nil {
+               it.releasePartIter(pki)
+               it.err = fmt.Errorf("partKeyIter has no current block")
+               return false
        }
 
-       if err := psMin.error(); err != nil {
-               return err
+       it.curBlock = generateBlockMetadata()
+       it.curBlock.copyFrom(bm)
+       it.curPart = p
+
+       if !pki.nextBlock() {
+               err := pki.error()
+               it.releasePartIter(pki)
+               if err != nil && !errors.Is(err, io.EOF) {
+                       it.releaseCurBlock()
+                       it.err = err
+                       return false
+               }
+       } else {
+               heap.Push(&it.heap, pki)
        }
 
-       heap.Pop(&it.piHeap)
+       return true
+}
 
-       if len(it.piHeap) == 0 {
-               return io.EOF
-       }
-       return nil
+func (it *iter) current() (*blockMetadata, *part) {
+       return it.curBlock, it.curPart
 }
 
 func (it *iter) Error() error {
@@ -148,29 +202,60 @@ func releaseIter(it *iter) {
 
 var iterPool = pool.Register[*iter]("sidx-iter")
 
-type partIterHeap []*partIter
+type partKeyIterHeap []*partKeyIter
 
-func (pih *partIterHeap) Len() int {
+func (pih *partKeyIterHeap) Len() int {
        return len(*pih)
 }
 
-func (pih *partIterHeap) Less(i, j int) bool {
+func (pih *partKeyIterHeap) Less(i, j int) bool {
        x := *pih
-       return x[i].curBlock.less(x[j].curBlock)
+       bmi, _ := x[i].current()
+       bmj, _ := x[j].current()
+       if bmi == nil {
+               return false
+       }
+       if bmj == nil {
+               return true
+       }
+       asc := true
+       if x[i] != nil {
+               asc = x[i].asc
+       } else if x[j] != nil {
+               asc = x[j].asc
+       }
+       if asc {
+               return bmi.lessByKey(bmj)
+       }
+       return bmj.lessByKey(bmi)
 }
 
-func (pih *partIterHeap) Swap(i, j int) {
+func (pih *partKeyIterHeap) Swap(i, j int) {
        x := *pih
        x[i], x[j] = x[j], x[i]
 }
 
-func (pih *partIterHeap) Push(x any) {
-       *pih = append(*pih, x.(*partIter))
+func (pih *partKeyIterHeap) Push(x any) {
+       *pih = append(*pih, x.(*partKeyIter))
 }
 
-func (pih *partIterHeap) Pop() any {
+func (pih *partKeyIterHeap) Pop() any {
        a := *pih
        v := a[len(a)-1]
        *pih = a[:len(a)-1]
        return v
 }
+
+func (it *iter) releasePartIter(p *partKeyIter) {
+       if p == nil {
+               return
+       }
+       for i := range it.partIters {
+               if it.partIters[i] == p {
+                       releasePartKeyIter(p)
+                       it.partIters[i] = nil
+                       return
+               }
+       }
+       releasePartKeyIter(p)
+}
diff --git a/banyand/internal/sidx/iter_test.go 
b/banyand/internal/sidx/iter_test.go
index 24d61235..fe4388ff 100644
--- a/banyand/internal/sidx/iter_test.go
+++ b/banyand/internal/sidx/iter_test.go
@@ -37,16 +37,7 @@ func TestIterComprehensive(t *testing.T) {
        tempDir := t.TempDir()
 
        // Test cases for comprehensive iterator testing
-       testCases := []struct {
-               blockFilter  index.Filter
-               name         string
-               parts        [][]testElement
-               querySids    []common.SeriesID
-               expectOrder  []blockExpectation
-               minKey       int64
-               maxKey       int64
-               expectBlocks int
-       }{
+       testCases := []iterTestCase{
                {
                        name: "single_part_single_block",
                        parts: [][]testElement{
@@ -215,8 +206,10 @@ func TestIterComprehensive(t *testing.T) {
 
        // Test both file-based and memory-based parts
        for _, partType := range []string{"file_based", "memory_based"} {
+               partType := partType
                t.Run(partType, func(t *testing.T) {
                        for _, tc := range testCases {
+                               tc := tc
                                t.Run(tc.name, func(t *testing.T) {
                                        var parts []*part
 
@@ -241,8 +234,29 @@ func TestIterComprehensive(t *testing.T) {
                                                parts = append(parts, testPart)
                                        }
 
-                                       // Test the iterator
-                                       runIteratorTest(t, tc, parts)
+                                       ascBlocks := runIteratorPass(t, tc, 
parts, true)
+                                       descBlocks := runIteratorPass(t, tc, 
parts, false)
+
+                                       assertSameBlocksIgnoreOrder(t, 
ascBlocks, descBlocks)
+
+                                       require.True(t, 
sort.SliceIsSorted(ascBlocks, func(i, j int) bool {
+                                               if ascBlocks[i].minKey == 
ascBlocks[j].minKey {
+                                                       return 
ascBlocks[i].seriesID <= ascBlocks[j].seriesID
+                                               }
+                                               return ascBlocks[i].minKey <= 
ascBlocks[j].minKey
+                                       }), "ascending pass should be ordered 
by non-decreasing minKey")
+
+                                       require.True(t, 
sort.SliceIsSorted(descBlocks, func(i, j int) bool {
+                                               if descBlocks[i].minKey == 
descBlocks[j].minKey {
+                                                       return 
descBlocks[i].seriesID >= descBlocks[j].seriesID
+                                               }
+                                               return descBlocks[i].minKey >= 
descBlocks[j].minKey
+                                       }), "descending pass should be ordered 
by non-increasing minKey")
+
+                                       if len(tc.expectOrder) > 0 {
+                                               require.Equal(t, 
tc.expectOrder, ascBlocks, "ascending pass order should match expectation")
+                                               require.Equal(t, 
reverseExpectations(tc.expectOrder), descBlocks, "descending pass should be 
reverse of expectation")
+                                       }
                                })
                        }
                })
@@ -254,15 +268,13 @@ func TestIterEdgeCases(t *testing.T) {
        tempDir := t.TempDir()
 
        t.Run("empty_parts_list", func(t *testing.T) {
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
-               it.init(bma, nil, []common.SeriesID{1, 2, 3}, 100, 200, nil)
-               assert.False(t, it.nextBlock())
-               assert.Nil(t, it.Error())
+               for _, asc := range []bool{true, false} {
+                       it := generateIter()
+                       it.init(nil, []common.SeriesID{1, 2, 3}, 100, 200, nil, 
asc)
+                       assert.False(t, it.nextBlock())
+                       assert.Nil(t, it.Error())
+                       releaseIter(it)
+               }
        })
 
        t.Run("empty_series_list", func(t *testing.T) {
@@ -281,15 +293,13 @@ func TestIterEdgeCases(t *testing.T) {
                testPart := mustOpenPart(1, partDir, testFS)
                defer testPart.close()
 
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
-               it.init(bma, []*part{testPart}, []common.SeriesID{}, 0, 1000, 
nil)
-               assert.False(t, it.nextBlock())
-               assert.Nil(t, it.Error())
+               for _, asc := range []bool{true, false} {
+                       it := generateIter()
+                       it.init([]*part{testPart}, []common.SeriesID{}, 0, 
1000, nil, asc)
+                       assert.False(t, it.nextBlock())
+                       assert.Nil(t, it.Error())
+                       releaseIter(it)
+               }
        })
 
        t.Run("no_matching_key_range", func(t *testing.T) {
@@ -322,16 +332,14 @@ func TestIterEdgeCases(t *testing.T) {
                testPart2 := mustOpenPart(2, partDir2, testFS)
                defer testPart2.close()
 
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
-               // Query range that doesn't overlap with any blocks
-               it.init(bma, []*part{testPart1, testPart2}, 
[]common.SeriesID{1, 2}, 400, 500, nil)
-               assert.False(t, it.nextBlock())
-               assert.Nil(t, it.Error())
+               for _, asc := range []bool{true, false} {
+                       it := generateIter()
+                       // Query range that doesn't overlap with any blocks
+                       it.init([]*part{testPart1, testPart2}, 
[]common.SeriesID{1, 2}, 400, 500, nil, asc)
+                       assert.False(t, it.nextBlock())
+                       assert.Nil(t, it.Error())
+                       releaseIter(it)
+               }
        })
 
        t.Run("single_part_single_block", func(t *testing.T) {
@@ -349,17 +357,15 @@ func TestIterEdgeCases(t *testing.T) {
                testPart := mustOpenPart(1, partDir, testFS)
                defer testPart.close()
 
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
-               it.init(bma, []*part{testPart}, []common.SeriesID{1}, 50, 150, 
nil)
+               for _, asc := range []bool{true, false} {
+                       it := generateIter()
+                       it.init([]*part{testPart}, []common.SeriesID{1}, 50, 
150, nil, asc)
 
-               assert.True(t, it.nextBlock())
-               assert.False(t, it.nextBlock()) // Should be only one block
-               assert.Nil(t, it.Error())
+                       assert.True(t, it.nextBlock())
+                       assert.False(t, it.nextBlock()) // Should be only one 
block
+                       assert.Nil(t, it.Error())
+                       releaseIter(it)
+               }
        })
 
        t.Run("block_filter_error", func(t *testing.T) {
@@ -375,20 +381,18 @@ func TestIterEdgeCases(t *testing.T) {
                testPart := openMemPart(mp)
                defer testPart.close()
 
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
                expectedErr := fmt.Errorf("test filter error")
                mockFilter := &mockBlockFilter{err: expectedErr}
 
-               it.init(bma, []*part{testPart}, []common.SeriesID{1}, 0, 200, 
mockFilter)
+               for _, asc := range []bool{true, false} {
+                       it := generateIter()
+                       it.init([]*part{testPart}, []common.SeriesID{1}, 0, 
200, mockFilter, asc)
 
-               assert.False(t, it.nextBlock())
-               assert.Error(t, it.Error())
-               assert.Contains(t, it.Error().Error(), "cannot initialize sidx 
iteration")
+                       assert.False(t, it.nextBlock())
+                       assert.Error(t, it.Error())
+                       assert.Contains(t, it.Error().Error(), "cannot 
initialize sidx iteration")
+                       releaseIter(it)
+               }
        })
 }
 
@@ -432,37 +436,41 @@ func TestIterOrdering(t *testing.T) {
                testPart2 := mustOpenPart(2, partDir2, testFS)
                defer testPart2.close()
 
-               bma := generateBlockMetadataArray()
-               defer releaseBlockMetadataArray(bma)
-
-               it := generateIter()
-               defer releaseIter(it)
-
-               it.init(bma, []*part{testPart1, testPart2}, 
[]common.SeriesID{1, 2, 3, 4, 5, 6}, 0, 1000, nil)
-
-               // Blocks should come in series ID order: 1, 2, 3, 4, 5, 6
-               var foundSeries []common.SeriesID
-               for it.nextBlock() {
-                       // Access the current block from the heap - need to be 
careful about heap structure
-                       if len(it.piHeap) > 0 {
-                               foundSeries = append(foundSeries, 
it.piHeap[0].curBlock.seriesID)
-                       }
+               tc := iterTestCase{
+                       name:         "interleaved_series_ordering",
+                       parts:        nil, // not used by helper during 
verification
+                       querySids:    []common.SeriesID{1, 2, 3, 4, 5, 6},
+                       minKey:       0,
+                       maxKey:       1000,
+                       blockFilter:  nil,
+                       expectBlocks: 6,
+                       expectOrder: []blockExpectation{
+                               {seriesID: 1, minKey: 100, maxKey: 100},
+                               {seriesID: 2, minKey: 200, maxKey: 200},
+                               {seriesID: 3, minKey: 300, maxKey: 300},
+                               {seriesID: 4, minKey: 400, maxKey: 400},
+                               {seriesID: 5, minKey: 500, maxKey: 500},
+                               {seriesID: 6, minKey: 600, maxKey: 600},
+                       },
                }
 
-               assert.NoError(t, it.Error())
+               ascBlocks := runIteratorPass(t, tc, []*part{testPart1, 
testPart2}, true)
+               descBlocks := runIteratorPass(t, tc, []*part{testPart1, 
testPart2}, false)
+
+               assertSameBlocksIgnoreOrder(t, ascBlocks, descBlocks)
+               require.Len(t, ascBlocks, 6)
 
-               // We expect to find all 6 series
-               assert.Equal(t, 6, len(foundSeries))
+               require.True(t, sort.SliceIsSorted(ascBlocks, func(i, j int) 
bool {
+                       return ascBlocks[i].seriesID <= ascBlocks[j].seriesID
+               }), "ascending iteration should retain increasing series order")
 
-               // Verify ordering
-               expectedOrder := []common.SeriesID{1, 2, 3, 4, 5, 6}
-               assert.True(t, sort.SliceIsSorted(foundSeries, func(i, j int) 
bool {
-                       return foundSeries[i] < foundSeries[j]
-               }), "found series should be in ascending order: %v", 
foundSeries)
+               require.True(t, sort.SliceIsSorted(descBlocks, func(i, j int) 
bool {
+                       return descBlocks[i].seriesID >= descBlocks[j].seriesID
+               }), "descending iteration should retain decreasing series 
order")
 
-               // All expected series should be found
-               for _, expectedSeries := range expectedOrder {
-                       assert.Contains(t, foundSeries, expectedSeries, "should 
find series %d", expectedSeries)
+               for _, expected := range tc.expectOrder {
+                       assert.Contains(t, ascBlocks, expected)
+                       assert.Contains(t, descBlocks, expected)
                }
        })
 }
@@ -491,15 +499,13 @@ func TestIterPoolOperations(t *testing.T) {
 
                // Set some state
                it.err = fmt.Errorf("test error")
-               it.nextBlockNoop = true
 
                // Reset should clear everything
                it.reset()
                assert.Nil(t, it.err)
                assert.Equal(t, 0, len(it.parts))
-               assert.Equal(t, 0, len(it.piPool))
-               assert.Equal(t, 0, len(it.piHeap))
-               assert.False(t, it.nextBlockNoop)
+               assert.Equal(t, 0, len(it.partIters))
+               assert.Equal(t, 0, len(it.heap))
        })
 }
 
@@ -525,18 +531,60 @@ func TestBlockMetadataLess(t *testing.T) {
        })
 }
 
-// Helper types and functions.
+func TestIterOverlappingBlockGroups(t *testing.T) {
+       elementsPart1 := createTestElements([]testElement{
+               {seriesID: 1, userKey: 100, data: []byte("p1-a")},
+               {seriesID: 1, userKey: 200, data: []byte("p1-b")},
+       })
+       defer releaseElements(elementsPart1)
+
+       mp1 := GenerateMemPart()
+       defer ReleaseMemPart(mp1)
+       mp1.mustInitFromElements(elementsPart1)
+       part1 := openMemPart(mp1)
+       defer part1.close()
+
+       elementsPart2 := createTestElements([]testElement{
+               {seriesID: 2, userKey: 150, data: []byte("p2-a")},
+               {seriesID: 2, userKey: 180, data: []byte("p2-b")},
+       })
+       defer releaseElements(elementsPart2)
+
+       mp2 := GenerateMemPart()
+       defer ReleaseMemPart(mp2)
+       mp2.mustInitFromElements(elementsPart2)
+       part2 := openMemPart(mp2)
+       defer part2.close()
+
+       for _, asc := range []bool{true, false} {
+               it := generateIter()
+               it.init([]*part{part1, part2}, []common.SeriesID{1, 2}, 0, 500, 
nil, asc)
+
+               // Now we iterate individual blocks from both parts
+               partsSeen := make(map[*part]struct{})
+               seriesSeen := make(map[common.SeriesID]struct{})
+               totalBlocks := 0
+
+               for it.nextBlock() {
+                       bm, p := it.current()
+                       require.NotNil(t, bm, "current block should not be nil")
+                       require.NotNil(t, p, "current part should not be nil")
+                       partsSeen[p] = struct{}{}
+                       seriesSeen[bm.seriesID] = struct{}{}
+                       totalBlocks++
+               }
 
-type blockExpectation struct {
-       seriesID common.SeriesID
-       minKey   int64
-       maxKey   int64
+               require.Len(t, partsSeen, 2, "expected contributions from both 
parts")
+               require.Len(t, seriesSeen, 2, "expected blocks from both 
series")
+               require.GreaterOrEqual(t, totalBlocks, 2, "should iterate at 
least 2 blocks (one per part)")
+               assert.NoError(t, it.Error())
+               releaseIter(it)
+       }
 }
 
-// mockBlockFilter is already defined in part_iter_test.go.
+// Helper types and functions.
 
-// runIteratorTest runs the iterator test with the given test case and parts.
-func runIteratorTest(t *testing.T, tc struct {
+type iterTestCase struct {
        blockFilter  index.Filter
        name         string
        parts        [][]testElement
@@ -545,28 +593,24 @@ func runIteratorTest(t *testing.T, tc struct {
        minKey       int64
        maxKey       int64
        expectBlocks int
-}, parts []*part,
-) {
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
+}
+
+// mockBlockFilter is already defined in part_iter_test.go.
+
+func runIteratorPass(t *testing.T, tc iterTestCase, parts []*part, asc bool) 
[]blockExpectation {
+       t.Helper()
 
        it := generateIter()
        defer releaseIter(it)
 
-       it.init(bma, parts, tc.querySids, tc.minKey, tc.maxKey, tc.blockFilter)
+       it.init(parts, tc.querySids, tc.minKey, tc.maxKey, tc.blockFilter, asc)
 
        var foundBlocks []blockExpectation
-       blockCount := 0
 
        for it.nextBlock() {
-               blockCount++
-               // Get the minimum block from heap (the one currently being 
processed)
-               require.True(t, len(it.piHeap) > 0, "heap should not be empty 
when nextBlock returns true")
+               curBlock, _ := it.current()
+               require.NotNil(t, curBlock, "current block should not be nil 
when nextBlock returns true (order=%s)", orderName(asc))
 
-               curBlock := it.piHeap[0].curBlock
-               t.Logf("Found block for seriesID %d, key range [%d, %d]", 
curBlock.seriesID, curBlock.minKey, curBlock.maxKey)
-
-               // Verify the block overlaps with query range
                overlaps := curBlock.maxKey >= tc.minKey && curBlock.minKey <= 
tc.maxKey
                assert.True(t, overlaps, "block should overlap with query range 
[%d, %d], but got block range [%d, %d]",
                        tc.minKey, tc.maxKey, curBlock.minKey, curBlock.maxKey)
@@ -585,23 +629,34 @@ func runIteratorTest(t *testing.T, tc struct {
        // Verify the number of blocks found
        assert.Equal(t, tc.expectBlocks, len(foundBlocks), "should find 
expected number of blocks")
 
-       // Verify ordering - blocks should come out in sorted order by 
(seriesID, minKey)
-       assert.True(t, sort.SliceIsSorted(foundBlocks, func(i, j int) bool {
-               if foundBlocks[i].seriesID == foundBlocks[j].seriesID {
-                       return foundBlocks[i].minKey < foundBlocks[j].minKey
-               }
-               return foundBlocks[i].seriesID < foundBlocks[j].seriesID
-       }), "blocks should be in sorted order by (seriesID, minKey)")
-
-       // If specific order is expected, verify it
-       if len(tc.expectOrder) > 0 {
-               require.Equal(t, len(tc.expectOrder), len(foundBlocks), "number 
of found blocks should match expected")
-               for i, expected := range tc.expectOrder {
-                       assert.Equal(t, expected.seriesID, 
foundBlocks[i].seriesID, "block %d seriesID should match", i)
-                       assert.Equal(t, expected.minKey, foundBlocks[i].minKey, 
"block %d minKey should match", i)
-                       assert.Equal(t, expected.maxKey, foundBlocks[i].maxKey, 
"block %d maxKey should match", i)
-               }
+       return foundBlocks
+}
+
+func reverseExpectations(src []blockExpectation) []blockExpectation {
+       if len(src) == 0 {
+               return nil
        }
+       out := make([]blockExpectation, len(src))
+       for i := range src {
+               out[i] = src[len(src)-1-i]
+       }
+       return out
+}
 
-       t.Logf("Test %s completed: found %d blocks, expected %d", tc.name, 
blockCount, tc.expectBlocks)
+func assertSameBlocksIgnoreOrder(t *testing.T, left, right []blockExpectation) 
{
+       t.Helper()
+
+       require.Equal(t, len(left), len(right), "block counts should match 
across orders")
+
+       counts := make(map[blockExpectation]int, len(left))
+       for _, block := range left {
+               counts[block]++
+       }
+       for _, block := range right {
+               counts[block]--
+               require.GreaterOrEqual(t, counts[block], 0, "unexpected block 
encountered %+v", block)
+       }
+       for block, count := range counts {
+               require.Equal(t, 0, count, "missing block %+v in comparison", 
block)
+       }
 }
diff --git a/banyand/internal/sidx/merge_test.go 
b/banyand/internal/sidx/merge_test.go
index 3bdaa78e..0d63b5f4 100644
--- a/banyand/internal/sidx/merge_test.go
+++ b/banyand/internal/sidx/merge_test.go
@@ -18,7 +18,6 @@
 package sidx
 
 import (
-       "encoding/binary"
        "errors"
        "path/filepath"
        "reflect"
@@ -35,19 +34,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-func marshalStrArr(strArr [][]byte) []byte {
-       if len(strArr) == 0 {
-               return []byte{}
-       }
-       var result []byte
-       result = binary.LittleEndian.AppendUint32(result, uint32(len(strArr)))
-       for _, str := range strArr {
-               result = binary.LittleEndian.AppendUint32(result, 
uint32(len(str)))
-               result = append(result, str...)
-       }
-       return result
-}
-
 var conventionalBlock = block{
        userKeys: []int64{1, 2},
        data:     [][]byte{[]byte("data1"), []byte("data2")},
@@ -55,7 +41,10 @@ var conventionalBlock = block{
                "service": {
                        name:      "service",
                        valueType: pbv1.ValueTypeStr,
-                       values:    [][]byte{[]byte("service1"), 
[]byte("service2")},
+                       values: []tagRow{
+                               {value: []byte("service1")},
+                               {value: []byte("service2")},
+                       },
                },
        },
 }
@@ -67,11 +56,11 @@ var mergedBlock = block{
                "arrTag": {
                        name:      "arrTag",
                        valueType: pbv1.ValueTypeStrArr,
-                       values: [][]byte{
-                               marshalStrArr([][]byte{[]byte("value1"), 
[]byte("value2")}),
-                               marshalStrArr([][]byte{[]byte("value3"), 
[]byte("value4")}),
-                               marshalStrArr([][]byte{[]byte("value5"), 
[]byte("value6")}),
-                               marshalStrArr([][]byte{[]byte("value7"), 
[]byte("value8")}),
+                       values: []tagRow{
+                               {valueArr: [][]byte{[]byte("value1"), 
[]byte("value2")}},
+                               {valueArr: [][]byte{[]byte("value3"), 
[]byte("value4")}},
+                               {valueArr: [][]byte{[]byte("value5"), 
[]byte("value6")}},
+                               {valueArr: [][]byte{[]byte("value7"), 
[]byte("value8")}},
                        },
                },
        },
@@ -84,13 +73,13 @@ var duplicatedMergedBlock = block{
                "arrTag": {
                        name:      "arrTag",
                        valueType: pbv1.ValueTypeStrArr,
-                       values: [][]byte{
-                               marshalStrArr([][]byte{[]byte("value1"), 
[]byte("value2")}),
-                               marshalStrArr([][]byte{[]byte("duplicated1")}),
-                               marshalStrArr([][]byte{[]byte("value3"), 
[]byte("value4")}),
-                               marshalStrArr([][]byte{[]byte("value5"), 
[]byte("value6")}),
-                               marshalStrArr([][]byte{[]byte("duplicated2")}),
-                               marshalStrArr([][]byte{[]byte("value7"), 
[]byte("value8")}),
+                       values: []tagRow{
+                               {valueArr: [][]byte{[]byte("value1"), 
[]byte("value2")}},
+                               {valueArr: [][]byte{[]byte("duplicated1")}},
+                               {valueArr: [][]byte{[]byte("value3"), 
[]byte("value4")}},
+                               {valueArr: [][]byte{[]byte("value5"), 
[]byte("value6")}},
+                               {valueArr: [][]byte{[]byte("duplicated2")}},
+                               {valueArr: [][]byte{[]byte("value7"), 
[]byte("value8")}},
                        },
                },
        },
@@ -131,9 +120,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-                                                               
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value1"), []byte("value2")}},
+                                                               {valueArr: 
[][]byte{[]byte("value3"), []byte("value4")}},
                                                        },
                                                },
                                        },
@@ -147,9 +136,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
-                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value5"), []byte("value6")}},
+                                                               {valueArr: 
[][]byte{[]byte("value7"), []byte("value8")}},
                                                        },
                                                },
                                        },
@@ -167,9 +156,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value1"), []byte("value2")}},
+                                                               {valueArr: 
[][]byte{[]byte("value5"), []byte("value6")}},
                                                        },
                                                },
                                        },
@@ -183,9 +172,9 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
-                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value3"), []byte("value4")}},
+                                                               {valueArr: 
[][]byte{[]byte("value7"), []byte("value8")}},
                                                        },
                                                },
                                        },
@@ -203,10 +192,10 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value1"), []byte("value2")}),
-                                                               
marshalStrArr([][]byte{[]byte("duplicated1")}),
-                                                               
marshalStrArr([][]byte{[]byte("duplicated2")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value1"), []byte("value2")}},
+                                                               {valueArr: 
[][]byte{[]byte("duplicated1")}},
+                                                               {valueArr: 
[][]byte{[]byte("duplicated2")}},
                                                        },
                                                },
                                        },
@@ -220,10 +209,10 @@ func Test_mergeTwoBlocks(t *testing.T) {
                                                "arrTag": {
                                                        name:      "arrTag",
                                                        valueType: 
pbv1.ValueTypeStrArr,
-                                                       values: [][]byte{
-                                                               
marshalStrArr([][]byte{[]byte("value3"), []byte("value4")}),
-                                                               
marshalStrArr([][]byte{[]byte("value5"), []byte("value6")}),
-                                                               
marshalStrArr([][]byte{[]byte("value7"), []byte("value8")}),
+                                                       values: []tagRow{
+                                                               {valueArr: 
[][]byte{[]byte("value3"), []byte("value4")}},
+                                                               {valueArr: 
[][]byte{[]byte("value5"), []byte("value6")}},
+                                                               {valueArr: 
[][]byte{[]byte("value7"), []byte("value8")}},
                                                        },
                                                },
                                        },
diff --git a/banyand/internal/sidx/part_key_iter.go 
b/banyand/internal/sidx/part_key_iter.go
new file mode 100644
index 00000000..edb7de9d
--- /dev/null
+++ b/banyand/internal/sidx/part_key_iter.go
@@ -0,0 +1,489 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "container/heap"
+       "errors"
+       "fmt"
+       "io"
+       "sort"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
+       "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
+)
+
+// lessByKey compares two blockMetadata by key range, then by seriesID and 
finally by data block offset.
+func (bm *blockMetadata) lessByKey(other *blockMetadata) bool {
+       if bm.minKey != other.minKey {
+               return bm.minKey < other.minKey
+       }
+       if bm.maxKey != other.maxKey {
+               return bm.maxKey < other.maxKey
+       }
+       if bm.seriesID != other.seriesID {
+               return bm.seriesID < other.seriesID
+       }
+       return bm.dataBlock.offset < other.dataBlock.offset
+}
+
+type blockRef struct {
+       primaryIdx int
+       blockIdx   int
+       seriesID   common.SeriesID
+       minKey     int64
+       maxKey     int64
+}
+
+type seriesCursor struct {
+       iter      *partKeyIter
+       refs      []blockRef
+       curBlock  blockMetadata
+       seriesID  common.SeriesID
+       refIdx    int
+       curLoaded bool
+}
+
+func (sc *seriesCursor) less(other *seriesCursor, asc bool) bool {
+       cur := sc.current()
+       otherCur := other.current()
+       if cur == nil {
+               return false
+       }
+       if otherCur == nil {
+               return true
+       }
+       if asc {
+               return cur.lessByKey(otherCur)
+       }
+       return otherCur.lessByKey(cur)
+}
+
+func (sc *seriesCursor) init(iter *partKeyIter, sid common.SeriesID, refs 
[]blockRef) {
+       sc.reset()
+       sc.iter = iter
+       sc.seriesID = sid
+       // Reuse underlying slice when possible
+       if cap(sc.refs) < len(refs) {
+               sc.refs = make([]blockRef, len(refs))
+               copy(sc.refs, refs)
+       } else {
+               sc.refs = sc.refs[:len(refs)]
+               copy(sc.refs, refs)
+       }
+       if iter != nil && !iter.asc && len(sc.refs) > 1 {
+               for i, j := 0, len(sc.refs)-1; i < j; i, j = i+1, j-1 {
+                       sc.refs[i], sc.refs[j] = sc.refs[j], sc.refs[i]
+               }
+       }
+}
+
+func (sc *seriesCursor) reset() {
+       sc.iter = nil
+       sc.seriesID = 0
+       sc.refIdx = 0
+       if sc.curLoaded {
+               sc.curBlock.reset()
+       }
+       sc.curLoaded = false
+       sc.refs = sc.refs[:0]
+}
+
+func (sc *seriesCursor) current() *blockMetadata {
+       if !sc.curLoaded {
+               return nil
+       }
+       return &sc.curBlock
+}
+
+func (sc *seriesCursor) advance() (bool, error) {
+       if sc.iter == nil {
+               return false, nil
+       }
+       for sc.refIdx < len(sc.refs) {
+               ref := sc.refs[sc.refIdx]
+               sc.refIdx++
+               bma, err := sc.iter.ensurePrimaryBlocks(ref.primaryIdx)
+               if err != nil {
+                       return false, err
+               }
+               if ref.blockIdx >= len(bma.arr) {
+                       return false, fmt.Errorf("block index %d out of range 
for primary %d", ref.blockIdx, ref.primaryIdx)
+               }
+               bm := &bma.arr[ref.blockIdx]
+               if bm.maxKey < sc.iter.minKey || bm.minKey > sc.iter.maxKey {
+                       continue
+               }
+               if sc.iter.blockFilter != nil {
+                       shouldSkip, err := sc.iter.shouldSkipBlock(bm)
+                       if err != nil {
+                               return false, err
+                       }
+                       if shouldSkip {
+                               continue
+                       }
+               }
+               sc.curBlock.copyFrom(bm)
+               sc.curLoaded = true
+               return true, nil
+       }
+       sc.curLoaded = false
+       return false, nil
+}
+
+type seriesCursorHeap []*seriesCursor
+
+func (sch *seriesCursorHeap) Len() int {
+       return len(*sch)
+}
+
+func (sch *seriesCursorHeap) Less(i, j int) bool {
+       x := *sch
+       asc := true
+       if x[i] != nil && x[i].iter != nil {
+               asc = x[i].iter.asc
+       } else if x[j] != nil && x[j].iter != nil {
+               asc = x[j].iter.asc
+       }
+       return x[i].less(x[j], asc)
+}
+
+func (sch *seriesCursorHeap) Swap(i, j int) {
+       x := *sch
+       x[i], x[j] = x[j], x[i]
+}
+
+func (sch *seriesCursorHeap) Push(x any) {
+       *sch = append(*sch, x.(*seriesCursor))
+}
+
+func (sch *seriesCursorHeap) Pop() any {
+       a := *sch
+       v := a[len(a)-1]
+       *sch = a[:len(a)-1]
+       return v
+}
+
+type partKeyIter struct {
+       err                  error
+       blockFilter          index.Filter
+       sidSet               map[common.SeriesID]struct{}
+       p                    *part
+       primaryCache         map[int]*blockMetadataArray
+       curBlock             *blockMetadata
+       cursorPool           []seriesCursor
+       cursorHeap           seriesCursorHeap
+       sids                 []common.SeriesID
+       primaryBuf           []byte
+       compressedPrimaryBuf []byte
+       minKey               int64
+       maxKey               int64
+       asc                  bool
+}
+
+func (pki *partKeyIter) releaseCurBlock() {
+       if pki.curBlock != nil {
+               releaseBlockMetadata(pki.curBlock)
+               pki.curBlock = nil
+       }
+}
+
+func (pki *partKeyIter) reset() {
+       pki.err = nil
+       pki.p = nil
+       pki.minKey = 0
+       pki.maxKey = 0
+       pki.blockFilter = nil
+
+       pki.releaseCurBlock()
+
+       for i := range pki.cursorHeap {
+               pki.cursorHeap[i].reset()
+       }
+       pki.cursorHeap = pki.cursorHeap[:0]
+
+       for i := range pki.cursorPool {
+               pki.cursorPool[i].reset()
+       }
+       pki.cursorPool = pki.cursorPool[:0]
+
+       for idx, cache := range pki.primaryCache {
+               if cache != nil {
+                       releaseBlockMetadataArray(cache)
+                       pki.primaryCache[idx] = nil
+               }
+       }
+       if pki.primaryCache != nil {
+               clear(pki.primaryCache)
+       }
+
+       if pki.sidSet != nil {
+               clear(pki.sidSet)
+       }
+       pki.sids = pki.sids[:0]
+
+       pki.compressedPrimaryBuf = pki.compressedPrimaryBuf[:0]
+       pki.primaryBuf = pki.primaryBuf[:0]
+}
+
+func (pki *partKeyIter) init(p *part, sids []common.SeriesID, minKey, maxKey 
int64, blockFilter index.Filter, asc bool) {
+       pki.reset()
+       pki.p = p
+       pki.minKey = minKey
+       pki.maxKey = maxKey
+       pki.blockFilter = blockFilter
+       pki.asc = asc
+
+       if len(sids) == 0 {
+               pki.err = io.EOF
+               return
+       }
+
+       pki.sids = append(pki.sids[:0], sids...)
+       sort.Slice(pki.sids, func(i, j int) bool {
+               return pki.sids[i] < pki.sids[j]
+       })
+
+       if pki.sidSet == nil {
+               pki.sidSet = make(map[common.SeriesID]struct{}, len(pki.sids))
+       } else {
+               clear(pki.sidSet)
+       }
+       for _, sid := range pki.sids {
+               pki.sidSet[sid] = struct{}{}
+       }
+
+       maxSID := pki.sids[len(pki.sids)-1]
+       minSID := pki.sids[0]
+
+       seriesRefs := make(map[common.SeriesID][]blockRef, len(pki.sids))
+
+       for idx := range p.primaryBlockMetadata {
+               pbm := &p.primaryBlockMetadata[idx]
+
+               if pbm.seriesID > maxSID {
+                       break
+               }
+
+               if pbm.maxKey < pki.minKey || pbm.minKey > pki.maxKey {
+                       continue
+               }
+
+               bma, err := pki.ensurePrimaryBlocks(idx)
+               if err != nil {
+                       pki.err = fmt.Errorf("cannot load primary block 
metadata: %w", err)
+                       return
+               }
+               if len(bma.arr) == 0 {
+                       continue
+               }
+
+               if bma.arr[len(bma.arr)-1].seriesID < minSID {
+                       continue
+               }
+
+               lastSeries := bma.arr[len(bma.arr)-1].seriesID
+               for _, sid := range pki.sids {
+                       if sid < pbm.seriesID {
+                               continue
+                       }
+                       if sid > lastSeries {
+                               continue
+                       }
+
+                       start := sort.Search(len(bma.arr), func(i int) bool {
+                               return bma.arr[i].seriesID >= sid
+                       })
+                       if start == len(bma.arr) || bma.arr[start].seriesID != 
sid {
+                               continue
+                       }
+
+                       for i := start; i < len(bma.arr) && bma.arr[i].seriesID 
== sid; i++ {
+                               bm := &bma.arr[i]
+                               if bm.maxKey < pki.minKey || bm.minKey > 
pki.maxKey {
+                                       continue
+                               }
+                               if _, ok := pki.sidSet[bm.seriesID]; !ok {
+                                       continue
+                               }
+                               seriesRefs[sid] = append(seriesRefs[sid], 
blockRef{
+                                       primaryIdx: idx,
+                                       blockIdx:   i,
+                                       seriesID:   sid,
+                                       minKey:     bm.minKey,
+                                       maxKey:     bm.maxKey,
+                               })
+                       }
+               }
+       }
+
+       activeSeries := 0
+       for _, sid := range pki.sids {
+               if refs := seriesRefs[sid]; len(refs) > 0 {
+                       activeSeries++
+               }
+       }
+
+       if activeSeries == 0 {
+               pki.err = io.EOF
+               return
+       }
+
+       if n := activeSeries - cap(pki.cursorPool); n > 0 {
+               pki.cursorPool = append(pki.cursorPool[:cap(pki.cursorPool)], 
make([]seriesCursor, n)...)
+       }
+       pki.cursorPool = pki.cursorPool[:activeSeries]
+
+       pki.cursorHeap = pki.cursorHeap[:0]
+       cursorIdx := 0
+       for _, sid := range pki.sids {
+               refs := seriesRefs[sid]
+               if len(refs) == 0 {
+                       continue
+               }
+               cursor := &pki.cursorPool[cursorIdx]
+               cursorIdx++
+               cursor.init(pki, sid, refs)
+               ok, err := cursor.advance()
+               if err != nil {
+                       pki.err = fmt.Errorf("cannot initialize cursor for 
series %d: %w", sid, err)
+                       return
+               }
+               if !ok {
+                       cursor.reset()
+                       continue
+               }
+               pki.cursorHeap = append(pki.cursorHeap, cursor)
+       }
+       pki.cursorPool = pki.cursorPool[:cursorIdx]
+
+       if len(pki.cursorHeap) == 0 {
+               pki.err = io.EOF
+               return
+       }
+       heap.Init(&pki.cursorHeap)
+}
+
+func (pki *partKeyIter) nextBlock() bool {
+       if pki.err != nil {
+               return false
+       }
+
+       pki.releaseCurBlock()
+
+       if len(pki.cursorHeap) == 0 {
+               pki.err = io.EOF
+               return false
+       }
+
+       cursor := heap.Pop(&pki.cursorHeap).(*seriesCursor)
+       current := cursor.current()
+       if current == nil {
+               cursor.reset()
+               pki.err = fmt.Errorf("series cursor %d has no current block", 
cursor.seriesID)
+               return false
+       }
+
+       pki.curBlock = generateBlockMetadata()
+       pki.curBlock.copyFrom(current)
+
+       ok, err := cursor.advance()
+       if err != nil {
+               cursor.reset()
+               pki.releaseCurBlock()
+               pki.err = err
+               return false
+       }
+       if ok {
+               heap.Push(&pki.cursorHeap, cursor)
+       } else {
+               cursor.reset()
+       }
+
+       return true
+}
+
+func (pki *partKeyIter) error() error {
+       if errors.Is(pki.err, io.EOF) {
+               return nil
+       }
+       return pki.err
+}
+
+func (pki *partKeyIter) ensurePrimaryBlocks(primaryIdx int) 
(*blockMetadataArray, error) {
+       if pki.primaryCache == nil {
+               pki.primaryCache = make(map[int]*blockMetadataArray)
+       }
+       if bma, ok := pki.primaryCache[primaryIdx]; ok && bma != nil {
+               return bma, nil
+       }
+
+       pbm := &pki.p.primaryBlockMetadata[primaryIdx]
+       bma := generateBlockMetadataArray()
+
+       pki.compressedPrimaryBuf = bytes.ResizeOver(pki.compressedPrimaryBuf, 
int(pbm.size))
+       fs.MustReadData(pki.p.primary, int64(pbm.offset), 
pki.compressedPrimaryBuf)
+
+       var err error
+       pki.primaryBuf, err = zstd.Decompress(pki.primaryBuf[:0], 
pki.compressedPrimaryBuf)
+       if err != nil {
+               releaseBlockMetadataArray(bma)
+               return nil, fmt.Errorf("cannot decompress primary block: %w", 
err)
+       }
+
+       bma.arr, err = unmarshalBlockMetadata(bma.arr[:0], pki.primaryBuf)
+       if err != nil {
+               releaseBlockMetadataArray(bma)
+               return nil, fmt.Errorf("cannot unmarshal primary block 
metadata: %w", err)
+       }
+
+       pki.primaryCache[primaryIdx] = bma
+       return bma, nil
+}
+
+func (pki *partKeyIter) shouldSkipBlock(bm *blockMetadata) (bool, error) {
+       tfo := generateTagFilterOp(bm, pki.p)
+       defer releaseTagFilterOp(tfo)
+       return pki.blockFilter.ShouldSkip(tfo)
+}
+
+func (pki *partKeyIter) current() (*blockMetadata, *part) {
+       return pki.curBlock, pki.p
+}
+
+func generatePartKeyIter() *partKeyIter {
+       v := partKeyIterPool.Get()
+       if v == nil {
+               return &partKeyIter{}
+       }
+       return v
+}
+
+func releasePartKeyIter(pki *partKeyIter) {
+       if pki == nil {
+               return
+       }
+       pki.reset()
+       partKeyIterPool.Put(pki)
+}
+
+var partKeyIterPool = pool.Register[*partKeyIter]("sidx-partKeyIter")
diff --git a/banyand/internal/sidx/part_key_iter_test.go 
b/banyand/internal/sidx/part_key_iter_test.go
new file mode 100644
index 00000000..f31d7500
--- /dev/null
+++ b/banyand/internal/sidx/part_key_iter_test.go
@@ -0,0 +1,483 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "errors"
+       "sort"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+)
+
+func setupPartForKeyIter(t *testing.T, elems []testElement) (*part, func()) {
+       t.Helper()
+
+       elements := createTestElements(elems)
+
+       mp := GenerateMemPart()
+       mp.mustInitFromElements(elements)
+
+       part := openMemPart(mp)
+
+       cleanup := func() {
+               part.close()
+               ReleaseMemPart(mp)
+               releaseElements(elements)
+       }
+       return part, cleanup
+}
+
+type blockExpectation struct {
+       seriesID common.SeriesID
+       minKey   int64
+       maxKey   int64
+}
+
+func runPartKeyIterPass(t *testing.T, part *part, sids []common.SeriesID, 
minKey, maxKey int64, blockFilter index.Filter, asc bool) ([]blockExpectation, 
error) {
+       iter := generatePartKeyIter()
+       defer releasePartKeyIter(iter)
+
+       iter.init(part, sids, minKey, maxKey, blockFilter, asc)
+
+       var results []blockExpectation
+       for iter.nextBlock() {
+               block, _ := iter.current()
+               require.NotNil(t, block)
+               results = append(results, blockExpectation{
+                       seriesID: block.seriesID,
+                       minKey:   block.minKey,
+                       maxKey:   block.maxKey,
+               })
+       }
+
+       return results, iter.error()
+}
+
+func runPartKeyIterDoublePass(t *testing.T, part *part, sids 
[]common.SeriesID, minKey, maxKey int64,
+       blockFilter index.Filter,
+) (ascBlocks, descBlocks []blockExpectation, ascErr, descErr error) {
+       ascBlocks, ascErr = runPartKeyIterPass(t, part, sids, minKey, maxKey, 
blockFilter, true)
+       descBlocks, descErr = runPartKeyIterPass(t, part, sids, minKey, maxKey, 
blockFilter, false)
+       return
+}
+
+func orderName(asc bool) string {
+       if asc {
+               return "asc"
+       }
+       return "desc"
+}
+
+func verifyDescendingOrder(t *testing.T, blocks []blockExpectation) {
+       require.True(t, sort.SliceIsSorted(blocks, func(i, j int) bool {
+               if blocks[i].minKey == blocks[j].minKey {
+                       return blocks[i].seriesID > blocks[j].seriesID
+               }
+               return blocks[i].minKey >= blocks[j].minKey
+       }), "blocks should be in non-increasing order by minKey")
+}
+
+func verifyAscendingOrder(t *testing.T, blocks []blockExpectation) {
+       require.True(t, sort.SliceIsSorted(blocks, func(i, j int) bool {
+               if blocks[i].minKey == blocks[j].minKey {
+                       return blocks[i].seriesID < blocks[j].seriesID
+               }
+               return blocks[i].minKey <= blocks[j].minKey
+       }), "blocks should be in non-decreasing order by minKey")
+}
+
+func TestPartKeyIterOrdersBlocksByMinKey(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 50, data: []byte("s2")},
+               {seriesID: 3, userKey: 150, data: []byte("s3")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2, 3}, 0, 200, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+
+       require.Len(t, asc, 3, "expected three blocks in ascending order")
+       verifyAscendingOrder(t, asc)
+       assert.Equal(t, []common.SeriesID{2, 1, 3}, 
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID, asc[2].seriesID})
+
+       require.Len(t, desc, 3, "expected three blocks in descending order")
+       verifyDescendingOrder(t, desc)
+       assert.Equal(t, []common.SeriesID{3, 1, 2}, 
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID, desc[2].seriesID})
+}
+
+func TestPartKeyIterFiltersSeriesIDs(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 50, data: []byte("s2")},
+               {seriesID: 3, userKey: 150, data: []byte("s3")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 3}, 0, 200, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Len(t, asc, 2)
+       require.Len(t, desc, 2)
+       require.Equal(t, []common.SeriesID{1, 3}, 
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID})
+       require.Equal(t, []common.SeriesID{3, 1}, 
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID})
+}
+
+func TestPartKeyIterAppliesKeyRange(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 50, data: []byte("s2")},
+               {seriesID: 3, userKey: 150, data: []byte("s3")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2, 3}, 120, 200, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Len(t, asc, 1)
+       require.Len(t, desc, 1)
+       assert.Equal(t, common.SeriesID(3), asc[0].seriesID)
+       assert.Equal(t, common.SeriesID(3), desc[0].seriesID)
+}
+
+func TestPartKeyIterNoBlocksInRange(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 50, data: []byte("s2")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2}, 1000, 2000, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Empty(t, asc)
+       require.Empty(t, desc)
+}
+
+func TestPartKeyIterHandlesEmptySeries(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{}, 0, 200, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Len(t, asc, 0)
+       require.Len(t, desc, 0)
+}
+
+func TestPartKeyIterHonorsBlockFilter(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 150, data: []byte("s2")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2}, 0, 200, &mockBlockFilter{shouldSkip: true})
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Empty(t, asc)
+       require.Empty(t, desc)
+}
+
+func TestPartKeyIterPropagatesFilterError(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+       })
+       defer cleanup()
+
+       expectedErr := errors.New("filter failure")
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1}, 0, 200, &mockBlockFilter{err: expectedErr})
+       require.ErrorIs(t, ascErr, expectedErr)
+       require.ErrorIs(t, descErr, expectedErr)
+       require.Empty(t, asc)
+       require.Empty(t, desc)
+}
+
+func TestPartKeyIterBreaksTiesBySeriesID(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 100, data: []byte("s2")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2}, 0, 200, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Len(t, asc, 2)
+       require.Len(t, desc, 2)
+       assert.Equal(t, []common.SeriesID{1, 2}, 
[]common.SeriesID{asc[0].seriesID, asc[1].seriesID})
+       assert.Equal(t, []common.SeriesID{2, 1}, 
[]common.SeriesID{desc[0].seriesID, desc[1].seriesID})
+}
+
+func TestPartKeyIterGroupsOverlappingRanges(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1a")},
+               {seriesID: 1, userKey: 180, data: []byte("s1b")},
+               {seriesID: 2, userKey: 120, data: []byte("s2a")},
+               {seriesID: 2, userKey: 220, data: []byte("s2b")},
+               {seriesID: 3, userKey: 400, data: []byte("s3")},
+       })
+       defer cleanup()
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2, 3}, 0, 500, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.Len(t, asc, 3)
+       require.Len(t, desc, 3)
+
+       expectedAsc := []blockExpectation{
+               {seriesID: 1, minKey: 100, maxKey: 180},
+               {seriesID: 2, minKey: 120, maxKey: 220},
+               {seriesID: 3, minKey: 400, maxKey: 400},
+       }
+       assert.Equal(t, expectedAsc, asc)
+       for i := range desc {
+               assert.Equal(t, expectedAsc[len(expectedAsc)-1-i], desc[i])
+       }
+
+       iter := generatePartKeyIter()
+       defer releasePartKeyIter(iter)
+       iter.init(part, []common.SeriesID{1, 2, 3}, 0, 500, nil, true)
+       var ids []common.SeriesID
+       for iter.nextBlock() {
+               block, _ := iter.current()
+               require.NotNil(t, block)
+               ids = append(ids, block.seriesID)
+       }
+       require.NoError(t, iter.error())
+       require.GreaterOrEqual(t, len(ids), 3, "expected at least three blocks")
+       // Verify we get blocks from all three series
+       require.Contains(t, ids, common.SeriesID(1))
+       require.Contains(t, ids, common.SeriesID(2))
+       require.Contains(t, ids, common.SeriesID(3))
+}
+
+func TestPartKeyIterSelectiveFilterAllowsLaterBlocks(t *testing.T) {
+       const elementsPerBatch = maxBlockLength + 10
+
+       var elems []testElement
+       for i := 0; i < elementsPerBatch; i++ {
+               elems = append(elems, testElement{
+                       seriesID: 1,
+                       userKey:  int64(i),
+                       data:     []byte("pending"),
+                       tags: []tag{
+                               {
+                                       name:      "status",
+                                       value:     []byte("pending"),
+                                       valueType: pbv1.ValueTypeStr,
+                               },
+                       },
+               })
+       }
+       for i := 0; i < elementsPerBatch; i++ {
+               elems = append(elems, testElement{
+                       seriesID: 1,
+                       userKey:  int64(20000 + i),
+                       data:     []byte("success"),
+                       tags: []tag{
+                               {
+                                       name:      "status",
+                                       value:     []byte("success"),
+                                       valueType: pbv1.ValueTypeStr,
+                               },
+                       },
+               })
+       }
+       elems = append(elems, testElement{
+               seriesID: 2,
+               userKey:  5000,
+               data:     []byte("series2"),
+               tags: []tag{
+                       {
+                               name:      "status",
+                               value:     []byte("other"),
+                               valueType: pbv1.ValueTypeStr,
+                       },
+               },
+       })
+
+       part, cleanup := setupPartForKeyIter(t, elems)
+       defer cleanup()
+
+       filter := &selectiveMockBlockFilter{
+               tagName:   "status",
+               skipValue: "pending",
+       }
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1, 2}, 0, 50000, filter)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       assert.Greater(t, filter.skipCallCount, 0, "filter should have been 
invoked")
+
+       require.NotEmpty(t, asc)
+       require.NotEmpty(t, desc)
+
+       seriesTwoCountAsc := 0
+       for _, block := range asc {
+               if block.seriesID == 1 {
+                       require.GreaterOrEqual(t, block.minKey, int64(20000), 
"pending block should be skipped in ascending order")
+               }
+               if block.seriesID == 2 {
+                       seriesTwoCountAsc++
+               }
+       }
+       require.Equal(t, 1, seriesTwoCountAsc, "series 2 block should appear 
once in ascending results")
+
+       seriesTwoCountDesc := 0
+       for _, block := range desc {
+               if block.seriesID == 1 {
+                       require.GreaterOrEqual(t, block.minKey, int64(20000), 
"pending block should be skipped in descending order")
+               }
+               if block.seriesID == 2 {
+                       seriesTwoCountDesc++
+               }
+       }
+       require.Equal(t, 1, seriesTwoCountDesc, "series 2 block should appear 
once in descending results")
+}
+
+func TestPartKeyIterExhaustion(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 2, userKey: 120, data: []byte("s2")},
+       })
+       defer cleanup()
+
+       for _, asc := range []bool{true, false} {
+               t.Run(orderName(asc), func(t *testing.T) {
+                       iter := generatePartKeyIter()
+                       defer releasePartKeyIter(iter)
+
+                       iter.init(part, []common.SeriesID{1, 2}, 0, 200, nil, 
asc)
+
+                       blockCount := 0
+                       for iter.nextBlock() {
+                               block, _ := iter.current()
+                               require.NotNil(t, block)
+                               blockCount++
+                       }
+                       require.NoError(t, iter.error())
+                       require.Greater(t, blockCount, 0, "iterator should 
yield at least one block")
+
+                       assert.False(t, iter.nextBlock(), "iterator should 
report exhaustion")
+                       assert.NoError(t, iter.error())
+               })
+       }
+}
+
+func TestPartKeyIterSkipsPrimaryBeyondMaxSID(t *testing.T) {
+       part, cleanup := setupPartForKeyIter(t, []testElement{
+               {seriesID: 1, userKey: 100, data: []byte("s1")},
+               {seriesID: 50, userKey: 200, data: []byte("s50")},
+       })
+
+       defer cleanup()
+
+       require.NotEmpty(t, part.primaryBlockMetadata, "part should have at 
least one primary block")
+
+       first := part.primaryBlockMetadata[0]
+       part.primaryBlockMetadata = append(part.primaryBlockMetadata, 
primaryBlockMetadata{
+               seriesID: 50,
+               minKey:   300,
+               maxKey:   300,
+               dataBlock: dataBlock{
+                       offset: first.offset + first.size + 1,
+                       size:   1,
+               },
+       })
+
+       asc, desc, ascErr, descErr := runPartKeyIterDoublePass(t, part, 
[]common.SeriesID{1}, 0, 500, nil)
+       require.NoError(t, ascErr)
+       require.NoError(t, descErr)
+       require.NotEmpty(t, asc)
+       require.NotEmpty(t, desc)
+       for _, block := range asc {
+               assert.Equal(t, common.SeriesID(1), block.seriesID)
+       }
+       for _, block := range desc {
+               assert.Equal(t, common.SeriesID(1), block.seriesID)
+       }
+}
+
+func TestPartKeyIterRequeuesOnGapBetweenBlocks(t *testing.T) {
+       const elementsPerBatch = maxBlockLength + 10
+
+       var elems []testElement
+       for i := 0; i < elementsPerBatch; i++ {
+               elems = append(elems, testElement{
+                       seriesID: 1,
+                       userKey:  int64(i),
+                       data:     []byte("batch1"),
+               })
+       }
+       for i := 0; i < 16; i++ {
+               elems = append(elems, testElement{
+                       seriesID: 1,
+                       userKey:  int64(50000 + i),
+                       data:     []byte("batch2"),
+               })
+       }
+
+       part, cleanup := setupPartForKeyIter(t, elems)
+       defer cleanup()
+
+       for _, asc := range []bool{true, false} {
+               t.Run(orderName(asc), func(t *testing.T) {
+                       iter := generatePartKeyIter()
+                       defer releasePartKeyIter(iter)
+
+                       iter.init(part, []common.SeriesID{1}, 0, 100000, nil, 
asc)
+
+                       var blocks []struct {
+                               min int64
+                               max int64
+                       }
+                       for iter.nextBlock() {
+                               block, _ := iter.current()
+                               require.NotNil(t, block)
+                               blocks = append(blocks, struct {
+                                       min int64
+                                       max int64
+                               }{min: block.minKey, max: block.maxKey})
+                       }
+
+                       require.NoError(t, iter.error())
+                       require.GreaterOrEqual(t, len(blocks), 2, "expected at 
least two blocks for the same series")
+
+                       // Verify blocks are in proper order
+                       for i := 1; i < len(blocks); i++ {
+                               prev := blocks[i-1]
+                               curr := blocks[i]
+                               if asc {
+                                       assert.LessOrEqual(t, prev.min, 
curr.min, "ascending iteration should maintain order")
+                               } else {
+                                       assert.GreaterOrEqual(t, prev.max, 
curr.max, "descending iteration should maintain order")
+                               }
+                       }
+               })
+       }
+}
diff --git a/banyand/internal/sidx/query.go b/banyand/internal/sidx/query.go
index c9618f58..36d2f80e 100644
--- a/banyand/internal/sidx/query.go
+++ b/banyand/internal/sidx/query.go
@@ -215,9 +215,10 @@ func (s *sidx) prepareStreamingResources(
                minKey:    minKey,
                maxKey:    maxKey,
                asc:       asc,
+               batchSize: req.MaxBatchSize,
        }
 
-       blockCh := make(chan *blockScanResultBatch, 1)
+       blockCh := make(chan *blockScanResultBatch)
        go func() {
                bs.scan(ctx, blockCh)
                close(blockCh)
@@ -259,38 +260,27 @@ func (s *sidx) processStreamingLoop(
        }
 
        scannerBatchCount := 0
-       for {
-               select {
-               case <-ctx.Done():
+       for batch := range resources.blockCh {
+               scannerBatchCount++
+               if err := s.handleStreamingBatch(ctx, batch, resources, req, 
resultsCh, metrics); err != nil {
                        if loopSpan != nil {
-                               loopSpan.Tag("termination_reason", 
"context_canceled")
-                               loopSpan.Tagf("scanner_batches_before_cancel", 
"%d", scannerBatchCount)
-                       }
-                       return ctx.Err()
-               case batch, ok := <-resources.blockCh:
-                       if !ok {
-                               if loopSpan != nil {
-                                       loopSpan.Tag("termination_reason", 
"channel_closed")
-                                       loopSpan.Tagf("total_scanner_batches", 
"%d", scannerBatchCount)
+                               if errors.Is(err, context.Canceled) {
+                                       loopSpan.Tag("termination_reason", 
"context_canceled")
+                                       
loopSpan.Tagf("scanner_batches_before_cancel", "%d", scannerBatchCount)
+                               } else {
+                                       loopSpan.Tag("termination_reason", 
"batch_error")
+                                       
loopSpan.Tagf("scanner_batches_before_error", "%d", scannerBatchCount)
+                                       loopSpan.Error(err)
                                }
-                               return resources.heap.merge(ctx, 
req.MaxBatchSize, resultsCh, metrics)
-                       }
-                       scannerBatchCount++
-                       if err := s.handleStreamingBatch(ctx, batch, resources, 
req, resultsCh, metrics); err != nil {
-                               if loopSpan != nil {
-                                       if errors.Is(err, context.Canceled) {
-                                               
loopSpan.Tag("termination_reason", "context_canceled")
-                                               
loopSpan.Tagf("scanner_batches_before_cancel", "%d", scannerBatchCount)
-                                       } else {
-                                               
loopSpan.Tag("termination_reason", "batch_error")
-                                               
loopSpan.Tagf("scanner_batches_before_error", "%d", scannerBatchCount)
-                                               loopSpan.Error(err)
-                                       }
-                               }
-                               return err
                        }
+                       return err
                }
        }
+       if loopSpan != nil {
+               loopSpan.Tag("termination_reason", "channel_closed")
+               loopSpan.Tagf("total_scanner_batches", "%d", scannerBatchCount)
+       }
+       return resources.heap.merge(ctx, req.MaxBatchSize, resultsCh, metrics)
 }
 
 func (s *sidx) handleStreamingBatch(
@@ -301,9 +291,9 @@ func (s *sidx) handleStreamingBatch(
        resultsCh chan<- *QueryResponse,
        metrics *batchMetrics,
 ) error {
+       defer releaseBlockScanResultBatch(batch)
        if batch.err != nil {
                err := batch.err
-               releaseBlockScanResultBatch(batch)
                return err
        }
 
@@ -314,7 +304,6 @@ func (s *sidx) handleStreamingBatch(
        }
 
        cursors, cursorsErr := s.buildCursorsForBatch(ctx, batch, 
resources.tagsToLoad, req, resources.asc, metrics)
-       releaseBlockScanResultBatch(batch)
        if cursorsErr != nil {
                return cursorsErr
        }
diff --git a/banyand/internal/sidx/query_result.go 
b/banyand/internal/sidx/query_result.go
index ab45ee88..3669a291 100644
--- a/banyand/internal/sidx/query_result.go
+++ b/banyand/internal/sidx/query_result.go
@@ -21,13 +21,11 @@ import (
        "container/heap"
 
        "github.com/apache/skywalking-banyandb/api/common"
-       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
 // queryResult is used internally for processing logic only.
@@ -178,28 +176,13 @@ func (qr *queryResult) loadTagData(tmpBlock *block, p 
*part, tagName string, tag
 
        // Create tag data structure and populate block
        td := generateTagData()
-       // Decode tag values directly (no compression)
-       td.values, err = internalencoding.DecodeTagValues(td.values[:0], 
decoder, bb2, tm.valueType, count)
-       if err != nil {
-               logger.Panicf("cannot decode tag values: %v", err)
-               return false
-       }
-
        td.name = tagName
        td.valueType = tm.valueType
 
-       // Set min/max for int64 tags
-       if tm.valueType == pbv1.ValueTypeInt64 {
-               td.min = tm.min
-               td.max = tm.max
-       }
-
-       // Create bloom filter for indexed tags
-       td.filter = generateBloomFilter(count)
-       for _, value := range td.values {
-               if value != nil {
-                       td.filter.Add(value)
-               }
+       // Decode and convert tag values using common helper
+       if err := decodeAndConvertTagValues(td, decoder, bb2, tm.valueType, 
count); err != nil {
+               logger.Panicf("cannot decode tag values: %v", err)
+               return false
        }
 
        tmpBlock.tags[tagName] = td
@@ -247,11 +230,17 @@ func (qr *queryResult) extractElementTags(block *block, 
elemIndex int) []Tag {
                for _, proj := range qr.request.TagProjection {
                        for _, tagName := range proj.Names {
                                if tagData, exists := block.tags[tagName]; 
exists && elemIndex < len(tagData.values) {
-                                       elementTags = append(elementTags, Tag{
+                                       row := &tagData.values[elemIndex]
+                                       tag := Tag{
                                                Name:      tagName,
-                                               Value:     
tagData.values[elemIndex],
                                                ValueType: tagData.valueType,
-                                       })
+                                       }
+                                       if len(row.valueArr) > 0 {
+                                               tag.ValueArr = row.valueArr
+                                       } else if len(row.value) > 0 {
+                                               tag.Value = row.value
+                                       }
+                                       elementTags = append(elementTags, tag)
                                }
                        }
                }
@@ -260,11 +249,17 @@ func (qr *queryResult) extractElementTags(block *block, 
elemIndex int) []Tag {
                elementTags = make([]Tag, 0, len(block.tags))
                for tagName, tagData := range block.tags {
                        if elemIndex < len(tagData.values) {
-                               elementTags = append(elementTags, Tag{
+                               row := &tagData.values[elemIndex]
+                               tag := Tag{
                                        Name:      tagName,
-                                       Value:     tagData.values[elemIndex],
                                        ValueType: tagData.valueType,
-                               })
+                               }
+                               if len(row.valueArr) > 0 {
+                                       tag.ValueArr = row.valueArr
+                               } else if len(row.value) > 0 {
+                                       tag.Value = row.value
+                               }
+                               elementTags = append(elementTags, tag)
                        }
                }
        }
diff --git a/banyand/internal/sidx/query_test.go 
b/banyand/internal/sidx/query_test.go
index 2c841250..225f2587 100644
--- a/banyand/internal/sidx/query_test.go
+++ b/banyand/internal/sidx/query_test.go
@@ -235,14 +235,37 @@ func TestSIDX_Query_Ordering(t *testing.T) {
                        }
 
                        if len(allKeys) > 1 {
-                               isSorted := sort.SliceIsSorted(allKeys, func(i, 
j int) bool {
-                                       if tt.ascending {
-                                               return allKeys[i] < allKeys[j]
+                               if len(tt.seriesIDs) == 1 {
+                                       // For single series, verify global 
sorting
+                                       isSorted := sort.SliceIsSorted(allKeys, 
func(i, j int) bool {
+                                               if tt.ascending {
+                                                       return allKeys[i] < 
allKeys[j]
+                                               }
+                                               return allKeys[i] > allKeys[j]
+                                       })
+                                       assert.True(t, isSorted, "Keys should 
be sorted in %s order. Keys: %v",
+                                               map[bool]string{true: 
"ascending", false: "descending"}[tt.ascending], allKeys)
+                               } else {
+                                       // For multiple series, verify sorting 
within each series group
+                                       seriesGroups := 
make(map[common.SeriesID][]int64)
+                                       for i, sid := range allSIDs {
+                                               if i < len(allKeys) {
+                                                       seriesGroups[sid] = 
append(seriesGroups[sid], allKeys[i])
+                                               }
+                                       }
+                                       for sid, keys := range seriesGroups {
+                                               if len(keys) > 1 {
+                                                       isSorted := 
sort.SliceIsSorted(keys, func(i, j int) bool {
+                                                               if tt.ascending 
{
+                                                                       return 
keys[i] < keys[j]
+                                                               }
+                                                               return keys[i] 
> keys[j]
+                                                       })
+                                                       assert.True(t, 
isSorted, "Keys for series %d should be sorted in %s order. Keys: %v",
+                                                               sid, 
map[bool]string{true: "ascending", false: "descending"}[tt.ascending], keys)
+                                               }
                                        }
-                                       return allKeys[i] > allKeys[j]
-                               })
-                               assert.True(t, isSorted, "Keys should be sorted 
in %s order. Keys: %v",
-                                       map[bool]string{true: "ascending", 
false: "descending"}[tt.ascending], allKeys)
+                               }
                        }
                })
        }
@@ -298,7 +321,9 @@ func TestSIDX_Query_WithArrValues(t *testing.T) {
        for i := 0; i < len(keys); i++ {
                if keys[i] == 100 {
                        assert.Equal(t, "arr_tag", tags[i][0].Name)
-                       assert.Equal(t, "a|b|", string(tags[i][0].Value))
+                       assert.Equal(t, 2, len(tags[i][0].ValueArr))
+                       assert.Equal(t, "a", string(tags[i][0].ValueArr[0]))
+                       assert.Equal(t, "b", string(tags[i][0].ValueArr[1]))
                }
        }
 }
diff --git a/banyand/internal/sidx/sidx.go b/banyand/internal/sidx/sidx.go
index f2cc93e9..370e4204 100644
--- a/banyand/internal/sidx/sidx.go
+++ b/banyand/internal/sidx/sidx.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
        "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
@@ -251,6 +252,147 @@ type blockCursor struct {
        idx      int
 }
 
+type blockCursorBuilder struct {
+       bc      *blockCursor
+       block   *block
+       metrics *batchMetrics
+       seen    map[uint64][][]byte
+       minKey  int64
+       maxKey  int64
+       hasMin  bool
+       hasMax  bool
+}
+
+func (b *blockCursorBuilder) processWithFilter(req QueryRequest, log 
*logger.Logger) error {
+       if req.TagFilter == nil {
+               return nil
+       }
+
+       tags := make([]*modelv1.Tag, 0, len(b.block.tags))
+       decoder := req.TagFilter.GetDecoder()
+
+       for i := 0; i < len(b.block.userKeys); i++ {
+               dataBytes := b.block.data[i]
+               hash, duplicate := b.checkDuplicate(dataBytes, true)
+               if duplicate {
+                       continue
+               }
+
+               tags = b.collectTagsForFilter(tags, decoder, i)
+
+               matched, err := req.TagFilter.Match(tags)
+               if err != nil {
+                       log.Error().Err(err).Msg("tag filter match error")
+                       return err
+               }
+               if !matched {
+                       continue
+               }
+
+               b.appendElement(i, hash, dataBytes)
+       }
+
+       return nil
+}
+
+func (b *blockCursorBuilder) processWithoutFilter() {
+       for i := 0; i < len(b.block.userKeys); i++ {
+               dataBytes := b.block.data[i]
+               hash, duplicate := b.checkDuplicate(dataBytes, false)
+               if duplicate {
+                       continue
+               }
+
+               b.appendElement(i, hash, dataBytes)
+       }
+}
+
+func (b *blockCursorBuilder) collectTagsForFilter(buf []*modelv1.Tag, decoder 
func(pbv1.ValueType, []byte) *modelv1.TagValue, index int) []*modelv1.Tag {
+       buf = buf[:0]
+       for tagName, tagData := range b.block.tags {
+               if index >= len(tagData.values) {
+                       continue
+               }
+
+               row := &tagData.values[index]
+               var marshaledValue []byte
+               if row.valueArr != nil || row.value != nil {
+                       marshaledValue = marshalTagRow(row, tagData.valueType)
+               }
+               if marshaledValue == nil {
+                       continue
+               }
+
+               tagValue := decoder(tagData.valueType, marshaledValue)
+               if tagValue != nil {
+                       buf = append(buf, &modelv1.Tag{
+                               Key:   tagName,
+                               Value: tagValue,
+                       })
+               }
+       }
+       return buf
+}
+
+func (b *blockCursorBuilder) appendElement(index int, hash uint64, dataBytes 
[]byte) {
+       key := b.block.userKeys[index]
+       if !b.keyInRange(key) {
+               return
+       }
+
+       b.markSeen(hash, dataBytes)
+
+       b.bc.userKeys = append(b.bc.userKeys, key)
+
+       dataCopy := make([]byte, len(dataBytes))
+       copy(dataCopy, dataBytes)
+       b.bc.data = append(b.bc.data, dataCopy)
+
+       for tagName, tagData := range b.block.tags {
+               if index < len(tagData.values) {
+                       row := &tagData.values[index]
+                       tag := Tag{
+                               Name:      tagName,
+                               ValueType: tagData.valueType,
+                       }
+                       if len(row.valueArr) > 0 {
+                               tag.ValueArr = row.valueArr
+                       } else if len(row.value) > 0 {
+                               tag.Value = row.value
+                       }
+                       b.bc.tags[tagName] = append(b.bc.tags[tagName], tag)
+               }
+       }
+}
+
+func (b *blockCursorBuilder) checkDuplicate(dataBytes []byte, recordMetric 
bool) (uint64, bool) {
+       hash := convert.Hash(dataBytes)
+       bucket := b.seen[hash]
+       for _, existing := range bucket {
+               if bytes.Equal(existing, dataBytes) {
+                       if recordMetric && b.metrics != nil {
+                               b.metrics.elementsDeduplicated.Add(1)
+                       }
+                       return hash, true
+               }
+       }
+       return hash, false
+}
+
+func (b *blockCursorBuilder) markSeen(hash uint64, dataBytes []byte) {
+       b.seen[hash] = append(b.seen[hash], dataBytes)
+}
+
+func (b *blockCursorBuilder) keyInRange(key int64) bool {
+       if b.hasMin && key < b.minKey {
+               return false
+       }
+       if b.hasMax && key > b.maxKey {
+               return false
+       }
+       return true
+}
+
 // init initializes the block cursor.
 func (bc *blockCursor) init(p *part, bm *blockMetadata, req QueryRequest) {
        bc.p = p
@@ -288,6 +430,19 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock 
*block, bs blockScanRes
                return false
        }
 
+       var (
+               minKey int64
+               maxKey int64
+               hasMin = req.MinKey != nil
+               hasMax = req.MaxKey != nil
+       )
+       if hasMin {
+               minKey = *req.MinKey
+       }
+       if hasMax {
+               maxKey = *req.MaxKey
+       }
+
        // Pre-allocate slices for filtered data (optimize for common case 
where most elements match)
        bc.userKeys = make([]int64, 0, totalElements)
        bc.data = make([][]byte, 0, totalElements)
@@ -299,118 +454,23 @@ func (s *sidx) loadBlockCursor(bc *blockCursor, tmpBlock 
*block, bs blockScanRes
        }
 
        // Track seen data for deduplication using hash buckets with collision 
checks
-       seenData := make(map[uint64][][]byte)
+       builder := &blockCursorBuilder{
+               bc:      bc,
+               block:   tmpBlock,
+               hasMin:  hasMin,
+               minKey:  minKey,
+               hasMax:  hasMax,
+               maxKey:  maxKey,
+               metrics: metrics,
+               seen:    make(map[uint64][][]byte),
+       }
 
-       // Single loop: filter and copy data in one pass
        if req.TagFilter != nil {
-               tags := make([]*modelv1.Tag, 0, len(tmpBlock.tags))
-               decoder := req.TagFilter.GetDecoder()
-
-               for i := 0; i < totalElements; i++ {
-                       // Check for duplicate data before processing via hash 
+ bytes.Equal on collisions
-                       dataBytes := tmpBlock.data[i]
-                       h := convert.Hash(dataBytes)
-                       bucket := seenData[h]
-                       duplicate := false
-                       for _, b := range bucket {
-                               if bytes.Equal(b, dataBytes) {
-                                       duplicate = true
-                                       break
-                               }
-                       }
-                       if duplicate {
-                               if metrics != nil {
-                                       metrics.elementsDeduplicated.Add(1)
-                               }
-                               continue
-                       }
-
-                       // Build tags slice for this element
-                       tags = tags[:0]
-                       for tagName, tagData := range tmpBlock.tags {
-                               if i < len(tagData.values) && tagData.values[i] 
!= nil {
-                                       // Decode []byte to *modelv1.TagValue 
using the provided decoder
-                                       tagValue := decoder(tagData.valueType, 
tagData.values[i])
-                                       if tagValue != nil {
-                                               tags = append(tags, 
&modelv1.Tag{
-                                                       Key:   tagName,
-                                                       Value: tagValue,
-                                               })
-                                       }
-                               }
-                       }
-
-                       // Apply filter
-                       matched, err := req.TagFilter.Match(tags)
-                       if err != nil {
-                               s.l.Error().Err(err).Msg("tag filter match 
error")
-                               return false
-                       }
-
-                       if matched {
-                               // Mark data as seen
-                               seenData[h] = append(bucket, dataBytes)
-
-                               // Copy userKey
-                               bc.userKeys = append(bc.userKeys, 
tmpBlock.userKeys[i])
-
-                               // Copy data
-                               dataCopy := make([]byte, len(tmpBlock.data[i]))
-                               copy(dataCopy, tmpBlock.data[i])
-                               bc.data = append(bc.data, dataCopy)
-
-                               // Copy tags
-                               for tagName, tagData := range tmpBlock.tags {
-                                       if i < len(tagData.values) {
-                                               bc.tags[tagName] = 
append(bc.tags[tagName], Tag{
-                                                       Name:      tagName,
-                                                       Value:     
tagData.values[i],
-                                                       ValueType: 
tagData.valueType,
-                                               })
-                                       }
-                               }
-                       }
+               if err := builder.processWithFilter(req, s.l); err != nil {
+                       return false
                }
        } else {
-               // No filter - copy all elements but skip duplicates
-               for i := 0; i < totalElements; i++ {
-                       // Check for duplicate data via hash + bytes.Equal on 
collisions
-                       dataBytes := tmpBlock.data[i]
-                       h := convert.Hash(dataBytes)
-                       bucket := seenData[h]
-                       duplicate := false
-                       for _, b := range bucket {
-                               if bytes.Equal(b, dataBytes) {
-                                       duplicate = true
-                                       break
-                               }
-                       }
-                       if duplicate {
-                               continue
-                       }
-
-                       // Mark data as seen
-                       seenData[h] = append(bucket, dataBytes)
-
-                       // Copy userKey
-                       bc.userKeys = append(bc.userKeys, tmpBlock.userKeys[i])
-
-                       // Copy data
-                       dataCopy := make([]byte, len(tmpBlock.data[i]))
-                       copy(dataCopy, tmpBlock.data[i])
-                       bc.data = append(bc.data, dataCopy)
-
-                       // Copy tags
-                       for tagName, tagData := range tmpBlock.tags {
-                               if i < len(tagData.values) {
-                                       bc.tags[tagName] = 
append(bc.tags[tagName], Tag{
-                                               Name:      tagName,
-                                               Value:     tagData.values[i],
-                                               ValueType: tagData.valueType,
-                                       })
-                               }
-                       }
-               }
+               builder.processWithoutFilter()
        }
 
        if metrics != nil {
diff --git a/banyand/internal/sidx/tag.go b/banyand/internal/sidx/tag.go
index d02ebf24..0314500f 100644
--- a/banyand/internal/sidx/tag.go
+++ b/banyand/internal/sidx/tag.go
@@ -20,6 +20,8 @@ package sidx
 import (
        "fmt"
 
+       internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
+       "github.com/apache/skywalking-banyandb/pkg/bytes"
        pkgencoding "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/filter"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -44,12 +46,21 @@ type tagMetadata struct {
 
 // tagData represents the runtime data for a tag with filtering capabilities.
 type tagData struct {
-       values    [][]byte
-       filter    *filter.BloomFilter // For indexed tags
-       name      string
-       min       []byte // For int64 tags
-       max       []byte // For int64 tags
-       valueType pbv1.ValueType
+       uniqueValues map[string]struct{}
+       name         string
+       values       []tagRow
+       tmpBytes     [][]byte
+       valueType    pbv1.ValueType
+}
+
+type tagRow struct {
+       value    []byte
+       valueArr [][]byte
+}
+
+func (tr *tagRow) reset() {
+       tr.value = nil
+       tr.valueArr = tr.valueArr[:0]
 }
 
 var (
@@ -101,19 +112,20 @@ func (td *tagData) reset() {
 
        // Reset values slice
        for i := range td.values {
-               td.values[i] = nil
+               td.values[i].reset()
        }
        td.values = td.values[:0]
 
-       // Reset filter
-       if td.filter != nil {
-               releaseBloomFilter(td.filter)
-               td.filter = nil
+       // Reset tmpBytes slice
+       for i := range td.tmpBytes {
+               td.tmpBytes[i] = nil
        }
+       td.tmpBytes = td.tmpBytes[:0]
 
-       // Reset min/max
-       td.min = nil
-       td.max = nil
+       // Reset uniqueValues map for reuse
+       for k := range td.uniqueValues {
+               delete(td.uniqueValues, k)
+       }
 }
 
 // reset clears tagMetadata for reuse in object pool.
@@ -134,8 +146,7 @@ func generateBloomFilter(expectedElements int) 
*filter.BloomFilter {
        }
        // Reset and resize for new expected elements
        v.SetN(expectedElements)
-       m := expectedElements * filter.B
-       v.ResizeBits((m + 63) / 64)
+       v.ResizeBits(filter.OptimalBitsSize(expectedElements))
        return v
 }
 
@@ -166,11 +177,9 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter, 
error) {
 
        n := pkgencoding.BytesToInt64(src)
        bf := generateBloomFilter(int(n))
+       bitsLen := len(bf.Bits())
 
-       m := n * filter.B
-       bits := make([]uint64, 0, (m+63)/64)
-       var err error
-       bits, _, err = pkgencoding.DecodeUint64Block(bits, src[8:], 
uint64((m+63)/64))
+       bits, _, err := pkgencoding.DecodeUint64Block(bf.Bits()[:0], src[8:], 
uint64(bitsLen))
        if err != nil {
                releaseBloomFilter(bf)
                return nil, fmt.Errorf("failed to decode bloom filter bits: 
%w", err)
@@ -180,40 +189,59 @@ func decodeBloomFilter(src []byte) (*filter.BloomFilter, 
error) {
        return bf, nil
 }
 
-// updateMinMax updates min/max values for int64 tags.
-func (td *tagData) updateMinMax() {
-       if td.valueType != pbv1.ValueTypeInt64 || len(td.values) == 0 {
-               return
+// marshalTagRow marshals the tagRow value to a byte slice.
+func marshalTagRow(tr *tagRow, valueType pbv1.ValueType) []byte {
+       if tr.valueArr != nil {
+               var dst []byte
+               for i := range tr.valueArr {
+                       if valueType == pbv1.ValueTypeInt64Arr {
+                               dst = append(dst, tr.valueArr[i]...)
+                               continue
+                       }
+                       dst = internalencoding.MarshalVarArray(dst, 
tr.valueArr[i])
+               }
+               return dst
+       }
+       return tr.value
+}
+
+// decodeAndConvertTagValues decodes encoded tag values and converts them to 
tagRow format.
+// This is a common operation shared between reading from disk and querying.
+func decodeAndConvertTagValues(td *tagData, decoder 
*pkgencoding.BytesBlockDecoder, encodedData *bytes.Buffer, valueType 
pbv1.ValueType, count int) error {
+       var err error
+
+       // Decode to tmpBytes buffer, reusing the existing slice to avoid 
allocations
+       td.tmpBytes, err = internalencoding.DecodeTagValues(td.tmpBytes[:0], 
decoder, encodedData, valueType, count)
+       if err != nil {
+               return fmt.Errorf("cannot decode tag values: %w", err)
        }
 
-       var minVal, maxVal int64
-       first := true
+       // Convert [][]byte to []tagRow based on valueType
+       if cap(td.values) < len(td.tmpBytes) {
+               td.values = make([]tagRow, len(td.tmpBytes))
+       } else {
+               td.values = td.values[:len(td.tmpBytes)]
+       }
 
-       for _, value := range td.values {
-               if len(value) != 8 {
-                       continue // Skip invalid int64 values
+       for i, encodedValue := range td.tmpBytes {
+               if encodedValue == nil {
+                       td.values[i] = tagRow{}
+                       continue
                }
 
-               val := pkgencoding.BytesToInt64(value)
-
-               if first {
-                       minVal = val
-                       maxVal = val
-                       first = false
-               } else {
-                       if val < minVal {
-                               minVal = val
-                       }
-                       if val > maxVal {
-                               maxVal = val
+               if valueType == pbv1.ValueTypeStrArr || valueType == 
pbv1.ValueTypeInt64Arr {
+                       // For array types, unmarshal to valueArr
+                       td.values[i].valueArr, err = 
unmarshalTag(td.values[i].valueArr[:0], encodedValue, valueType)
+                       if err != nil {
+                               return fmt.Errorf("cannot unmarshal tag array: 
%w", err)
                        }
+               } else {
+                       // For scalar types, set value directly
+                       td.values[i].value = encodedValue
                }
        }
 
-       if !first {
-               td.min = pkgencoding.Int64ToBytes(nil, minVal)
-               td.max = pkgencoding.Int64ToBytes(nil, maxVal)
-       }
+       return nil
 }
 
 // marshal serializes tag metadata to bytes using encoding package.
diff --git a/banyand/internal/sidx/tag_filter_op.go 
b/banyand/internal/sidx/tag_filter_op.go
index 620f8aff..2f3318cc 100644
--- a/banyand/internal/sidx/tag_filter_op.go
+++ b/banyand/internal/sidx/tag_filter_op.go
@@ -246,9 +246,9 @@ func decodeBloomFilterFromBytes(src []byte, bf 
*filter.BloomFilter) *filter.Bloo
        n := encoding.BytesToInt64(src)
        bf.SetN(int(n))
 
-       m := n * filter.B
+       // With B=16, use optimized bit shift calculation
        bits := make([]uint64, 0)
-       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64((m+63)/64))
+       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64(filter.OptimalBitsSize(int(n))))
        if err != nil {
                logger.Panicf("failed to decode Bloom filter: %v", err)
        }
diff --git a/banyand/internal/sidx/tag_test.go 
b/banyand/internal/sidx/tag_test.go
index 2f96ff2e..e5eb113b 100644
--- a/banyand/internal/sidx/tag_test.go
+++ b/banyand/internal/sidx/tag_test.go
@@ -21,6 +21,7 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
@@ -115,3 +116,100 @@ func TestTagInWriteRequest(t *testing.T) {
        assert.Equal(t, "service", req.Tags[0].Name)
        assert.Equal(t, "environment", req.Tags[1].Name)
 }
+
+func TestEncodeDecodeBloomFilter_RoundTrip(t *testing.T) {
+       // Test round-trip encoding and decoding
+       testCases := []struct {
+               name          string
+               itemsToAdd    [][]byte
+               itemsToCheck  [][]byte
+               shouldContain []bool
+               expectedItems int
+       }{
+               {
+                       name:          "small filter",
+                       expectedItems: 5,
+                       itemsToAdd: [][]byte{
+                               []byte("item1"),
+                               []byte("item2"),
+                               []byte("item3"),
+                       },
+                       itemsToCheck: [][]byte{
+                               []byte("item1"),
+                               []byte("item2"),
+                               []byte("item3"),
+                               []byte("not-added"),
+                       },
+                       shouldContain: []bool{true, true, true, false},
+               },
+               {
+                       name:          "medium filter",
+                       expectedItems: 100,
+                       itemsToAdd: [][]byte{
+                               []byte("service1"),
+                               []byte("service2"),
+                               []byte("service3"),
+                       },
+                       itemsToCheck: [][]byte{
+                               []byte("service1"),
+                               []byte("service2"),
+                               []byte("service3"),
+                               []byte("not-added"),
+                       },
+                       shouldContain: []bool{true, true, true, false},
+               },
+               {
+                       name:          "empty filter",
+                       expectedItems: 0,
+                       itemsToAdd:    [][]byte{},
+                       itemsToCheck: [][]byte{
+                               []byte("any-item"),
+                       },
+                       shouldContain: []bool{false},
+               },
+       }
+
+       for _, tc := range testCases {
+               t.Run(tc.name, func(t *testing.T) {
+                       // Create and populate original filter
+                       original := generateBloomFilter(tc.expectedItems)
+                       defer releaseBloomFilter(original)
+
+                       for _, item := range tc.itemsToAdd {
+                               original.Add(item)
+                       }
+
+                       // Encode
+                       dst := make([]byte, 0)
+                       encoded := encodeBloomFilter(dst, original)
+                       require.Greater(t, len(encoded), 0, "encoded data 
should not be empty")
+
+                       // Decode
+                       decoded, err := decodeBloomFilter(encoded)
+                       require.NoError(t, err, "decoding should succeed")
+                       require.NotNil(t, decoded, "decoded filter should not 
be nil")
+                       defer releaseBloomFilter(decoded)
+
+                       // Verify N matches
+                       assert.Equal(t, original.N(), decoded.N(), "N should 
match")
+
+                       // Verify bits match
+                       originalBits := original.Bits()
+                       decodedBits := decoded.Bits()
+                       assert.Equal(t, len(originalBits), len(decodedBits), 
"bits length should match")
+                       for i := range originalBits {
+                               assert.Equal(t, originalBits[i], 
decodedBits[i], "bits[%d] should match", i)
+                       }
+
+                       // Verify filter behavior matches
+                       for i, item := range tc.itemsToCheck {
+                               originalResult := original.MightContain(item)
+                               decodedResult := decoded.MightContain(item)
+                               assert.Equal(t, originalResult, decodedResult, 
"MightContain for item %d should match", i)
+                               if len(tc.shouldContain) > i {
+                                       assert.Equal(t, tc.shouldContain[i], 
decodedResult, "MightContain result should match expected")
+                               }
+                       }
+               })
+       }
+}
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index 3cf8c5c3..ddaa6ec7 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -113,7 +113,7 @@ func (b *block) processTags(tf tagValues, tagFamilyIdx, i 
int, elementsLen int)
                        tags[j].filter = filter
                }
                tags[j].filter.SetN(elementsLen)
-               tags[j].filter.ResizeBits((elementsLen*filter.B + 63) / 64)
+               tags[j].filter.ResizeBits(filter.OptimalBitsSize(elementsLen))
                tags[j].filter.Add(t.value)
                if t.valueType == pbv1.ValueTypeInt64 {
                        if len(tags[j].min) == 0 {
diff --git a/banyand/stream/tag_filter.go b/banyand/stream/tag_filter.go
index 1f7925be..3a8748a5 100644
--- a/banyand/stream/tag_filter.go
+++ b/banyand/stream/tag_filter.go
@@ -41,9 +41,9 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter) 
*filter.BloomFilter {
        n := encoding.BytesToInt64(src)
        bf.SetN(int(n))
 
-       m := n * filter.B
+       // With B=16, use optimized bit shift calculation
        bits := make([]uint64, 0)
-       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64((m+63)/64))
+       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64(filter.OptimalBitsSize(int(n))))
        if err != nil {
                logger.Panicf("failed to decode Bloom filter: %v", err)
        }
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index a3993dde..46aef8c4 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -192,7 +192,7 @@ func (bw *blockWriter) MustInitForMemPart(mp *memPart, 
traceSize int) {
                        filter: generateTraceIDBloomFilter(),
                }
                bw.traceIDFilter.filter.SetN(traceSize)
-               bw.traceIDFilter.filter.ResizeBits((traceSize*filter.B + 63) / 
64)
+               
bw.traceIDFilter.filter.ResizeBits(filter.OptimalBitsSize(traceSize))
        }
 }
 
@@ -214,7 +214,7 @@ func (bw *blockWriter) mustInitForFilePart(fileSystem 
fs.FileSystem, path string
                        filter: generateTraceIDBloomFilter(),
                }
                bw.traceIDFilter.filter.SetN(traceSize)
-               bw.traceIDFilter.filter.ResizeBits((traceSize*filter.B + 63) / 
64)
+               
bw.traceIDFilter.filter.ResizeBits(filter.OptimalBitsSize(traceSize))
        }
 }
 
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 8bbbe26c..fb4da2ce 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -34,9 +34,9 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter) 
*filter.BloomFilter {
        n := encoding.BytesToInt64(src)
        bf.SetN(int(n))
 
-       m := n * filter.B
+       // With B=16, use optimized bit shift calculation
        bits := make([]uint64, 0)
-       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64((m+63)/64))
+       bits, _, err := encoding.DecodeUint64Block(bits[:0], src[8:], 
uint64(filter.OptimalBitsSize(int(n))))
        if err != nil {
                logger.Panicf("failed to decode Bloom filter: %v", err)
        }
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 2b0d9ded..6fa20fae 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -28,6 +28,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/convert"
@@ -522,9 +523,9 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value 
[]byte) *modelv1.TagValu
                defer bigValuePool.Release(bb)
                var err error
                for len(value) > 0 {
-                       bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0], 
value)
+                       bb.Buf, value, err = 
encoding.UnmarshalVarArray(bb.Buf[:0], value)
                        if err != nil {
-                               logger.Panicf("unmarshalVarArray failed: %v", 
err)
+                               logger.Panicf("UnmarshalVarArray failed: %v", 
err)
                        }
                        values = append(values, string(bb.Buf))
                }
diff --git a/banyand/trace/snapshot_test.go b/banyand/trace/snapshot_test.go
index f028f66e..81f8c916 100644
--- a/banyand/trace/snapshot_test.go
+++ b/banyand/trace/snapshot_test.go
@@ -125,13 +125,13 @@ func TestSnapshotGetParts(t *testing.T) {
                        snapshot: func() *snapshot {
                                bf1 := filter.NewBloomFilter(0)
                                bf1.SetN(2)
-                               bf1.ResizeBits((2*filter.B + 63) / 64)
+                               bf1.ResizeBits(filter.OptimalBitsSize(2))
                                bf1.Add(convert.StringToBytes("trace1"))
                                bf1.Add(convert.StringToBytes("trace2"))
 
                                bf2 := filter.NewBloomFilter(0)
                                bf2.SetN(1)
-                               bf2.ResizeBits((1*filter.B + 63) / 64)
+                               bf2.ResizeBits(filter.OptimalBitsSize(1))
                                bf2.Add(convert.StringToBytes("trace3"))
 
                                return &snapshot{
@@ -182,7 +182,7 @@ func TestSnapshotGetParts(t *testing.T) {
                        snapshot: func() *snapshot {
                                bf := filter.NewBloomFilter(0)
                                bf.SetN(2)
-                               bf.ResizeBits((2*filter.B + 63) / 64)
+                               bf.ResizeBits(filter.OptimalBitsSize(2))
                                bf.Add(convert.StringToBytes("trace1"))
                                bf.Add(convert.StringToBytes("trace2"))
 
diff --git a/banyand/trace/traces.go b/banyand/trace/traces.go
index 1fc5b5ab..5b057af1 100644
--- a/banyand/trace/traces.go
+++ b/banyand/trace/traces.go
@@ -18,8 +18,6 @@
 package trace
 
 import (
-       "github.com/pkg/errors"
-
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
@@ -74,31 +72,6 @@ func releaseTagValue(v *tagValue) {
 
 var tagValuePool = pool.Register[*tagValue]("trace-tagValue")
 
-func unmarshalVarArray(dest, src []byte) ([]byte, []byte, error) {
-       if len(src) == 0 {
-               return nil, nil, errors.New("empty entity value")
-       }
-       if src[0] == encoding.EntityDelimiter {
-               return dest, src[1:], nil
-       }
-       for len(src) > 0 {
-               switch {
-               case src[0] == encoding.Escape:
-                       if len(src) < 2 {
-                               return nil, nil, errors.New("invalid escape 
character")
-                       }
-                       src = src[1:]
-                       dest = append(dest, src[0])
-               case src[0] == encoding.EntityDelimiter:
-                       return dest, src[1:], nil
-               default:
-                       dest = append(dest, src[0])
-               }
-               src = src[1:]
-       }
-       return nil, nil, errors.New("invalid variable array")
-}
-
 type traces struct {
        traceIDs   []string
        timestamps []int64
diff --git a/pkg/filter/bloom_filter.go b/pkg/filter/bloom_filter.go
index 98ff47c4..ad53692d 100644
--- a/pkg/filter/bloom_filter.go
+++ b/pkg/filter/bloom_filter.go
@@ -26,11 +26,25 @@ import (
 )
 
 const (
-       k = 6
+       k = 10
        // B specifies the number of bits allocated for each item.
-       B = 15
+       // Using B=16 (power of 2) maintains memory alignment and enables 
shift-based math.
+       // With 8k items per block: memory = 8192 * 16 / 8 = 16KB per block.
+       // FPR with k=10, B=16: ~0.042%.
+       B = 16
+       // _bMustBePowerOf2 ensures B is a power of 2 at compile time.
+       // A number is a power of 2 if and only if B > 0 and B & (B - 1) == 0.
+       // This check uses: 1 / (1 - (B & (B - 1))).
+       // - For powers of 2: B & (B - 1) == 0, so 1 / (1 - 0) = 1 (valid).
+       // - For non-powers of 2 where B & (B - 1) == 1: 1 / (1 - 1) = 1 / 0 
(compile error).
+       // - For other non-powers of 2: may compile but provides basic 
validation.
+       // Note: This is a best-effort compile-time check. For complete 
validation,
+       // ensure B is explicitly set to a power of 2 (1, 2, 4, 8, 16, 32, 64, 
...).
+       _bMustBePowerOf2 = 1 / (1 - (B & (B - 1)))
 )
 
+var _ = _bMustBePowerOf2 // Ensure compile-time check is evaluated
+
 // BloomFilter is a probabilistic data structure designed to test whether an 
element is a member of a set.
 type BloomFilter struct {
        bits []uint64
@@ -39,8 +53,14 @@ type BloomFilter struct {
 
 // NewBloomFilter creates a new Bloom filter with the number of items n and 
false positive rate p.
 func NewBloomFilter(n int) *BloomFilter {
-       m := n * B
-       bits := make([]uint64, (m+63)/64)
+       // With B=16, we can optimize: m = n * 16 = n << 4
+       // Number of uint64s needed: (n * 16) / 64 = n / 4 = n >> 2
+       // Ensure at least 1 uint64 to avoid empty slice
+       numBits := n >> 2
+       if numBits == 0 {
+               numBits = 1
+       }
+       bits := make([]uint64, numBits)
        return &BloomFilter{
                bits,
                n,
@@ -49,6 +69,9 @@ func NewBloomFilter(n int) *BloomFilter {
 
 // Reset resets the Bloom filter.
 func (bf *BloomFilter) Reset() {
+       for i := range bf.bits {
+               bf.bits[i] = 0
+       }
        bf.bits = bf.bits[:0]
        bf.n = 0
 }
@@ -130,3 +153,13 @@ func (bf *BloomFilter) ResizeBits(n int) {
        bits = bits[:n]
        bf.bits = bits
 }
+
+// OptimalBitsSize returns the optimal number of uint64s needed for n items.
+// With B=16, this is simply n/4 (n >> 2), with a minimum of 1.
+func OptimalBitsSize(n int) int {
+       size := n >> 2
+       if size == 0 {
+               return 1
+       }
+       return size
+}
diff --git a/pkg/filter/bloom_filter_test.go b/pkg/filter/bloom_filter_test.go
index e76204b7..82fa7525 100644
--- a/pkg/filter/bloom_filter_test.go
+++ b/pkg/filter/bloom_filter_test.go
@@ -55,6 +55,22 @@ func TestBloomFilter(t *testing.T) {
        }
 }
 
+func TestBloomFilterResetClearsBits(t *testing.T) {
+       assert := assert.New(t)
+
+       const expected = 16
+       key := []byte("reuse-key")
+
+       bf := NewBloomFilter(expected)
+       assert.True(bf.Add(key))
+
+       bf.Reset()
+       bf.SetN(expected)
+       bf.ResizeBits(OptimalBitsSize(expected))
+
+       assert.False(bf.MightContain(key))
+}
+
 func BenchmarkFilterAdd(b *testing.B) {
        for _, n := range []int{1e3, 1e4, 1e5, 1e6, 1e7} {
                data := generateTestData(n)

Reply via email to