This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ac9aa9e2 Fix memory leak in measure package: resolve cache-related 
memory issues in block metadata, part iteration, query processing, and table 
operations (#726)
ac9aa9e2 is described below

commit ac9aa9e25798e2867039d3f66022ddcb96f295dd
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sat Aug 16 07:32:56 2025 +0800

    Fix memory leak in measure package: resolve cache-related memory issues in 
block metadata, part iteration, query processing, and table operations (#726)
---
 banyand/measure/block_metadata.go | 22 ----------------------
 banyand/measure/part_iter.go      | 36 ++++++++++++++----------------------
 banyand/measure/part_iter_test.go |  4 +---
 banyand/measure/query.go          |  4 +---
 banyand/measure/query_test.go     |  4 +---
 banyand/measure/tstable.go        |  4 ++--
 banyand/measure/tstable_test.go   |  4 +---
 7 files changed, 20 insertions(+), 58 deletions(-)

diff --git a/banyand/measure/block_metadata.go 
b/banyand/measure/block_metadata.go
index a046586a..0e3efe3c 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -192,28 +192,6 @@ type blockMetadataArray struct {
        arr []blockMetadata
 }
 
-func (bma *blockMetadataArray) reset() {
-       for i := range bma.arr {
-               bma.arr[i].reset()
-       }
-       bma.arr = bma.arr[:0]
-}
-
-var blockMetadataArrayPool = 
pool.Register[*blockMetadataArray]("measure-blockMetadataArray")
-
-func generateBlockMetadataArray() *blockMetadataArray {
-       v := blockMetadataArrayPool.Get()
-       if v == nil {
-               return &blockMetadataArray{}
-       }
-       return v
-}
-
-func releaseBlockMetadataArray(bma *blockMetadataArray) {
-       bma.reset()
-       blockMetadataArrayPool.Put(bma)
-}
-
 type timestampsMetadata struct {
        dataBlock
        min               int64
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index 2a0ad791..2985501f 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -61,13 +61,12 @@ func (pi *partIter) reset() {
        pi.err = nil
 }
 
-func (pi *partIter) init(bma *blockMetadataArray, p *part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
+func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
        pi.reset()
        pi.curBlock = &blockMetadata{}
        pi.p = p
        pi.c = p.cache
 
-       pi.bms = bma.arr
        pi.sids = sids
        pi.minTimestamp = minTimestamp
        pi.maxTimestamp = maxTimestamp
@@ -151,8 +150,7 @@ func (pi *partIter) loadNextBlockMetadata() bool {
                        continue
                }
 
-               var err error
-               pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm)
+               err := pi.readPrimaryBlock(pbm)
                if err != nil {
                        pi.err = fmt.Errorf("cannot read primary block for part 
%q at offset %d with size %d: %w",
                                &pi.p.partMetadata, pbm.offset, pbm.size, err)
@@ -183,14 +181,12 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid 
common.SeriesID) []primaryBl
        return pbmIndex[n-1:]
 }
 
-func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr 
*primaryBlockMetadata) ([]blockMetadata, error) {
+func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) error {
        value := pi.c.Get(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset))
        if value != nil {
-               bmPtrs := value.([]*blockMetadata)
-               for _, bmsPtr := range bmPtrs {
-                       bms = append(bms, *bmsPtr)
-               }
-               return bms, nil
+               bma := value.(*blockMetadataArray)
+               pi.bms = bma.arr
+               return nil
        }
 
        pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, 
int(mr.size))
@@ -199,20 +195,16 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, 
mr *primaryBlockMetada
        var err error
        pi.primaryBuf, err = zstd.Decompress(pi.primaryBuf[:0], 
pi.compressedPrimaryBuf)
        if err != nil {
-               return nil, fmt.Errorf("cannot decompress index block: %w", err)
+               return fmt.Errorf("cannot decompress index block: %w", err)
        }
-       bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf)
+       bma := &blockMetadataArray{}
+       bma.arr, err = unmarshalBlockMetadata(bma.arr[:0], pi.primaryBuf)
        if err != nil {
-               return nil, fmt.Errorf("cannot unmarshal index block: %w", err)
+               return fmt.Errorf("cannot unmarshal index block: %w", err)
        }
-       bmPtrs := make([]*blockMetadata, 0, len(bms))
-       for _, bm := range bms {
-               bmCopy := &blockMetadata{}
-               bmCopy.copyFrom(&bm)
-               bmPtrs = append(bmPtrs, bmCopy)
-       }
-       pi.c.Put(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset), bmPtrs)
-       return bms, nil
+       pi.c.Put(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset), bma)
+       pi.bms = bma.arr
+       return nil
 }
 
 func (pi *partIter) findBlock() bool {
@@ -248,7 +240,7 @@ func (pi *partIter) findBlock() bool {
                        continue
                }
 
-               pi.curBlock = bm
+               pi.curBlock.copyFrom(bm)
 
                pi.bms = bhs[1:]
                return true
diff --git a/banyand/measure/part_iter_test.go 
b/banyand/measure/part_iter_test.go
index 45e6e7de..8f4fc674 100644
--- a/banyand/measure/part_iter_test.go
+++ b/banyand/measure/part_iter_test.go
@@ -110,14 +110,12 @@ func Test_partIter_nextBlock(t *testing.T) {
                },
        }
 
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verifyPart := func(p *part) {
                                defer p.close()
                                pi := partIter{}
-                               pi.init(bma, p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
+                               pi.init(p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
 
                                var got []blockMetadata
                                for pi.nextBlock() {
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 44a47927..066f732a 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -361,8 +361,6 @@ func (m *measure) buildIndexQueryResult(ctx 
context.Context, mqo model.MeasureQu
 }
 
 func (m *measure) searchBlocks(ctx context.Context, result *queryResult, sids 
[]common.SeriesID, parts []*part, qo queryOptions) error {
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
        defFn := startBlockScanSpan(ctx, len(sids), parts, result)
        defer defFn()
        tstIter := generateTstIter()
@@ -370,7 +368,7 @@ func (m *measure) searchBlocks(ctx context.Context, result 
*queryResult, sids []
        originalSids := make([]common.SeriesID, len(sids))
        copy(originalSids, sids)
        sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
-       tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
+       tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp)
        if tstIter.Error() != nil {
                return fmt.Errorf("cannot init tstIter: %w", tstIter.Error())
        }
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index bb76959f..1f2d24a8 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -1232,8 +1232,6 @@ func TestQueryResult(t *testing.T) {
                },
        }
 
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
                        verify := func(t *testing.T, tst *tsTable) {
@@ -1253,7 +1251,7 @@ func TestQueryResult(t *testing.T) {
                                        return sids[i] < tt.sids[j]
                                })
                                ti := &tstIter{}
-                               ti.init(bma, pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
+                               ti.init(pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
 
                                var result queryResult
                                result.ctx = context.TODO()
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 703ad639..2f71eddd 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -340,7 +340,7 @@ func (ti *tstIter) reset() {
        ti.nextBlockNoop = false
 }
 
-func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids 
[]common.SeriesID, minTimestamp, maxTimestamp int64) {
+func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, 
maxTimestamp int64) {
        ti.reset()
        ti.parts = parts
 
@@ -349,7 +349,7 @@ func (ti *tstIter) init(bma *blockMetadataArray, parts 
[]*part, sids []common.Se
        }
        ti.piPool = ti.piPool[:len(ti.parts)]
        for i, p := range ti.parts {
-               ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp)
+               ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp)
        }
 
        ti.piHeap = ti.piHeap[:0]
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index 75ca5b2d..c5c3956f 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -138,8 +138,6 @@ func Test_tstIter(t *testing.T) {
                minTimestamp int64
                maxTimestamp int64
        }
-       bma := generateBlockMetadataArray()
-       defer releaseBlockMetadataArray(bma)
 
        verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 {
                defer tst.Close()
@@ -152,7 +150,7 @@ func Test_tstIter(t *testing.T) {
                pp, n := s.getParts(nil, shardCache, tt.minTimestamp, 
tt.maxTimestamp)
                require.Equal(t, len(s.parts), n)
                ti := &tstIter{}
-               ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
+               ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
                var got []blockMetadata
                for ti.nextBlock() {
                        if ti.piHeap[0].curBlock.seriesID == 0 {

Reply via email to