This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch sidx/interface
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e580e7d204fbffb4945718178f06b8e0df72dd7b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Tue Aug 19 18:32:10 2025 +0700

    Enhance tag encoding and decoding with compression type prefix
    
    - Updated `encodeInt64TagValues` and `encodeFloat64TagValues` functions to 
prepend the encoding type byte to the compressed data, ensuring proper 
identification during decoding.
    - Modified `decodeInt64TagValues` and `decodeFloat64TagValues` functions to 
handle the new encoding type prefix, improving data integrity and error 
handling.
    - Refactored cache implementation in `serviceCache` to streamline entry 
removal and size management, enhancing performance and maintainability.
---
 banyand/internal/encoding/tag_encoder.go |  71 +++----
 banyand/internal/storage/cache.go        |  66 +++++--
 banyand/measure/block_metadata.go        |  29 ++-
 banyand/measure/block_metadata_test.go   | 314 +++++++++++++++++++++++++++++++
 4 files changed, 427 insertions(+), 53 deletions(-)

diff --git a/banyand/internal/encoding/tag_encoder.go 
b/banyand/internal/encoding/tag_encoder.go
index fe5160a0..7859dd1d 100644
--- a/banyand/internal/encoding/tag_encoder.go
+++ b/banyand/internal/encoding/tag_encoder.go
@@ -147,10 +147,11 @@ func encodeInt64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                if v == nil || string(v) == "null" {
                        // Handle null values by falling back to default 
encoding
                        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
                        // Apply zstd compression for plain encoding
                        compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
-                       return compressed, nil
+                       // Prepend EncodeTypePlain at the head of compressed 
data
+                       result := 
append([]byte{byte(encoding.EncodeTypePlain)}, compressed...)
+                       return result, nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
@@ -188,10 +189,11 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                if v == nil || string(v) == "null" {
                        // Handle null values by falling back to default 
encoding
                        bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-                       bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
                        // Apply zstd compression for plain encoding
                        compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
-                       return compressed, nil
+                       // Prepend EncodeTypePlain at the head of compressed 
data
+                       result := 
append([]byte{byte(encoding.EncodeTypePlain)}, compressed...)
+                       return result, nil
                }
                if len(v) != 8 {
                        logger.Panicf("invalid value length at index %d: 
expected 8 bytes, got %d", i, len(v))
@@ -204,10 +206,11 @@ func encodeFloat64TagValues(bb *bytes.Buffer, values 
[][]byte) ([]byte, error) {
                logger.Errorf("cannot convert Float64List to DecimalIntList: 
%v", err)
                // Handle error by falling back to default encoding
                bb.Buf = encoding.EncodeBytesBlock(bb.Buf[:0], values)
-               bb.Buf = append([]byte{byte(encoding.EncodeTypePlain)}, 
bb.Buf...)
                // Apply zstd compression for plain encoding
                compressed := zstd.Compress(nil, bb.Buf, 
defaultCompressionLevel)
-               return compressed, nil
+               // Prepend EncodeTypePlain at the head of compressed data
+               result := append([]byte{byte(encoding.EncodeTypePlain)}, 
compressed...)
+               return result, nil
        }
 
        var firstValue int64
@@ -256,28 +259,29 @@ func decodeInt64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffer,
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
        }
 
-       // Check if this is zstd compressed data (no encode type prefix)
-       decompressed, err := zstd.Decompress(nil, bb.Buf)
-       if err == nil {
-               // Successfully decompressed, this is compressed data
-               bb.Buf = decompressed
-               if len(bb.Buf) < 1 {
-                       logger.Panicf("decompressed data too short: expect at 
least %d bytes, but got %d bytes", 1, len(bb.Buf))
+       // Check the first byte to determine the encoding type
+       firstByte := encoding.EncodeType(bb.Buf[0])
+
+       if firstByte == encoding.EncodeTypePlain {
+               // This is compressed data with EncodeTypePlain at the head
+               // Skip the EncodeTypePlain byte and decompress the rest
+               compressedData := bb.Buf[1:]
+               decompressed, err := zstd.Decompress(nil, compressedData)
+               if err != nil {
+                       logger.Panicf("cannot decompress data: %v", err)
                }
-       }
 
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               // Data is already decompressed, decode directly without trying 
to decompress again
+               // Decode the decompressed data
                values := make([][]byte, 0, count)
-               values, decodeErr := decoder.Decode(values[:0], bb.Buf, count)
+               values, decodeErr := decoder.Decode(values[:0], decompressed, 
count)
                if decodeErr != nil {
                        logger.Panicf("cannot decode values: %v", decodeErr)
                }
                return values, nil
        }
 
+       // Otherwise, this is int list data with EncodeType at the beginning
+       encodeType := firstByte
        const expectedLen = 9
        if len(bb.Buf) < expectedLen {
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
@@ -285,6 +289,7 @@ func decodeInt64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffer,
        firstValue := convert.BytesToInt64(bb.Buf[1:9])
        bb.Buf = bb.Buf[9:]
 
+       var err error
        intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
        if err != nil {
                logger.Panicf("cannot decode int values: %v", err)
@@ -311,28 +316,29 @@ func decodeFloat64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffe
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", 1, len(bb.Buf))
        }
 
-       // Check if this is zstd compressed data (no encode type prefix)
-       decompressed, err := zstd.Decompress(nil, bb.Buf)
-       if err == nil {
-               // Successfully decompressed, this is compressed data
-               bb.Buf = decompressed
-               if len(bb.Buf) < 1 {
-                       logger.Panicf("decompressed data too short: expect at 
least %d bytes, but got %d bytes", 1, len(bb.Buf))
+       // Check the first byte to determine the encoding type
+       firstByte := encoding.EncodeType(bb.Buf[0])
+
+       if firstByte == encoding.EncodeTypePlain {
+               // This is compressed data with EncodeTypePlain at the head
+               // Skip the EncodeTypePlain byte and decompress the rest
+               compressedData := bb.Buf[1:]
+               decompressed, err := zstd.Decompress(nil, compressedData)
+               if err != nil {
+                       logger.Panicf("cannot decompress data: %v", err)
                }
-       }
 
-       encodeType := encoding.EncodeType(bb.Buf[0])
-       if encodeType == encoding.EncodeTypePlain {
-               bb.Buf = bb.Buf[1:]
-               // Data is already decompressed, decode directly without trying 
to decompress again
+               // Decode the decompressed data
                values := make([][]byte, 0, count)
-               values, decodeErr := decoder.Decode(values[:0], bb.Buf, count)
+               values, decodeErr := decoder.Decode(values[:0], decompressed, 
count)
                if decodeErr != nil {
                        logger.Panicf("cannot decode values: %v", decodeErr)
                }
                return values, nil
        }
 
+       // Otherwise, this is float64 int list data with EncodeType at the 
beginning
+       encodeType := firstByte
        const expectedLen = 11
        if len(bb.Buf) < expectedLen {
                logger.Panicf("bb.Buf length too short: expect at least %d 
bytes, but got %d bytes", expectedLen, len(bb.Buf))
@@ -341,6 +347,7 @@ func decodeFloat64TagValues(decoder 
*encoding.BytesBlockDecoder, bb *bytes.Buffe
        firstValue := convert.BytesToInt64(bb.Buf[3:11])
        bb.Buf = bb.Buf[11:]
 
+       var err error
        intValues, err = encoding.BytesToInt64List(intValues[:0], bb.Buf, 
encodeType, firstValue, int(count))
        if err != nil {
                logger.Panicf("cannot decode int values: %v", err)
diff --git a/banyand/internal/storage/cache.go 
b/banyand/internal/storage/cache.go
index 7a890d82..2ca868e0 100644
--- a/banyand/internal/storage/cache.go
+++ b/banyand/internal/storage/cache.go
@@ -169,12 +169,12 @@ func (sc *serviceCache) startCleaner() {
 
 func (sc *serviceCache) removeEntry(key EntryKey) {
        if entry, exists := sc.entry[key]; exists {
-               atomic.AddUint64(&sc.currentSize, ^(entry.size - 1))
+               sc.atomicSubtract(entry.size)
                delete(sc.entry, key)
-               if ei, exists := sc.entryIndex[key]; exists && ei.index >= 0 && 
ei.index < sc.entryIndexHeap.Len() {
-                       heap.Remove(sc.entryIndexHeap, ei.index)
-                       delete(sc.entryIndex, key)
-               }
+       }
+       if ei, exists := sc.entryIndex[key]; exists && ei.index >= 0 && 
ei.index < sc.entryIndexHeap.Len() {
+               heap.Remove(sc.entryIndexHeap, ei.index)
+               delete(sc.entryIndex, key)
        }
 }
 
@@ -186,11 +186,17 @@ func (sc *serviceCache) clean() {
                case <-ticker.C:
                        now := uint64(time.Now().UnixNano())
                        sc.mu.Lock()
+                       // Collect keys to remove to avoid modifying map during 
iteration
+                       var keysToRemove []EntryKey
                        for key, entry := range sc.entry {
                                if now-atomic.LoadUint64(&entry.lastAccess) > 
uint64(sc.idleTimeout.Nanoseconds()) {
-                                       sc.removeEntry(key)
+                                       keysToRemove = append(keysToRemove, key)
                                }
                        }
+                       // Remove expired entries
+                       for _, key := range keysToRemove {
+                               sc.removeEntry(key)
+                       }
                        sc.mu.Unlock()
                case <-sc.stopCh:
                        return
@@ -211,15 +217,19 @@ func (sc *serviceCache) Get(key EntryKey) Sizable {
 
        sc.mu.RLock()
        entry := sc.entry[key]
+       ei := sc.entryIndex[key]
        sc.mu.RUnlock()
 
        if entry != nil {
                now := uint64(time.Now().UnixNano())
                if atomic.LoadUint64(&entry.lastAccess) != now {
                        sc.mu.Lock()
-                       atomic.StoreUint64(&entry.lastAccess, now)
-                       if ei := sc.entryIndex[key]; ei != nil {
-                               heap.Fix(sc.entryIndexHeap, ei.index)
+                       // Verify entry still exists and update access time
+                       if currentEntry := sc.entry[key]; currentEntry == entry 
{
+                               atomic.StoreUint64(&entry.lastAccess, now)
+                               if ei != nil && ei.index >= 0 && ei.index < 
sc.entryIndexHeap.Len() {
+                                       heap.Fix(sc.entryIndexHeap, ei.index)
+                               }
                        }
                        sc.mu.Unlock()
                }
@@ -238,14 +248,26 @@ func (sc *serviceCache) Put(key EntryKey, value Sizable) {
        entryOverhead := uint64(unsafe.Sizeof(entry{}) + 
unsafe.Sizeof(entryIndex{}) + unsafe.Sizeof(key))
        totalSize := valueSize + entryOverhead
 
+       // Remove existing entry if present
        if existing, exists := sc.entry[key]; exists {
-               atomic.AddUint64(&sc.currentSize, ^(existing.size - 1))
-               sc.removeEntry(key)
+               sc.atomicSubtract(existing.size)
+               delete(sc.entry, key)
+               if ei, exists := sc.entryIndex[key]; exists {
+                       if ei.index >= 0 && ei.index < sc.entryIndexHeap.Len() {
+                               heap.Remove(sc.entryIndexHeap, ei.index)
+                       }
+                       delete(sc.entryIndex, key)
+               }
        }
 
-       for atomic.LoadUint64(&sc.currentSize)+totalSize > sc.maxCacheSize && 
sc.len() > 0 {
+       // Evict entries until there's space
+       for atomic.LoadUint64(&sc.currentSize)+totalSize > sc.maxCacheSize && 
sc.entryIndexHeap.Len() > 0 {
                ei := heap.Pop(sc.entryIndexHeap).(*entryIndex)
-               sc.removeEntry(ei.key)
+               if entry, exists := sc.entry[ei.key]; exists {
+                       sc.atomicSubtract(entry.size)
+                       delete(sc.entry, ei.key)
+               }
+               delete(sc.entryIndex, ei.key)
        }
 
        now := uint64(time.Now().UnixNano())
@@ -283,8 +305,6 @@ func (sc *serviceCache) len() uint64 {
 }
 
 func (sc *serviceCache) Size() uint64 {
-       sc.mu.RLock()
-       defer sc.mu.RUnlock()
        return sc.size()
 }
 
@@ -292,6 +312,22 @@ func (sc *serviceCache) size() uint64 {
        return atomic.LoadUint64(&sc.currentSize)
 }
 
+// atomicSubtract safely subtracts a value from currentSize.
+func (sc *serviceCache) atomicSubtract(size uint64) {
+       for {
+               current := atomic.LoadUint64(&sc.currentSize)
+               var newSize uint64
+               if current >= size {
+                       newSize = current - size
+               } else {
+                       newSize = 0
+               }
+               if atomic.CompareAndSwapUint64(&sc.currentSize, current, 
newSize) {
+                       break
+               }
+       }
+}
+
 var _ Cache = (*bypassCache)(nil)
 
 type bypassCache struct{}
diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index b4796a63..65253dcd 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -194,21 +194,38 @@ type blockMetadataArray struct {
 }
 
 func (bma *blockMetadataArray) Size() uint64 {
+       // Size of the struct itself (contains slice header)
        size := uint64(unsafe.Sizeof(*bma))
-       size += uint64(len(bma.arr)) * uint64(unsafe.Sizeof(blockMetadata{}))
+
+       // Size of each blockMetadata struct (just the static parts)
        for i := range bma.arr {
                bm := &bma.arr[i]
+               // Base struct size (fixed fields only)
+               size += uint64(unsafe.Sizeof(blockMetadata{}))
+
                if bm.tagFamilies != nil {
-                       size += uint64(len(bm.tagFamilies)) * 
uint64(unsafe.Sizeof(dataBlock{}))
-                       for name := range bm.tagFamilies {
-                               size += uint64(unsafe.Sizeof("")) + 
uint64(len(name))
+                       // Map overhead + entries (key string header + key data 
+ pointer to dataBlock + actual dataBlock)
+                       mapOverhead := uint64(48) // Approximate Go map overhead
+                       size += mapOverhead
+                       for name, db := range bm.tagFamilies {
+                               if db != nil {
+                                       size += uint64(unsafe.Sizeof("")) + 
uint64(len(name)) // key
+                                       size += 
uint64(unsafe.Sizeof((*dataBlock)(nil)))      // pointer
+                                       size += uint64(unsafe.Sizeof(*db))      
              // actual dataBlock value
+                               }
                        }
                }
                if bm.tagProjection != nil {
-                       size += uint64(len(bm.tagProjection)) * 
uint64(unsafe.Sizeof(model.TagProjection{}))
+                       // Account for slice header
+                       size += uint64(unsafe.Sizeof(bm.tagProjection))
                        for j := range bm.tagProjection {
                                tp := &bm.tagProjection[j]
-                               size += uint64(unsafe.Sizeof("")) + 
uint64(len(tp.Family)) // Family string header + data
+                               // TagProjection struct size
+                               size += 
uint64(unsafe.Sizeof(model.TagProjection{}))
+                               // Family string header + data
+                               size += uint64(unsafe.Sizeof("")) + 
uint64(len(tp.Family))
+                               // Names slice header + entries
+                               size += uint64(unsafe.Sizeof(tp.Names))
                                for _, name := range tp.Names {
                                        size += uint64(unsafe.Sizeof("")) + 
uint64(len(name)) // Each name string header + data
                                }
diff --git a/banyand/measure/block_metadata_test.go 
b/banyand/measure/block_metadata_test.go
index 8ca919d8..450764e5 100644
--- a/banyand/measure/block_metadata_test.go
+++ b/banyand/measure/block_metadata_test.go
@@ -18,14 +18,20 @@
 package measure
 
 import (
+       "fmt"
+       "runtime"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/query/model"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 func Test_dataBlock_reset(t *testing.T) {
@@ -406,3 +412,311 @@ func Test_unmarshalBlockMetadata(t *testing.T) {
                require.Error(t, err)
        })
 }
+
+func TestBlockMetadataArrayCacheLimit(t *testing.T) {
+       // Test that blockMetadataArray objects respect cache size limits
+       // and that our size calculation fixes work correctly
+
+       // Create a cache with a small limit (1KB) to test eviction
+       cacheConfig := storage.CacheConfig{
+               MaxCacheSize:    run.Bytes(1024), // 1KB limit
+               CleanupInterval: 100 * time.Millisecond,
+               IdleTimeout:     500 * time.Millisecond,
+       }
+       cache := storage.NewServiceCacheWithConfig(cacheConfig)
+       defer cache.Close()
+
+       // Helper function to create a blockMetadataArray with controlled size
+       createBlockMetadataArray := func(numBlocks int, tagFamilyCount int, 
tagNameLen int) *blockMetadataArray {
+               bma := &blockMetadataArray{
+                       arr: make([]blockMetadata, numBlocks),
+               }
+
+               for i := 0; i < numBlocks; i++ {
+                       bm := &bma.arr[i]
+                       bm.seriesID = common.SeriesID(i)
+                       bm.uncompressedSizeBytes = uint64(i * 1000)
+                       bm.count = uint64(i * 10)
+
+                       // Add tag families to increase size
+                       if tagFamilyCount > 0 {
+                               bm.tagFamilies = make(map[string]*dataBlock)
+                               for j := 0; j < tagFamilyCount; j++ {
+                                       // Create tag names with controlled 
length
+                                       tagName := ""
+                                       for k := 0; k < tagNameLen; k++ {
+                                               tagName += "x"
+                                       }
+                                       tagName += string(rune('0' + j%10)) // 
Make unique
+
+                                       bm.tagFamilies[tagName] = &dataBlock{
+                                               offset: uint64(j * 100),
+                                               size:   uint64(j * 50),
+                                       }
+                               }
+                       }
+
+                       // Add tag projections to increase size further
+                       bm.tagProjection = []model.TagProjection{
+                               {
+                                       Family: "test_family",
+                                       Names:  []string{"name1", "name2", 
"name3"},
+                               },
+                       }
+               }
+
+               return bma
+       }
+
+       t.Run("Small objects fit within cache limit", func(t *testing.T) {
+               // Create small blockMetadataArray objects that should fit
+               for i := 0; i < 3; i++ {
+                       key := storage.NewEntryKey(uint64(i), 0)
+                       bma := createBlockMetadataArray(1, 2, 5) // Small: 1 
block, 2 tag families, 5-char names
+
+                       cache.Put(key, bma)
+
+                       // Verify we can retrieve it
+                       retrieved := cache.Get(key)
+                       assert.NotNil(t, retrieved)
+                       assert.Equal(t, bma, retrieved)
+               }
+
+               // Cache should not be at limit yet
+               assert.Less(t, cache.Size(), uint64(1024))
+       })
+
+       t.Run("Large objects trigger eviction", func(t *testing.T) {
+               // Create a separate cache for this test to avoid double-close
+               testCache := storage.NewServiceCacheWithConfig(cacheConfig)
+               defer testCache.Close()
+
+               // Create objects that will exceed cache limit
+               key1 := storage.NewEntryKey(1, 0)
+               bma1 := createBlockMetadataArray(2, 5, 10) // Smaller objects: 
2 blocks, 5 tag families, 10-char names
+               size1 := bma1.Size()
+               t.Logf("Object 1 size: %d bytes", size1)
+
+               testCache.Put(key1, bma1)
+               assert.NotNil(t, testCache.Get(key1))
+
+               key2 := storage.NewEntryKey(2, 0)
+               bma2 := createBlockMetadataArray(2, 5, 10) // Another object of 
similar size
+               size2 := bma2.Size()
+               t.Logf("Object 2 size: %d bytes", size2)
+
+               testCache.Put(key2, bma2)
+
+               // Cache should have evicted the first object if size limit is 
enforced
+               cacheSize := testCache.Size()
+               t.Logf("Cache size after adding 2 objects: %d bytes (limit: 
1024)", cacheSize)
+
+               // The cache eviction should prevent unlimited growth
+               // Objects are larger than cache limit, so only one should fit
+               entries := testCache.Entries()
+               t.Logf("Cache entries: %d", entries)
+
+               // If objects are larger than cache limit, cache should try to 
stay reasonable
+               // At least one object should be retrievable
+               obj1Present := testCache.Get(key1) != nil
+               obj2Present := testCache.Get(key2) != nil
+               t.Logf("Object 1 present: %v, Object 2 present: %v", 
obj1Present, obj2Present)
+
+               // Since both objects are larger than the cache limit 
individually,
+               // the cache behavior depends on the implementation details.
+               // The important thing is that it doesn't crash and tries to 
manage memory
+               assert.True(t, obj1Present || obj2Present, "At least one object 
should remain in cache")
+       })
+
+       t.Run("Size calculation is accurate", func(t *testing.T) {
+               // Test that our size calculation is reasonable and consistent
+               bma := createBlockMetadataArray(2, 5, 10)
+               calculatedSize := bma.Size()
+
+               t.Logf("Calculated size for test object: %d bytes", 
calculatedSize)
+
+               // Size should be greater than just the struct size
+               minExpectedSize := uint64(200) // Much larger than just struct 
headers
+               assert.Greater(t, calculatedSize, minExpectedSize, "Size should 
account for all data")
+
+               // Size shouldn't be unreasonably huge (old bug would make it 
massive)
+               maxReasonableSize := uint64(50000) // Reasonable upper bound
+               assert.Less(t, calculatedSize, maxReasonableSize, "Size 
shouldn't be unreasonably large")
+
+               // Test empty object
+               emptyBma := &blockMetadataArray{}
+               emptySize := emptyBma.Size()
+               assert.Greater(t, emptySize, uint64(0))
+               assert.Less(t, emptySize, uint64(1000)) // Empty should be small
+
+               // Size with data should be larger than empty
+               assert.Greater(t, calculatedSize, emptySize)
+       })
+
+       t.Run("Cache respects max size over time", func(t *testing.T) {
+               // Create a separate cache for this test
+               testCache2 := storage.NewServiceCacheWithConfig(cacheConfig)
+               defer testCache2.Close()
+
+               // Add many small objects to test gradual eviction
+               for i := range 10 {
+                       key := storage.NewEntryKey(uint64(i), 0)
+                       bma := createBlockMetadataArray(1, 2, 5) // Small 
objects: 1 block, 2 tag families, 5-char names
+                       objSize := bma.Size()
+                       testCache2.Put(key, bma)
+
+                       cacheSize := testCache2.Size()
+                       entries := testCache2.Entries()
+                       t.Logf("Iteration %d: Object size: %d, Cache size: %d 
bytes, Entries: %d", i, objSize, cacheSize, entries)
+               }
+
+               // Final verification
+               finalSize := testCache2.Size()
+               finalEntries := testCache2.Entries()
+               t.Logf("Final cache size: %d bytes (limit: 1024), entries: %d", 
finalSize, finalEntries)
+
+               // Cache should have some entries but not be wildly over the 
limit
+               assert.Greater(t, finalEntries, uint64(0), "Cache should 
contain some entries")
+
+               // The exact size depends on eviction behavior, but it 
shouldn't be completely unbounded
+               // We'll be more lenient here since small objects might 
accumulate
+               assert.LessOrEqual(t, finalSize, uint64(5120), "Cache shouldn't 
grow completely unbounded")
+       })
+}
+
+func TestBlockMetadataArrayGarbageCollection(t *testing.T) {
+       // Test that blockMetadataArray objects can be garbage collected
+       // even when they're referenced by partIter objects
+
+       // Create a cache with a small limit to force eviction
+       cacheConfig := storage.CacheConfig{
+               MaxCacheSize:    run.Bytes(512), // Very small limit
+               CleanupInterval: 10 * time.Millisecond,
+               IdleTimeout:     50 * time.Millisecond,
+       }
+       cache := storage.NewServiceCacheWithConfig(cacheConfig)
+       defer cache.Close()
+
+       // Helper to create large objects that will be evicted
+       createLargeObject := func(id uint64) *blockMetadataArray {
+               bma := &blockMetadataArray{
+                       arr: make([]blockMetadata, 3),
+               }
+
+               for i := range bma.arr {
+                       bm := &bma.arr[i]
+                       bm.seriesID = common.SeriesID(id*10 + uint64(i))
+                       bm.uncompressedSizeBytes = uint64(i * 1000)
+                       bm.count = uint64(i * 100)
+
+                       // Create large tag families
+                       bm.tagFamilies = make(map[string]*dataBlock)
+                       for j := range 10 {
+                               tagName := 
fmt.Sprintf("large_tag_family_%d_%d", id, j)
+                               bm.tagFamilies[tagName] = &dataBlock{
+                                       offset: uint64(j * 100),
+                                       size:   uint64(j * 50),
+                               }
+                       }
+
+                       // Add large tag projections
+                       bm.tagProjection = []model.TagProjection{
+                               {
+                                       Family: fmt.Sprintf("large_family_%d", 
id),
+                                       Names:  []string{"name1", "name2", 
"name3", "name4", "name5"},
+                               },
+                       }
+               }
+
+               return bma
+       }
+
+       t.Run("References don't prevent garbage collection", func(t *testing.T) 
{
+               // Simulate partIter usage pattern
+               var sliceRefs [][]blockMetadata
+
+               // Add objects to cache and keep slice references (like 
partIter does)
+               for i := range 5 {
+                       key := storage.NewEntryKey(uint64(i), 0)
+                       bma := createLargeObject(uint64(i))
+
+                       cache.Put(key, bma)
+
+                       // Simulate what partIter.readPrimaryBlock does:
+                       // Keep a reference to the internal slice
+                       if retrieved := cache.Get(key); retrieved != nil {
+                               retrievedBma := retrieved.(*blockMetadataArray)
+                               sliceRefs = append(sliceRefs, retrievedBma.arr) 
// This is the potential leak
+                       }
+
+                       t.Logf("Added object %d, cache size: %d bytes, entries: 
%d",
+                               i, cache.Size(), cache.Entries())
+               }
+
+               // Force garbage collection
+               runtime.GC()
+               runtime.GC() // Run twice to ensure cleanup
+
+               // Cache should have evicted most objects due to size limit
+               cacheSize := cache.Size()
+               cacheEntries := cache.Entries()
+               t.Logf("Final cache size: %d bytes, entries: %d", cacheSize, 
cacheEntries)
+
+               // Cache should not be holding many objects due to eviction
+               assert.LessOrEqual(t, cacheEntries, uint64(2), "Cache should 
have evicted most objects")
+
+               // Even though we have slice references, the original objects 
should be GC'able
+               // This test verifies that holding slice references doesn't 
prevent GC of the parent object
+               assert.Len(t, sliceRefs, 5, "Should have collected all slice 
references")
+
+               // Verify slice references are still valid (they should be 
separate from cached objects)
+               for i, sliceRef := range sliceRefs {
+                       assert.Len(t, sliceRef, 3, "Slice reference %d should 
still be valid", i)
+                       if len(sliceRef) > 0 {
+                               assert.Equal(t, common.SeriesID(uint64(i)*10), 
sliceRef[0].seriesID)
+                       }
+               }
+       })
+
+       t.Run("partIter reset clears references", func(t *testing.T) {
+               // Test the actual partIter reset behavior
+               // We can't easily test partIter directly due to dependencies,
+               // but we can verify the reference pattern
+
+               key := storage.NewEntryKey(100, 0)
+               bma := createLargeObject(100)
+               cache.Put(key, bma)
+
+               // Simulate partIter.readPrimaryBlock
+               var bmSlice []blockMetadata
+               if retrieved := cache.Get(key); retrieved != nil {
+                       retrievedBma := retrieved.(*blockMetadataArray)
+                       bmSlice = retrievedBma.arr // pi.bms = bma.arr
+               }
+
+               assert.NotNil(t, bmSlice)
+               assert.Len(t, bmSlice, 3)
+
+               // Force cache eviction by adding a large object
+               key2 := storage.NewEntryKey(101, 0)
+               bma2 := createLargeObject(101)
+               cache.Put(key2, bma2)
+
+               // Original object should be evicted from cache
+               assert.Nil(t, cache.Get(key), "Original object should be 
evicted")
+
+               // But slice reference should still be valid
+               assert.Len(t, bmSlice, 3, "Slice reference should still be 
valid")
+
+               // Simulate partIter.reset()
+               bmSlice = nil // pi.bms = nil
+
+               // Force GC
+               runtime.GC()
+               runtime.GC()
+
+               // Now the blockMetadataArray should be eligible for GC
+               // This is mainly a documentation test - we can't easily verify 
GC directly
+               t.Log("partIter.reset() should make blockMetadataArray eligible 
for GC")
+       })
+}

Reply via email to