This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e0977afc Allocate Bloom filter from memory pool (#802)
e0977afc is described below

commit e0977afc84d12252737966e10d7eebca60e2360a
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Oct 10 07:03:23 2025 +0800

    Allocate Bloom filter from memory pool (#802)
---
 banyand/trace/block_writer.go      |  2 +-
 banyand/trace/bloom_filter.go      | 19 +++++++++++++++++++
 banyand/trace/bloom_filter_test.go |  4 +++-
 banyand/trace/flusher.go           |  1 +
 banyand/trace/part.go              |  3 +--
 banyand/trace/part_metadata.go     |  3 ++-
 banyand/trace/write_data.go        |  3 +--
 7 files changed, 28 insertions(+), 7 deletions(-)

diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 88b1c858..732685e0 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -282,7 +282,7 @@ func (bw *blockWriter) Flush(pm *partMetadata, tf 
*traceIDFilter, tt *tagType) {
 
        if len(bw.traceIDs) > 0 {
                if tf.filter == nil {
-                       tf.filter = filter.NewBloomFilter(0)
+                       tf.filter = generateBloomFilter()
                }
                tf.filter.SetN(len(bw.traceIDs))
                tf.filter.ResizeBits((len(bw.traceIDs)*filter.B + 63) / 64)
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 67e182b7..928ece1e 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -21,6 +21,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/filter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/pool"
 )
 
 func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
@@ -43,3 +44,21 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter) 
*filter.BloomFilter {
 
        return bf
 }
+
+func generateBloomFilter() *filter.BloomFilter {
+       v := bloomFilterPool.Get()
+       if v == nil {
+               return filter.NewBloomFilter(0)
+       }
+       return v
+}
+
+func releaseBloomFilter(bf *filter.BloomFilter) {
+       if bf == nil {
+               return
+       }
+       bf.Reset()
+       bloomFilterPool.Put(bf)
+}
+
+var bloomFilterPool = pool.Register[*filter.BloomFilter]("trace-bloomFilter")
diff --git a/banyand/trace/bloom_filter_test.go 
b/banyand/trace/bloom_filter_test.go
index 3970655b..92047731 100644
--- a/banyand/trace/bloom_filter_test.go
+++ b/banyand/trace/bloom_filter_test.go
@@ -29,6 +29,7 @@ func TestEncodeAndDecodeBloomFilter(t *testing.T) {
        assert := assert.New(t)
 
        bf := filter.NewBloomFilter(3)
+       defer releaseBloomFilter(bf)
 
        items := [][]byte{
                []byte("skywalking"),
@@ -44,7 +45,8 @@ func TestEncodeAndDecodeBloomFilter(t *testing.T) {
 
        buf := make([]byte, 0)
        buf = encodeBloomFilter(buf, bf)
-       bf2 := filter.NewBloomFilter(0)
+       bf2 := generateBloomFilter()
+       defer releaseBloomFilter(bf2)
        bf2 = decodeBloomFilter(buf, bf2)
 
        for i := 0; i < 3; i++ {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 194f8a0d..f8fcb5ed 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -219,6 +219,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
                partPath := partPath(tst.root, pw.ID())
                pw.mp.mustFlush(tst.fileSystem, partPath)
                newPW := newPartWrapper(nil, mustOpenFilePart(pw.ID(), 
tst.root, tst.fileSystem))
+               defer newPW.p.traceIDFilter.reset()
                newPW.p.partMetadata.ID = pw.ID()
                ind.flushed[newPW.ID()] = newPW
                partIDMap[newPW.ID()] = struct{}{}
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 213a15ff..973f5d95 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -30,7 +30,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
-       "github.com/apache/skywalking-banyandb/pkg/filter"
        "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/pool"
@@ -277,7 +276,7 @@ func (mp *memPart) Unmarshal(data []byte) error {
                }
                filterBytes := tail[:filterLen]
                tail = tail[filterLen:]
-               bf := filter.NewBloomFilter(0)
+               bf := generateBloomFilter()
                mp.traceIDFilter.filter = decodeBloomFilter(filterBytes, bf)
        } else {
                mp.traceIDFilter.filter = nil
diff --git a/banyand/trace/part_metadata.go b/banyand/trace/part_metadata.go
index f6e5c3dc..5dd7bcf3 100644
--- a/banyand/trace/part_metadata.go
+++ b/banyand/trace/part_metadata.go
@@ -204,6 +204,7 @@ type traceIDFilter struct {
 }
 
 func (tf *traceIDFilter) reset() {
+       releaseBloomFilter(tf.filter)
        tf.filter = nil
 }
 
@@ -225,7 +226,7 @@ func (tf *traceIDFilter) mustReadTraceIDFilter(fileSystem 
fs.FileSystem, partPat
                return
        }
 
-       bf := filter.NewBloomFilter(0)
+       bf := generateBloomFilter()
        tf.filter = decodeBloomFilter(data, bf)
 }
 
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 2e691cef..b41b62f3 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -25,7 +25,6 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/pkg/filter"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -71,7 +70,7 @@ func (s *syncPartContext) NewPartType(ctx 
*queue.ChunkedSyncPartContext) error {
 
 func (s *syncPartContext) FinishSync() error {
        if len(s.traceIDFilterBuffer) > 0 && s.memPart != nil {
-               bf := filter.NewBloomFilter(0)
+               bf := generateBloomFilter()
                s.memPart.traceIDFilter.filter = 
decodeBloomFilter(s.traceIDFilterBuffer, bf)
        }
        if len(s.tagTypeBuffer) > 0 && s.memPart != nil {

Reply via email to