This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new ac9aa9e2 Fix memory leak in measure package: resolve cache-related memory issues in block metadata, part iteration, query processing, and table operations (#726) ac9aa9e2 is described below commit ac9aa9e25798e2867039d3f66022ddcb96f295dd Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sat Aug 16 07:32:56 2025 +0800 Fix memory leak in measure package: resolve cache-related memory issues in block metadata, part iteration, query processing, and table operations (#726) --- banyand/measure/block_metadata.go | 22 ---------------------- banyand/measure/part_iter.go | 36 ++++++++++++++---------------------- banyand/measure/part_iter_test.go | 4 +--- banyand/measure/query.go | 4 +--- banyand/measure/query_test.go | 4 +--- banyand/measure/tstable.go | 4 ++-- banyand/measure/tstable_test.go | 4 +--- 7 files changed, 20 insertions(+), 58 deletions(-) diff --git a/banyand/measure/block_metadata.go b/banyand/measure/block_metadata.go index a046586a..0e3efe3c 100644 --- a/banyand/measure/block_metadata.go +++ b/banyand/measure/block_metadata.go @@ -192,28 +192,6 @@ type blockMetadataArray struct { arr []blockMetadata } -func (bma *blockMetadataArray) reset() { - for i := range bma.arr { - bma.arr[i].reset() - } - bma.arr = bma.arr[:0] -} - -var blockMetadataArrayPool = pool.Register[*blockMetadataArray]("measure-blockMetadataArray") - -func generateBlockMetadataArray() *blockMetadataArray { - v := blockMetadataArrayPool.Get() - if v == nil { - return &blockMetadataArray{} - } - return v -} - -func releaseBlockMetadataArray(bma *blockMetadataArray) { - bma.reset() - blockMetadataArrayPool.Put(bma) -} - type timestampsMetadata struct { dataBlock min int64 diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go index 2a0ad791..2985501f 100644 --- a/banyand/measure/part_iter.go +++ b/banyand/measure/part_iter.go @@ -61,13 +61,12 @@ func (pi *partIter) reset() { pi.err = nil } -func (pi *partIter) init(bma *blockMetadataArray, p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (pi *partIter) init(p *part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { pi.reset() pi.curBlock = &blockMetadata{} pi.p = p pi.c = p.cache - pi.bms = bma.arr pi.sids = sids pi.minTimestamp = minTimestamp pi.maxTimestamp = maxTimestamp @@ -151,8 +150,7 @@ func (pi *partIter) loadNextBlockMetadata() bool { continue } - var err error - pi.bms, err = pi.readPrimaryBlock(pi.bms[:0], pbm) + err := pi.readPrimaryBlock(pbm) if err != nil { pi.err = fmt.Errorf("cannot read primary block for part %q at offset %d with size %d: %w", &pi.p.partMetadata, pbm.offset, pbm.size, err) @@ -183,14 +181,12 @@ func searchPBM(pbmIndex []primaryBlockMetadata, sid common.SeriesID) []primaryBl return pbmIndex[n-1:] } -func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetadata) ([]blockMetadata, error) { +func (pi *partIter) readPrimaryBlock(mr *primaryBlockMetadata) error { value := pi.c.Get(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset)) if value != nil { - bmPtrs := value.([]*blockMetadata) - for _, bmsPtr := range bmPtrs { - bms = append(bms, *bmsPtr) - } - return bms, nil + bma := value.(*blockMetadataArray) + pi.bms = bma.arr + return nil } pi.compressedPrimaryBuf = bytes.ResizeOver(pi.compressedPrimaryBuf, int(mr.size)) @@ -199,20 +195,16 @@ func (pi *partIter) readPrimaryBlock(bms []blockMetadata, mr *primaryBlockMetada var err error pi.primaryBuf, err = zstd.Decompress(pi.primaryBuf[:0], pi.compressedPrimaryBuf) if err != nil { - return nil, fmt.Errorf("cannot decompress index block: %w", err) + return fmt.Errorf("cannot decompress index block: %w", err) } - bms, err = unmarshalBlockMetadata(bms, pi.primaryBuf) + bma := &blockMetadataArray{} + bma.arr, err = unmarshalBlockMetadata(bma.arr[:0], pi.primaryBuf) if err != nil { - return nil, fmt.Errorf("cannot unmarshal index block: %w", err) + return fmt.Errorf("cannot unmarshal index block: %w", err) } - bmPtrs := make([]*blockMetadata, 0, len(bms)) - for _, bm := range bms { - bmCopy := &blockMetadata{} - bmCopy.copyFrom(&bm) - bmPtrs = append(bmPtrs, bmCopy) - } - pi.c.Put(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset), bmPtrs) - return bms, nil + pi.c.Put(storage.NewEntryKey(pi.p.partMetadata.ID, mr.offset), bma) + pi.bms = bma.arr + return nil } func (pi *partIter) findBlock() bool { @@ -248,7 +240,7 @@ func (pi *partIter) findBlock() bool { continue } - pi.curBlock = bm + pi.curBlock.copyFrom(bm) pi.bms = bhs[1:] return true diff --git a/banyand/measure/part_iter_test.go b/banyand/measure/part_iter_test.go index 45e6e7de..8f4fc674 100644 --- a/banyand/measure/part_iter_test.go +++ b/banyand/measure/part_iter_test.go @@ -110,14 +110,12 @@ func Test_partIter_nextBlock(t *testing.T) { }, } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verifyPart := func(p *part) { defer p.close() pi := partIter{} - pi.init(bma, p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) + pi.init(p, tt.sids, tt.opt.minTimestamp, tt.opt.maxTimestamp) var got []blockMetadata for pi.nextBlock() { diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 44a47927..066f732a 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -361,8 +361,6 @@ func (m *measure) buildIndexQueryResult(ctx context.Context, mqo model.MeasureQu } func (m *measure) searchBlocks(ctx context.Context, result *queryResult, sids []common.SeriesID, parts []*part, qo queryOptions) error { - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) defFn := startBlockScanSpan(ctx, len(sids), parts, result) defer defFn() tstIter := generateTstIter() @@ -370,7 +368,7 @@ func (m *measure) searchBlocks(ctx context.Context, result *queryResult, sids [] originalSids := make([]common.SeriesID, len(sids)) copy(originalSids, sids) sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] }) - tstIter.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp) + tstIter.init(parts, sids, qo.minTimestamp, qo.maxTimestamp) if tstIter.Error() != nil { return fmt.Errorf("cannot init tstIter: %w", tstIter.Error()) } diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index bb76959f..1f2d24a8 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -1232,8 +1232,6 @@ func TestQueryResult(t *testing.T) { }, } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { verify := func(t *testing.T, tst *tsTable) { @@ -1253,7 +1251,7 @@ func TestQueryResult(t *testing.T) { return sids[i] < tt.sids[j] }) ti := &tstIter{} - ti.init(bma, pp, sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) var result queryResult result.ctx = context.TODO() diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 703ad639..2f71eddd 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -340,7 +340,7 @@ func (ti *tstIter) reset() { ti.nextBlockNoop = false } -func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { +func (ti *tstIter) init(parts []*part, sids []common.SeriesID, minTimestamp, maxTimestamp int64) { ti.reset() ti.parts = parts @@ -349,7 +349,7 @@ func (ti *tstIter) init(bma *blockMetadataArray, parts []*part, sids []common.Se } ti.piPool = ti.piPool[:len(ti.parts)] for i, p := range ti.parts { - ti.piPool[i].init(bma, p, sids, minTimestamp, maxTimestamp) + ti.piPool[i].init(p, sids, minTimestamp, maxTimestamp) } ti.piHeap = ti.piHeap[:0] diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index 75ca5b2d..c5c3956f 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -138,8 +138,6 @@ func Test_tstIter(t *testing.T) { minTimestamp int64 maxTimestamp int64 } - bma := generateBlockMetadataArray() - defer releaseBlockMetadataArray(bma) verify := func(t *testing.T, tt testCtx, tst *tsTable) uint64 { defer tst.Close() @@ -152,7 +150,7 @@ func Test_tstIter(t *testing.T) { pp, n := s.getParts(nil, shardCache, tt.minTimestamp, tt.maxTimestamp) require.Equal(t, len(s.parts), n) ti := &tstIter{} - ti.init(bma, pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) + ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) var got []blockMetadata for ti.nextBlock() { if ti.piHeap[0].curBlock.seriesID == 0 {