This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e0977afc Allocate Bloom filter from memory pool (#802)
e0977afc is described below
commit e0977afc84d12252737966e10d7eebca60e2360a
Author: Huang Youliang <[email protected]>
AuthorDate: Fri Oct 10 07:03:23 2025 +0800
Allocate Bloom filter from memory pool (#802)
---
banyand/trace/block_writer.go | 2 +-
banyand/trace/bloom_filter.go | 19 +++++++++++++++++++
banyand/trace/bloom_filter_test.go | 4 +++-
banyand/trace/flusher.go | 1 +
banyand/trace/part.go | 3 +--
banyand/trace/part_metadata.go | 3 ++-
banyand/trace/write_data.go | 3 +--
7 files changed, 28 insertions(+), 7 deletions(-)
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index 88b1c858..732685e0 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -282,7 +282,7 @@ func (bw *blockWriter) Flush(pm *partMetadata, tf
*traceIDFilter, tt *tagType) {
if len(bw.traceIDs) > 0 {
if tf.filter == nil {
- tf.filter = filter.NewBloomFilter(0)
+ tf.filter = generateBloomFilter()
}
tf.filter.SetN(len(bw.traceIDs))
tf.filter.ResizeBits((len(bw.traceIDs)*filter.B + 63) / 64)
diff --git a/banyand/trace/bloom_filter.go b/banyand/trace/bloom_filter.go
index 67e182b7..928ece1e 100644
--- a/banyand/trace/bloom_filter.go
+++ b/banyand/trace/bloom_filter.go
@@ -21,6 +21,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
func encodeBloomFilter(dst []byte, bf *filter.BloomFilter) []byte {
@@ -43,3 +44,21 @@ func decodeBloomFilter(src []byte, bf *filter.BloomFilter)
*filter.BloomFilter {
return bf
}
+
+func generateBloomFilter() *filter.BloomFilter {
+ v := bloomFilterPool.Get()
+ if v == nil {
+ return filter.NewBloomFilter(0)
+ }
+ return v
+}
+
+func releaseBloomFilter(bf *filter.BloomFilter) {
+ if bf == nil {
+ return
+ }
+ bf.Reset()
+ bloomFilterPool.Put(bf)
+}
+
+var bloomFilterPool = pool.Register[*filter.BloomFilter]("trace-bloomFilter")
diff --git a/banyand/trace/bloom_filter_test.go
b/banyand/trace/bloom_filter_test.go
index 3970655b..92047731 100644
--- a/banyand/trace/bloom_filter_test.go
+++ b/banyand/trace/bloom_filter_test.go
@@ -29,6 +29,7 @@ func TestEncodeAndDecodeBloomFilter(t *testing.T) {
assert := assert.New(t)
bf := filter.NewBloomFilter(3)
+ defer releaseBloomFilter(bf)
items := [][]byte{
[]byte("skywalking"),
@@ -44,7 +45,8 @@ func TestEncodeAndDecodeBloomFilter(t *testing.T) {
buf := make([]byte, 0)
buf = encodeBloomFilter(buf, bf)
- bf2 := filter.NewBloomFilter(0)
+ bf2 := generateBloomFilter()
+ defer releaseBloomFilter(bf2)
bf2 = decodeBloomFilter(buf, bf2)
for i := 0; i < 3; i++ {
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 194f8a0d..f8fcb5ed 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -219,6 +219,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan
*flusherIntroduction)
partPath := partPath(tst.root, pw.ID())
pw.mp.mustFlush(tst.fileSystem, partPath)
newPW := newPartWrapper(nil, mustOpenFilePart(pw.ID(),
tst.root, tst.fileSystem))
+ defer newPW.p.traceIDFilter.reset()
newPW.p.partMetadata.ID = pw.ID()
ind.flushed[newPW.ID()] = newPW
partIDMap[newPW.ID()] = struct{}{}
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 213a15ff..973f5d95 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -30,7 +30,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/encoding"
- "github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/pool"
@@ -277,7 +276,7 @@ func (mp *memPart) Unmarshal(data []byte) error {
}
filterBytes := tail[:filterLen]
tail = tail[filterLen:]
- bf := filter.NewBloomFilter(0)
+ bf := generateBloomFilter()
mp.traceIDFilter.filter = decodeBloomFilter(filterBytes, bf)
} else {
mp.traceIDFilter.filter = nil
diff --git a/banyand/trace/part_metadata.go b/banyand/trace/part_metadata.go
index f6e5c3dc..5dd7bcf3 100644
--- a/banyand/trace/part_metadata.go
+++ b/banyand/trace/part_metadata.go
@@ -204,6 +204,7 @@ type traceIDFilter struct {
}
func (tf *traceIDFilter) reset() {
+ releaseBloomFilter(tf.filter)
tf.filter = nil
}
@@ -225,7 +226,7 @@ func (tf *traceIDFilter) mustReadTraceIDFilter(fileSystem
fs.FileSystem, partPat
return
}
- bf := filter.NewBloomFilter(0)
+ bf := generateBloomFilter()
tf.filter = decodeBloomFilter(data, bf)
}
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 2e691cef..b41b62f3 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -25,7 +25,6 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
"github.com/apache/skywalking-banyandb/banyand/queue"
- "github.com/apache/skywalking-banyandb/pkg/filter"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -71,7 +70,7 @@ func (s *syncPartContext) NewPartType(ctx
*queue.ChunkedSyncPartContext) error {
func (s *syncPartContext) FinishSync() error {
if len(s.traceIDFilterBuffer) > 0 && s.memPart != nil {
- bf := filter.NewBloomFilter(0)
+ bf := generateBloomFilter()
s.memPart.traceIDFilter.filter =
decodeBloomFilter(s.traceIDFilterBuffer, bf)
}
if len(s.tagTypeBuffer) > 0 && s.memPart != nil {