This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 768baa12 Shard-level buffer (#339)
768baa12 is described below
commit 768baa127e08283109b8f26cab0abf3f18fe9292
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Oct 10 17:11:44 2023 +0800
Shard-level buffer (#339)
---
CHANGES.md | 1 +
banyand/kv/badger.go | 46 +++++++++++++-
banyand/kv/badger_test.go | 72 ++++++++++++++++++++++
banyand/kv/kv.go | 10 +++
banyand/measure/tstable.go | 67 ++++++---------------
banyand/stream/tstable.go | 60 ++++++------------
banyand/tsdb/block.go | 8 ++-
banyand/tsdb/buffer.go | 144 ++++++++++++++++++++++++++++++++++++++++----
banyand/tsdb/buffer_test.go | 135 ++++++++++++++++++++++++++++++-----------
banyand/tsdb/shard.go | 30 ++++++---
banyand/tsdb/shard_test.go | 3 +
banyand/tsdb/tsdb.go | 8 ++-
banyand/tsdb/tsdb_test.go | 2 +-
banyand/tsdb/tstable.go | 26 +-------
14 files changed, 433 insertions(+), 179 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 8edd1e3e..fd2e3767 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -18,6 +18,7 @@ Release Notes.
- Add mod revision check to write requests.
- Add TTL to the property.
- Implement node selector (e.g. PickFirst Selector, Maglev Selector).
+- Unified the buffers separated in blocks to a single buffer in the shard.
### Bugs
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index d1f998ad..09f1d385 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -31,6 +31,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -45,12 +46,16 @@ var (
type badgerTSS struct {
badger.TSet
- db *badger.DB
- dbOpts badger.Options
+ timeRange timestamp.TimeRange
+ db *badger.DB
+ dbOpts badger.Options
}
func (b *badgerTSS) Handover(skl *skl.Skiplist) error {
- return b.db.HandoverIterator(skl.NewUniIterator(false))
+ return b.db.HandoverIterator(&timeRangeIterator{
+ timeRange: b.timeRange,
+ UniIterator: skl.NewUniIterator(false),
+ })
}
func (b *badgerTSS) Close() error {
@@ -119,6 +124,41 @@ func (i mergedIter) Value() y.ValueStruct {
}
}
+type timeRangeIterator struct {
+ *skl.UniIterator
+ timeRange timestamp.TimeRange
+}
+
+func (i *timeRangeIterator) Next() {
+ i.UniIterator.Next()
+ for !i.validTime() {
+ i.UniIterator.Next()
+ }
+}
+
+func (i *timeRangeIterator) Rewind() {
+ i.UniIterator.Rewind()
+ if !i.validTime() {
+ i.Next()
+ }
+}
+
+func (i *timeRangeIterator) Seek(key []byte) {
+ i.UniIterator.Seek(key)
+ if !i.validTime() {
+ i.Next()
+ }
+}
+
+func (i *timeRangeIterator) validTime() bool {
+ if !i.Valid() {
+ // If the underlying iterator is invalid, we should return true
to stop iterating.
+ return true
+ }
+ ts := y.ParseTs(i.Key())
+ return i.timeRange.Contains(ts)
+}
+
type badgerDB struct {
db *badger.DB
dbOpts badger.Options
diff --git a/banyand/kv/badger_test.go b/banyand/kv/badger_test.go
new file mode 100644
index 00000000..5313bbfa
--- /dev/null
+++ b/banyand/kv/badger_test.go
@@ -0,0 +1,72 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kv
+
+import (
+ "reflect"
+ "testing"
+ "time"
+
+ "github.com/dgraph-io/badger/v3/skl"
+ "github.com/dgraph-io/badger/v3/y"
+
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+var arenaSize = int64(4 << 20)
+
+func TestTimeRangeIterator(t *testing.T) {
+ // Create a new skiplist and insert some data
+ sl := skl.NewSkiplist(arenaSize)
+ sl.Put(y.KeyWithTs([]byte("key"), 1), y.ValueStruct{Value:
[]byte("value1"), Meta: 0})
+ sl.Put(y.KeyWithTs([]byte("key"), 2), y.ValueStruct{Value:
[]byte("value2"), Meta: 0})
+ sl.Put(y.KeyWithTs([]byte("key"), 3), y.ValueStruct{Value:
[]byte("value3"), Meta: 0})
+ sl.Put(y.KeyWithTs([]byte("key"), 4), y.ValueStruct{Value:
[]byte("value4"), Meta: 0})
+
+ // Create a new time range iterator for the skiplist
+ iter := &timeRangeIterator{
+ timeRange: timestamp.NewInclusiveTimeRange(
+ timestamp.DefaultTimeRange.Begin.AsTime(),
+ timestamp.DefaultTimeRange.End.AsTime()),
+ UniIterator: sl.NewUniIterator(false),
+ }
+
+ // Test Next() and Value() methods
+ var values []string
+ for iter.Rewind(); iter.Valid(); iter.Next() {
+ values = append(values, string(iter.Value().Value))
+ }
+ expectedValues := []string{"value4", "value3", "value2", "value1"}
+ if !reflect.DeepEqual(values, expectedValues) {
+ t.Errorf("unexpected values: %v, expected: %v", values,
expectedValues)
+ }
+
+ // Test Next() method with time range filtering
+ iter = &timeRangeIterator{
+ timeRange: timestamp.NewSectionTimeRange(time.Unix(0, 2),
time.Unix(0, 3)),
+ UniIterator: sl.NewUniIterator(false),
+ }
+ values = nil
+ for iter.Rewind(); iter.Valid(); iter.Next() {
+ values = append(values, string(iter.Value().Value))
+ }
+ expectedValues = []string{"value2"}
+ if !reflect.DeepEqual(values, expectedValues) {
+ t.Errorf("unexpected values: %v, expected: %v", values,
expectedValues)
+ }
+}
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 49b105c3..28675730 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -147,6 +148,15 @@ func TSSWithMemTableSize(sizeInBytes int64)
TimeSeriesOptions {
}
}
+// TSSWithTimeRange sets the time range of the time series.
+func TSSWithTimeRange(timeRange timestamp.TimeRange) TimeSeriesOptions {
+ return func(store TimeSeriesStore) {
+ if btss, ok := store.(*badgerTSS); ok {
+ btss.timeRange = timeRange
+ }
+ }
+}
+
// Iterator allows iterating the kv tables.
// TODO: use generic to provide a unique iterator.
type Iterator interface {
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index a52b2a78..28ca9561 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io"
"path"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -49,15 +50,14 @@ const (
var _ tsdb.TSTable = (*tsTable)(nil)
type tsTable struct {
- encoderSST kv.TimeSeriesStore
- sst kv.TimeSeriesStore
- *tsdb.BlockExpiryTracker
+ encoderSST kv.TimeSeriesStore
+ sst kv.TimeSeriesStore
+ bufferSupplier *tsdb.BufferSupplier
l *logger.Logger
encoderBuffer *tsdb.Buffer
buffer *tsdb.Buffer
- closeBufferTimer *time.Timer
position common.Position
- path string
+ id string
bufferSize int64
encoderBufferSize int64
lock sync.RWMutex
@@ -74,40 +74,13 @@ func (t *tsTable) openBuffer() (err error) {
return nil
}
bufferSize := int(t.encoderBufferSize / defaultNumBufferShards)
- if t.encoderBuffer, err = tsdb.NewBufferWithWal(t.l, t.position,
bufferSize,
- defaultWriteConcurrency, defaultNumBufferShards,
t.encoderFlush, defaultWriteWal, &t.path); err != nil {
+ if t.encoderBuffer, err = t.bufferSupplier.Borrow(encoded, t.id,
bufferSize, t.encoderFlush); err != nil {
return fmt.Errorf("failed to create encoder buffer: %w", err)
}
bufferSize = int(t.bufferSize / defaultNumBufferShards)
- if t.buffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize,
- defaultWriteConcurrency, defaultNumBufferShards, t.flush,
defaultWriteWal, &t.path); err != nil {
+ if t.buffer, err = t.bufferSupplier.Borrow(plain, t.id, bufferSize,
t.flush); err != nil {
return fmt.Errorf("failed to create buffer: %w", err)
}
- end := t.EndTime()
- now := time.Now()
- closeAfter := end.Sub(now)
- if now.After(end) {
- closeAfter = t.BlockExpiryDuration()
- }
- t.closeBufferTimer = time.AfterFunc(closeAfter, func() {
- if t.l.Debug().Enabled() {
- t.l.Debug().Msg("closing buffer")
- }
- t.lock.Lock()
- defer t.lock.Unlock()
- if t.encoderBuffer != nil {
- if err := t.encoderBuffer.Close(); err != nil {
- t.l.Error().Err(err).Msg("close encoder buffer
error")
- }
- t.encoderBuffer = nil
- }
- if t.buffer != nil {
- if err := t.buffer.Close(); err != nil {
- t.l.Error().Err(err).Msg("close buffer error")
- }
- t.buffer = nil
- }
- })
return nil
}
@@ -119,6 +92,9 @@ func (t *tsTable) Close() (err error) {
err = multierr.Append(err, b.Close())
}
}
+ for _, b := range []string{encoded, plain} {
+ t.bufferSupplier.Return(b, t.id)
+ }
return err
}
@@ -203,7 +179,7 @@ type tsTableFactory struct {
compressionMethod databasev1.CompressionMethod
}
-func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker
tsdb.BlockExpiryTracker, root string, position common.Position, l
*logger.Logger) (tsdb.TSTable, error) {
+func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier,
root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
encoderSST, err := kv.OpenTimeSeriesStore(
path.Join(root, encoded),
kv.TSSWithMemTableSize(ttf.bufferSize),
@@ -223,19 +199,14 @@ func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker
tsdb.BlockExpiryTracker
return nil, fmt.Errorf("failed to create time series table:
%w", err)
}
table := &tsTable{
- bufferSize: ttf.bufferSize,
- encoderBufferSize: ttf.encoderBufferSize,
- l: l,
- position: position,
- encoderSST: encoderSST,
- sst: sst,
- BlockExpiryTracker: &blockExpiryTracker,
- path: root,
- }
- if table.IsActive() {
- if err := table.openBuffer(); err != nil {
- return nil, fmt.Errorf("failed to open buffer: %w", err)
- }
+ bufferSize: ttf.bufferSize,
+ encoderBufferSize: ttf.encoderBufferSize,
+ l: l,
+ position: position,
+ encoderSST: encoderSST,
+ sst: sst,
+ id: strings.Join([]string{position.Segment,
position.Block}, "-"),
+ bufferSupplier: bufferSupplier,
}
return table, nil
}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 6dca480c..1cdc4fa7 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -20,6 +20,7 @@ package stream
import (
"fmt"
"path"
+ "strings"
"sync"
"time"
@@ -43,14 +44,14 @@ const (
var _ tsdb.TSTable = (*tsTable)(nil)
type tsTable struct {
- sst kv.TimeSeriesStore
- *tsdb.BlockExpiryTracker
- l *logger.Logger
- buffer *tsdb.Buffer
- closeBufferTimer *time.Timer
- position common.Position
- bufferSize int64
- lock sync.RWMutex
+ sst kv.TimeSeriesStore
+ l *logger.Logger
+ buffer *tsdb.Buffer
+ bufferSupplier *tsdb.BufferSupplier
+ position common.Position
+ id string
+ bufferSize int64
+ lock sync.RWMutex
}
func (t *tsTable) SizeOnDisk() int64 {
@@ -64,30 +65,9 @@ func (t *tsTable) openBuffer() (err error) {
return nil
}
bufferSize := int(t.bufferSize / defaultNumBufferShards)
- if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
- defaultWriteConcurrency, defaultNumBufferShards, t.flush); err
!= nil {
+ if t.buffer, err = t.bufferSupplier.Borrow(id, t.id, bufferSize,
t.flush); err != nil {
return fmt.Errorf("failed to create buffer: %w", err)
}
- end := t.EndTime()
- now := time.Now()
- closeAfter := end.Sub(now)
- if now.After(end) {
- closeAfter = t.BlockExpiryDuration()
- }
- t.closeBufferTimer = time.AfterFunc(closeAfter, func() {
- if t.l.Debug().Enabled() {
- t.l.Debug().Msg("closing buffer")
- }
- t.lock.Lock()
- defer t.lock.Unlock()
- if t.buffer == nil {
- return
- }
- if err := t.buffer.Close(); err != nil {
- t.l.Error().Err(err).Msg("close buffer error")
- }
- t.buffer = nil
- })
return nil
}
@@ -95,7 +75,7 @@ func (t *tsTable) Close() (err error) {
t.lock.Lock()
defer t.lock.Unlock()
if t.buffer != nil {
- err = multierr.Append(err, t.buffer.Close())
+ t.bufferSupplier.Return(id, t.id)
}
return multierr.Combine(err, t.sst.Close())
}
@@ -137,23 +117,19 @@ type tsTableFactory struct {
chunkSize int
}
-func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker
tsdb.BlockExpiryTracker, root string, position common.Position, l
*logger.Logger) (tsdb.TSTable, error) {
+func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier,
root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
sst, err := kv.OpenTimeSeriesStore(path.Join(root, id),
kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)),
kv.TSSWithZSTDCompression(ttf.chunkSize))
if err != nil {
return nil, fmt.Errorf("failed to create time series table:
%w", err)
}
table := &tsTable{
- bufferSize: ttf.bufferSize,
- l: l,
- position: position,
- sst: sst,
- BlockExpiryTracker: &blockExpiryTracker,
- }
- if table.IsActive() {
- if err := table.openBuffer(); err != nil {
- return nil, fmt.Errorf("failed to open buffer: %w", err)
- }
+ bufferSupplier: bufferSupplier,
+ bufferSize: ttf.bufferSize,
+ l: l,
+ position: position,
+ sst: sst,
+ id: strings.Join([]string{position.Segment,
position.Block}, "-"),
}
return table, nil
}
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 4f67416b..32e4d521 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -108,6 +108,7 @@ type block struct {
type openOpts struct {
tsTableFactory TSTableFactory
+ bufferSupplier *BufferSupplier
inverted *inverted.StoreOpts
lsm lsm.StoreOpts
}
@@ -179,6 +180,11 @@ func options(ctx context.Context, root string, l
*logger.Logger) (openOpts, erro
if opts.tsTableFactory == nil {
return opts, errors.New("ts table factory is nil")
}
+ bs := ctx.Value(bufferSupplierKey)
+ if bs == nil {
+ return opts, errors.New("buffer supplier not found")
+ }
+ opts.bufferSupplier = bs.(*BufferSupplier)
return opts, nil
}
@@ -195,7 +201,7 @@ func (b *block) openSafely() (err error) {
}
func (b *block) open() (err error) {
- if b.tsTable, err =
b.openOpts.tsTableFactory.NewTSTable(BlockExpiryTracker{ttl: b.End, clock:
b.clock},
+ if b.tsTable, err =
b.openOpts.tsTableFactory.NewTSTable(b.openOpts.bufferSupplier,
b.path, b.position, b.l); err != nil {
return err
}
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index e1fe1c50..49dbf4d7 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -89,7 +89,7 @@ type bufferShardBucket struct {
// Buffer is an exported struct that represents a buffer composed of multiple
shard buckets.
type Buffer struct {
- onFlushFn onFlush
+ onFlushFn sync.Map
entryCloser *run.Closer
log *logger.Logger
buckets []bufferShardBucket
@@ -101,18 +101,17 @@ type Buffer struct {
}
// NewBuffer creates a new Buffer instance with the given parameters.
-func NewBuffer(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
- return NewBufferWithWal(log, position, flushSize, writeConcurrency,
numShards, onFlushFn, false, nil)
+func NewBuffer(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int) (*Buffer, error) {
+ return NewBufferWithWal(log, position, flushSize, writeConcurrency,
numShards, false, "")
}
// NewBufferWithWal creates a new Buffer instance with the given parameters.
-func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int, onFlushFn onFlush, enableWal bool, walPath
*string,
+func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int, enableWal bool, walPath string,
) (*Buffer, error) {
buckets := make([]bufferShardBucket, numShards)
buffer := &Buffer{
buckets: buckets,
numShards: numShards,
- onFlushFn: onFlushFn,
entryCloser: run.NewCloser(1),
log: log.Named("buffer"),
enableWal: enableWal,
@@ -133,12 +132,12 @@ func NewBufferWithWal(log *logger.Logger, position
common.Position, flushSize, w
shardLabelValues: position.ShardLabelValues(),
enableWal: enableWal,
}
- buckets[i].start(onFlushFn)
+ buckets[i].start(buffer.flushers)
if enableWal {
- if walPath == nil {
+ if walPath == "" {
return nil, errors.New("wal path is required")
}
- shardWalPath := fmt.Sprintf("%s/buffer-%d", *walPath, i)
+ shardWalPath := fmt.Sprintf("%s/buffer-%d", walPath, i)
if err := buckets[i].startWal(shardWalPath,
defaultWalSyncMode); err != nil {
return nil, errors.Wrap(err, "failed to start
wal")
}
@@ -148,6 +147,29 @@ func NewBufferWithWal(log *logger.Logger, position
common.Position, flushSize, w
return buffer, nil
}
+// Register registers a callback function that will be called when a shard
bucket is flushed.
+func (b *Buffer) Register(id string, onFlushFn onFlush) {
+ b.onFlushFn.LoadOrStore(id, onFlushFn)
+}
+
+// Unregister unregisters a callback function that will be called when a shard
bucket is flushed.
+func (b *Buffer) Unregister(id string) {
+ b.onFlushFn.Delete(id)
+}
+
+func (b *Buffer) flushers() []onFlush {
+ var flushers []onFlush
+ b.onFlushFn.Range(func(key, value interface{}) bool {
+ flushers = append(flushers, value.(onFlush))
+ return true
+ })
+ return flushers
+}
+
+func (b *Buffer) isEmpty() bool {
+ return len(b.flushers()) == 0
+}
+
// Write adds a key-value pair with a timestamp to the appropriate shard
bucket in the buffer.
func (b *Buffer) Write(key, value []byte, timestamp time.Time) error {
if b == nil || !b.entryCloser.AddRunning() {
@@ -206,8 +228,11 @@ func (b *Buffer) Close() error {
}
b.writeWaitGroup.Wait()
for i := 0; i < b.numShards; i++ {
- if err := b.onFlushFn(i, b.buckets[i].mutable); err !=
nil {
- b.buckets[i].log.Err(err).Msg("flushing mutable
buffer failed")
+ ff := b.flushers()
+ for _, fn := range ff {
+ if err := fn(i, b.buckets[i].mutable); err !=
nil {
+ b.buckets[i].log.Err(err).Msg("flushing
mutable buffer failed")
+ }
}
b.buckets[i].mutable.DecrRef()
}
@@ -246,7 +271,7 @@ func (bsb *bufferShardBucket) getAll() ([]*skl.Skiplist,
func()) {
}
}
-func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
+func (bsb *bufferShardBucket) start(flushers func() []onFlush) {
go func() {
defer func() {
for _, g := range []meter.Gauge{maxBytes, mutableBytes}
{
@@ -259,12 +284,21 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
memSize := oldSkipList.MemSize()
onFlushFnDone := false
t1 := time.Now()
+ ff := flushers()
for {
if !onFlushFnDone {
- if err := onFlushFn(bsb.index,
oldSkipList); err != nil {
- bsb.log.Err(err).Msg("flushing
immutable buffer failed. Retrying...")
+ failedFns := make([]onFlush, 0)
+ for i := 0; i < len(ff); i++ {
+ fn := ff[i]
+ if err := fn(bsb.index,
oldSkipList); err != nil {
+
bsb.log.Err(err).Msg("flushing immutable buffer failed. Retrying...")
+ failedFns =
append(failedFns, fn)
+ }
+ }
+ if len(failedFns) > 0 {
flushNum.Inc(1,
append(bsb.labelValues[:2], "true")...)
time.Sleep(time.Second)
+ ff = failedFns
continue
}
onFlushFnDone = true
@@ -445,3 +479,87 @@ func (bsb *bufferShardBucket) writeWal(key, value []byte,
timestamp time.Time) e
wg.Wait()
return walErr
}
+
+// BufferSupplier lends a Buffer to a caller and returns it when the caller is
done with it.
+type BufferSupplier struct {
+ l *logger.Logger
+ p common.Position
+ buffers sync.Map
+ path string
+ writeConcurrency int
+ numShards int
+ enableWAL bool
+}
+
+// NewBufferSupplier creates a new BufferSupplier instance with the given
parameters.
+func NewBufferSupplier(l *logger.Logger, p common.Position, writeConcurrency,
numShards int, enableWAL bool, path string) *BufferSupplier {
+ return &BufferSupplier{
+ l: l.Named("buffer-supplier"),
+ p: p,
+ writeConcurrency: writeConcurrency,
+ numShards: numShards,
+ enableWAL: enableWAL,
+ path: path,
+ }
+}
+
+// Borrow borrows a Buffer from the BufferSupplier.
+func (b *BufferSupplier) Borrow(bufferName, name string, bufferSize int,
onFlushFn onFlush) (buffer *Buffer, err error) {
+ if bufferName == "" || name == "" {
+ return nil, errors.New("bufferName and name are required")
+ }
+ if onFlushFn == nil {
+ return nil, errors.New("onFlushFn is required")
+ }
+ defer func() {
+ if buffer != nil {
+ buffer.Register(name, onFlushFn)
+ }
+ }()
+ if v, ok := b.buffers.Load(bufferName); ok {
+ buffer = v.(*Buffer)
+ return v.(*Buffer), nil
+ }
+ if buffer, err = NewBufferWithWal(b.l.Named("buffer-"+bufferName), b.p,
+ bufferSize, b.writeConcurrency, b.numShards, b.enableWAL,
b.path); err != nil {
+ return nil, err
+ }
+ if v, ok := b.buffers.LoadOrStore(bufferName, buffer); ok {
+ _ = buffer.Close()
+ buffer = v.(*Buffer)
+ return buffer, nil
+ }
+ return buffer, nil
+}
+
+// Return returns a Buffer to the BufferSupplier.
+func (b *BufferSupplier) Return(bufferName, name string) {
+ if v, ok := b.buffers.Load(bufferName); ok {
+ buffer := v.(*Buffer)
+ buffer.Unregister(name)
+ if buffer.isEmpty() {
+ b.buffers.Delete(bufferName)
+ _ = buffer.Close()
+ }
+ }
+}
+
+// Volume returns the number of Buffers in the BufferSupplier.
+func (b *BufferSupplier) Volume() int {
+ volume := 0
+ b.buffers.Range(func(key, value interface{}) bool {
+ volume++
+ return true
+ })
+ return volume
+}
+
+// Close closes all Buffers in the BufferSupplier.
+func (b *BufferSupplier) Close() error {
+ b.buffers.Range(func(key, value interface{}) bool {
+ buffer := value.(*Buffer)
+ _ = buffer.Close()
+ return true
+ })
+ return nil
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
index 6b46eea3..82df4251 100644
--- a/banyand/tsdb/buffer_test.go
+++ b/banyand/tsdb/buffer_test.go
@@ -38,6 +38,10 @@ import (
"github.com/apache/skywalking-banyandb/pkg/test/flags"
)
+var emptyFn = func(shardIndex int, skl *skl.Skiplist) error {
+ return nil
+}
+
var _ = Describe("Buffer", func() {
var (
buffer *tsdb.Buffer
@@ -54,10 +58,9 @@ var _ = Describe("Buffer", func() {
Context("Write and Read", func() {
BeforeEach(func() {
var err error
- buffer, err = tsdb.NewBuffer(log, common.Position{},
1024*1024, 16, 4, func(shardIndex int, skl *skl.Skiplist) error {
- return nil
- })
+ buffer, err = tsdb.NewBuffer(log, common.Position{},
1024*1024, 16, 4)
Expect(err).ToNot(HaveOccurred())
+ buffer.Register("test", emptyFn)
})
AfterEach(func() {
@@ -122,11 +125,12 @@ var _ = Describe("Buffer", func() {
}
var err error
- buffer, err = tsdb.NewBuffer(log, common.Position{},
1024, 16, numShards, onFlushFn)
+ buffer, err = tsdb.NewBuffer(log, common.Position{},
1024, 16, numShards)
defer func() {
_ = buffer.Close()
}()
Expect(err).ToNot(HaveOccurred())
+ buffer.Register("test", onFlushFn)
randInt := func() int {
n, err := rand.Int(rand.Reader,
big.NewInt(1000))
@@ -181,25 +185,24 @@ var _ = Describe("Buffer", func() {
flushSize,
writeConcurrency,
numShards,
- func(shardIndex int, skl *skl.Skiplist) error {
- flushMutex.Lock()
- defer flushMutex.Unlock()
-
- shardWalDir := filepath.Join(path,
"buffer-"+strconv.Itoa(shardIndex))
- var shardWalList []os.DirEntry
- shardWalList, err =
os.ReadDir(shardWalDir)
- Expect(err).ToNot(HaveOccurred())
- for _, shardWalFile := range
shardWalList {
-
Expect(shardWalFile.IsDir()).To(BeFalse())
-
Expect(shardWalFile.Name()).To(HaveSuffix(".wal"))
- shardWalFileHistory[shardIndex]
= append(shardWalFileHistory[shardIndex], shardWalFile.Name())
- }
- return nil
- },
true,
- &path)
+ path)
Expect(err).ToNot(HaveOccurred())
+ buffer.Register("test", func(shardIndex int, skl
*skl.Skiplist) error {
+ flushMutex.Lock()
+ defer flushMutex.Unlock()
+ shardWalDir := filepath.Join(path,
"buffer-"+strconv.Itoa(shardIndex))
+ var shardWalList []os.DirEntry
+ shardWalList, err = os.ReadDir(shardWalDir)
+ Expect(err).ToNot(HaveOccurred())
+ for _, shardWalFile := range shardWalList {
+
Expect(shardWalFile.IsDir()).To(BeFalse())
+
Expect(shardWalFile.Name()).To(HaveSuffix(".wal"))
+ shardWalFileHistory[shardIndex] =
append(shardWalFileHistory[shardIndex], shardWalFile.Name())
+ }
+ return nil
+ })
// Write buffer & wal
var wg sync.WaitGroup
wg.Add(writeConcurrency)
@@ -245,18 +248,18 @@ var _ = Describe("Buffer", func() {
flushSize,
writeConcurrency,
numShards,
- func(shardIndex int, skl *skl.Skiplist) error {
- flushMutex.Lock()
- defer flushMutex.Unlock()
-
- if !bufferFlushed {
- bufferFlushed = true
- }
- return nil
- },
true,
- &path)
+ path)
Expect(err).ToNot(HaveOccurred())
+ buffer.Register("test", func(shardIndex int, skl
*skl.Skiplist) error {
+ flushMutex.Lock()
+ defer flushMutex.Unlock()
+
+ if !bufferFlushed {
+ bufferFlushed = true
+ }
+ return nil
+ })
// Write buffer & wal
for i := 0; i < numShards; i++ {
@@ -278,13 +281,11 @@ var _ = Describe("Buffer", func() {
flushSize,
writeConcurrency,
numShards,
- func(shardIndex int, skl *skl.Skiplist) error {
- return nil
- },
true,
- &path)
+ path)
Expect(err).ToNot(HaveOccurred())
defer buffer.Close()
+ buffer.Register("test", emptyFn)
// Check buffer was recovered from wal
for i := 0; i < numShards; i++ {
@@ -298,3 +299,69 @@ var _ = Describe("Buffer", func() {
})
})
})
+
+var _ = Describe("bufferSupplier", func() {
+ var (
+ b *tsdb.BufferSupplier
+ goods []gleak.Goroutine
+ )
+
+ BeforeEach(func() {
+ goods = gleak.Goroutines()
+ b =
tsdb.NewBufferSupplier(logger.GetLogger("buffer-supplier-test"),
common.Position{}, 16, 4, false, "")
+ })
+ AfterEach(func() {
+ b.Close()
+ Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+ })
+
+ Describe("Borrow", func() {
+ Context("when borrowing a buffer with a new name", func() {
+ It("should return a new buffer instance", func() {
+ buf, err := b.Borrow("buffer", "test",
1024*1024, emptyFn)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(buf).ToNot(BeNil())
+ Expect(b.Volume()).To(Equal(1))
+ b.Return("buffer", "test")
+ Expect(b.Volume()).To(Equal(0))
+ })
+ })
+
+ Context("when borrowing a buffer with an existing name", func()
{
+ It("should return the same buffer instance", func() {
+ buf1, err := b.Borrow("buffer", "test",
1024*1024, emptyFn)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(buf1).ToNot(BeNil())
+ Expect(b.Volume()).To(Equal(1))
+
+ buf2, err := b.Borrow("buffer", "test",
1024*1024, emptyFn)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(buf2).ToNot(BeNil())
+ Expect(b.Volume()).To(Equal(1))
+
+ Expect(buf2).To(Equal(buf1))
+ b.Return("buffer", "test")
+ Expect(b.Volume()).To(Equal(0))
+ })
+ })
+
+ Context("when borrowing a buffer from different buffer pools",
func() {
+ It("should return different buffer instances", func() {
+ buf1, err := b.Borrow("buffer1", "test",
1024*1024, emptyFn)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(buf1).ToNot(BeNil())
+ Expect(b.Volume()).To(Equal(1))
+
+ buf2, err := b.Borrow("buffer2", "test",
1024*1024, emptyFn)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(buf2).ToNot(BeNil())
+ Expect(b.Volume()).To(Equal(2))
+
+ Expect(buf2).ToNot(Equal(buf1))
+ b.Return("buffer1", "test")
+ b.Return("buffer2", "test")
+ Expect(b.Volume()).To(Equal(0))
+ })
+ })
+ })
+})
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index af9369af..b7841837 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -41,6 +41,8 @@ const (
defaultBlockQueueSize = 2
defaultMaxBlockQueueSize = 64
defaultKVMemorySize = 4 << 20
+ defaultNumBufferShards = 2
+ defaultWriteConcurrency = 1000
)
var (
@@ -58,6 +60,10 @@ var (
onDiskBytesGauge meter.Gauge
)
+type contextBufferSupplierKey struct{}
+
+var bufferSupplierKey = contextBufferSupplierKey{}
+
func init() {
labelNames := common.ShardLabelNames()
diskStateGauge = shardProvider.Gauge("disk_state", append(labelNames,
"kind")...)
@@ -76,6 +82,7 @@ type shard struct {
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
scheduler *timestamp.Scheduler
+ bufferSupplier *BufferSupplier
position common.Position
closeOnce sync.Once
id common.ShardID
@@ -83,7 +90,7 @@ type shard struct {
// OpenShard returns an existed Shard or create a new one if not existed.
func OpenShard(ctx context.Context, id common.ShardID,
- root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize,
maxOpenedBlockSize int,
+ root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize,
maxOpenedBlockSize int, enableWAL bool,
) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
@@ -100,17 +107,20 @@ func OpenShard(ctx context.Context, id common.ShardID,
return p
})
clock, _ := timestamp.GetClock(shardCtx)
+
scheduler := timestamp.NewScheduler(l, clock)
- sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize,
openedBlockSize, maxOpenedBlockSize, l, scheduler)
+ s := &shard{
+ id: id,
+ l: l,
+ scheduler: scheduler,
+ position: common.GetPosition(shardCtx),
+ }
+ s.bufferSupplier = NewBufferSupplier(l, s.position,
defaultWriteConcurrency, defaultNumBufferShards, enableWAL, path)
+ shardCtx = context.WithValue(shardCtx, bufferSupplierKey,
s.bufferSupplier)
+ s.segmentController, err = newSegmentController(shardCtx, path,
segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l, scheduler)
if err != nil {
return nil, errors.Wrapf(err, "create the segment controller of
the shard %d", int(id))
}
- s := &shard{
- id: id,
- segmentController: sc,
- l: l,
- scheduler: scheduler,
- }
err = s.segmentController.open()
if err != nil {
return nil, err
@@ -131,7 +141,6 @@ func OpenShard(ctx context.Context, id common.ShardID,
return nil, err
}
s.segmentManageStrategy.Run()
- s.position = common.GetPosition(shardCtx)
retentionTask := newRetentionTask(s.segmentController, ttl)
if err := scheduler.Register("retention", retentionTask.option,
retentionTask.expr, retentionTask.run); err != nil {
return nil, err
@@ -235,11 +244,12 @@ func (s *shard) TriggerSchedule(task string) bool {
func (s *shard) Close() (err error) {
s.closeOnce.Do(func() {
+ _ = s.bufferSupplier.Close()
s.scheduler.Close()
s.segmentManageStrategy.Close()
ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
defer cancel()
- err = multierr.Combine(s.segmentController.close(ctx),
s.seriesDatabase.Close())
+ err = multierr.Combine(s.bufferSupplier.Close(),
s.segmentController.close(ctx), s.seriesDatabase.Close())
})
return err
}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index d952f85c..ae9abfd8 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -90,6 +90,7 @@ var _ = Describe("Shard", func() {
},
2,
3,
+ false,
)
Expect(err).NotTo(HaveOccurred())
started("BlockID-19700101-1970010100-1",
"SegID-19700101-1")
@@ -433,6 +434,7 @@ var _ = Describe("Shard", func() {
},
10,
15,
+ false,
)
Expect(err).NotTo(HaveOccurred())
started("BlockID-19700101-1970010100-1",
"SegID-19700101-1", "retention")
@@ -556,6 +558,7 @@ var _ = Describe("Shard", func() {
},
2,
3,
+ false,
)
Expect(err).NotTo(HaveOccurred())
started("BlockID-19700101-1970010101-1",
"SegID-19700101-1", "retention")
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index c47f2d72..e8b8a311 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -121,9 +121,10 @@ type DatabaseOpts struct {
BlockInvertedIndex InvertedIndexOpts
SeriesMemSize run.Bytes
GlobalIndexMemSize run.Bytes
+ IndexGranularity IndexGranularity
ShardNum uint32
EnableGlobalIndex bool
- IndexGranularity IndexGranularity
+ EnableWAL bool
}
// InvertedIndexOpts wraps options to create the block inverted index.
@@ -212,6 +213,7 @@ type database struct {
sync.RWMutex
shardNum uint32
shardCreationState uint32
+ enableWAL bool
}
func (d *database) CreateShardsAndGetByID(id common.ShardID) (Shard, error) {
@@ -292,6 +294,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts)
(Database, error) {
segmentSize: opts.SegmentInterval,
blockSize: opts.BlockInterval,
ttl: opts.TTL,
+ enableWAL: opts.EnableWAL,
}
db.logger.Info().Str("path", opts.Location).Msg("initialized")
var entries []os.DirEntry
@@ -313,7 +316,7 @@ func createDatabase(db *database, startID int) (Database,
error) {
for i := startID; i < int(db.shardNum); i++ {
db.logger.Info().Int("shard_id", i).Msg("creating a shard")
so, errNewShard := OpenShard(db.shardCreationCtx,
common.ShardID(i),
- db.location, db.segmentSize, db.blockSize, db.ttl,
defaultBlockQueueSize, defaultMaxBlockQueueSize)
+ db.location, db.segmentSize, db.blockSize, db.ttl,
defaultBlockQueueSize, defaultMaxBlockQueueSize, db.enableWAL)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -346,6 +349,7 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
db.ttl,
defaultBlockQueueSize,
defaultMaxBlockQueueSize,
+ db.enableWAL,
)
if errOpenShard != nil {
return errOpenShard
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 9046b8c7..ebe299b7 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -141,7 +141,7 @@ func NewByPassTSTableFactory() TSTableFactory {
return bypassTSTableFactory{}
}
-func (bypassTSTableFactory) NewTSTable(_ BlockExpiryTracker, _ string, _
common.Position, _ *logger.Logger) (TSTable, error) {
+func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _
common.Position, _ *logger.Logger) (TSTable, error) {
return newBypassTSTable()
}
diff --git a/banyand/tsdb/tstable.go b/banyand/tsdb/tstable.go
index a10d1de9..9aab9bb0 100644
--- a/banyand/tsdb/tstable.go
+++ b/banyand/tsdb/tstable.go
@@ -25,11 +25,8 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/logger"
- "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-const maxBlockAge = time.Hour
-
// TSTable is time series table.
type TSTable interface {
// Put a value with a timestamp/version
@@ -46,26 +43,5 @@ type TSTable interface {
// TSTableFactory is the factory of TSTable.
type TSTableFactory interface {
// NewTSTable creates a new TSTable.
- NewTSTable(bufferLifecycle BlockExpiryTracker, root string, position
common.Position, l *logger.Logger) (TSTable, error)
-}
-
-// BlockExpiryTracker tracks the expiry of the buffer.
-type BlockExpiryTracker struct {
- clock timestamp.Clock
- ttl time.Time
-}
-
-// IsActive checks if the buffer is active.
-func (bl *BlockExpiryTracker) IsActive() bool {
- return !bl.clock.Now().After(bl.EndTime())
-}
-
-// EndTime returns the end time of the buffer.
-func (bl *BlockExpiryTracker) EndTime() time.Time {
- return bl.ttl.Add(maxBlockAge)
-}
-
-// BlockExpiryDuration returns the expiry duration of the buffer.
-func (bl *BlockExpiryTracker) BlockExpiryDuration() time.Duration {
- return maxBlockAge
+ NewTSTable(bufferSupplier *BufferSupplier, root string, position
common.Position, l *logger.Logger) (TSTable, error)
}