This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new f354d0a9 Add Tracker To Object Pool (#508)
f354d0a9 is described below
commit f354d0a9e98b435947feda713eda0b1d9a8b2772
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Aug 6 15:31:29 2024 +0800
Add Tracker To Object Pool (#508)
* Fix duplicated measure data in a single part
* Add the tracked pool to fix leak issues
Signed-off-by: Gao Hongtao <[email protected]>
---------
Signed-off-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 7 +
banyand/internal/storage/index_test.go | 10 +-
banyand/liaison/grpc/node.go | 10 +
banyand/liaison/grpc/server.go | 1 -
banyand/liaison/http/server.go | 2 -
banyand/measure/block.go | 14 +-
banyand/measure/block_metadata.go | 11 +-
banyand/measure/block_reader.go | 10 +-
banyand/measure/block_reader_test.go | 15 +-
banyand/measure/block_writer.go | 6 +-
banyand/measure/column.go | 2 +-
banyand/measure/column_metadata.go | 6 +-
banyand/measure/datapoints.go | 18 +-
banyand/measure/introducer.go | 15 +-
banyand/measure/part.go | 27 +-
banyand/measure/part_iter.go | 10 +-
banyand/measure/query.go | 5 +-
banyand/measure/tstable.go | 16 +
banyand/measure/tstable_test.go | 62 ++++
banyand/observability/meter_prom.go | 12 +-
banyand/observability/metrics_system.go | 16 +-
banyand/observability/service.go | 16 +-
banyand/observability/system.go | 162 ---------
banyand/queue/sub/server.go | 1 -
banyand/stream/block.go | 14 +-
banyand/stream/block_metadata.go | 11 +-
banyand/stream/block_reader.go | 10 +-
banyand/stream/block_writer.go | 6 +-
banyand/stream/introducer.go | 15 +-
banyand/stream/part.go | 6 +-
banyand/stream/part_iter.go | 10 +-
banyand/stream/query.go | 7 +-
banyand/stream/tag.go | 2 +-
banyand/stream/tag_metadata.go | 6 +-
banyand/stream/tstable.go | 16 +
pkg/bytes/buffer.go | 13 +-
pkg/encoding/bytes.go | 2 +-
pkg/encoding/encoder.go | 292 ---------------
pkg/encoding/encoder_test.go | 392 ---------------------
pkg/encoding/int.go | 11 +-
pkg/fs/local_file_system.go | 10 +-
pkg/index/inverted/inverted.go | 6 +-
pkg/meter/native/collection.go | 2 +
pkg/node/interface.go | 13 +
pkg/node/maglev.go | 13 +
pkg/node/round_robin.go | 69 +++-
pkg/node/round_robin_test.go | 22 ++
pkg/pb/v1/series.go | 21 --
pkg/pool/pool.go | 81 +++++
pkg/test/gmatcher/gmatcher.go | 59 ++++
pkg/test/measure/testdata/groups/exception.json | 18 +
pkg/test/measure/testdata/measures/duplicated.json | 42 +++
test/cases/init.go | 1 +
test/cases/measure/data/input/duplicated_part.yaml | 25 ++
test/cases/measure/data/testdata/duplicated.json | 182 ++++++++++
test/cases/measure/data/want/duplicated_part.yaml | 38 ++
test/cases/measure/measure.go | 1 +
.../distributed/query/query_suite_test.go | 3 +
test/integration/etcd/client_test.go | 3 +
test/integration/load/load_suite_test.go | 3 +
.../standalone/cold_query/query_suite_test.go | 3 +
test/integration/standalone/other/measure_test.go | 3 +
test/integration/standalone/other/property_test.go | 3 +
.../standalone/query/query_suite_test.go | 3 +
.../query_ondisk/query_ondisk_suite_test.go | 3 +
65 files changed, 879 insertions(+), 1015 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ba07da6f..7389aaae 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -35,12 +35,19 @@ Release Notes.
- Fix a bug that the Stream module didn't support duplicated in index-based
filtering and sorting
- Fix the bug that segment's reference count is increased twice when the
controller try to create an existing segment.
- Fix a bug where a distributed query would return an empty result if the
"limit" was set much lower than the "offset".
+- Fix duplicated measure data in a single part.
+- Fix several "sync.Pool" leak issues by adding a tracker to the pool.
### Documentation
+
- Introduce new doc menu structure.
- Add installation on Docker and Kubernetes.
- Add quick-start guide.
+### Chores
+
+Bump up the version of infra e2e framework.
+
## 0.6.1
### Features
diff --git a/banyand/internal/storage/index_test.go
b/banyand/internal/storage/index_test.go
index 0e61b623..d73886a3 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -33,8 +33,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
-var testSeriesPool pbv1.SeriesPool
-
func TestSeriesIndex_Primary(t *testing.T) {
ctx := context.Background()
path, fn := setUp(require.New(t))
@@ -46,7 +44,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
}()
var docs index.Documents
for i := 0; i < 100; i++ {
- series := testSeriesPool.Generate()
+ var series pbv1.Series
series.Subject = "service_instance_latency"
series.EntityValues = []*modelv1.TagValue{
{Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
fmt.Sprintf("svc_%d", i)}}},
@@ -64,7 +62,6 @@ func TestSeriesIndex_Primary(t *testing.T) {
}
copy(doc.EntityValues, series.Buffer)
docs = append(docs, doc)
- testSeriesPool.Release(series)
}
require.NoError(t, si.Write(docs))
// Restart the index
@@ -155,11 +152,10 @@ func TestSeriesIndex_Primary(t *testing.T) {
t.Run(tt.name, func(t *testing.T) {
var seriesQueries []*pbv1.Series
for i := range tt.entityValues {
- seriesQuery := testSeriesPool.Generate()
- defer testSeriesPool.Release(seriesQuery)
+ var seriesQuery pbv1.Series
seriesQuery.Subject = tt.subject
seriesQuery.EntityValues = tt.entityValues[i]
- seriesQueries = append(seriesQueries,
seriesQuery)
+ seriesQueries = append(seriesQueries,
&seriesQuery)
}
sl, _, err := si.searchPrimary(ctx, seriesQueries, nil)
require.NoError(t, err)
diff --git a/banyand/liaison/grpc/node.go b/banyand/liaison/grpc/node.go
index b43a3bdb..9f307fc2 100644
--- a/banyand/liaison/grpc/node.go
+++ b/banyand/liaison/grpc/node.go
@@ -18,6 +18,7 @@
package grpc
import (
+ "fmt"
"sync"
"github.com/pkg/errors"
@@ -37,6 +38,7 @@ var (
// together with the shardID calculated from the incoming data.
type NodeRegistry interface {
Locate(group, name string, shardID uint32) (string, error)
+ fmt.Stringer
}
type clusterNodeService struct {
@@ -94,8 +96,16 @@ func (n *clusterNodeService) OnDelete(metadata
schema.Metadata) {
}
}
+func (n *clusterNodeService) String() string {
+ return n.sel.String()
+}
+
type localNodeService struct{}
+func (l localNodeService) String() string {
+ return "local"
+}
+
// NewLocalNodeRegistry creates a local(fake) node registry.
func NewLocalNodeRegistry() NodeRegistry {
return localNodeService{}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index a4d48bb5..2cc0dd73 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -191,7 +191,6 @@ func (s *server) Validate() error {
if s.enableIngestionAccessLog && s.accessLogRootPath == "" {
return errAccessLogRootPath
}
- observability.UpdateAddress("grpc", s.addr)
if !s.tls {
return nil
}
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index f1b10053..f7b797b2 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -40,7 +40,6 @@ import (
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
- "github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/ui"
@@ -102,7 +101,6 @@ func (p *server) Validate() error {
if p.listenAddr == ":" {
return errNoAddr
}
- observability.UpdateAddress("http", p.listenAddr)
if p.grpcCert != "" {
creds, errTLS := credentials.NewClientTLSFromFile(p.grpcCert,
"")
if errTLS != nil {
diff --git a/banyand/measure/block.go b/banyand/measure/block.go
index 001940cc..ae5006e6 100644
--- a/banyand/measure/block.go
+++ b/banyand/measure/block.go
@@ -20,7 +20,6 @@ package measure
import (
"slices"
"sort"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
@@ -29,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -403,7 +403,7 @@ func generateBlock() *block {
if v == nil {
return &block{}
}
- return v.(*block)
+ return v
}
func releaseBlock(b *block) {
@@ -411,7 +411,7 @@ func releaseBlock(b *block) {
blockPool.Put(b)
}
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("measure-block")
type blockCursor struct {
p *part
@@ -705,14 +705,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
return true
}
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("measure-blockCursor")
func generateBlockCursor() *blockCursor {
v := blockCursorPool.Get()
if v == nil {
return &blockCursor{}
}
- return v.(*blockCursor)
+ return v
}
func releaseBlockCursor(bc *blockCursor) {
@@ -832,7 +832,7 @@ func generateBlockPointer() *blockPointer {
if v == nil {
return &blockPointer{}
}
- return v.(*blockPointer)
+ return v
}
func releaseBlockPointer(bi *blockPointer) {
@@ -840,4 +840,4 @@ func releaseBlockPointer(bi *blockPointer) {
blockPointerPool.Put(bi)
}
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("measure-blockPointer")
diff --git a/banyand/measure/block_metadata.go
b/banyand/measure/block_metadata.go
index 41093282..ff23c4af 100644
--- a/banyand/measure/block_metadata.go
+++ b/banyand/measure/block_metadata.go
@@ -21,11 +21,11 @@ import (
"errors"
"fmt"
"sort"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
@@ -170,7 +170,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte,
error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal
tagFamily name: %w", err)
}
- // TODO: cache dataBlock
tf := &dataBlock{}
src, err = tf.unmarshal(src)
if err != nil {
@@ -198,7 +197,7 @@ func generateBlockMetadata() *blockMetadata {
if v == nil {
return &blockMetadata{}
}
- return v.(*blockMetadata)
+ return v
}
func releaseBlockMetadata(bm *blockMetadata) {
@@ -206,7 +205,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
blockMetadataPool.Put(bm)
}
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("measure-blockMetadata")
type blockMetadataArray struct {
arr []blockMetadata
@@ -219,14 +218,14 @@ func (bma *blockMetadataArray) reset() {
bma.arr = bma.arr[:0]
}
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool =
pool.Register[*blockMetadataArray]("measure-blockMetadataArray")
func generateBlockMetadataArray() *blockMetadataArray {
v := blockMetadataArrayPool.Get()
if v == nil {
return &blockMetadataArray{}
}
- return v.(*blockMetadataArray)
+ return v
}
func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/measure/block_reader.go b/banyand/measure/block_reader.go
index eb221eef..238b6833 100644
--- a/banyand/measure/block_reader.go
+++ b/banyand/measure/block_reader.go
@@ -22,11 +22,11 @@ import (
"errors"
"fmt"
"io"
- "sync"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
func generateSeqReader() *seqReader {
if v := seqReaderPool.Get(); v != nil {
- return v.(*seqReader)
+ return v
}
return &seqReader{}
}
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
seqReaderPool.Put(sr)
}
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("measure-seqReader")
type seqReaders struct {
tagFamilyMetadata map[string]*seqReader
@@ -219,11 +219,11 @@ func (br *blockReader) error() error {
return br.err
}
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("measure-blockReader")
func generateBlockReader() *blockReader {
if v := blockReaderPool.Get(); v != nil {
- return v.(*blockReader)
+ return v
}
return &blockReader{}
}
diff --git a/banyand/measure/block_reader_test.go
b/banyand/measure/block_reader_test.go
index 63f3184e..cd576b78 100644
--- a/banyand/measure/block_reader_test.go
+++ b/banyand/measure/block_reader_test.go
@@ -62,6 +62,13 @@ func Test_blockReader_nextBlock(t *testing.T) {
{seriesID: 3, count: 1, uncompressedSizeBytes:
24},
},
},
+ {
+ name: "Test with a single part with same ts",
+ dpsList: []*dataPoints{duplicatedDps},
+ want: []blockMetadata{
+ {seriesID: 1, count: 1, uncompressedSizeBytes:
24},
+ },
+ },
{
name: "Test with multiple parts with same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
@@ -77,7 +84,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- verify := func(pp []*part) {
+ verify := func(t *testing.T, pp []*part) {
var pii []*partMergeIter
for _, p := range pp {
pmi := &partMergeIter{}
@@ -116,7 +123,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
}
}
- t.Run("memory parts", func(_ *testing.T) {
+ t.Run("memory parts", func(t *testing.T) {
var mpp []*memPart
defer func() {
for _, mp := range mpp {
@@ -130,7 +137,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
mp.mustInitFromDataPoints(dps)
pp = append(pp, openMemPart(mp))
}
- verify(pp)
+ verify(t, pp)
})
t.Run("file parts", func(t *testing.T) {
@@ -158,7 +165,7 @@ func Test_blockReader_nextBlock(t *testing.T) {
fpp = append(fpp, filePW)
pp = append(pp, filePW.p)
}
- verify(pp)
+ verify(t, pp)
})
})
}
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index fd065f1a..4a07291c 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,12 +19,12 @@ package measure
import (
"path/filepath"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type writer struct {
@@ -285,7 +285,7 @@ func generateBlockWriter() *blockWriter {
},
}
}
- return v.(*blockWriter)
+ return v
}
func releaseBlockWriter(bsw *blockWriter) {
@@ -293,4 +293,4 @@ func releaseBlockWriter(bsw *blockWriter) {
blockWriterPool.Put(bsw)
}
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("measure-blockWriter")
diff --git a/banyand/measure/column.go b/banyand/measure/column.go
index eafa0134..35f34ab8 100644
--- a/banyand/measure/column.go
+++ b/banyand/measure/column.go
@@ -112,7 +112,7 @@ func (c *column) mustSeqReadValues(decoder
*encoding.BytesBlockDecoder, reader *
}
}
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("measure-big-value")
type columnFamily struct {
name string
diff --git a/banyand/measure/column_metadata.go
b/banyand/measure/column_metadata.go
index 3f1eb957..3f7102af 100644
--- a/banyand/measure/column_metadata.go
+++ b/banyand/measure/column_metadata.go
@@ -36,11 +36,11 @@ package measure
import (
"fmt"
- "sync"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type columnMetadata struct {
@@ -148,7 +148,7 @@ func generateColumnFamilyMetadata() *columnFamilyMetadata {
if v == nil {
return &columnFamilyMetadata{}
}
- return v.(*columnFamilyMetadata)
+ return v
}
func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata) {
@@ -156,4 +156,4 @@ func releaseColumnFamilyMetadata(cfm *columnFamilyMetadata)
{
columnFamilyMetadataPool.Put(cfm)
}
-var columnFamilyMetadataPool sync.Pool
+var columnFamilyMetadataPool =
pool.Register[*columnFamilyMetadata]("measure-columnFamilyMetadata")
diff --git a/banyand/measure/datapoints.go b/banyand/measure/datapoints.go
index 03b07093..156d4b45 100644
--- a/banyand/measure/datapoints.go
+++ b/banyand/measure/datapoints.go
@@ -123,6 +123,19 @@ type dataPoints struct {
fields []nameValues
}
+func (d *dataPoints) skip(i int) {
+ if len(d.timestamps) <= i {
+ return
+ }
+ d.seriesIDs = append(d.seriesIDs[:i], d.seriesIDs[i+1:]...)
+ d.timestamps = append(d.timestamps[:i], d.timestamps[i+1:]...)
+ d.versions = append(d.versions[:i], d.versions[i+1:]...)
+ d.tagFamilies = append(d.tagFamilies[:i], d.tagFamilies[i+1:]...)
+ if len(d.fields) > 0 {
+ d.fields = append(d.fields[:i], d.fields[i+1:]...)
+ }
+}
+
func (d *dataPoints) Len() int {
return len(d.seriesIDs)
}
@@ -131,7 +144,10 @@ func (d *dataPoints) Less(i, j int) bool {
if d.seriesIDs[i] != d.seriesIDs[j] {
return d.seriesIDs[i] < d.seriesIDs[j]
}
- return d.timestamps[i] < d.timestamps[j]
+ if d.timestamps[i] != d.timestamps[j] {
+ return d.timestamps[i] < d.timestamps[j]
+ }
+ return d.versions[i] > d.versions[j]
}
func (d *dataPoints) Swap(i, j int) {
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 7fce11d1..0bfb2e23 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -18,8 +18,7 @@
package measure
import (
- "sync"
-
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
i.applied = nil
}
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("measure-introduction")
func generateIntroduction() *introduction {
v := introductionPool.Get()
if v == nil {
return &introduction{}
}
- i := v.(*introduction)
+ i := v
i.reset()
return i
}
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
i.applied = nil
}
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool =
pool.Register[*flusherIntroduction]("measure-flusher-introduction")
func generateFlusherIntroduction() *flusherIntroduction {
v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
flushed: make(map[uint64]*partWrapper),
}
}
- i := v.(*flusherIntroduction)
+ i := v
i.reset()
return i
}
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
i.creator = 0
}
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool =
pool.Register[*mergerIntroduction]("measure-merger-introduction")
func generateMergerIntroduction() *mergerIntroduction {
v := mergerIntroductionPool.Get()
if v == nil {
return &mergerIntroduction{}
}
- i := v.(*mergerIntroduction)
+ i := v
i.reset()
return i
}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index c6383814..41baf29b 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -22,7 +22,6 @@ import (
"path"
"path/filepath"
"sort"
- "sync"
"sync/atomic"
"time"
@@ -30,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
const (
@@ -150,16 +150,29 @@ func (mp *memPart) mustInitFromDataPoints(dps
*dataPoints) {
var sidPrev common.SeriesID
uncompressedBlockSizeBytes := uint64(0)
var indexPrev int
- for i := range dps.timestamps {
+ var tsPrev int64
+ for i := 0; i < len(dps.timestamps); i++ {
sid := dps.seriesIDs[i]
if sidPrev == 0 {
sidPrev = sid
}
+ if sid == sidPrev {
+ if tsPrev == dps.timestamps[i] {
+ dps.skip(i)
+ i--
+ continue
+ }
+ tsPrev = dps.timestamps[i]
+ } else {
+ tsPrev = 0
+ }
+
if uncompressedBlockSizeBytes >= maxUncompressedBlockSize ||
(i-indexPrev) > maxBlockLength || sid != sidPrev {
bsw.MustWriteDataPoints(sidPrev,
dps.timestamps[indexPrev:i], dps.versions[indexPrev:i],
dps.tagFamilies[indexPrev:i], dps.fields[indexPrev:i])
sidPrev = sid
+ tsPrev = 0
indexPrev = i
uncompressedBlockSizeBytes = 0
}
@@ -209,7 +222,7 @@ func generateMemPart() *memPart {
if v == nil {
return &memPart{}
}
- return v.(*memPart)
+ return v
}
func releaseMemPart(mp *memPart) {
@@ -217,7 +230,7 @@ func releaseMemPart(mp *memPart) {
memPartPool.Put(mp)
}
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("measure-memPart")
type partWrapper struct {
mp *memPart
@@ -239,6 +252,12 @@ func (pw *partWrapper) decRef() {
if n > 0 {
return
}
+ if pw.mp != nil {
+ releaseMemPart(pw.mp)
+ pw.mp = nil
+ pw.p = nil
+ return
+ }
pw.p.close()
if pw.removable.Load() && pw.p.fileSystem != nil {
go func(pw *partWrapper) {
diff --git a/banyand/measure/part_iter.go b/banyand/measure/part_iter.go
index 0c916334..cf67d320 100644
--- a/banyand/measure/part_iter.go
+++ b/banyand/measure/part_iter.go
@@ -22,7 +22,6 @@ import (
"fmt"
"io"
"sort"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type partIter struct {
@@ -327,7 +327,7 @@ func generatePartMergeIter() *partMergeIter {
if v == nil {
return &partMergeIter{}
}
- return v.(*partMergeIter)
+ return v
}
func releasePartMergeIter(pmi *partMergeIter) {
@@ -335,7 +335,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
pmiPool.Put(pmi)
}
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("measure-partMergeIter")
type partMergeIterHeap []*partMergeIter
@@ -369,7 +369,7 @@ func generateColumnValuesDecoder()
*encoding.BytesBlockDecoder {
if v == nil {
return &encoding.BytesBlockDecoder{}
}
- return v.(*encoding.BytesBlockDecoder)
+ return v
}
func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -377,4 +377,4 @@ func releaseColumnValuesDecoder(d
*encoding.BytesBlockDecoder) {
columnValuesDecoderPool.Put(d)
}
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("measure-columnValuesDecoder")
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index fa94091e..c9ac2117 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -242,9 +242,8 @@ func (s *measure) searchBlocks(ctx context.Context, result
*queryResult, sids []
defer releaseBlockMetadataArray(bma)
defFn := startBlockScanSpan(ctx, len(sids), parts, result)
defer defFn()
- // TODO: cache tstIter
- var tstIter tstIter
- defer tstIter.reset()
+ tstIter := generateTstIter()
+ defer releaseTstIter(tstIter)
originalSids := make([]common.SeriesID, len(sids))
copy(originalSids, sids)
sort.Slice(sids, func(i, j int) bool { return sids[i] < sids[j] })
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index d2a20b4a..033979a8 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -376,6 +377,21 @@ func (ti *tstIter) Error() error {
return ti.err
}
+func generateTstIter() *tstIter {
+ v := tstIterPool.Get()
+ if v == nil {
+ return &tstIter{}
+ }
+ return v
+}
+
+func releaseTstIter(ti *tstIter) {
+ ti.reset()
+ tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("measure-tstIter")
+
type partIterHeap []*partIter
func (pih *partIterHeap) Len() int {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index 57ea15a4..386f57b1 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -214,6 +214,16 @@ func Test_tstIter(t *testing.T) {
{seriesID: 3, count: 1,
uncompressedSizeBytes: 24},
},
},
+ {
+ name: "Test with a single part with
same ts",
+ dpsList: []*dataPoints{duplicatedDps},
+ sids: []common.SeriesID{1},
+ minTimestamp: 1,
+ maxTimestamp: 1,
+ want: []blockMetadata{
+ {seriesID: 1, count: 1,
uncompressedSizeBytes: 24},
+ },
+ },
{
name: "Test with multiple parts with
same ts",
dpsList: []*dataPoints{dpsTS1, dpsTS1},
@@ -568,6 +578,58 @@ var dpsTS2 = &dataPoints{
},
}
+var duplicatedDps = &dataPoints{
+ seriesIDs: []common.SeriesID{1, 1, 1},
+ timestamps: []int64{1, 1, 1},
+ versions: []int64{1, 2, 3},
+ tagFamilies: [][]nameValues{
+ {
+ {
+ name: "arrTag", values: []*nameValue{
+ {name: "strArrTag", valueType:
pbv1.ValueTypeStrArr, value: nil, valueArr: [][]byte{[]byte("value1"),
[]byte("value2")}},
+ {name: "intArrTag", valueType:
pbv1.ValueTypeInt64Arr, value: nil, valueArr:
[][]byte{convert.Int64ToBytes(25), convert.Int64ToBytes(30)}},
+ },
+ },
+ {
+ name: "binaryTag", values: []*nameValue{
+ {name: "binaryTag", valueType:
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+ },
+ },
+ {
+ name: "singleTag", values: []*nameValue{
+ {name: "strTag", valueType:
pbv1.ValueTypeStr, value: []byte("value1"), valueArr: nil},
+ {name: "intTag", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(10), valueArr: nil},
+ },
+ },
+ },
+ {
+ {
+ name: "singleTag", values: []*nameValue{
+ {name: "strTag1", valueType:
pbv1.ValueTypeStr, value: []byte("tag1"), valueArr: nil},
+ {name: "strTag2", valueType:
pbv1.ValueTypeStr, value: []byte("tag2"), valueArr: nil},
+ },
+ },
+ },
+ {}, // empty tagFamilies for seriesID 3
+ },
+ fields: []nameValues{
+ {
+ name: "skipped", values: []*nameValue{
+ {name: "strField", valueType:
pbv1.ValueTypeStr, value: []byte("field1"), valueArr: nil},
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+ {name: "floatField", valueType:
pbv1.ValueTypeFloat64, value: convert.Float64ToBytes(1221233.343), valueArr:
nil},
+ {name: "binaryField", valueType:
pbv1.ValueTypeBinaryData, value: longText, valueArr: nil},
+ },
+ },
+ {}, // empty fields for seriesID 2
+ {
+ name: "onlyFields", values: []*nameValue{
+ {name: "intField", valueType:
pbv1.ValueTypeInt64, value: convert.Int64ToBytes(1110), valueArr: nil},
+ },
+ },
+ },
+}
+
func generateHugeDps(startTimestamp, endTimestamp, timestamp int64)
*dataPoints {
hugeDps := &dataPoints{
seriesIDs: []common.SeriesID{},
diff --git a/banyand/observability/meter_prom.go
b/banyand/observability/meter_prom.go
index 260b442f..ea5d2d8a 100644
--- a/banyand/observability/meter_prom.go
+++ b/banyand/observability/meter_prom.go
@@ -18,6 +18,7 @@
package observability
import (
+ "net/http"
"sync"
grpcprom
"github.com/grpc-ecosystem/go-grpc-middleware/providers/prometheus"
@@ -42,10 +43,6 @@ var (
func init() {
reg.MustRegister(collectors.NewGoCollector())
reg.MustRegister(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{}))
- metricsMux.Handle("/metrics", promhttp.HandlerFor(
- reg,
- promhttp.HandlerOpts{},
- ))
}
// NewMeterProvider returns a meter.Provider based on the given scope.
@@ -53,6 +50,13 @@ func newPromMeterProvider() meter.Provider {
return prom.NewProvider(SystemScope, reg)
}
+func registerMetricsEndpoint(metricsMux *http.ServeMux) {
+ metricsMux.Handle("/metrics", promhttp.HandlerFor(
+ reg,
+ promhttp.HandlerOpts{},
+ ))
+}
+
// MetricsServerInterceptor returns a server interceptor for metrics.
func promMetricsServerInterceptor() (grpc.UnaryServerInterceptor,
grpc.StreamServerInterceptor) {
once.Do(func() {
diff --git a/banyand/observability/metrics_system.go
b/banyand/observability/metrics_system.go
index ae391b73..7717db5d 100644
--- a/banyand/observability/metrics_system.go
+++ b/banyand/observability/metrics_system.go
@@ -55,8 +55,22 @@ var (
upTimeGauge meter.Gauge
diskStateGauge meter.Gauge
initMetricsOnce sync.Once
+ diskMap = sync.Map{}
)
+// UpdatePath updates a path to monitoring its disk usage.
+func UpdatePath(path string) {
+ diskMap.Store(path, nil)
+}
+
+func getPath() (paths []string) {
+ diskMap.Range(func(key, _ any) bool {
+ paths = append(paths, key.(string))
+ return true
+ })
+ return paths
+}
+
func init() {
MetricsCollector.Register("cpu", collectCPU)
MetricsCollector.Register("memory", collectMemory)
@@ -169,7 +183,7 @@ func collectUpTime() {
}
func collectDisk() {
- for path := range getPath() {
+ for _, path := range getPath() {
usage, err := disk.Usage(path)
if err != nil {
if _, err = os.Stat(path); err != nil {
diff --git a/banyand/observability/service.go b/banyand/observability/service.go
index 8e41c5ed..4170b09f 100644
--- a/banyand/observability/service.go
+++ b/banyand/observability/service.go
@@ -42,9 +42,8 @@ const (
)
var (
- _ run.Service = (*metricService)(nil)
- _ run.Config = (*metricService)(nil)
- metricsMux = http.NewServeMux()
+ _ run.Service = (*metricService)(nil)
+ _ run.Config = (*metricService)(nil)
// MetricsServerInterceptor is the function to obtain grpc metrics
interceptor.
MetricsServerInterceptor func() (grpc.UnaryServerInterceptor,
grpc.StreamServerInterceptor) = emptyMetricsServerInterceptor
)
@@ -146,6 +145,11 @@ func (p *metricService) Serve() run.StopNotify {
if err != nil {
p.l.Fatal().Err(err).Msg("Failed to register metrics collector")
}
+ metricsMux := http.NewServeMux()
+ metricsMux.HandleFunc("/_route", p.routeTableHandler)
+ if containsMode(p.modes, flagPromethusMode) {
+ registerMetricsEndpoint(metricsMux)
+ }
if containsMode(p.modes, flagNativeMode) {
err = p.scheduler.Register("native-metric-collection",
cron.Descriptor, "@every 5s", func(_ time.Time, _ *logger.Logger) bool {
NativeMetricCollection.FlushMetrics()
@@ -180,6 +184,12 @@ func (p *metricService) GracefulStop() {
p.closer.CloseThenWait()
}
+func (p *metricService) routeTableHandler(w http.ResponseWriter, _
*http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ _, _ = w.Write([]byte(p.nodeSelector.String()))
+}
+
func containsMode(modes []string, mode string) bool {
for _, item := range modes {
if item == mode {
diff --git a/banyand/observability/system.go b/banyand/observability/system.go
deleted file mode 100644
index 20aa68b6..00000000
--- a/banyand/observability/system.go
+++ /dev/null
@@ -1,162 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package observability
-
-import (
- "encoding/json"
- "net/http"
- "os"
- "sync"
-
- "github.com/shirou/gopsutil/v3/cpu"
- "github.com/shirou/gopsutil/v3/disk"
- "github.com/shirou/gopsutil/v3/host"
- "github.com/shirou/gopsutil/v3/mem"
-
- "github.com/apache/skywalking-banyandb/pkg/logger"
-)
-
-var systemInfoInstance *SystemInfo
-
-func init() {
- systemInfoInstance = &SystemInfo{
- Addresses: make(map[string]string),
- DiskUsages: make(map[string]*DiskUsage),
- }
-}
-
-// UpdateAddress updates the address of the given name.
-func UpdateAddress(name, address string) {
- systemInfoInstance.Lock()
- defer systemInfoInstance.Unlock()
- systemInfoInstance.Addresses[name] = address
-}
-
-func getAddresses() map[string]string {
- systemInfoInstance.RLock()
- defer systemInfoInstance.RUnlock()
- return systemInfoInstance.Addresses
-}
-
-// UpdatePath updates a path to monitoring its disk usage.
-func UpdatePath(path string) {
- systemInfoInstance.Lock()
- defer systemInfoInstance.Unlock()
- systemInfoInstance.DiskUsages[path] = nil
-}
-
-func getPath() map[string]*DiskUsage {
- systemInfoInstance.RLock()
- defer systemInfoInstance.RUnlock()
- return systemInfoInstance.DiskUsages
-}
-
-// SystemInfo represents the system information of a node.
-type SystemInfo struct {
- Addresses map[string]string `json:"addresses"`
- DiskUsages map[string]*DiskUsage `json:"disk_usages"`
- NodeID string `json:"node_id"`
- Hostname string `json:"hostname"`
- Roles []string `json:"roles"`
- Uptime uint64 `json:"uptime"`
- CPUUsage float64 `json:"cpu_usage"`
- MemoryUsage float64 `json:"memory_usage"`
- sync.RWMutex
-}
-
-// DiskUsage represents the disk usage for a given path.
-type DiskUsage struct {
- Capacity uint64 `json:"capacity"`
- Used uint64 `json:"used"`
-}
-
-// ErrorResponse represents the error response.
-type ErrorResponse struct {
- Message string `json:"message"`
- OriginalError string `json:"original_error,omitempty"`
-}
-
-func init() {
- metricsMux.HandleFunc("/system", systemInfoHandler)
-}
-
-func systemInfoHandler(w http.ResponseWriter, _ *http.Request) {
- hostname, _ := os.Hostname()
- uptime, _ := getUptime()
- cpuUsage, _ := getCPUUsage()
- memoryUsage, _ := getMemoryUsage()
-
- systemInfo := &SystemInfo{
- NodeID: "1",
- Roles: []string{"meta", "ingest", "query", "data"},
- Hostname: hostname,
- Uptime: uptime,
- CPUUsage: cpuUsage,
- MemoryUsage: memoryUsage,
- Addresses: getAddresses(),
- DiskUsages: make(map[string]*DiskUsage),
- }
- for k := range getPath() {
- usage, _ := getDiskUsage(k)
- systemInfo.DiskUsages[k] = &usage
- }
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- if err := json.NewEncoder(w).Encode([]*SystemInfo{systemInfo}); err !=
nil {
- w.WriteHeader(http.StatusInternalServerError)
- errorResponse := &ErrorResponse{
- Message: "Error encoding JSON response",
- OriginalError: err.Error(),
- }
- if err := json.NewEncoder(w).Encode(errorResponse); err != nil {
- logger.GetLogger().Error().Err(err).Msg("Error encoding
JSON response")
- }
- }
-}
-
-func getUptime() (uint64, error) {
- uptime, err := host.Uptime()
- if err != nil {
- return 0, err
- }
- return uptime, nil
-}
-
-func getCPUUsage() (float64, error) {
- percentages, err := cpu.Percent(0, false)
- if err != nil {
- return 0, err
- }
- return percentages[0], nil
-}
-
-func getMemoryUsage() (float64, error) {
- vm, err := mem.VirtualMemory()
- if err != nil {
- return 0, err
- }
- return vm.UsedPercent, nil
-}
-
-func getDiskUsage(path string) (DiskUsage, error) {
- usage, err := disk.Usage(path)
- if err != nil {
- return DiskUsage{}, err
- }
- return DiskUsage{Capacity: usage.Total, Used: usage.Used}, nil
-}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 2b880f9f..829e460a 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -112,7 +112,6 @@ func (s *server) Validate() error {
if s.addr == ":" {
return errNoAddr
}
- observability.UpdateAddress("grpc", s.addr)
if !s.tls {
return nil
}
diff --git a/banyand/stream/block.go b/banyand/stream/block.go
index bd0db065..e6e7b964 100644
--- a/banyand/stream/block.go
+++ b/banyand/stream/block.go
@@ -19,7 +19,6 @@ package stream
import (
"sort"
- "sync"
"golang.org/x/exp/slices"
@@ -31,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -351,7 +351,7 @@ func generateBlock() *block {
if v == nil {
return &block{}
}
- return v.(*block)
+ return v
}
func releaseBlock(b *block) {
@@ -359,7 +359,7 @@ func releaseBlock(b *block) {
blockPool.Put(b)
}
-var blockPool sync.Pool
+var blockPool = pool.Register[*block]("stream-block")
type blockCursor struct {
p *part
@@ -559,14 +559,14 @@ func (bc *blockCursor) loadData(tmpBlock *block) bool {
return true
}
-var blockCursorPool sync.Pool
+var blockCursorPool = pool.Register[*blockCursor]("stream-blockCursor")
func generateBlockCursor() *blockCursor {
v := blockCursorPool.Get()
if v == nil {
return &blockCursor{}
}
- return v.(*blockCursor)
+ return v
}
func releaseBlockCursor(bc *blockCursor) {
@@ -668,7 +668,7 @@ func generateBlockPointer() *blockPointer {
if v == nil {
return &blockPointer{}
}
- return v.(*blockPointer)
+ return v
}
func releaseBlockPointer(bi *blockPointer) {
@@ -676,4 +676,4 @@ func releaseBlockPointer(bi *blockPointer) {
blockPointerPool.Put(bi)
}
-var blockPointerPool sync.Pool
+var blockPointerPool = pool.Register[*blockPointer]("stream-blockPointer")
diff --git a/banyand/stream/block_metadata.go b/banyand/stream/block_metadata.go
index 9f0091ee..4f410cd5 100644
--- a/banyand/stream/block_metadata.go
+++ b/banyand/stream/block_metadata.go
@@ -21,11 +21,11 @@ import (
"errors"
"fmt"
"sort"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/query/model"
)
@@ -175,7 +175,6 @@ func (bm *blockMetadata) unmarshal(src []byte) ([]byte,
error) {
if err != nil {
return nil, fmt.Errorf("cannot unmarshal
tagFamily name: %w", err)
}
- // TODO: cache dataBlock
tf := &dataBlock{}
src, err = tf.unmarshal(src)
if err != nil {
@@ -202,7 +201,7 @@ func generateBlockMetadata() *blockMetadata {
if v == nil {
return &blockMetadata{}
}
- return v.(*blockMetadata)
+ return v
}
func releaseBlockMetadata(bm *blockMetadata) {
@@ -210,7 +209,7 @@ func releaseBlockMetadata(bm *blockMetadata) {
blockMetadataPool.Put(bm)
}
-var blockMetadataPool sync.Pool
+var blockMetadataPool = pool.Register[*blockMetadata]("stream-blockMetadata")
type blockMetadataArray struct {
arr []blockMetadata
@@ -223,14 +222,14 @@ func (bma *blockMetadataArray) reset() {
bma.arr = bma.arr[:0]
}
-var blockMetadataArrayPool sync.Pool
+var blockMetadataArrayPool =
pool.Register[*blockMetadataArray]("stream-blockMetadataArray")
func generateBlockMetadataArray() *blockMetadataArray {
v := blockMetadataArrayPool.Get()
if v == nil {
return &blockMetadataArray{}
}
- return v.(*blockMetadataArray)
+ return v
}
func releaseBlockMetadataArray(bma *blockMetadataArray) {
diff --git a/banyand/stream/block_reader.go b/banyand/stream/block_reader.go
index 60701515..1c8a987c 100644
--- a/banyand/stream/block_reader.go
+++ b/banyand/stream/block_reader.go
@@ -22,11 +22,11 @@ import (
"errors"
"fmt"
"io"
- "sync"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type seqReader struct {
@@ -70,7 +70,7 @@ func (sr *seqReader) mustReadFull(data []byte) {
func generateSeqReader() *seqReader {
if v := seqReaderPool.Get(); v != nil {
- return v.(*seqReader)
+ return v
}
return &seqReader{}
}
@@ -80,7 +80,7 @@ func releaseSeqReader(sr *seqReader) {
seqReaderPool.Put(sr)
}
-var seqReaderPool sync.Pool
+var seqReaderPool = pool.Register[*seqReader]("stream-seqReader")
type seqReaders struct {
tagFamilyMetadata map[string]*seqReader
@@ -216,11 +216,11 @@ func (br *blockReader) error() error {
return br.err
}
-var blockReaderPool sync.Pool
+var blockReaderPool = pool.Register[*blockReader]("stream-blockReader")
func generateBlockReader() *blockReader {
if v := blockReaderPool.Get(); v != nil {
- return v.(*blockReader)
+ return v
}
return &blockReader{}
}
diff --git a/banyand/stream/block_writer.go b/banyand/stream/block_writer.go
index 5125bd85..f55f9ebf 100644
--- a/banyand/stream/block_writer.go
+++ b/banyand/stream/block_writer.go
@@ -19,12 +19,12 @@ package stream
import (
"path/filepath"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type writer struct {
@@ -279,7 +279,7 @@ func generateBlockWriter() *blockWriter {
},
}
}
- return v.(*blockWriter)
+ return v
}
func releaseBlockWriter(bsw *blockWriter) {
@@ -287,4 +287,4 @@ func releaseBlockWriter(bsw *blockWriter) {
blockWriterPool.Put(bsw)
}
-var blockWriterPool sync.Pool
+var blockWriterPool = pool.Register[*blockWriter]("stream-blockWriter")
diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go
index 76c6e659..072e8b39 100644
--- a/banyand/stream/introducer.go
+++ b/banyand/stream/introducer.go
@@ -18,8 +18,7 @@
package stream
import (
- "sync"
-
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
@@ -33,14 +32,14 @@ func (i *introduction) reset() {
i.applied = nil
}
-var introductionPool = sync.Pool{}
+var introductionPool = pool.Register[*introduction]("stream-introduction")
func generateIntroduction() *introduction {
v := introductionPool.Get()
if v == nil {
return &introduction{}
}
- intro := v.(*introduction)
+ intro := v
intro.reset()
return intro
}
@@ -61,7 +60,7 @@ func (i *flusherIntroduction) reset() {
i.applied = nil
}
-var flusherIntroductionPool = sync.Pool{}
+var flusherIntroductionPool =
pool.Register[*flusherIntroduction]("stream-flusher-introduction")
func generateFlusherIntroduction() *flusherIntroduction {
v := flusherIntroductionPool.Get()
@@ -70,7 +69,7 @@ func generateFlusherIntroduction() *flusherIntroduction {
flushed: make(map[uint64]*partWrapper),
}
}
- fi := v.(*flusherIntroduction)
+ fi := v
fi.reset()
return fi
}
@@ -95,14 +94,14 @@ func (i *mergerIntroduction) reset() {
i.creator = 0
}
-var mergerIntroductionPool = sync.Pool{}
+var mergerIntroductionPool =
pool.Register[*mergerIntroduction]("stream-merger-introduction")
func generateMergerIntroduction() *mergerIntroduction {
v := mergerIntroductionPool.Get()
if v == nil {
return &mergerIntroduction{}
}
- mi := v.(*mergerIntroduction)
+ mi := v
mi.reset()
return mi
}
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index b6ad05c7..228e607c 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -22,7 +22,6 @@ import (
"path"
"path/filepath"
"sort"
- "sync"
"sync/atomic"
"time"
@@ -30,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
const (
@@ -198,7 +198,7 @@ func generateMemPart() *memPart {
if v == nil {
return &memPart{}
}
- return v.(*memPart)
+ return v
}
func releaseMemPart(mp *memPart) {
@@ -206,7 +206,7 @@ func releaseMemPart(mp *memPart) {
memPartPool.Put(mp)
}
-var memPartPool sync.Pool
+var memPartPool = pool.Register[*memPart]("stream-memPart")
type partWrapper struct {
mp *memPart
diff --git a/banyand/stream/part_iter.go b/banyand/stream/part_iter.go
index e9c24b98..f4277032 100644
--- a/banyand/stream/part_iter.go
+++ b/banyand/stream/part_iter.go
@@ -22,7 +22,6 @@ import (
"fmt"
"io"
"sort"
- "sync"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/bytes"
@@ -30,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type partIter struct {
@@ -324,7 +324,7 @@ func generatePartMergeIter() *partMergeIter {
if v == nil {
return &partMergeIter{}
}
- return v.(*partMergeIter)
+ return v
}
func releasePartMergeIter(pmi *partMergeIter) {
@@ -332,7 +332,7 @@ func releasePartMergeIter(pmi *partMergeIter) {
pmiPool.Put(pmi)
}
-var pmiPool sync.Pool
+var pmiPool = pool.Register[*partMergeIter]("stream-partMergeIter")
type partMergeIterHeap []*partMergeIter
@@ -366,7 +366,7 @@ func generateColumnValuesDecoder()
*encoding.BytesBlockDecoder {
if v == nil {
return &encoding.BytesBlockDecoder{}
}
- return v.(*encoding.BytesBlockDecoder)
+ return v
}
func releaseColumnValuesDecoder(d *encoding.BytesBlockDecoder) {
@@ -374,4 +374,4 @@ func releaseColumnValuesDecoder(d
*encoding.BytesBlockDecoder) {
columnValuesDecoderPool.Put(d)
}
-var columnValuesDecoderPool sync.Pool
+var columnValuesDecoderPool =
pool.Register[*encoding.BytesBlockDecoder]("stream-columnValuesDecoder")
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index 0a5ebdc3..7c766136 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -190,9 +190,8 @@ func (qr *queryResult) scanParts(ctx context.Context, qo
queryOptions) error {
defer releaseBlockMetadataArray(bma)
defFn := startBlockScanSpan(ctx, len(qo.sortedSids), parts, qr)
defer defFn()
- // TODO: cache tstIter
- var ti tstIter
- defer ti.reset()
+ ti := generateTstIter()
+ defer releaseTstIter(ti)
sids := qo.sortedSids
ti.init(bma, parts, sids, qo.minTimestamp, qo.maxTimestamp)
if ti.Error() != nil {
@@ -288,6 +287,7 @@ func (qr *queryResult) load(ctx context.Context, qo
queryOptions) *model.StreamR
return blankCursorList[i] > blankCursorList[j]
})
for _, index := range blankCursorList {
+ releaseBlockCursor(qr.data[index])
qr.data = append(qr.data[:index], qr.data[index+1:]...)
}
qr.loaded = true
@@ -610,6 +610,7 @@ func mustDecodeTagValue(valueType pbv1.ValueType, value
[]byte) *modelv1.TagValu
case pbv1.ValueTypeStrArr:
var values []string
bb := bigValuePool.Generate()
+ defer bigValuePool.Release(bb)
var err error
for len(value) > 0 {
bb.Buf, value, err = unmarshalVarArray(bb.Buf[:0],
value)
diff --git a/banyand/stream/tag.go b/banyand/stream/tag.go
index 7f5459c0..b7954810 100644
--- a/banyand/stream/tag.go
+++ b/banyand/stream/tag.go
@@ -112,7 +112,7 @@ func (t *tag) mustSeqReadValues(decoder
*encoding.BytesBlockDecoder, reader *seq
}
}
-var bigValuePool bytes.BufferPool
+var bigValuePool = bytes.NewBufferPool("stream-big-value")
type tagFamily struct {
name string
diff --git a/banyand/stream/tag_metadata.go b/banyand/stream/tag_metadata.go
index d044f0ec..d470b6f9 100644
--- a/banyand/stream/tag_metadata.go
+++ b/banyand/stream/tag_metadata.go
@@ -19,11 +19,11 @@ package stream
import (
"fmt"
- "sync"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
type tagMetadata struct {
@@ -131,7 +131,7 @@ func generateTagFamilyMetadata() *tagFamilyMetadata {
if v == nil {
return &tagFamilyMetadata{}
}
- return v.(*tagFamilyMetadata)
+ return v
}
func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
@@ -139,4 +139,4 @@ func releaseTagFamilyMetadata(tfm *tagFamilyMetadata) {
tagFamilyMetadataPool.Put(tfm)
}
-var tagFamilyMetadataPool sync.Pool
+var tagFamilyMetadataPool =
pool.Register[*tagFamilyMetadata]("stream-tagFamilyMetadata")
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index dff93b39..ea329ff1 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -34,6 +34,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
"github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -388,6 +389,21 @@ func (ti *tstIter) Error() error {
return ti.err
}
+func generateTstIter() *tstIter {
+ v := tstIterPool.Get()
+ if v == nil {
+ return &tstIter{}
+ }
+ return v
+}
+
+func releaseTstIter(ti *tstIter) {
+ ti.reset()
+ tstIterPool.Put(ti)
+}
+
+var tstIterPool = pool.Register[*tstIter]("stream-tstIter")
+
type partIterHeap []*partIter
func (pih *partIterHeap) Len() int {
diff --git a/pkg/bytes/buffer.go b/pkg/bytes/buffer.go
index 595ce54c..c657cde9 100644
--- a/pkg/bytes/buffer.go
+++ b/pkg/bytes/buffer.go
@@ -21,9 +21,9 @@ package bytes
import (
"fmt"
"io"
- "sync"
"github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
var (
@@ -107,9 +107,16 @@ func (r *reader) Close() error {
return nil
}
+// NewBufferPool creates a new BufferPool.
+func NewBufferPool(name string) *BufferPool {
+ return &BufferPool{
+ p: pool.Register[*Buffer](name),
+ }
+}
+
// BufferPool is a pool of Buffer.
type BufferPool struct {
- p sync.Pool
+ p *pool.Synced[*Buffer]
}
// Generate generates a Buffer.
@@ -118,7 +125,7 @@ func (bp *BufferPool) Generate() *Buffer {
if bbv == nil {
return &Buffer{}
}
- return bbv.(*Buffer)
+ return bbv
}
// Release releases a Buffer.
diff --git a/pkg/encoding/bytes.go b/pkg/encoding/bytes.go
index 6aab3f89..25ece6fa 100644
--- a/pkg/encoding/bytes.go
+++ b/pkg/encoding/bytes.go
@@ -297,4 +297,4 @@ func decompressBlock(dst, src []byte) ([]byte, []byte,
error) {
}
}
-var bbPool bytes.BufferPool
+var bbPool = bytes.NewBufferPool("encoding.bytesBlock")
diff --git a/pkg/encoding/encoder.go b/pkg/encoding/encoder.go
deleted file mode 100644
index 78add92c..00000000
--- a/pkg/encoding/encoder.go
+++ /dev/null
@@ -1,292 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
- "bytes"
- "encoding/binary"
- "io"
- "sync"
- "time"
-
- "github.com/pkg/errors"
-
- "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-var (
- encoderPool = sync.Pool{
- New: newEncoder,
- }
- decoderPool = sync.Pool{
- New: func() interface{} {
- return &decoder{}
- },
- }
-
- errInvalidValue = errors.New("invalid encoded value")
- errNoData = errors.New("there is no data")
-)
-
-type encoderPoolDelegator struct {
- pool *sync.Pool
- fn ParseInterval
- name string
- size int
-}
-
-// NewEncoderPool returns a SeriesEncoderPool which provides int-based xor
encoders.
-func NewEncoderPool(name string, size int, fn ParseInterval) SeriesEncoderPool
{
- return &encoderPoolDelegator{
- name: name,
- pool: &encoderPool,
- size: size,
- fn: fn,
- }
-}
-
-func (b *encoderPoolDelegator) Get(metadata []byte, buffer BufferWriter)
SeriesEncoder {
- encoder := b.pool.Get().(*encoder)
- encoder.name = b.name
- encoder.size = b.size
- encoder.fn = b.fn
- encoder.Reset(metadata, buffer)
- return encoder
-}
-
-func (b *encoderPoolDelegator) Put(seriesEncoder SeriesEncoder) {
- _, ok := seriesEncoder.(*encoder)
- if ok {
- b.pool.Put(seriesEncoder)
- }
-}
-
-type decoderPoolDelegator struct {
- pool *sync.Pool
- fn ParseInterval
- name string
- size int
-}
-
-// NewDecoderPool returns a SeriesDecoderPool which provides int-based xor
decoders.
-func NewDecoderPool(name string, size int, fn ParseInterval) SeriesDecoderPool
{
- return &decoderPoolDelegator{
- name: name,
- pool: &decoderPool,
- size: size,
- fn: fn,
- }
-}
-
-func (b *decoderPoolDelegator) Get(_ []byte) SeriesDecoder {
- decoder := b.pool.Get().(*decoder)
- decoder.name = b.name
- decoder.size = b.size
- decoder.fn = b.fn
- return decoder
-}
-
-func (b *decoderPoolDelegator) Put(seriesDecoder SeriesDecoder) {
- _, ok := seriesDecoder.(*decoder)
- if ok {
- b.pool.Put(seriesDecoder)
- }
-}
-
-var _ SeriesEncoder = (*encoder)(nil)
-
-// ParseInterval parses the interval rule from the key in a kv pair.
-type ParseInterval = func(key []byte) time.Duration
-
-type encoder struct {
- buff BufferWriter
- bw *Writer
- values *XOREncoder
- fn ParseInterval
- name string
- interval time.Duration
- startTime uint64
- prevTime uint64
- num int
- size int
-}
-
-func newEncoder() interface{} {
- bw := NewWriter()
- return &encoder{
- bw: bw,
- values: NewXOREncoder(bw),
- }
-}
-
-func (ie *encoder) Append(ts uint64, value []byte) {
- if len(value) > 8 {
- return
- }
- if ie.startTime == 0 {
- ie.startTime = ts
- ie.prevTime = ts
- } else if ie.startTime > ts {
- ie.startTime = ts
- }
- gap := int(ie.prevTime) - int(ts)
- if gap < 0 {
- return
- }
- zeroNum := gap/int(ie.interval) - 1
- for i := 0; i < zeroNum; i++ {
- ie.bw.WriteBool(false)
- ie.num++
- }
- ie.prevTime = ts
- l := len(value)
- ie.bw.WriteBool(l > 0)
- ie.values.Write(convert.BytesToUint64(value))
- ie.num++
-}
-
-func (ie *encoder) IsFull() bool {
- return ie.num >= ie.size
-}
-
-func (ie *encoder) Reset(key []byte, buffer BufferWriter) {
- ie.buff = buffer
- ie.bw.Reset(buffer)
- ie.interval = ie.fn(key)
- ie.startTime = 0
- ie.prevTime = 0
- ie.num = 0
- ie.values = NewXOREncoder(ie.bw)
-}
-
-func (ie *encoder) Encode() error {
- ie.bw.Flush()
- buffWriter := NewPacker(ie.buff)
- buffWriter.PutUint64(ie.startTime)
- buffWriter.PutUint16(uint16(ie.num))
- return nil
-}
-
-func (ie *encoder) StartTime() uint64 {
- return ie.startTime
-}
-
-var _ SeriesDecoder = (*decoder)(nil)
-
-type decoder struct {
- fn ParseInterval
- name string
- area []byte
- size int
- interval time.Duration
- startTime uint64
- num int
-}
-
-func (i *decoder) Decode(key, data []byte) error {
- if len(data) < 10 {
- return errInvalidValue
- }
- i.interval = i.fn(key)
- i.startTime = binary.LittleEndian.Uint64(data[len(data)-10 :
len(data)-2])
- i.num = int(binary.LittleEndian.Uint16(data[len(data)-2:]))
- i.area = data[:len(data)-10]
- return nil
-}
-
-func (i decoder) Len() int {
- return i.num
-}
-
-func (i decoder) IsFull() bool {
- return i.num >= i.size
-}
-
-func (i decoder) Get(ts uint64) ([]byte, error) {
- for iter := i.Iterator(); iter.Next(); {
- if iter.Time() == ts {
- return iter.Val(), nil
- }
- }
- return nil, errors.WithMessagef(errNoData, "ts:%d", ts)
-}
-
-func (i decoder) Range() (start, end uint64) {
- return i.startTime, i.startTime + uint64(i.num-1)*uint64(i.interval)
-}
-
-func (i decoder) Iterator() SeriesIterator {
- br := NewReader(bytes.NewReader(i.area))
- return &intIterator{
- endTime: i.startTime + uint64(i.num*int(i.interval)),
- interval: int(i.interval),
- br: br,
- values: NewXORDecoder(br),
- size: i.num,
- }
-}
-
-var _ SeriesIterator = (*intIterator)(nil)
-
-type intIterator struct {
- err error
- br *Reader
- values *XORDecoder
- endTime uint64
- interval int
- size int
- currVal uint64
- currTime uint64
- index int
-}
-
-func (i *intIterator) Next() bool {
- if i.index >= i.size {
- return false
- }
- var b bool
- var err error
- for !b {
- b, err = i.br.ReadBool()
- if errors.Is(err, io.EOF) {
- return false
- }
- if err != nil {
- i.err = err
- return false
- }
- i.index++
- i.currTime = i.endTime - uint64(i.interval*i.index)
- }
- if i.values.Next() {
- i.currVal = i.values.Value()
- }
- return true
-}
-
-func (i *intIterator) Val() []byte {
- return convert.Uint64ToBytes(i.currVal)
-}
-
-func (i *intIterator) Time() uint64 {
- return i.currTime
-}
-
-func (i *intIterator) Error() error {
- return i.err
-}
diff --git a/pkg/encoding/encoder_test.go b/pkg/encoding/encoder_test.go
deleted file mode 100644
index 840a883f..00000000
--- a/pkg/encoding/encoder_test.go
+++ /dev/null
@@ -1,392 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package encoding
-
-import (
- "bytes"
- "testing"
- "time"
-
- "github.com/stretchr/testify/assert"
-
- "github.com/apache/skywalking-banyandb/pkg/convert"
-)
-
-func TestNewEncoderAndDecoder(t *testing.T) {
- type tsData struct {
- ts []uint64
- data []any
- start uint64
- end uint64
- }
- tests := []struct {
- name string
- args tsData
- want tsData
- }{
- {
- name: "int golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "int more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9, 6},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "int less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{7, 8, 7},
- },
- want: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{7, 8, 7},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "int empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(time.Minute)},
- data: []any{7, 9},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float64 golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7.0, 8.0, 7.0, 9.0},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7.0, 8.0, 7.0, 9.0},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float64 more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{0.7, 0.8, 0.7, 0.9, 0.6},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{0.7, 0.8, 0.7, 0.9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float64 less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{1.7, 1.8, 1.7},
- },
- want: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{1.7, 1.8, 1.7},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "float64 empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(time.Minute)},
- data: []any{0.700033, 0.988822},
- },
- want: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{0.700033, 0.988822},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- }
- key := []byte("foo")
- fn := func(k []byte) time.Duration {
- assert.Equal(t, key, k)
- return 1 * time.Minute
- }
- encoderPool := NewEncoderPool("minute", 4, fn)
- decoderPool := NewDecoderPool("minute", 4, fn)
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- at := assert.New(t)
- var buffer bytes.Buffer
- encoder := encoderPool.Get(key, &buffer)
- defer encoderPool.Put(encoder)
- decoder := decoderPool.Get(key)
- defer decoderPool.Put(decoder)
- isFull := false
- for i, v := range tt.args.ts {
- encoder.Append(v, ToBytes(tt.args.data[i]))
- if encoder.IsFull() {
- isFull = true
- break
- }
- }
- err := encoder.Encode()
- at.NoError(err)
-
- at.Equal(tt.want.start, encoder.StartTime())
- at.NoError(decoder.Decode(key, buffer.Bytes()))
- start, end := decoder.Range()
- at.Equal(tt.want.start, start)
- at.Equal(tt.want.end, end)
- if isFull {
- at.True(decoder.IsFull())
- }
- i := 0
- for iter := decoder.Iterator(); iter.Next(); i++ {
- at.NoError(iter.Error())
- at.Equal(tt.want.ts[i], iter.Time())
- at.Equal(tt.want.data[i],
BytesTo(tt.want.data[i], iter.Val()))
- v, err := decoder.Get(iter.Time())
- at.NoError(err)
- at.Equal(tt.want.data[i],
BytesTo(tt.want.data[i], v))
- }
- at.Equal(len(tt.want.ts), i)
- })
- }
-}
-
-func ToBytes(v any) []byte {
- switch d := v.(type) {
- case int:
- return convert.Int64ToBytes(int64(d))
- case float64:
- return convert.Float64ToBytes(d)
- }
- return nil
-}
-
-func BytesTo(t any, b []byte) any {
- switch t.(type) {
- case int:
- return int(convert.BytesToInt64(b))
- case float64:
- return convert.BytesToFloat64(b)
- }
- return nil
-}
-
-func TestNewDecoderGet(t *testing.T) {
- type tsData struct {
- ts []uint64
- data []any
- }
- type wantData struct {
- ts []uint64
- data []any
- wantErr []bool
- start uint64
- end uint64
- }
- tests := []struct {
- name string
- args tsData
- want wantData
- }{
- {
- name: "int golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "int more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{7, 8, 7, 9, 6},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
- data: []any{7, 8, 7, 9, nil},
- wantErr: []bool{false, false, false, false,
true},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "int less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{7, 8, 7},
- },
- want: wantData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{7, 8, 7},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "int empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(time.Minute)},
- data: []any{7, 9},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7, nil, nil, 9},
- wantErr: []bool{false, true, true, false},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float golden path",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7.0, 8.0, 7.0, 9.0},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{7.0, 8.0, 7.0, 9.0},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float more than the size",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute),
uint64(1 * time.Minute)},
- data: []any{1.7, 1.8, 1.7, 1.9, 1.6},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute), 0},
- data: []any{1.7, 1.8, 1.7, 1.9, nil},
- wantErr: []bool{false, false, false, false,
true},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- {
- name: "float less than the size",
- args: tsData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{0.71, 0.833, 0.709},
- },
- want: wantData{
- ts: []uint64{uint64(3 * time.Minute),
uint64(2 * time.Minute), uint64(time.Minute)},
- data: []any{0.71, 0.833, 0.709},
- start: uint64(time.Minute),
- end: uint64(3 * time.Minute),
- },
- },
- {
- name: "float empty slot in the middle",
- args: tsData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(time.Minute)},
- data: []any{1.7, 1.9},
- },
- want: wantData{
- ts: []uint64{uint64(4 * time.Minute),
uint64(3 * time.Minute), uint64(2 * time.Minute), uint64(1 * time.Minute)},
- data: []any{1.7, nil, nil, 1.9},
- wantErr: []bool{false, true, true, false},
- start: uint64(time.Minute),
- end: uint64(4 * time.Minute),
- },
- },
- }
- key := []byte("foo")
- fn := func(k []byte) time.Duration {
- assert.Equal(t, key, k)
- return 1 * time.Minute
- }
- encoderPool := NewEncoderPool("minute", 4, fn)
- decoderPool := NewDecoderPool("minute", 4, fn)
-
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- at := assert.New(t)
- var buffer bytes.Buffer
- encoder := encoderPool.Get(key, &buffer)
- defer encoderPool.Put(encoder)
- decoder := decoderPool.Get(key)
- defer decoderPool.Put(decoder)
- isFull := false
- for i, v := range tt.args.ts {
- encoder.Append(v, ToBytes(tt.args.data[i]))
- if encoder.IsFull() {
- isFull = true
- break
- }
- }
- err := encoder.Encode()
- at.NoError(err)
-
- at.Equal(tt.want.start, encoder.StartTime())
- at.NoError(decoder.Decode(key, buffer.Bytes()))
- start, end := decoder.Range()
- at.Equal(tt.want.start, start)
- at.Equal(tt.want.end, end)
- if isFull {
- at.True(decoder.IsFull())
- }
- for i, t := range tt.want.ts {
- wantErr := false
- if tt.want.wantErr != nil {
- wantErr = tt.want.wantErr[i]
- }
- v, err := decoder.Get(t)
- if wantErr {
- at.ErrorIs(err, errNoData)
- } else {
- at.NoError(err)
- at.Equal(tt.want.data[i],
BytesTo(tt.want.data[i], v))
- }
- }
- })
- }
-}
diff --git a/pkg/encoding/int.go b/pkg/encoding/int.go
index 21b77482..05a227ab 100644
--- a/pkg/encoding/int.go
+++ b/pkg/encoding/int.go
@@ -20,7 +20,8 @@ package encoding
import (
"encoding/binary"
"fmt"
- "sync"
+
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
// Uint16ToBytes appends the bytes of the given uint16 to the given byte slice.
@@ -224,7 +225,7 @@ func GenerateInt64List(size int) *Int64List {
L: make([]int64, size),
}
}
- is := v.(*Int64List)
+ is := v
if n := size - cap(is.L); n > 0 {
is.L = append(is.L[:cap(is.L)], make([]int64, n)...)
}
@@ -243,7 +244,7 @@ type Int64List struct {
L []int64
}
-var int64ListPool sync.Pool
+var int64ListPool = pool.Register[*Int64List]("encoding-int64List")
// GenerateUint64List generates a list of uint64 with the given size.
// The returned list may be from a pool and should be released after use.
@@ -254,7 +255,7 @@ func GenerateUint64List(size int) *Uint64List {
L: make([]uint64, size),
}
}
- is := v.(*Uint64List)
+ is := v
if n := size - cap(is.L); n > 0 {
is.L = append(is.L[:cap(is.L)], make([]uint64, n)...)
}
@@ -273,4 +274,4 @@ type Uint64List struct {
L []uint64
}
-var uint64ListPool sync.Pool
+var uint64ListPool = pool.Register[*Uint64List]("encoding-uin64List")
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index ff7dac5e..39fcb22a 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,12 +25,12 @@ import (
"io"
"os"
"path/filepath"
- "sync"
"time"
"github.com/shirou/gopsutil/v3/disk"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
)
const defaultIOSize = 256 * 1024
@@ -465,7 +465,7 @@ func generateReader(f *os.File) *bufio.Reader {
if v == nil {
return bufio.NewReaderSize(f, defaultIOSize)
}
- br := v.(*bufio.Reader)
+ br := v
br.Reset(f)
return br
}
@@ -475,14 +475,14 @@ func releaseReader(br *bufio.Reader) {
bufReaderPool.Put(br)
}
-var bufReaderPool sync.Pool
+var bufReaderPool = pool.Register[*bufio.Reader]("fs-bufReader")
func generateWriter(f *os.File) *bufio.Writer {
v := bufWriterPool.Get()
if v == nil {
return bufio.NewWriterSize(f, defaultIOSize)
}
- bw := v.(*bufio.Writer)
+ bw := v
bw.Reset(f)
return bw
}
@@ -492,4 +492,4 @@ func releaseWriter(bw *bufio.Writer) {
bufWriterPool.Put(bw)
}
-var bufWriterPool sync.Pool
+var bufWriterPool = pool.Register[*bufio.Writer]("fs-bufWriter")
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index a481b7f3..81af6e20 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,7 +25,6 @@ import (
"io"
"log"
"math"
- "sync"
"time"
"github.com/blugelabs/bluge"
@@ -43,6 +42,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/posting"
"github.com/apache/skywalking-banyandb/pkg/index/posting/roaring"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -88,14 +88,14 @@ type store struct {
l *logger.Logger
}
-var batchPool sync.Pool
+var batchPool = pool.Register[*blugeIndex.Batch]("index-bluge-batch")
func generateBatch() *blugeIndex.Batch {
b := batchPool.Get()
if b == nil {
return bluge.NewBatch()
}
- return b.(*blugeIndex.Batch)
+ return b
}
func releaseBatch(b *blugeIndex.Batch) {
diff --git a/pkg/meter/native/collection.go b/pkg/meter/native/collection.go
index 9904e11a..209211fd 100644
--- a/pkg/meter/native/collection.go
+++ b/pkg/meter/native/collection.go
@@ -19,6 +19,7 @@
package native
import (
+ "fmt"
"time"
"google.golang.org/protobuf/types/known/timestamppb"
@@ -34,6 +35,7 @@ import (
// NodeSelector has Locate method to select a nodeId.
type NodeSelector interface {
Locate(group, name string, shardID uint32) (string, error)
+ fmt.Stringer
}
type collector interface {
diff --git a/pkg/node/interface.go b/pkg/node/interface.go
index 0f41aab5..c4341b7d 100644
--- a/pkg/node/interface.go
+++ b/pkg/node/interface.go
@@ -20,6 +20,7 @@ package node
import (
"context"
+ "fmt"
"sync"
"github.com/pkg/errors"
@@ -42,6 +43,7 @@ type Selector interface {
RemoveNode(node *databasev1.Node)
Pick(group, name string, shardID uint32) (string, error)
run.PreRunner
+ fmt.Stringer
}
// NewPickFirstSelector returns a simple selector that always returns the
first node if exists.
@@ -58,6 +60,17 @@ type pickFirstSelector struct {
mu sync.RWMutex
}
+// String implements Selector.
+func (p *pickFirstSelector) String() string {
+ n, err := p.Pick("", "", 0)
+ if err != nil {
+ return fmt.Sprintf("%v", err)
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return fmt.Sprintf("pick [%s] from %s", n, p.nodeIDs)
+}
+
func (p *pickFirstSelector) PreRun(context.Context) error {
return nil
}
diff --git a/pkg/node/maglev.go b/pkg/node/maglev.go
index 34a855f6..3efcaab6 100644
--- a/pkg/node/maglev.go
+++ b/pkg/node/maglev.go
@@ -19,6 +19,7 @@ package node
import (
"context"
+ "fmt"
"sort"
"strconv"
"sync"
@@ -38,6 +39,18 @@ type maglevSelector struct {
mutex sync.RWMutex
}
+// String implements Selector.
+func (m *maglevSelector) String() string {
+ var groups []string
+ m.routers.Range(func(key, _ any) bool {
+ groups = append(groups, key.(string))
+ return true
+ })
+ m.mutex.RLock()
+ defer m.mutex.Unlock()
+ return fmt.Sprintf("nodes:%s groups:%s", m.nodes, groups)
+}
+
func (m *maglevSelector) Name() string {
return "maglev-selector"
}
diff --git a/pkg/node/round_robin.go b/pkg/node/round_robin.go
index 2815b19c..d0bb37e1 100644
--- a/pkg/node/round_robin.go
+++ b/pkg/node/round_robin.go
@@ -19,6 +19,7 @@ package node
import (
"context"
+ "encoding/json"
"fmt"
"slices"
"sort"
@@ -32,22 +33,47 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
)
type roundRobinSelector struct {
schemaRegistry metadata.Repo
closeCh chan struct{}
- lookupTable sync.Map
+ lookupTable []key
nodes []string
mu sync.RWMutex
}
+func (r *roundRobinSelector) String() string {
+ r.mu.RLock()
+ defer r.mu.RUnlock()
+ result := make(map[string]string)
+ for _, entry := range r.lookupTable {
+ n, err := r.Pick(entry.group, "", entry.shardID)
+ key := fmt.Sprintf("%s-%d", entry.group, entry.shardID)
+ if err != nil {
+ result[key] = fmt.Sprintf("%v", err)
+ continue
+ }
+ result[key] = n
+ }
+ if len(result) < 1 {
+ return ""
+ }
+ jsonBytes, err := json.Marshal(result)
+ if err != nil {
+ return fmt.Sprintf("%v", err)
+ }
+ return convert.BytesToString(jsonBytes)
+}
+
// NewRoundRobinSelector creates a new round-robin selector.
func NewRoundRobinSelector(schemaRegistry metadata.Repo) Selector {
rrs := &roundRobinSelector{
nodes: make([]string, 0),
closeCh: make(chan struct{}),
schemaRegistry: schemaRegistry,
+ lookupTable: make([]key, 0),
}
return rrs
}
@@ -69,9 +95,11 @@ func (r *roundRobinSelector) OnAddOrUpdate(schemaMetadata
schema.Metadata) {
if !ok || !validateGroup(group) {
return
}
+ r.mu.Lock()
+ defer r.mu.Unlock()
for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
k := key{group: group.Metadata.Name, shardID: i}
- r.lookupTable.Store(k, 0)
+ r.lookupTable = append(r.lookupTable, k)
}
r.sortEntries()
}
@@ -80,12 +108,18 @@ func (r *roundRobinSelector) OnDelete(schemaMetadata
schema.Metadata) {
if schemaMetadata.Kind != schema.KindGroup {
return
}
+ r.mu.Lock()
+ defer r.mu.Unlock()
group := schemaMetadata.Spec.(*commonv1.Group)
for i := uint32(0); i < group.ResourceOpts.ShardNum; i++ {
k := key{group: group.Metadata.Name, shardID: i}
- r.lookupTable.Delete(k)
+ for j := range r.lookupTable {
+ if r.lookupTable[j] == k {
+ r.lookupTable = append(r.lookupTable[:j],
r.lookupTable[j+1:]...)
+ break
+ }
+ }
}
- r.sortEntries()
}
func (r *roundRobinSelector) OnInit(kinds []schema.Kind) (bool, []int64) {
@@ -101,8 +135,10 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind)
(bool, []int64) {
if err != nil {
panic(fmt.Sprintf("failed to list group: %v", err))
}
+ r.mu.Lock()
+ defer r.mu.Unlock()
var revision int64
- r.lookupTable = sync.Map{}
+ r.lookupTable = r.lookupTable[:0]
for _, g := range gg {
if !validateGroup(g) {
continue
@@ -112,7 +148,7 @@ func (r *roundRobinSelector) OnInit(kinds []schema.Kind)
(bool, []int64) {
}
for i := uint32(0); i < g.ResourceOpts.ShardNum; i++ {
k := key{group: g.Metadata.Name, shardID: i}
- r.lookupTable.Store(k, 0)
+ r.lookupTable = append(r.lookupTable, k)
}
}
r.sortEntries()
@@ -144,29 +180,26 @@ func (r *roundRobinSelector) Pick(group, _ string,
shardID uint32) (string, erro
if len(r.nodes) == 0 {
return "", errors.New("no nodes available")
}
- entry, ok := r.lookupTable.Load(k)
- if ok {
- return r.selectNode(entry), nil
+ i := sort.Search(len(r.lookupTable), func(i int) bool {
+ if r.lookupTable[i].group == group {
+ return r.lookupTable[i].shardID >= shardID
+ }
+ return r.lookupTable[i].group > group
+ })
+ if i < len(r.lookupTable) && r.lookupTable[i] == k {
+ return r.selectNode(i), nil
}
return "", fmt.Errorf("%s-%d is a unknown shard", group, shardID)
}
func (r *roundRobinSelector) sortEntries() {
- var keys []key
- r.lookupTable.Range(func(k, _ any) bool {
- keys = append(keys, k.(key))
- return true
- })
- slices.SortFunc(keys, func(a, b key) int {
+ slices.SortFunc(r.lookupTable, func(a, b key) int {
n := strings.Compare(a.group, b.group)
if n != 0 {
return n
}
return int(a.shardID) - int(b.shardID)
})
- for i := range keys {
- r.lookupTable.Store(keys[i], i)
- }
}
func (r *roundRobinSelector) Close() {
diff --git a/pkg/node/round_robin_test.go b/pkg/node/round_robin_test.go
index 9e8037c5..223e1908 100644
--- a/pkg/node/round_robin_test.go
+++ b/pkg/node/round_robin_test.go
@@ -111,6 +111,28 @@ func TestCleanupGroup(t *testing.T) {
assert.Error(t, err)
}
+func TestSortNodeEntries(t *testing.T) {
+ selector := &roundRobinSelector{
+ nodes: make([]string, 0),
+ }
+ setupGroup(selector)
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node3"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ assert.EqualValues(t, []string{"node1", "node2", "node3"},
selector.nodes)
+}
+
+func TestStringer(t *testing.T) {
+ selector := NewRoundRobinSelector(nil)
+ assert.Empty(t, selector.String())
+ setupGroup(selector)
+ assert.NotEmpty(t, selector.String())
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node3"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node1"}})
+ selector.AddNode(&databasev1.Node{Metadata: &commonv1.Metadata{Name:
"node2"}})
+ assert.NotEmpty(t, selector.String())
+}
+
var groupSchema = schema.Metadata{
TypeMeta: schema.TypeMeta{
Kind: schema.KindGroup,
diff --git a/pkg/pb/v1/series.go b/pkg/pb/v1/series.go
index 754d155e..a0e76827 100644
--- a/pkg/pb/v1/series.go
+++ b/pkg/pb/v1/series.go
@@ -19,7 +19,6 @@ package v1
import (
"sort"
- "sync"
"github.com/pkg/errors"
@@ -97,26 +96,6 @@ func (s *Series) reset() {
s.Buffer = s.Buffer[:0]
}
-// SeriesPool is a pool of Series.
-type SeriesPool struct {
- pool sync.Pool
-}
-
-// Generate creates a new Series or gets one from the pool.
-func (sp *SeriesPool) Generate() *Series {
- sv := sp.pool.Get()
- if sv == nil {
- return &Series{}
- }
- return sv.(*Series)
-}
-
-// Release puts a Series back to the pool.
-func (sp *SeriesPool) Release(s *Series) {
- s.reset()
- sp.pool.Put(s)
-}
-
// SeriesList is a collection of Series.
type SeriesList []*Series
diff --git a/pkg/pool/pool.go b/pkg/pool/pool.go
new file mode 100644
index 00000000..323cb81c
--- /dev/null
+++ b/pkg/pool/pool.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package pool provides a pool for reusing objects.
+package pool
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+)
+
+var poolMap = sync.Map{}
+
+// Register registers a new pool with the given name.
+func Register[T any](name string) *Synced[T] {
+ p := new(Synced[T])
+ if _, ok := poolMap.LoadOrStore(name, p); ok {
+ panic(fmt.Sprintf("duplicated pool: %s", name))
+ }
+ return p
+}
+
+// AllRefsCount returns the reference count of all pools.
+func AllRefsCount() map[string]int {
+ result := make(map[string]int)
+ poolMap.Range(func(key, value any) bool {
+ result[key.(string)] = value.(Trackable).RefsCount()
+ return true
+ })
+ return result
+}
+
+// Trackable is the interface that wraps the RefsCount method.
+type Trackable interface {
+ // RefsCount returns the reference count of the pool.
+ RefsCount() int
+}
+
+// Synced is a pool that is safe for concurrent use.
+type Synced[T any] struct {
+ sync.Pool
+ refs atomic.Int32
+}
+
+// Get returns an object from the pool.
+// If the pool is empty, nil is returned.
+func (p *Synced[T]) Get() T {
+ v := p.Pool.Get()
+ p.refs.Add(1)
+ if v == nil {
+ var t T
+ return t
+ }
+ return v.(T)
+}
+
+// Put puts an object back to the pool.
+func (p *Synced[T]) Put(v T) {
+ p.Pool.Put(v)
+ p.refs.Add(-1)
+}
+
+// RefsCount returns the reference count of the pool.
+func (p *Synced[T]) RefsCount() int {
+ return int(p.refs.Load())
+}
diff --git a/pkg/test/gmatcher/gmatcher.go b/pkg/test/gmatcher/gmatcher.go
new file mode 100644
index 00000000..93b6fb1a
--- /dev/null
+++ b/pkg/test/gmatcher/gmatcher.go
@@ -0,0 +1,59 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package gmatcher provides custom Gomega matchers.
+package gmatcher
+
+import (
+ "fmt"
+
+ "github.com/onsi/gomega"
+)
+
+// HaveZeroRef returns a matcher that checks if all pools have 0 references.
+func HaveZeroRef() gomega.OmegaMatcher {
+ return &ZeroRefMatcher{}
+}
+
+var _ gomega.OmegaMatcher = &ZeroRefMatcher{}
+
+// ZeroRefMatcher is a matcher that checks if all pools have 0 references.
+type ZeroRefMatcher struct{}
+
+// FailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) FailureMessage(actual interface{}) (message string) {
+ return fmt.Sprintf("expected all pools to have 0 references, got %v",
actual)
+}
+
+// Match implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) Match(actual interface{}) (success bool, err error) {
+ data, ok := actual.(map[string]int)
+ if !ok {
+ return false, fmt.Errorf("expected map[string]int, got %T",
actual)
+ }
+ for pooName, refers := range data {
+ if refers > 0 {
+ return false, fmt.Errorf("pool %s has %d references",
pooName, refers)
+ }
+ }
+ return true, nil
+}
+
+// NegatedFailureMessage implements types.GomegaMatcher.
+func (p *ZeroRefMatcher) NegatedFailureMessage(actual interface{}) (message
string) {
+ return fmt.Sprintf("expected at least one pool to have references, got
%v", actual)
+}
diff --git a/pkg/test/measure/testdata/groups/exception.json
b/pkg/test/measure/testdata/groups/exception.json
new file mode 100644
index 00000000..fadbd5a5
--- /dev/null
+++ b/pkg/test/measure/testdata/groups/exception.json
@@ -0,0 +1,18 @@
+{
+ "metadata": {
+ "name": "exception"
+ },
+ "catalog": "CATALOG_MEASURE",
+ "resource_opts": {
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ }
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/measure/testdata/measures/duplicated.json
b/pkg/test/measure/testdata/measures/duplicated.json
new file mode 100644
index 00000000..e25f0a37
--- /dev/null
+++ b/pkg/test/measure/testdata/measures/duplicated.json
@@ -0,0 +1,42 @@
+{
+ "metadata": {
+ "name": "duplicated",
+ "group": "exception"
+ },
+ "tag_families": [
+ {
+ "name": "default",
+ "tags": [
+ {
+ "name": "id",
+ "type": "TAG_TYPE_STRING"
+ },
+ {
+ "name": "entity_id",
+ "type": "TAG_TYPE_STRING"
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "name": "total",
+ "field_type": "FIELD_TYPE_INT",
+ "encoding_method": "ENCODING_METHOD_GORILLA",
+ "compression_method": "COMPRESSION_METHOD_ZSTD"
+ },
+ {
+ "name": "value",
+ "field_type": "FIELD_TYPE_INT",
+ "encoding_method": "ENCODING_METHOD_GORILLA",
+ "compression_method": "COMPRESSION_METHOD_ZSTD"
+ }
+ ],
+ "entity": {
+ "tag_names": [
+ "entity_id"
+ ]
+ },
+ "interval": "1m",
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/init.go b/test/cases/init.go
index a39e09f6..7341a96d 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -54,4 +54,5 @@ func Initialize(addr string, now time.Time) {
casesmeasuredata.Write(conn, "service_latency_minute", "sw_metric",
"service_latency_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_latency_minute",
"sw_metric", "service_instance_latency_minute_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_latency_minute",
"sw_metric", "service_instance_latency_minute_data1.json",
now.Add(1*time.Minute), interval)
+ casesmeasuredata.Write(conn, "duplicated", "exception",
"duplicated.json", now, 0)
}
diff --git a/test/cases/measure/data/input/duplicated_part.yaml
b/test/cases/measure/data/input/duplicated_part.yaml
new file mode 100644
index 00000000..91900820
--- /dev/null
+++ b/test/cases/measure/data/input/duplicated_part.yaml
@@ -0,0 +1,25 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+name: "duplicated"
+groups: ["exception"]
+tagProjection:
+ tagFamilies:
+ - name: "default"
+ tags: ["id", "entity_id"]
+fieldProjection:
+ names: ["total", "value"]
diff --git a/test/cases/measure/data/testdata/duplicated.json
b/test/cases/measure/data/testdata/duplicated.json
new file mode 100644
index 00000000..d67feaaf
--- /dev/null
+++ b/test/cases/measure/data/testdata/duplicated.json
@@ -0,0 +1,182 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 1
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 2
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc1"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 3
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc2"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 100
+ }
+ },
+ {
+ "int": {
+ "value": 5
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc2"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 50
+ }
+ },
+ {
+ "int": {
+ "value": 4
+ }
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "svc3"
+ }
+ },
+ {
+ "str": {
+ "value": "entity_1"
+ }
+ }
+ ]
+ }
+ ],
+ "fields": [
+ {
+ "int": {
+ "value": 300
+ }
+ },
+ {
+ "int": {
+ "value": 6
+ }
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/data/want/duplicated_part.yaml
b/test/cases/measure/data/want/duplicated_part.yaml
new file mode 100644
index 00000000..10a57e59
--- /dev/null
+++ b/test/cases/measure/data/want/duplicated_part.yaml
@@ -0,0 +1,38 @@
+# Licensed to Apache Software Foundation (ASF) under one or more contributor
+# license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright
+# ownership. Apache Software Foundation (ASF) licenses this file to you under
+# the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+dataPoints:
+- fields:
+ - name: total
+ value:
+ int:
+ value: "300"
+ - name: value
+ value:
+ int:
+ value: "6"
+ tagFamilies:
+ - name: default
+ tags:
+ - key: id
+ value:
+ str:
+ value: svc3
+ - key: entity_id
+ value:
+ str:
+ value: entity_1
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index 960401ca..06b24498 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -70,4 +70,5 @@ var _ = g.DescribeTable("Scanning Measures", verify,
g.Entry("float64 value", helpers.Args{Input: "float", Duration: 25 *
time.Minute, Offset: -20 * time.Minute}),
g.Entry("float64 aggregation:min", helpers.Args{Input: "float_agg_min",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
g.Entry("all_latency", helpers.Args{Input: "all_latency", Duration: 25
* time.Minute, Offset: -20 * time.Minute}),
+ g.Entry("duplicated in a part", helpers.Args{Input: "duplicated_part",
Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
)
diff --git a/test/integration/distributed/query/query_suite_test.go
b/test/integration/distributed/query/query_suite_test.go
index 0c78a14f..3d8eaad3 100644
--- a/test/integration/distributed/query/query_suite_test.go
+++ b/test/integration/distributed/query/query_suite_test.go
@@ -35,8 +35,10 @@ import (
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
@@ -132,4 +134,5 @@ var _ = SynchronizedAfterSuite(func() {
}, func() {
deferFunc()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
diff --git a/test/integration/etcd/client_test.go
b/test/integration/etcd/client_test.go
index a78d8345..ddfb4797 100644
--- a/test/integration/etcd/client_test.go
+++ b/test/integration/etcd/client_test.go
@@ -36,8 +36,10 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
)
@@ -76,6 +78,7 @@ var _ = Describe("Client Test", func() {
AfterEach(func() {
dirSpaceDef()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
It("should be using user/password connect etcd server successfully",
func() {
diff --git a/test/integration/load/load_suite_test.go
b/test/integration/load/load_suite_test.go
index 1858e380..84dccc38 100644
--- a/test/integration/load/load_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -36,8 +36,10 @@ import (
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
cases_stream_data
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
@@ -157,6 +159,7 @@ var _ = Describe("Load Test Suit", func() {
}
deferFunc()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
})
diff --git a/test/integration/standalone/cold_query/query_suite_test.go
b/test/integration/standalone/cold_query/query_suite_test.go
index 18e832ad..a28c145e 100644
--- a/test/integration/standalone/cold_query/query_suite_test.go
+++ b/test/integration/standalone/cold_query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -89,4 +91,5 @@ var _ = SynchronizedAfterSuite(func() {
}, func() {
deferFunc()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
diff --git a/test/integration/standalone/other/measure_test.go
b/test/integration/standalone/other/measure_test.go
index a2ab6e81..a0ced9ac 100644
--- a/test/integration/standalone/other/measure_test.go
+++ b/test/integration/standalone/other/measure_test.go
@@ -27,7 +27,9 @@ import (
"google.golang.org/grpc/credentials/insecure"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -57,6 +59,7 @@ var _ = g.Describe("Query service_cpm_minute", func() {
gm.Expect(conn.Close()).To(gm.Succeed())
deferFn()
gm.Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ gm.Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
g.It("queries service_cpm_minute by id after updating", func() {
casesMeasureData.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data1.json", baseTime, interval)
diff --git a/test/integration/standalone/other/property_test.go
b/test/integration/standalone/other/property_test.go
index ab217059..164b3ce3 100644
--- a/test/integration/standalone/other/property_test.go
+++ b/test/integration/standalone/other/property_test.go
@@ -32,7 +32,9 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
)
@@ -57,6 +59,7 @@ var _ = Describe("Property application", func() {
Expect(conn.Close()).To(Succeed())
deferFn()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
It("applies properties", func() {
md := &propertyv1.Metadata{
diff --git a/test/integration/standalone/query/query_suite_test.go
b/test/integration/standalone/query/query_suite_test.go
index a1e684b2..765cd893 100644
--- a/test/integration/standalone/query/query_suite_test.go
+++ b/test/integration/standalone/query/query_suite_test.go
@@ -29,7 +29,9 @@ import (
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -90,4 +92,5 @@ var _ = SynchronizedAfterSuite(func() {
}, func() {
deferFunc()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})
diff --git
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index f6fcb14c..346b8918 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -29,8 +29,10 @@ import (
"github.com/apache/skywalking-banyandb/pkg/grpchelper"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/pool"
"github.com/apache/skywalking-banyandb/pkg/test"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/test/gmatcher"
"github.com/apache/skywalking-banyandb/pkg/test/helpers"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -103,4 +105,5 @@ var _ = SynchronizedAfterSuite(func() {
}, func() {
deferFunc()
Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ Eventually(pool.AllRefsCount,
flags.EventuallyTimeout).Should(gmatcher.HaveZeroRef())
})