This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 4655b735 Support for recovery measure/buffer using wal (#313)
4655b735 is described below
commit 4655b7359f86303a282a471ec166cc68826382d1
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 9 20:24:12 2023 +0800
Support for recovery measure/buffer using wal (#313)
* Update wal seriesID datatype to bytes
* Integrate wal into measure/buffer for memory recovery
* update bytes/string convert
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
CHANGES.md | 1 +
api/common/id.go | 26 ------
banyand/measure/tstable.go | 24 ++---
banyand/stream/tstable.go | 6 +-
banyand/tsdb/buffer.go | 207 ++++++++++++++++++++++++++++++++++++++----
banyand/tsdb/buffer_test.go | 155 ++++++++++++++++++++++++++++++-
pkg/convert/string.go | 38 ++++++++
pkg/wal/wal.go | 127 ++++++++++++++------------
pkg/wal/wal_benchmark_test.go | 88 +++++++++---------
pkg/wal/wal_test.go | 30 ++----
10 files changed, 518 insertions(+), 184 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index bbdec150..e537ba38 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@ Release Notes.
- Implement Write-ahead Logging
- Document the clustering.
- Support multiple roles for banyand server.
+- Support for recovery buffer using wal.
### Bugs
diff --git a/api/common/id.go b/api/common/id.go
index 0d8c5890..6cc879ca 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -40,32 +40,6 @@ func (s SeriesID) Marshal() []byte {
return convert.Uint64ToBytes(uint64(s))
}
-// GlobalSeriesID identities a series in a shard.
-type GlobalSeriesID struct {
- Name string
- SeriesID SeriesID
-}
-
-// Marshal encodes global series id to bytes.
-func (s GlobalSeriesID) Marshal() []byte {
- seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID))
- nameBytes := []byte(s.Name)
- return append(seriesIDBytes, nameBytes...)
-}
-
-// Volume returns the estimated bytes volume of global series id.
-func (s GlobalSeriesID) Volume() int {
- return 8 + len(s.Name)
-}
-
-// ParseGlobalSeriesID parses global series id from bytes.
-func ParseGlobalSeriesID(b []byte) GlobalSeriesID {
- return GlobalSeriesID{
- SeriesID: SeriesID(convert.BytesToUint64(b[:8])),
- Name: string(b[8:]),
- }
-}
-
// positionKey is a context key to store the module position.
var positionKey = contextPositionKey{}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 08765a5d..462d5066 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -41,6 +41,7 @@ import (
const (
defaultNumBufferShards = 2
defaultWriteConcurrency = 1000
+ defaultWriteWal = true
plain = "tst"
encoded = "encoded"
)
@@ -56,6 +57,7 @@ type tsTable struct {
buffer *tsdb.Buffer
closeBufferTimer *time.Timer
position common.Position
+ path string
bufferSize int64
encoderBufferSize int64
lock sync.Mutex
@@ -72,13 +74,13 @@ func (t *tsTable) openBuffer() (err error) {
return nil
}
bufferSize := int(t.encoderBufferSize / defaultNumBufferShards)
- if t.encoderBuffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
- defaultWriteConcurrency, defaultNumBufferShards,
t.encoderFlush); err != nil {
+ if t.encoderBuffer, err = tsdb.NewBufferWithWal(t.l, t.position,
bufferSize,
+ defaultWriteConcurrency, defaultNumBufferShards,
t.encoderFlush, defaultWriteWal, &t.path); err != nil {
return fmt.Errorf("failed to create encoder buffer: %w", err)
}
bufferSize = int(t.bufferSize / defaultNumBufferShards)
- if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
- defaultWriteConcurrency, defaultNumBufferShards, t.flush); err
!= nil {
+ if t.buffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize,
+ defaultWriteConcurrency, defaultNumBufferShards, t.flush,
defaultWriteWal, &t.path); err != nil {
return fmt.Errorf("failed to create buffer: %w", err)
}
end := t.EndTime()
@@ -153,22 +155,19 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte,
error) {
func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
if t.encoderBuffer != nil {
- t.writeToBuffer(key, val, ts)
- return nil
+ return t.writeToBuffer(key, val, ts)
}
if err := t.openBuffer(); err != nil {
return err
}
- t.writeToBuffer(key, val, ts)
- return nil
+ return t.writeToBuffer(key, val, ts)
}
-func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) {
+func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) error {
if t.toEncode(key) {
- t.encoderBuffer.Write(key, val, ts)
- } else {
- t.buffer.Write(key, val, ts)
+ return t.encoderBuffer.Write(key, val, ts)
}
+ return t.buffer.Write(key, val, ts)
}
func (t *tsTable) encoderFlush(shardIndex int, skl *skl.Skiplist) error {
@@ -228,6 +227,7 @@ func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker
tsdb.BlockExpiryTracker
encoderSST: encoderSST,
sst: sst,
BlockExpiryTracker: &blockExpiryTracker,
+ path: root,
}
if table.IsActive() {
if err := table.openBuffer(); err != nil {
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 924f53f6..6af8bf2f 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -113,15 +113,13 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte,
error) {
func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
if t.buffer != nil {
- t.buffer.Write(key, val, ts)
- return nil
+ return t.buffer.Write(key, val, ts)
}
if err := t.openBuffer(); err != nil {
return err
}
- t.buffer.Write(key, val, ts)
- return nil
+ return t.buffer.Write(key, val, ts)
}
func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index c586659b..1fc94283 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
"github.com/dgraph-io/badger/v3/skl"
"github.com/dgraph-io/badger/v3/y"
+ "github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -32,11 +33,13 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/meter"
"github.com/apache/skywalking-banyandb/pkg/run"
+ "github.com/apache/skywalking-banyandb/pkg/wal"
)
const (
- defaultSize = 1 << 20 // 1MB
- nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
+ defaultSize = 1 << 20 // 1MB
+ nodeAlign = int(unsafe.Sizeof(uint64(0))) - 1
+ defaultWalSyncMode = false
)
var (
@@ -53,13 +56,15 @@ func init() {
}
type operation struct {
- key []byte
- value []byte
- epoch uint64
+ recoveryDoneFn func()
+ key []byte
+ value []byte
+ epoch uint64
}
type flushEvent struct {
- data *skl.Skiplist
+ data *skl.Skiplist
+ walSegmentID wal.SegmentID
}
type onFlush func(shardIndex int, skl *skl.Skiplist) error
@@ -71,12 +76,15 @@ type bufferShardBucket struct {
writeWaitGroup *sync.WaitGroup
flushWaitGroup *sync.WaitGroup
log *logger.Logger
+ wal wal.WAL
immutables []*skl.Skiplist
labelValues []string
shardLabelValues []string
index int
capacity int
mutex sync.RWMutex
+ walSyncMode bool
+ enableWal bool
}
// Buffer is an exported struct that represents a buffer composed of multiple
shard buckets.
@@ -88,11 +96,18 @@ type Buffer struct {
writeWaitGroup sync.WaitGroup
flushWaitGroup sync.WaitGroup
numShards int
+ enableWal bool
closerOnce sync.Once
}
// NewBuffer creates a new Buffer instance with the given parameters.
func NewBuffer(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
+ return NewBufferWithWal(log, position, flushSize, writeConcurrency,
numShards, onFlushFn, false, nil)
+}
+
+// NewBufferWithWal creates a new Buffer instance with the given parameters.
+func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize,
writeConcurrency, numShards int, onFlushFn onFlush, enableWal bool, walPath
*string,
+) (*Buffer, error) {
buckets := make([]bufferShardBucket, numShards)
buffer := &Buffer{
buckets: buckets,
@@ -100,6 +115,7 @@ func NewBuffer(log *logger.Logger, position
common.Position, flushSize, writeCon
onFlushFn: onFlushFn,
entryCloser: run.NewCloser(1),
log: log.Named("buffer"),
+ enableWal: enableWal,
}
buffer.writeWaitGroup.Add(numShards)
buffer.flushWaitGroup.Add(numShards)
@@ -115,17 +131,27 @@ func NewBuffer(log *logger.Logger, position
common.Position, flushSize, writeCon
log:
buffer.log.Named(fmt.Sprintf("shard-%d", i)),
labelValues: append(position.LabelValues(),
fmt.Sprintf("%d", i)),
shardLabelValues: position.ShardLabelValues(),
+ enableWal: enableWal,
}
buckets[i].start(onFlushFn)
+ if enableWal {
+ if walPath == nil {
+ return nil, errors.New("wal path is required")
+ }
+ shardWalPath := fmt.Sprintf("%s/buffer-%d", *walPath, i)
+ if err := buckets[i].startWal(shardWalPath,
defaultWalSyncMode); err != nil {
+ return nil, errors.Wrap(err, "failed to start
wal")
+ }
+ }
maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
}
return buffer, nil
}
// Write adds a key-value pair with a timestamp to the appropriate shard
bucket in the buffer.
-func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
+func (b *Buffer) Write(key, value []byte, timestamp time.Time) error {
if b == nil || !b.entryCloser.AddRunning() {
- return
+ return errors.New("buffer is invalid")
}
defer b.entryCloser.Done()
index := b.getShardIndex(key)
@@ -133,7 +159,15 @@ func (b *Buffer) Write(key, value []byte, timestamp
time.Time) {
b.log.Debug().Uint64("shard", index).Bytes("key", key).
Time("ts", timestamp).Msg("route a shard")
}
+
+ if b.enableWal {
+ if err := b.buckets[index].writeWal(key, value, timestamp); err
!= nil {
+ return errors.Wrap(err, "failed to write wal")
+ }
+ }
+
b.buckets[index].writeCh <- operation{key: key, value: value, epoch:
uint64(timestamp.UnixNano())}
+ return nil
}
// Read retrieves the value associated with the given key and timestamp from
the appropriate shard bucket in the buffer.
@@ -179,6 +213,11 @@ func (b *Buffer) Close() error {
}
for i := 0; i < b.numShards; i++ {
close(b.buckets[i].flushCh)
+ if b.enableWal {
+ if err := b.buckets[i].wal.Close(); err != nil {
+ b.buckets[i].log.Err(err).Msg("closing
buffer shard wal failed")
+ }
+ }
}
b.flushWaitGroup.Wait()
})
@@ -218,13 +257,24 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
for event := range bsb.flushCh {
oldSkipList := event.data
memSize := oldSkipList.MemSize()
+ onFlushFnDone := false
t1 := time.Now()
for {
- if err := onFlushFn(bsb.index, oldSkipList);
err != nil {
- bsb.log.Err(err).Msg("flushing
immutable buffer failed. Retrying...")
- flushNum.Inc(1,
append(bsb.labelValues[:2], "true")...)
- time.Sleep(time.Second)
- continue
+ if !onFlushFnDone {
+ if err := onFlushFn(bsb.index,
oldSkipList); err != nil {
+ bsb.log.Err(err).Msg("flushing
immutable buffer failed. Retrying...")
+ flushNum.Inc(1,
append(bsb.labelValues[:2], "true")...)
+ time.Sleep(time.Second)
+ continue
+ }
+ onFlushFnDone = true
+ }
+ if bsb.enableWal {
+ if err :=
bsb.wal.Delete(event.walSegmentID); err != nil {
+ bsb.log.Err(err).Msg("delete
wal segment file failed. Retrying...")
+ time.Sleep(time.Second)
+ continue
+ }
}
break
}
@@ -249,23 +299,38 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
volume += len(k) + int(v.EncodedSize()) +
skl.MaxNodeSize + nodeAlign
memSize := bsb.mutable.MemSize()
mutableBytes.Set(float64(memSize), bsb.labelValues...)
- if volume >= bsb.capacity || memSize >=
int64(bsb.capacity) {
- bsb.triggerFlushing()
- volume = 0
+ if op.recoveryDoneFn == nil && (volume >= bsb.capacity
|| memSize >= int64(bsb.capacity)) {
+ if err := bsb.triggerFlushing(); err != nil {
+ bsb.log.Err(err).Msg("triggering
flushing failed")
+ } else {
+ volume = 0
+ }
}
bsb.mutable.Put(k, v)
+ if bsb.enableWal && op.recoveryDoneFn != nil {
+ op.recoveryDoneFn()
+ }
}
}()
}
-func (bsb *bufferShardBucket) triggerFlushing() {
+func (bsb *bufferShardBucket) triggerFlushing() error {
+ var walSegmentID wal.SegmentID
+ if bsb.enableWal {
+ segment, err := bsb.wal.Rotate()
+ if err != nil {
+ return errors.Wrap(err, "rotating wal failed")
+ }
+ walSegmentID = segment.GetSegmentID()
+ }
+
for {
select {
- case bsb.flushCh <- flushEvent{data: bsb.mutable}:
+ case bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID:
walSegmentID}:
bsb.mutex.Lock()
defer bsb.mutex.Unlock()
bsb.swap()
- return
+ return nil
default:
}
time.Sleep(10 * time.Second)
@@ -276,3 +341,107 @@ func (bsb *bufferShardBucket) swap() {
bsb.immutables = append(bsb.immutables, bsb.mutable)
bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
}
+
+func (bsb *bufferShardBucket) startWal(path string, syncMode bool) error {
+ wal, err := wal.New(path, wal.DefaultOptions)
+ if err != nil {
+ return errors.Wrap(err, fmt.Sprintf("failed to create wal: %s",
path))
+ }
+ bsb.wal = wal
+ bsb.walSyncMode = syncMode
+ bsb.log.Info().Msg(fmt.Sprintf(
+ "wal started with path: %s, sync mode: %v", path, syncMode))
+
+ if err = bsb.recoveryWal(); err != nil {
+ return errors.Wrap(err, "failed to recovery wal")
+ }
+ return nil
+}
+
+func (bsb *bufferShardBucket) recoveryWal() error {
+ segments, err := bsb.wal.ReadAllSegments()
+ if err != nil {
+ return errors.Wrap(err, "failed to load wal segments")
+ }
+
+ recoveredRecords := 0
+ for index, segment := range segments {
+ recoveredRecords += len(segment.GetLogEntries())
+ isWorkSegment := index == len(segments)-1
+ if isWorkSegment {
+ bsb.recoveryWorkSegment(segment)
+ } else {
+ bsb.recoveryStableSegment(segment)
+ }
+ bsb.log.Info().Msg(fmt.Sprintf(
+ "recovered %d log records from wal segment %d",
+ len(segment.GetLogEntries()),
+ segment.GetSegmentID()))
+ }
+ bsb.log.Info().Msg(fmt.Sprintf(
+ "recovered %d log records from wal", recoveredRecords))
+ return nil
+}
+
+func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) {
+ var wg sync.WaitGroup
+ wg.Add(len(segment.GetLogEntries()))
+ for _, logEntry := range segment.GetLogEntries() {
+ timestamps := logEntry.GetTimestamps()
+ values := logEntry.GetValues()
+ elementIndex := 0
+ for element := values.Front(); element != nil; element =
element.Next() {
+ timestamp := timestamps[elementIndex]
+ bsb.writeCh <- operation{
+ key: logEntry.GetSeriesID(),
+ value: element.Value.([]byte),
+ epoch: uint64(timestamp.UnixNano()),
+ recoveryDoneFn: func() {
+ wg.Done()
+ if bsb.log.Trace().Enabled() {
+
bsb.log.Trace().Msg(fmt.Sprintf("recovered key: %v, ts: %v",
+ logEntry.GetSeriesID(),
timestamp.UnixNano()))
+ }
+ },
+ }
+ elementIndex++
+ }
+ }
+ wg.Wait()
+}
+
+func (bsb *bufferShardBucket) recoveryStableSegment(segment wal.Segment) {
+ for _, logEntries := range segment.GetLogEntries() {
+ timestamps := logEntries.GetTimestamps()
+ values := logEntries.GetValues()
+ elementIndex := 0
+ for element := values.Front(); element != nil; element =
element.Next() {
+ timestamp := timestamps[elementIndex]
+ k := y.KeyWithTs(logEntries.GetSeriesID(),
uint64(timestamp.UnixNano()))
+ v := y.ValueStruct{Value: element.Value.([]byte)}
+ bsb.mutable.Put(k, v)
+ elementIndex++
+ }
+ }
+ bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID:
segment.GetSegmentID()}
+ // Sync recover data to immutables
+ bsb.swap()
+}
+
+func (bsb *bufferShardBucket) writeWal(key, value []byte, timestamp time.Time)
error {
+ if !bsb.walSyncMode {
+ bsb.wal.Write(key, timestamp, value, nil)
+ return nil
+ }
+
+ var walErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
+ walCallback := func(key []byte, t time.Time, value []byte, err error) {
+ walErr = err
+ wg.Done()
+ }
+ bsb.wal.Write(key, timestamp, value, walCallback)
+ wg.Wait()
+ return walErr
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
index bb948372..7068e972 100644
--- a/banyand/tsdb/buffer_test.go
+++ b/banyand/tsdb/buffer_test.go
@@ -21,6 +21,9 @@ import (
"crypto/rand"
"fmt"
"math/big"
+ "os"
+ "path/filepath"
+ "strconv"
"sync"
"time"
@@ -118,7 +121,8 @@ var _ = Describe("Buffer", func() {
}(ch)
}
- buffer, err := tsdb.NewBuffer(log, common.Position{},
1024, 16, numShards, onFlushFn)
+ var err error
+ buffer, err = tsdb.NewBuffer(log, common.Position{},
1024, 16, numShards, onFlushFn)
defer func() {
_ = buffer.Close()
}()
@@ -147,4 +151,153 @@ var _ = Describe("Buffer", func() {
}
})
})
+
+ Context("Write and Recover of wal correctly", func() {
+ writeConcurrency := 2
+ numShards := 2
+ flushSize := 1024
+ baseTime := time.Now()
+ var path string
+
+ BeforeEach(func() {
+ var err error
+ path, err = os.MkdirTemp("",
"banyandb-test-buffer-wal-*")
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ AfterEach(func() {
+ err := os.RemoveAll(path)
+ Expect(err).ToNot(HaveOccurred())
+ })
+
+ It("should write and rotate wal file correctly", func() {
+ var err error
+ var flushMutex sync.Mutex
+
+ shardWalFileHistory := make(map[int][]string)
+ buffer, err = tsdb.NewBufferWithWal(
+ log,
+ common.Position{},
+ flushSize,
+ writeConcurrency,
+ numShards,
+ func(shardIndex int, skl *skl.Skiplist) error {
+ flushMutex.Lock()
+ defer flushMutex.Unlock()
+
+ shardWalDir := filepath.Join(path,
"buffer-"+strconv.Itoa(shardIndex))
+ var shardWalList []os.DirEntry
+ shardWalList, err =
os.ReadDir(shardWalDir)
+ Expect(err).ToNot(HaveOccurred())
+ for _, shardWalFile := range
shardWalList {
+
Expect(shardWalFile.IsDir()).To(BeFalse())
+
Expect(shardWalFile.Name()).To(HaveSuffix(".wal"))
+ shardWalFileHistory[shardIndex]
= append(shardWalFileHistory[shardIndex], shardWalFile.Name())
+ }
+ return nil
+ },
+ true,
+ &path)
+ Expect(err).ToNot(HaveOccurred())
+ defer buffer.Close()
+
+ // Write buffer & wal
+ var wg sync.WaitGroup
+ wg.Add(writeConcurrency)
+ for i := 0; i < writeConcurrency; i++ {
+ go func(writerIndex int) {
+ for j := 0; j < numShards; j++ {
+ for k := 0; k < flushSize; k++ {
+ buffer.Write(
+
[]byte(fmt.Sprintf("writer-%d-shard-%d-key-%d", writerIndex, j, k)),
+
[]byte(fmt.Sprintf("writer-%d-shard-%d-value-%d", writerIndex, j, k)),
+
time.UnixMilli(baseTime.UnixMilli()+int64(writerIndex+j+k)))
+ }
+ }
+ wg.Done()
+ }(i)
+ }
+ wg.Wait()
+
+ flushMutex.Lock()
+ defer flushMutex.Unlock()
+
+ // Check wal
+ Expect(len(shardWalFileHistory) ==
numShards).To(BeTrue())
+ for shardIndex := 0; shardIndex < numShards;
shardIndex++ {
+ // Check wal rotate
+ Expect(len(shardWalFileHistory[shardIndex]) >
1).To(BeTrue())
+
+ shardWalDir := filepath.Join(path,
"buffer-"+strconv.Itoa(shardIndex))
+ currentShardWalFiles, err :=
os.ReadDir(shardWalDir)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(len(currentShardWalFiles) <=
2).To(BeTrue())
+ // Check wal delete
+ Expect(len(shardWalFileHistory[shardIndex]) >
len(currentShardWalFiles)).To(BeTrue())
+ }
+ })
+
+ It("should recover buffer from wal file correctly", func() {
+ var err error
+ var flushMutex sync.Mutex
+ var bufferFlushed bool
+
+ buffer, err = tsdb.NewBufferWithWal(
+ log,
+ common.Position{},
+ flushSize,
+ writeConcurrency,
+ numShards,
+ func(shardIndex int, skl *skl.Skiplist) error {
+ flushMutex.Lock()
+ defer flushMutex.Unlock()
+
+ if !bufferFlushed {
+ bufferFlushed = true
+ }
+ return nil
+ },
+ true,
+ &path)
+ Expect(err).ToNot(HaveOccurred())
+
+ // Write buffer & wal
+ for i := 0; i < numShards; i++ {
+ buffer.Write(
+ []byte(fmt.Sprintf("shard-%d-key-1",
i)),
+ []byte(fmt.Sprintf("shard-%d-value-1",
i)),
+
time.UnixMilli(baseTime.UnixMilli()+int64(i)))
+ }
+
+ flushMutex.Lock()
+ Expect(bufferFlushed).To(BeFalse())
+ flushMutex.Unlock()
+
+ // Restart buffer
+ buffer.Close()
+ buffer, err = tsdb.NewBufferWithWal(
+ log,
+ common.Position{},
+ flushSize,
+ writeConcurrency,
+ numShards,
+ func(shardIndex int, skl *skl.Skiplist) error {
+ return nil
+ },
+ true,
+ &path)
+ Expect(err).ToNot(HaveOccurred())
+ defer buffer.Close()
+
+ // Check buffer was recovered from wal
+ for i := 0; i < numShards; i++ {
+ expectValue :=
[]byte(fmt.Sprintf("shard-%d-value-1", i))
+ value, exist := buffer.Read(
+ []byte(fmt.Sprintf("shard-%d-key-1",
i)),
+
time.UnixMilli(baseTime.UnixMilli()+int64(i)))
+ Expect(exist).To(BeTrue())
+ Expect(bytes.Equal(expectValue,
value)).To(BeTrue())
+ }
+ })
+ })
})
diff --git a/pkg/convert/string.go b/pkg/convert/string.go
new file mode 100644
index 00000000..5a24db8d
--- /dev/null
+++ b/pkg/convert/string.go
@@ -0,0 +1,38 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package convert
+
+import (
+ "reflect"
+ "unsafe"
+)
+
+// StringToBytes converts string to bytes.
+func StringToBytes(s string) (b []byte) {
+ bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+ sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
+ bh.Data = sh.Data
+ bh.Cap = sh.Len
+ bh.Len = sh.Len
+ return b
+}
+
+// BytesToString converts bytes to string.
+func BytesToString(b []byte) string {
+ return *(*string)(unsafe.Pointer(&b))
+}
diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go
index 39b02282..db8f416d 100644
--- a/pkg/wal/wal.go
+++ b/pkg/wal/wal.go
@@ -26,6 +26,7 @@ import (
"math"
"os"
"path/filepath"
+ "sort"
"strconv"
"strings"
"sync"
@@ -35,7 +36,7 @@ import (
"github.com/pkg/errors"
"go.uber.org/multierr"
- "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -55,6 +56,7 @@ const (
parseTimeStr = "2006-01-02 15:04:05"
maxRetries = 3
maxSegmentID = uint64(math.MaxUint64) - 1
+ defaultSyncFlush = false
)
// DefaultOptions for Open().
@@ -62,7 +64,7 @@ var DefaultOptions = &Options{
FileSize: 67108864, // 64MB
BufferSize: 65535, // 16KB
BufferBatchInterval: 3 * time.Second,
- NoSync: false,
+ SyncFlush: defaultSyncFlush,
}
// Options for creating Write-ahead Logging.
@@ -71,7 +73,7 @@ type Options struct {
BufferSize int
BufferBatchInterval time.Duration
FlushQueueSize int
- NoSync bool
+ SyncFlush bool
}
// WAL denotes a Write-ahead logging.
@@ -82,7 +84,7 @@ type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
// The callback function will be called when the entity is flushed on
the persistent storage.
- Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte,
callback func(common.GlobalSeriesID, time.Time, []byte, error))
+ Write(seriesID []byte, timestamp time.Time, data []byte, callback
func([]byte, time.Time, []byte, error))
// Read specified segment by SegmentID.
Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
@@ -106,7 +108,7 @@ type Segment interface {
// LogEntry used for attain detail value of WAL entry.
type LogEntry interface {
- GetSeriesID() common.GlobalSeriesID
+ GetSeriesID() []byte
GetTimestamps() []time.Time
GetValues() *list.List
}
@@ -137,35 +139,56 @@ type segment struct {
}
type logRequest struct {
- seriesID common.GlobalSeriesID
+ seriesID []byte
timestamp time.Time
- callback func(common.GlobalSeriesID, time.Time, []byte, error)
+ callback func([]byte, time.Time, []byte, error)
data []byte
}
type logEntry struct {
timestamps []time.Time
values *list.List
- seriesID common.GlobalSeriesID
+ seriesID []byte
entryLength uint64
count uint32
}
type buffer struct {
- timestampMap map[common.GlobalSeriesID][]time.Time
- valueMap map[common.GlobalSeriesID][]byte
- callbackMap map[common.GlobalSeriesID][]func(common.GlobalSeriesID,
time.Time, []byte, error)
+ timestampMap map[logSeriesID][]time.Time
+ valueMap map[logSeriesID][]byte
+ callbackMap map[logSeriesID][]func([]byte, time.Time, []byte, error)
count int
}
type bufferWriter struct {
buf *bytes.Buffer
- seriesIDBuf *bytes.Buffer
timestampsBuf *bytes.Buffer
+ seriesID *logSeriesID
dataBuf []byte
- dataLen int
- seriesCount uint32
batchLen uint64
+ seriesCount uint32
+ dataLen int
+}
+
+type logSeriesID struct {
+ key string
+ byteLen int
+}
+
+func newLogSeriesID(b []byte) logSeriesID {
+ return logSeriesID{key: convert.BytesToString(b), byteLen: len(b)}
+}
+
+func (s logSeriesID) string() string {
+ return s.key
+}
+
+func (s logSeriesID) bytes() []byte {
+ return convert.StringToBytes(s.key)
+}
+
+func (s logSeriesID) len() int {
+ return s.byteLen
}
// New creates a WAL instance in the specified path.
@@ -189,7 +212,7 @@ func New(path string, options *Options) (WAL, error) {
FileSize: fileSize,
BufferSize: bufferSize,
BufferBatchInterval: bufferBatchInterval,
- NoSync: options.NoSync,
+ SyncFlush: options.SyncFlush,
}
}
@@ -216,9 +239,9 @@ func New(path string, options *Options) (WAL, error) {
flushCloser: flushCloser,
chanGroupCloser: chanGroupCloser,
buffer: buffer{
- timestampMap:
make(map[common.GlobalSeriesID][]time.Time),
- valueMap: make(map[common.GlobalSeriesID][]byte),
- callbackMap:
make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte,
error)),
+ timestampMap: make(map[logSeriesID][]time.Time),
+ valueMap: make(map[logSeriesID][]byte),
+ callbackMap: make(map[logSeriesID][]func([]byte,
time.Time, []byte, error)),
count: 0,
},
}
@@ -234,7 +257,7 @@ func New(path string, options *Options) (WAL, error) {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
// The callback function will be called when the entity is flushed on the
persistent storage.
-func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time,
data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) {
+func (log *log) Write(seriesID []byte, timestamp time.Time, data []byte,
callback func([]byte, time.Time, []byte, error)) {
if !log.writeCloser.AddSender() {
return
}
@@ -266,6 +289,7 @@ func (log *log) ReadAllSegments() ([]Segment, error) {
for _, segment := range log.segmentMap {
segments = append(segments, segment)
}
+ sort.Slice(segments, func(i, j int) bool { return
segments[i].GetSegmentID() < segments[j].GetSegmentID() })
return segments, nil
}
@@ -365,7 +389,7 @@ func (log *log) start() {
log.logger.Debug().Msg("Write request
to buffer. elements: " + strconv.Itoa(log.buffer.count))
}
- bufferVolume += request.seriesID.Volume() +
timestampVolumeLength + len(request.data)
+ bufferVolume += len(request.seriesID) +
timestampVolumeLength + len(request.data)
if bufferVolume > log.options.BufferSize {
log.triggerFlushing()
bufferVolume = 0
@@ -440,9 +464,9 @@ func (log *log) triggerFlushing() {
func (log *log) newBuffer() {
log.buffer = buffer{
- timestampMap: make(map[common.GlobalSeriesID][]time.Time),
- valueMap: make(map[common.GlobalSeriesID][]byte),
- callbackMap:
make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte,
error)),
+ timestampMap: make(map[logSeriesID][]time.Time),
+ valueMap: make(map[logSeriesID][]byte),
+ callbackMap: make(map[logSeriesID][]func([]byte, time.Time,
[]byte, error)),
count: 0,
}
}
@@ -481,7 +505,7 @@ func (log *log) writeWorkSegment(data []byte) error {
if _, err := log.workSegment.file.Write(data); err != nil {
return errors.Wrap(err, "Write WAL segment file error, file:
"+log.workSegment.path)
}
- if !log.options.NoSync {
+ if log.options.SyncFlush {
if err := log.workSegment.file.Sync(); err != nil {
log.logger.Warn().Msg("Sync WAL segment file to disk
failed, file: " + log.workSegment.path)
}
@@ -541,7 +565,6 @@ func (log *log) load() error {
func newBufferWriter() *bufferWriter {
return &bufferWriter{
buf: bytes.NewBuffer([]byte{}),
- seriesIDBuf: bytes.NewBuffer([]byte{}),
timestampsBuf: bytes.NewBuffer([]byte{}),
dataBuf: make([]byte, 128),
}
@@ -558,14 +581,14 @@ func (w *bufferWriter) Reset() error {
}
func (w *bufferWriter) ResetSeries() {
- w.seriesIDBuf.Reset()
w.timestampsBuf.Reset()
w.dataLen = 0
+ w.seriesID = nil
w.seriesCount = 0
}
func (w *bufferWriter) AddSeries() error {
- seriesIDBytesLen := uint16(w.seriesIDBuf.Len())
+ seriesIDBytesLen := uint16(w.seriesID.len())
timestampsBytesLen := uint16(w.timestampsBuf.Len())
entryLen := seriesIDLength + uint64(seriesIDBytesLen) +
seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) +
uint64(w.dataLen)
@@ -576,7 +599,7 @@ func (w *bufferWriter) AddSeries() error {
if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil {
return err
}
- if err = w.writeSeriesID(w.seriesIDBuf.Bytes()); err != nil {
+ if err = w.writeSeriesID(w.seriesID); err != nil {
return err
}
if err = w.writeSeriesCount(w.seriesCount); err != nil {
@@ -601,13 +624,8 @@ func (w *bufferWriter) Bytes() []byte {
return w.rewriteBatchLength(batchBytes, batchLen)
}
-func (w *bufferWriter) WriteSeriesID(s common.GlobalSeriesID) error {
- if err := writeUint64(w.seriesIDBuf, uint64(s.SeriesID)); err != nil {
- return err
- }
- if _, err := w.seriesIDBuf.WriteString(s.Name); err != nil {
- return err
- }
+func (w *bufferWriter) WriteSeriesID(seriesID logSeriesID) error {
+ w.seriesID = &seriesID
return nil
}
@@ -661,8 +679,8 @@ func (w *bufferWriter) writeSeriesIDLength(data uint16)
error {
return writeUint16(w.buf, data)
}
-func (w *bufferWriter) writeSeriesID(data []byte) error {
- _, err := w.buf.Write(data)
+func (w *bufferWriter) writeSeriesID(data *logSeriesID) error {
+ _, err := w.buf.WriteString(data.string())
return err
}
@@ -713,7 +731,7 @@ func (segment *segment) parseLogEntries() error {
var batchLen uint64
var entryLen uint64
var seriesIDLen uint16
- var seriesID common.GlobalSeriesID
+ var seriesID []byte
var seriesCount uint32
var timestampsBinaryLen uint16
var entryEndPosition uint64
@@ -853,11 +871,8 @@ func (segment *segment) parseSeriesIDLength(data []byte)
(uint16, error) {
return seriesIDLen, nil
}
-func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID {
- return common.GlobalSeriesID{
- SeriesID: common.SeriesID(bytesToUint64(data[:8])),
- Name: string(data[8:]),
- }
+func (segment *segment) parseSeriesID(data []byte) []byte {
+ return newLogSeriesID(data).bytes()
}
func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) {
@@ -910,7 +925,7 @@ func (segment *segment) parseValuesBinary(data []byte)
(*list.List, error) {
return values, nil
}
-func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID {
+func (logEntry *logEntry) GetSeriesID() []byte {
return logEntry.seriesID
}
@@ -923,15 +938,15 @@ func (logEntry *logEntry) GetValues() *list.List {
}
func (buffer *buffer) write(request logRequest) {
- seriesID := request.seriesID
- buffer.timestampMap[seriesID] = append(buffer.timestampMap[seriesID],
request.timestamp)
+ key := newLogSeriesID(request.seriesID)
+ buffer.timestampMap[key] = append(buffer.timestampMap[key],
request.timestamp)
// Value item: binary-length(2-bytes) + binary data(n-bytes)
binaryLen := uint16(len(request.data))
- buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID],
byte(binaryLen), byte(binaryLen>>8))
- buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID],
request.data...)
+ buffer.valueMap[key] = append(buffer.valueMap[key], byte(binaryLen),
byte(binaryLen>>8))
+ buffer.valueMap[key] = append(buffer.valueMap[key], request.data...)
- buffer.callbackMap[seriesID] = append(buffer.callbackMap[seriesID],
request.callback)
+ buffer.callbackMap[key] = append(buffer.callbackMap[key],
request.callback)
buffer.count++
}
@@ -946,9 +961,11 @@ func (buffer *buffer) notifyRequests(err error) {
valuePos = 0
for index, callback := range callbacks {
valuePos, valueItem = readValuesBinary(values,
valuePos, valuesBinaryLength)
- buffer.runningCallback(func() {
- callback(seriesID, timestamps[index],
valueItem, err)
- })
+ if callback != nil {
+ buffer.runningCallback(func() {
+ callback(seriesID.bytes(),
timestamps[index], valueItem, err)
+ })
+ }
}
}
}
@@ -973,7 +990,7 @@ func parseSegmentID(segmentName string) (uint64, error) {
if !strings.HasSuffix(segmentName, segmentNameSuffix) {
return 0, errors.New("Invalid segment name: " + segmentName)
}
- return strconv.ParseUint(segmentName[3:19], 10, 64)
+ return strconv.ParseUint(segmentName[3:19], 16, 64)
}
func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) {
@@ -1051,10 +1068,6 @@ func bytesToUint16(buf []byte) uint16 {
return binary.LittleEndian.Uint16(buf)
}
-func bytesToUint64(buf []byte) uint64 {
- return binary.LittleEndian.Uint64(buf)
-}
-
func timeToUnixNano(time time.Time) uint64 {
return uint64(time.UnixNano())
}
diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go
index d6eccc39..ba9570f2 100644
--- a/pkg/wal/wal_benchmark_test.go
+++ b/pkg/wal/wal_benchmark_test.go
@@ -27,7 +27,6 @@ import (
"testing"
"time"
- "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -41,11 +40,11 @@ var (
seriesID100 = newSeriesIDList(100)
seriesID500 = newSeriesIDList(500)
seriesID1000 = newSeriesIDList(1000)
- callback = func(seriesID common.GlobalSeriesID, t time.Time, bytes
[]byte, err error) {}
+ callback = func(seriesID []byte, t time.Time, bytes []byte, err
error) {}
)
func Benchmark_SeriesID_1(b *testing.B) {
- wal := newWAL(nil)
+ wal := newWAL(&Options{SyncFlush: true})
defer closeWAL(wal)
seriesID := seriesID1
@@ -53,13 +52,13 @@ func Benchmark_SeriesID_1(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_20(b *testing.B) {
- wal := newWAL(nil)
+ wal := newWAL(&Options{SyncFlush: true})
defer closeWAL(wal)
seriesID := seriesID20
@@ -67,13 +66,13 @@ func Benchmark_SeriesID_20(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_100(b *testing.B) {
- wal := newWAL(nil)
+ wal := newWAL(&Options{SyncFlush: true})
defer closeWAL(wal)
seriesID := seriesID100
@@ -81,13 +80,13 @@ func Benchmark_SeriesID_100(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_500(b *testing.B) {
- wal := newWAL(nil)
+ wal := newWAL(&Options{SyncFlush: true})
defer closeWAL(wal)
seriesID := seriesID500
@@ -95,13 +94,13 @@ func Benchmark_SeriesID_500(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000(b *testing.B) {
- wal := newWAL(nil)
+ wal := newWAL(&Options{SyncFlush: true})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -109,13 +108,13 @@ func Benchmark_SeriesID_1000(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 64})
+ wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 64})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -123,13 +122,13 @@ func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 128})
+ wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 128})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -137,13 +136,13 @@ func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 512})
+ wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 512})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -151,13 +150,13 @@ func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 1024})
+ wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -165,13 +164,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2})
+ wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024 * 2})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -179,13 +178,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -193,13 +192,13 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b
*testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 128, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 128, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -207,13 +206,13 @@ func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b
*testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 512, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 512, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -221,13 +220,13 @@ func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b
*testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 1024, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 1024, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -235,13 +234,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b
*testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -249,13 +248,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b
*testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
data[i%dataLen].binary, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
}
b.StopTimer()
}
func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b
*testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -281,7 +280,7 @@ func
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B
}()
for i := 0; i < b.N; i++ {
binaryData = data[i%dataLen].binary
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
binaryData, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), binaryData, callback)
logVolume += seriesIDVolume + timeVolume + len(binaryData)
if logVolume >= rotateSize {
@@ -293,7 +292,7 @@ func
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B
}
func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b
*testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -319,7 +318,7 @@ func
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B
}()
for i := 0; i < b.N; i++ {
binaryData = data[i%dataLen].binary
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
binaryData, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), binaryData, callback)
logVolume += seriesIDVolume + timeVolume + len(binaryData)
if logVolume >= rotateSize {
@@ -331,7 +330,7 @@ func
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B
}
func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b
*testing.B) {
- wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+ wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
defer closeWAL(wal)
seriesID := seriesID1000
@@ -357,7 +356,7 @@ func
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B
}()
for i := 0; i < b.N; i++ {
binaryData = data[i%dataLen].binary
- wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1),
binaryData, callback)
+ wal.Write(seriesID[i%seriesIDLen].key,
time.UnixMilli(baseTime+1), binaryData, callback)
logVolume += seriesIDVolume + timeVolume + len(binaryData)
if logVolume >= rotateSize {
@@ -398,13 +397,14 @@ func closeWAL(wal WAL) {
}
}
-func newSeriesIDList(series int) []common.GlobalSeriesID {
- var seriesIDSet []common.GlobalSeriesID
+type SeriesID struct {
+ key []byte
+}
+
+func newSeriesIDList(series int) []SeriesID {
+ var seriesIDSet []SeriesID
for i := 0; i < series; i++ {
- seriesID := common.GlobalSeriesID{
- SeriesID: common.SeriesID(i),
- Name: fmt.Sprintf("series-%d", i),
- }
+ seriesID := SeriesID{key: []byte(fmt.Sprintf("series-%d", i))}
seriesIDSet = append(seriesIDSet, seriesID)
}
return seriesIDSet
diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go
index f21cda56..d807f970 100644
--- a/pkg/wal/wal_test.go
+++ b/pkg/wal/wal_test.go
@@ -31,7 +31,6 @@ import (
"github.com/onsi/gomega"
"github.com/onsi/gomega/gleak"
- "github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/wal"
)
@@ -83,20 +82,17 @@ var _ = ginkgo.Describe("WAL", func() {
wg.Add(writeLogCount)
baseTime := time.Now()
for i := 0; i < seriesIDCount; i++ {
- seriesID := &common.GlobalSeriesID{
- SeriesID: common.SeriesID(i),
- Name: fmt.Sprintf("series-%d", i),
- }
+ seriesID := []byte(fmt.Sprintf("series-%d", i))
go func() {
for j := 0; j < seriesIDElementCount;
j++ {
timestamp :=
time.UnixMilli(baseTime.UnixMilli() + int64(j))
value :=
[]byte(fmt.Sprintf("value-%d", j))
- callback := func(seriesID
common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
+ callback := func(seriesID
[]byte, t time.Time, bytes []byte, err error) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())
wg.Done()
}
- log.Write(*seriesID, timestamp,
value, callback)
+ log.Write(seriesID, timestamp,
value, callback)
}
}()
}
@@ -114,13 +110,8 @@ var _ = ginkgo.Describe("WAL", func() {
entries := segment.GetLogEntries()
for _, entity := range entries {
seriesID := entity.GetSeriesID()
- seriesIDSequence := seriesID.SeriesID
- expectSeriesID := common.GlobalSeriesID{
- SeriesID: seriesIDSequence,
- Name:
fmt.Sprintf("series-%d", seriesIDSequence),
- }
// Check seriesID
- gomega.Expect(expectSeriesID ==
seriesID).To(gomega.BeTrue())
+ gomega.Expect(seriesID !=
nil).To(gomega.BeTrue())
timestamps := entity.GetTimestamps()
values := entity.GetValues()
@@ -171,15 +162,12 @@ var _ = ginkgo.Describe("WAL", func() {
writeLogCount := 3
wg.Add(writeLogCount)
- expectSegments :=
make(map[wal.SegmentID]common.GlobalSeriesID)
+ expectSegments := make(map[wal.SegmentID][]byte)
for i := 0; i < writeLogCount; i++ {
- seriesID := &common.GlobalSeriesID{
- SeriesID: common.SeriesID(i),
- Name: fmt.Sprintf("series-%d", i),
- }
+ seriesID := []byte(fmt.Sprintf("series-%d", i))
timestamp := time.Now()
value := []byte(fmt.Sprintf("value-%d", i))
- callback := func(seriesID
common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
+ callback := func(seriesID []byte, t time.Time,
bytes []byte, err error) {
gomega.Expect(err).ToNot(gomega.HaveOccurred())
// Rotate
@@ -189,7 +177,7 @@ var _ = ginkgo.Describe("WAL", func() {
wg.Done()
}
- log.Write(*seriesID, timestamp, value, callback)
+ log.Write(seriesID, timestamp, value, callback)
}
wg.Wait()
@@ -205,7 +193,7 @@ var _ = ginkgo.Describe("WAL", func() {
gomega.Expect(err).ToNot(gomega.HaveOccurred())
entries := segment.GetLogEntries()
gomega.Expect(len(entries) ==
1).To(gomega.BeTrue())
- gomega.Expect(entries[0].GetSeriesID() ==
seriesID).To(gomega.BeTrue())
+
gomega.Expect(bytes.Equal(entries[0].GetSeriesID(),
seriesID)).To(gomega.BeTrue())
}
})
})