This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 4655b735 Support for recovery measure/buffer using wal (#313)
4655b735 is described below

commit 4655b7359f86303a282a471ec166cc68826382d1
Author: hailin0 <[email protected]>
AuthorDate: Wed Aug 9 20:24:12 2023 +0800

    Support for recovery measure/buffer using wal (#313)
    
    * Update wal seriesID datatype to bytes
    * Integrate wal into measure/buffer for memory recovery
    * update bytes/string convert
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
---
 CHANGES.md                    |   1 +
 api/common/id.go              |  26 ------
 banyand/measure/tstable.go    |  24 ++---
 banyand/stream/tstable.go     |   6 +-
 banyand/tsdb/buffer.go        | 207 ++++++++++++++++++++++++++++++++++++++----
 banyand/tsdb/buffer_test.go   | 155 ++++++++++++++++++++++++++++++-
 pkg/convert/string.go         |  38 ++++++++
 pkg/wal/wal.go                | 127 ++++++++++++++------------
 pkg/wal/wal_benchmark_test.go |  88 +++++++++---------
 pkg/wal/wal_test.go           |  30 ++----
 10 files changed, 518 insertions(+), 184 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index bbdec150..e537ba38 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -10,6 +10,7 @@ Release Notes.
 - Implement Write-ahead Logging
 - Document the clustering.
 - Support multiple roles for banyand server.
+- Support for recovery buffer using wal.
 
 ### Bugs
 
diff --git a/api/common/id.go b/api/common/id.go
index 0d8c5890..6cc879ca 100644
--- a/api/common/id.go
+++ b/api/common/id.go
@@ -40,32 +40,6 @@ func (s SeriesID) Marshal() []byte {
        return convert.Uint64ToBytes(uint64(s))
 }
 
-// GlobalSeriesID identities a series in a shard.
-type GlobalSeriesID struct {
-       Name     string
-       SeriesID SeriesID
-}
-
-// Marshal encodes global series id to bytes.
-func (s GlobalSeriesID) Marshal() []byte {
-       seriesIDBytes := convert.Uint64ToBytes(uint64(s.SeriesID))
-       nameBytes := []byte(s.Name)
-       return append(seriesIDBytes, nameBytes...)
-}
-
-// Volume returns the estimated bytes volume of global series id.
-func (s GlobalSeriesID) Volume() int {
-       return 8 + len(s.Name)
-}
-
-// ParseGlobalSeriesID parses global series id from bytes.
-func ParseGlobalSeriesID(b []byte) GlobalSeriesID {
-       return GlobalSeriesID{
-               SeriesID: SeriesID(convert.BytesToUint64(b[:8])),
-               Name:     string(b[8:]),
-       }
-}
-
 // positionKey is a context key to store the module position.
 var positionKey = contextPositionKey{}
 
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 08765a5d..462d5066 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -41,6 +41,7 @@ import (
 const (
        defaultNumBufferShards  = 2
        defaultWriteConcurrency = 1000
+       defaultWriteWal         = true
        plain                   = "tst"
        encoded                 = "encoded"
 )
@@ -56,6 +57,7 @@ type tsTable struct {
        buffer            *tsdb.Buffer
        closeBufferTimer  *time.Timer
        position          common.Position
+       path              string
        bufferSize        int64
        encoderBufferSize int64
        lock              sync.Mutex
@@ -72,13 +74,13 @@ func (t *tsTable) openBuffer() (err error) {
                return nil
        }
        bufferSize := int(t.encoderBufferSize / defaultNumBufferShards)
-       if t.encoderBuffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
-               defaultWriteConcurrency, defaultNumBufferShards, 
t.encoderFlush); err != nil {
+       if t.encoderBuffer, err = tsdb.NewBufferWithWal(t.l, t.position, 
bufferSize,
+               defaultWriteConcurrency, defaultNumBufferShards, 
t.encoderFlush, defaultWriteWal, &t.path); err != nil {
                return fmt.Errorf("failed to create encoder buffer: %w", err)
        }
        bufferSize = int(t.bufferSize / defaultNumBufferShards)
-       if t.buffer, err = tsdb.NewBuffer(t.l, t.position, bufferSize,
-               defaultWriteConcurrency, defaultNumBufferShards, t.flush); err 
!= nil {
+       if t.buffer, err = tsdb.NewBufferWithWal(t.l, t.position, bufferSize,
+               defaultWriteConcurrency, defaultNumBufferShards, t.flush, 
defaultWriteWal, &t.path); err != nil {
                return fmt.Errorf("failed to create buffer: %w", err)
        }
        end := t.EndTime()
@@ -153,22 +155,19 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, 
error) {
 
 func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
        if t.encoderBuffer != nil {
-               t.writeToBuffer(key, val, ts)
-               return nil
+               return t.writeToBuffer(key, val, ts)
        }
        if err := t.openBuffer(); err != nil {
                return err
        }
-       t.writeToBuffer(key, val, ts)
-       return nil
+       return t.writeToBuffer(key, val, ts)
 }
 
-func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) {
+func (t *tsTable) writeToBuffer(key []byte, val []byte, ts time.Time) error {
        if t.toEncode(key) {
-               t.encoderBuffer.Write(key, val, ts)
-       } else {
-               t.buffer.Write(key, val, ts)
+               return t.encoderBuffer.Write(key, val, ts)
        }
+       return t.buffer.Write(key, val, ts)
 }
 
 func (t *tsTable) encoderFlush(shardIndex int, skl *skl.Skiplist) error {
@@ -228,6 +227,7 @@ func (ttf *tsTableFactory) NewTSTable(blockExpiryTracker 
tsdb.BlockExpiryTracker
                encoderSST:         encoderSST,
                sst:                sst,
                BlockExpiryTracker: &blockExpiryTracker,
+               path:               root,
        }
        if table.IsActive() {
                if err := table.openBuffer(); err != nil {
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 924f53f6..6af8bf2f 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -113,15 +113,13 @@ func (t *tsTable) Get(key []byte, ts time.Time) ([]byte, 
error) {
 
 func (t *tsTable) Put(key []byte, val []byte, ts time.Time) error {
        if t.buffer != nil {
-               t.buffer.Write(key, val, ts)
-               return nil
+               return t.buffer.Write(key, val, ts)
        }
 
        if err := t.openBuffer(); err != nil {
                return err
        }
-       t.buffer.Write(key, val, ts)
-       return nil
+       return t.buffer.Write(key, val, ts)
 }
 
 func (t *tsTable) flush(shardIndex int, skl *skl.Skiplist) error {
diff --git a/banyand/tsdb/buffer.go b/banyand/tsdb/buffer.go
index c586659b..1fc94283 100644
--- a/banyand/tsdb/buffer.go
+++ b/banyand/tsdb/buffer.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/dgraph-io/badger/v3/skl"
        "github.com/dgraph-io/badger/v3/y"
+       "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -32,11 +33,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/meter"
        "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/wal"
 )
 
 const (
-       defaultSize = 1 << 20 // 1MB
-       nodeAlign   = int(unsafe.Sizeof(uint64(0))) - 1
+       defaultSize        = 1 << 20 // 1MB
+       nodeAlign          = int(unsafe.Sizeof(uint64(0))) - 1
+       defaultWalSyncMode = false
 )
 
 var (
@@ -53,13 +56,15 @@ func init() {
 }
 
 type operation struct {
-       key   []byte
-       value []byte
-       epoch uint64
+       recoveryDoneFn func()
+       key            []byte
+       value          []byte
+       epoch          uint64
 }
 
 type flushEvent struct {
-       data *skl.Skiplist
+       data         *skl.Skiplist
+       walSegmentID wal.SegmentID
 }
 
 type onFlush func(shardIndex int, skl *skl.Skiplist) error
@@ -71,12 +76,15 @@ type bufferShardBucket struct {
        writeWaitGroup   *sync.WaitGroup
        flushWaitGroup   *sync.WaitGroup
        log              *logger.Logger
+       wal              wal.WAL
        immutables       []*skl.Skiplist
        labelValues      []string
        shardLabelValues []string
        index            int
        capacity         int
        mutex            sync.RWMutex
+       walSyncMode      bool
+       enableWal        bool
 }
 
 // Buffer is an exported struct that represents a buffer composed of multiple 
shard buckets.
@@ -88,11 +96,18 @@ type Buffer struct {
        writeWaitGroup sync.WaitGroup
        flushWaitGroup sync.WaitGroup
        numShards      int
+       enableWal      bool
        closerOnce     sync.Once
 }
 
 // NewBuffer creates a new Buffer instance with the given parameters.
 func NewBuffer(log *logger.Logger, position common.Position, flushSize, 
writeConcurrency, numShards int, onFlushFn onFlush) (*Buffer, error) {
+       return NewBufferWithWal(log, position, flushSize, writeConcurrency, 
numShards, onFlushFn, false, nil)
+}
+
+// NewBufferWithWal creates a new Buffer instance with the given parameters.
+func NewBufferWithWal(log *logger.Logger, position common.Position, flushSize, 
writeConcurrency, numShards int, onFlushFn onFlush, enableWal bool, walPath 
*string,
+) (*Buffer, error) {
        buckets := make([]bufferShardBucket, numShards)
        buffer := &Buffer{
                buckets:     buckets,
@@ -100,6 +115,7 @@ func NewBuffer(log *logger.Logger, position 
common.Position, flushSize, writeCon
                onFlushFn:   onFlushFn,
                entryCloser: run.NewCloser(1),
                log:         log.Named("buffer"),
+               enableWal:   enableWal,
        }
        buffer.writeWaitGroup.Add(numShards)
        buffer.flushWaitGroup.Add(numShards)
@@ -115,17 +131,27 @@ func NewBuffer(log *logger.Logger, position 
common.Position, flushSize, writeCon
                        log:              
buffer.log.Named(fmt.Sprintf("shard-%d", i)),
                        labelValues:      append(position.LabelValues(), 
fmt.Sprintf("%d", i)),
                        shardLabelValues: position.ShardLabelValues(),
+                       enableWal:        enableWal,
                }
                buckets[i].start(onFlushFn)
+               if enableWal {
+                       if walPath == nil {
+                               return nil, errors.New("wal path is required")
+                       }
+                       shardWalPath := fmt.Sprintf("%s/buffer-%d", *walPath, i)
+                       if err := buckets[i].startWal(shardWalPath, 
defaultWalSyncMode); err != nil {
+                               return nil, errors.Wrap(err, "failed to start 
wal")
+                       }
+               }
                maxBytes.Set(float64(flushSize), buckets[i].labelValues...)
        }
        return buffer, nil
 }
 
 // Write adds a key-value pair with a timestamp to the appropriate shard 
bucket in the buffer.
-func (b *Buffer) Write(key, value []byte, timestamp time.Time) {
+func (b *Buffer) Write(key, value []byte, timestamp time.Time) error {
        if b == nil || !b.entryCloser.AddRunning() {
-               return
+               return errors.New("buffer is invalid")
        }
        defer b.entryCloser.Done()
        index := b.getShardIndex(key)
@@ -133,7 +159,15 @@ func (b *Buffer) Write(key, value []byte, timestamp 
time.Time) {
                b.log.Debug().Uint64("shard", index).Bytes("key", key).
                        Time("ts", timestamp).Msg("route a shard")
        }
+
+       if b.enableWal {
+               if err := b.buckets[index].writeWal(key, value, timestamp); err 
!= nil {
+                       return errors.Wrap(err, "failed to write wal")
+               }
+       }
+
        b.buckets[index].writeCh <- operation{key: key, value: value, epoch: 
uint64(timestamp.UnixNano())}
+       return nil
 }
 
 // Read retrieves the value associated with the given key and timestamp from 
the appropriate shard bucket in the buffer.
@@ -179,6 +213,11 @@ func (b *Buffer) Close() error {
                }
                for i := 0; i < b.numShards; i++ {
                        close(b.buckets[i].flushCh)
+                       if b.enableWal {
+                               if err := b.buckets[i].wal.Close(); err != nil {
+                                       b.buckets[i].log.Err(err).Msg("closing 
buffer shard wal failed")
+                               }
+                       }
                }
                b.flushWaitGroup.Wait()
        })
@@ -218,13 +257,24 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
                for event := range bsb.flushCh {
                        oldSkipList := event.data
                        memSize := oldSkipList.MemSize()
+                       onFlushFnDone := false
                        t1 := time.Now()
                        for {
-                               if err := onFlushFn(bsb.index, oldSkipList); 
err != nil {
-                                       bsb.log.Err(err).Msg("flushing 
immutable buffer failed. Retrying...")
-                                       flushNum.Inc(1, 
append(bsb.labelValues[:2], "true")...)
-                                       time.Sleep(time.Second)
-                                       continue
+                               if !onFlushFnDone {
+                                       if err := onFlushFn(bsb.index, 
oldSkipList); err != nil {
+                                               bsb.log.Err(err).Msg("flushing 
immutable buffer failed. Retrying...")
+                                               flushNum.Inc(1, 
append(bsb.labelValues[:2], "true")...)
+                                               time.Sleep(time.Second)
+                                               continue
+                                       }
+                                       onFlushFnDone = true
+                               }
+                               if bsb.enableWal {
+                                       if err := 
bsb.wal.Delete(event.walSegmentID); err != nil {
+                                               bsb.log.Err(err).Msg("delete 
wal segment file failed. Retrying...")
+                                               time.Sleep(time.Second)
+                                               continue
+                                       }
                                }
                                break
                        }
@@ -249,23 +299,38 @@ func (bsb *bufferShardBucket) start(onFlushFn onFlush) {
                        volume += len(k) + int(v.EncodedSize()) + 
skl.MaxNodeSize + nodeAlign
                        memSize := bsb.mutable.MemSize()
                        mutableBytes.Set(float64(memSize), bsb.labelValues...)
-                       if volume >= bsb.capacity || memSize >= 
int64(bsb.capacity) {
-                               bsb.triggerFlushing()
-                               volume = 0
+                       if op.recoveryDoneFn == nil && (volume >= bsb.capacity 
|| memSize >= int64(bsb.capacity)) {
+                               if err := bsb.triggerFlushing(); err != nil {
+                                       bsb.log.Err(err).Msg("triggering 
flushing failed")
+                               } else {
+                                       volume = 0
+                               }
                        }
                        bsb.mutable.Put(k, v)
+                       if bsb.enableWal && op.recoveryDoneFn != nil {
+                               op.recoveryDoneFn()
+                       }
                }
        }()
 }
 
-func (bsb *bufferShardBucket) triggerFlushing() {
+func (bsb *bufferShardBucket) triggerFlushing() error {
+       var walSegmentID wal.SegmentID
+       if bsb.enableWal {
+               segment, err := bsb.wal.Rotate()
+               if err != nil {
+                       return errors.Wrap(err, "rotating wal failed")
+               }
+               walSegmentID = segment.GetSegmentID()
+       }
+
        for {
                select {
-               case bsb.flushCh <- flushEvent{data: bsb.mutable}:
+               case bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: 
walSegmentID}:
                        bsb.mutex.Lock()
                        defer bsb.mutex.Unlock()
                        bsb.swap()
-                       return
+                       return nil
                default:
                }
                time.Sleep(10 * time.Second)
@@ -276,3 +341,107 @@ func (bsb *bufferShardBucket) swap() {
        bsb.immutables = append(bsb.immutables, bsb.mutable)
        bsb.mutable = skl.NewSkiplist(int64(bsb.capacity))
 }
+
+func (bsb *bufferShardBucket) startWal(path string, syncMode bool) error {
+       wal, err := wal.New(path, wal.DefaultOptions)
+       if err != nil {
+               return errors.Wrap(err, fmt.Sprintf("failed to create wal: %s", 
path))
+       }
+       bsb.wal = wal
+       bsb.walSyncMode = syncMode
+       bsb.log.Info().Msg(fmt.Sprintf(
+               "wal started with path: %s, sync mode: %v", path, syncMode))
+
+       if err = bsb.recoveryWal(); err != nil {
+               return errors.Wrap(err, "failed to recovery wal")
+       }
+       return nil
+}
+
+func (bsb *bufferShardBucket) recoveryWal() error {
+       segments, err := bsb.wal.ReadAllSegments()
+       if err != nil {
+               return errors.Wrap(err, "failed to load wal segments")
+       }
+
+       recoveredRecords := 0
+       for index, segment := range segments {
+               recoveredRecords += len(segment.GetLogEntries())
+               isWorkSegment := index == len(segments)-1
+               if isWorkSegment {
+                       bsb.recoveryWorkSegment(segment)
+               } else {
+                       bsb.recoveryStableSegment(segment)
+               }
+               bsb.log.Info().Msg(fmt.Sprintf(
+                       "recovered %d log records from wal segment %d",
+                       len(segment.GetLogEntries()),
+                       segment.GetSegmentID()))
+       }
+       bsb.log.Info().Msg(fmt.Sprintf(
+               "recovered %d log records from wal", recoveredRecords))
+       return nil
+}
+
+func (bsb *bufferShardBucket) recoveryWorkSegment(segment wal.Segment) {
+       var wg sync.WaitGroup
+       wg.Add(len(segment.GetLogEntries()))
+       for _, logEntry := range segment.GetLogEntries() {
+               timestamps := logEntry.GetTimestamps()
+               values := logEntry.GetValues()
+               elementIndex := 0
+               for element := values.Front(); element != nil; element = 
element.Next() {
+                       timestamp := timestamps[elementIndex]
+                       bsb.writeCh <- operation{
+                               key:   logEntry.GetSeriesID(),
+                               value: element.Value.([]byte),
+                               epoch: uint64(timestamp.UnixNano()),
+                               recoveryDoneFn: func() {
+                                       wg.Done()
+                                       if bsb.log.Trace().Enabled() {
+                                               
bsb.log.Trace().Msg(fmt.Sprintf("recovered key: %v, ts: %v",
+                                                       logEntry.GetSeriesID(), 
timestamp.UnixNano()))
+                                       }
+                               },
+                       }
+                       elementIndex++
+               }
+       }
+       wg.Wait()
+}
+
+func (bsb *bufferShardBucket) recoveryStableSegment(segment wal.Segment) {
+       for _, logEntries := range segment.GetLogEntries() {
+               timestamps := logEntries.GetTimestamps()
+               values := logEntries.GetValues()
+               elementIndex := 0
+               for element := values.Front(); element != nil; element = 
element.Next() {
+                       timestamp := timestamps[elementIndex]
+                       k := y.KeyWithTs(logEntries.GetSeriesID(), 
uint64(timestamp.UnixNano()))
+                       v := y.ValueStruct{Value: element.Value.([]byte)}
+                       bsb.mutable.Put(k, v)
+                       elementIndex++
+               }
+       }
+       bsb.flushCh <- flushEvent{data: bsb.mutable, walSegmentID: 
segment.GetSegmentID()}
+       // Sync recover data to immutables
+       bsb.swap()
+}
+
+func (bsb *bufferShardBucket) writeWal(key, value []byte, timestamp time.Time) 
error {
+       if !bsb.walSyncMode {
+               bsb.wal.Write(key, timestamp, value, nil)
+               return nil
+       }
+
+       var walErr error
+       var wg sync.WaitGroup
+       wg.Add(1)
+       walCallback := func(key []byte, t time.Time, value []byte, err error) {
+               walErr = err
+               wg.Done()
+       }
+       bsb.wal.Write(key, timestamp, value, walCallback)
+       wg.Wait()
+       return walErr
+}
diff --git a/banyand/tsdb/buffer_test.go b/banyand/tsdb/buffer_test.go
index bb948372..7068e972 100644
--- a/banyand/tsdb/buffer_test.go
+++ b/banyand/tsdb/buffer_test.go
@@ -21,6 +21,9 @@ import (
        "crypto/rand"
        "fmt"
        "math/big"
+       "os"
+       "path/filepath"
+       "strconv"
        "sync"
        "time"
 
@@ -118,7 +121,8 @@ var _ = Describe("Buffer", func() {
                                }(ch)
                        }
 
-                       buffer, err := tsdb.NewBuffer(log, common.Position{}, 
1024, 16, numShards, onFlushFn)
+                       var err error
+                       buffer, err = tsdb.NewBuffer(log, common.Position{}, 
1024, 16, numShards, onFlushFn)
                        defer func() {
                                _ = buffer.Close()
                        }()
@@ -147,4 +151,153 @@ var _ = Describe("Buffer", func() {
                        }
                })
        })
+
+       Context("Write and Recover of wal correctly", func() {
+               writeConcurrency := 2
+               numShards := 2
+               flushSize := 1024
+               baseTime := time.Now()
+               var path string
+
+               BeforeEach(func() {
+                       var err error
+                       path, err = os.MkdirTemp("", 
"banyandb-test-buffer-wal-*")
+                       Expect(err).ToNot(HaveOccurred())
+               })
+
+               AfterEach(func() {
+                       err := os.RemoveAll(path)
+                       Expect(err).ToNot(HaveOccurred())
+               })
+
+               It("should write and rotate wal file correctly", func() {
+                       var err error
+                       var flushMutex sync.Mutex
+
+                       shardWalFileHistory := make(map[int][]string)
+                       buffer, err = tsdb.NewBufferWithWal(
+                               log,
+                               common.Position{},
+                               flushSize,
+                               writeConcurrency,
+                               numShards,
+                               func(shardIndex int, skl *skl.Skiplist) error {
+                                       flushMutex.Lock()
+                                       defer flushMutex.Unlock()
+
+                                       shardWalDir := filepath.Join(path, 
"buffer-"+strconv.Itoa(shardIndex))
+                                       var shardWalList []os.DirEntry
+                                       shardWalList, err = 
os.ReadDir(shardWalDir)
+                                       Expect(err).ToNot(HaveOccurred())
+                                       for _, shardWalFile := range 
shardWalList {
+                                               
Expect(shardWalFile.IsDir()).To(BeFalse())
+                                               
Expect(shardWalFile.Name()).To(HaveSuffix(".wal"))
+                                               shardWalFileHistory[shardIndex] 
= append(shardWalFileHistory[shardIndex], shardWalFile.Name())
+                                       }
+                                       return nil
+                               },
+                               true,
+                               &path)
+                       Expect(err).ToNot(HaveOccurred())
+                       defer buffer.Close()
+
+                       // Write buffer & wal
+                       var wg sync.WaitGroup
+                       wg.Add(writeConcurrency)
+                       for i := 0; i < writeConcurrency; i++ {
+                               go func(writerIndex int) {
+                                       for j := 0; j < numShards; j++ {
+                                               for k := 0; k < flushSize; k++ {
+                                                       buffer.Write(
+                                                               
[]byte(fmt.Sprintf("writer-%d-shard-%d-key-%d", writerIndex, j, k)),
+                                                               
[]byte(fmt.Sprintf("writer-%d-shard-%d-value-%d", writerIndex, j, k)),
+                                                               
time.UnixMilli(baseTime.UnixMilli()+int64(writerIndex+j+k)))
+                                               }
+                                       }
+                                       wg.Done()
+                               }(i)
+                       }
+                       wg.Wait()
+
+                       flushMutex.Lock()
+                       defer flushMutex.Unlock()
+
+                       // Check wal
+                       Expect(len(shardWalFileHistory) == 
numShards).To(BeTrue())
+                       for shardIndex := 0; shardIndex < numShards; 
shardIndex++ {
+                               // Check wal rotate
+                               Expect(len(shardWalFileHistory[shardIndex]) > 
1).To(BeTrue())
+
+                               shardWalDir := filepath.Join(path, 
"buffer-"+strconv.Itoa(shardIndex))
+                               currentShardWalFiles, err := 
os.ReadDir(shardWalDir)
+                               Expect(err).ToNot(HaveOccurred())
+                               Expect(len(currentShardWalFiles) <= 
2).To(BeTrue())
+                               // Check wal delete
+                               Expect(len(shardWalFileHistory[shardIndex]) > 
len(currentShardWalFiles)).To(BeTrue())
+                       }
+               })
+
+               It("should recover buffer from wal file correctly", func() {
+                       var err error
+                       var flushMutex sync.Mutex
+                       var bufferFlushed bool
+
+                       buffer, err = tsdb.NewBufferWithWal(
+                               log,
+                               common.Position{},
+                               flushSize,
+                               writeConcurrency,
+                               numShards,
+                               func(shardIndex int, skl *skl.Skiplist) error {
+                                       flushMutex.Lock()
+                                       defer flushMutex.Unlock()
+
+                                       if !bufferFlushed {
+                                               bufferFlushed = true
+                                       }
+                                       return nil
+                               },
+                               true,
+                               &path)
+                       Expect(err).ToNot(HaveOccurred())
+
+                       // Write buffer & wal
+                       for i := 0; i < numShards; i++ {
+                               buffer.Write(
+                                       []byte(fmt.Sprintf("shard-%d-key-1", 
i)),
+                                       []byte(fmt.Sprintf("shard-%d-value-1", 
i)),
+                                       
time.UnixMilli(baseTime.UnixMilli()+int64(i)))
+                       }
+
+                       flushMutex.Lock()
+                       Expect(bufferFlushed).To(BeFalse())
+                       flushMutex.Unlock()
+
+                       // Restart buffer
+                       buffer.Close()
+                       buffer, err = tsdb.NewBufferWithWal(
+                               log,
+                               common.Position{},
+                               flushSize,
+                               writeConcurrency,
+                               numShards,
+                               func(shardIndex int, skl *skl.Skiplist) error {
+                                       return nil
+                               },
+                               true,
+                               &path)
+                       Expect(err).ToNot(HaveOccurred())
+                       defer buffer.Close()
+
+                       // Check buffer was recovered from wal
+                       for i := 0; i < numShards; i++ {
+                               expectValue := 
[]byte(fmt.Sprintf("shard-%d-value-1", i))
+                               value, exist := buffer.Read(
+                                       []byte(fmt.Sprintf("shard-%d-key-1", 
i)),
+                                       
time.UnixMilli(baseTime.UnixMilli()+int64(i)))
+                               Expect(exist).To(BeTrue())
+                               Expect(bytes.Equal(expectValue, 
value)).To(BeTrue())
+                       }
+               })
+       })
 })
diff --git a/pkg/convert/string.go b/pkg/convert/string.go
new file mode 100644
index 00000000..5a24db8d
--- /dev/null
+++ b/pkg/convert/string.go
@@ -0,0 +1,38 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package convert
+
+import (
+       "reflect"
+       "unsafe"
+)
+
+// StringToBytes converts string to bytes.
+func StringToBytes(s string) (b []byte) {
+       bh := (*reflect.SliceHeader)(unsafe.Pointer(&b))
+       sh := (*reflect.StringHeader)(unsafe.Pointer(&s))
+       bh.Data = sh.Data
+       bh.Cap = sh.Len
+       bh.Len = sh.Len
+       return b
+}
+
+// BytesToString converts bytes to string.
+func BytesToString(b []byte) string {
+       return *(*string)(unsafe.Pointer(&b))
+}
diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go
index 39b02282..db8f416d 100644
--- a/pkg/wal/wal.go
+++ b/pkg/wal/wal.go
@@ -26,6 +26,7 @@ import (
        "math"
        "os"
        "path/filepath"
+       "sort"
        "strconv"
        "strings"
        "sync"
@@ -35,7 +36,7 @@ import (
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
-       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
@@ -55,6 +56,7 @@ const (
        parseTimeStr           = "2006-01-02 15:04:05"
        maxRetries             = 3
        maxSegmentID           = uint64(math.MaxUint64) - 1
+       defaultSyncFlush       = false
 )
 
 // DefaultOptions for Open().
@@ -62,7 +64,7 @@ var DefaultOptions = &Options{
        FileSize:            67108864, // 64MB
        BufferSize:          65535,    // 16KB
        BufferBatchInterval: 3 * time.Second,
-       NoSync:              false,
+       SyncFlush:           defaultSyncFlush,
 }
 
 // Options for creating Write-ahead Logging.
@@ -71,7 +73,7 @@ type Options struct {
        BufferSize          int
        BufferBatchInterval time.Duration
        FlushQueueSize      int
-       NoSync              bool
+       SyncFlush           bool
 }
 
 // WAL denotes a Write-ahead logging.
@@ -82,7 +84,7 @@ type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
        // The callback function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.GlobalSeriesID, timestamp time.Time, data []byte, 
callback func(common.GlobalSeriesID, time.Time, []byte, error))
+       Write(seriesID []byte, timestamp time.Time, data []byte, callback 
func([]byte, time.Time, []byte, error))
        // Read specified segment by SegmentID.
        Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
@@ -106,7 +108,7 @@ type Segment interface {
 
 // LogEntry used for attain detail value of WAL entry.
 type LogEntry interface {
-       GetSeriesID() common.GlobalSeriesID
+       GetSeriesID() []byte
        GetTimestamps() []time.Time
        GetValues() *list.List
 }
@@ -137,35 +139,56 @@ type segment struct {
 }
 
 type logRequest struct {
-       seriesID  common.GlobalSeriesID
+       seriesID  []byte
        timestamp time.Time
-       callback  func(common.GlobalSeriesID, time.Time, []byte, error)
+       callback  func([]byte, time.Time, []byte, error)
        data      []byte
 }
 
 type logEntry struct {
        timestamps  []time.Time
        values      *list.List
-       seriesID    common.GlobalSeriesID
+       seriesID    []byte
        entryLength uint64
        count       uint32
 }
 
 type buffer struct {
-       timestampMap map[common.GlobalSeriesID][]time.Time
-       valueMap     map[common.GlobalSeriesID][]byte
-       callbackMap  map[common.GlobalSeriesID][]func(common.GlobalSeriesID, 
time.Time, []byte, error)
+       timestampMap map[logSeriesID][]time.Time
+       valueMap     map[logSeriesID][]byte
+       callbackMap  map[logSeriesID][]func([]byte, time.Time, []byte, error)
        count        int
 }
 
 type bufferWriter struct {
        buf           *bytes.Buffer
-       seriesIDBuf   *bytes.Buffer
        timestampsBuf *bytes.Buffer
+       seriesID      *logSeriesID
        dataBuf       []byte
-       dataLen       int
-       seriesCount   uint32
        batchLen      uint64
+       seriesCount   uint32
+       dataLen       int
+}
+
+type logSeriesID struct {
+       key     string
+       byteLen int
+}
+
+func newLogSeriesID(b []byte) logSeriesID {
+       return logSeriesID{key: convert.BytesToString(b), byteLen: len(b)}
+}
+
+func (s logSeriesID) string() string {
+       return s.key
+}
+
+func (s logSeriesID) bytes() []byte {
+       return convert.StringToBytes(s.key)
+}
+
+func (s logSeriesID) len() int {
+       return s.byteLen
 }
 
 // New creates a WAL instance in the specified path.
@@ -189,7 +212,7 @@ func New(path string, options *Options) (WAL, error) {
                        FileSize:            fileSize,
                        BufferSize:          bufferSize,
                        BufferBatchInterval: bufferBatchInterval,
-                       NoSync:              options.NoSync,
+                       SyncFlush:           options.SyncFlush,
                }
        }
 
@@ -216,9 +239,9 @@ func New(path string, options *Options) (WAL, error) {
                flushCloser:     flushCloser,
                chanGroupCloser: chanGroupCloser,
                buffer: buffer{
-                       timestampMap: 
make(map[common.GlobalSeriesID][]time.Time),
-                       valueMap:     make(map[common.GlobalSeriesID][]byte),
-                       callbackMap:  
make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, 
error)),
+                       timestampMap: make(map[logSeriesID][]time.Time),
+                       valueMap:     make(map[logSeriesID][]byte),
+                       callbackMap:  make(map[logSeriesID][]func([]byte, 
time.Time, []byte, error)),
                        count:        0,
                },
        }
@@ -234,7 +257,7 @@ func New(path string, options *Options) (WAL, error) {
 // Write a logging entity.
 // It will return immediately when the data is written in the buffer,
 // The callback function will be called when the entity is flushed on the 
persistent storage.
-func (log *log) Write(seriesID common.GlobalSeriesID, timestamp time.Time, 
data []byte, callback func(common.GlobalSeriesID, time.Time, []byte, error)) {
+func (log *log) Write(seriesID []byte, timestamp time.Time, data []byte, 
callback func([]byte, time.Time, []byte, error)) {
        if !log.writeCloser.AddSender() {
                return
        }
@@ -266,6 +289,7 @@ func (log *log) ReadAllSegments() ([]Segment, error) {
        for _, segment := range log.segmentMap {
                segments = append(segments, segment)
        }
+       sort.Slice(segments, func(i, j int) bool { return 
segments[i].GetSegmentID() < segments[j].GetSegmentID() })
        return segments, nil
 }
 
@@ -365,7 +389,7 @@ func (log *log) start() {
                                        log.logger.Debug().Msg("Write request 
to buffer. elements: " + strconv.Itoa(log.buffer.count))
                                }
 
-                               bufferVolume += request.seriesID.Volume() + 
timestampVolumeLength + len(request.data)
+                               bufferVolume += len(request.seriesID) + 
timestampVolumeLength + len(request.data)
                                if bufferVolume > log.options.BufferSize {
                                        log.triggerFlushing()
                                        bufferVolume = 0
@@ -440,9 +464,9 @@ func (log *log) triggerFlushing() {
 
 func (log *log) newBuffer() {
        log.buffer = buffer{
-               timestampMap: make(map[common.GlobalSeriesID][]time.Time),
-               valueMap:     make(map[common.GlobalSeriesID][]byte),
-               callbackMap:  
make(map[common.GlobalSeriesID][]func(common.GlobalSeriesID, time.Time, []byte, 
error)),
+               timestampMap: make(map[logSeriesID][]time.Time),
+               valueMap:     make(map[logSeriesID][]byte),
+               callbackMap:  make(map[logSeriesID][]func([]byte, time.Time, 
[]byte, error)),
                count:        0,
        }
 }
@@ -481,7 +505,7 @@ func (log *log) writeWorkSegment(data []byte) error {
        if _, err := log.workSegment.file.Write(data); err != nil {
                return errors.Wrap(err, "Write WAL segment file error, file: 
"+log.workSegment.path)
        }
-       if !log.options.NoSync {
+       if log.options.SyncFlush {
                if err := log.workSegment.file.Sync(); err != nil {
                        log.logger.Warn().Msg("Sync WAL segment file to disk 
failed, file: " + log.workSegment.path)
                }
@@ -541,7 +565,6 @@ func (log *log) load() error {
 func newBufferWriter() *bufferWriter {
        return &bufferWriter{
                buf:           bytes.NewBuffer([]byte{}),
-               seriesIDBuf:   bytes.NewBuffer([]byte{}),
                timestampsBuf: bytes.NewBuffer([]byte{}),
                dataBuf:       make([]byte, 128),
        }
@@ -558,14 +581,14 @@ func (w *bufferWriter) Reset() error {
 }
 
 func (w *bufferWriter) ResetSeries() {
-       w.seriesIDBuf.Reset()
        w.timestampsBuf.Reset()
        w.dataLen = 0
+       w.seriesID = nil
        w.seriesCount = 0
 }
 
 func (w *bufferWriter) AddSeries() error {
-       seriesIDBytesLen := uint16(w.seriesIDBuf.Len())
+       seriesIDBytesLen := uint16(w.seriesID.len())
        timestampsBytesLen := uint16(w.timestampsBuf.Len())
        entryLen := seriesIDLength + uint64(seriesIDBytesLen) + 
seriesCountLength + timestampsBinaryLength + uint64(timestampsBytesLen) + 
uint64(w.dataLen)
 
@@ -576,7 +599,7 @@ func (w *bufferWriter) AddSeries() error {
        if err = w.writeSeriesIDLength(seriesIDBytesLen); err != nil {
                return err
        }
-       if err = w.writeSeriesID(w.seriesIDBuf.Bytes()); err != nil {
+       if err = w.writeSeriesID(w.seriesID); err != nil {
                return err
        }
        if err = w.writeSeriesCount(w.seriesCount); err != nil {
@@ -601,13 +624,8 @@ func (w *bufferWriter) Bytes() []byte {
        return w.rewriteBatchLength(batchBytes, batchLen)
 }
 
-func (w *bufferWriter) WriteSeriesID(s common.GlobalSeriesID) error {
-       if err := writeUint64(w.seriesIDBuf, uint64(s.SeriesID)); err != nil {
-               return err
-       }
-       if _, err := w.seriesIDBuf.WriteString(s.Name); err != nil {
-               return err
-       }
+func (w *bufferWriter) WriteSeriesID(seriesID logSeriesID) error {
+       w.seriesID = &seriesID
        return nil
 }
 
@@ -661,8 +679,8 @@ func (w *bufferWriter) writeSeriesIDLength(data uint16) 
error {
        return writeUint16(w.buf, data)
 }
 
-func (w *bufferWriter) writeSeriesID(data []byte) error {
-       _, err := w.buf.Write(data)
+func (w *bufferWriter) writeSeriesID(data *logSeriesID) error {
+       _, err := w.buf.WriteString(data.string())
        return err
 }
 
@@ -713,7 +731,7 @@ func (segment *segment) parseLogEntries() error {
        var batchLen uint64
        var entryLen uint64
        var seriesIDLen uint16
-       var seriesID common.GlobalSeriesID
+       var seriesID []byte
        var seriesCount uint32
        var timestampsBinaryLen uint16
        var entryEndPosition uint64
@@ -853,11 +871,8 @@ func (segment *segment) parseSeriesIDLength(data []byte) 
(uint16, error) {
        return seriesIDLen, nil
 }
 
-func (segment *segment) parseSeriesID(data []byte) common.GlobalSeriesID {
-       return common.GlobalSeriesID{
-               SeriesID: common.SeriesID(bytesToUint64(data[:8])),
-               Name:     string(data[8:]),
-       }
+func (segment *segment) parseSeriesID(data []byte) []byte {
+       return newLogSeriesID(data).bytes()
 }
 
 func (segment *segment) parseSeriesCountLength(data []byte) (uint32, error) {
@@ -910,7 +925,7 @@ func (segment *segment) parseValuesBinary(data []byte) 
(*list.List, error) {
        return values, nil
 }
 
-func (logEntry *logEntry) GetSeriesID() common.GlobalSeriesID {
+func (logEntry *logEntry) GetSeriesID() []byte {
        return logEntry.seriesID
 }
 
@@ -923,15 +938,15 @@ func (logEntry *logEntry) GetValues() *list.List {
 }
 
 func (buffer *buffer) write(request logRequest) {
-       seriesID := request.seriesID
-       buffer.timestampMap[seriesID] = append(buffer.timestampMap[seriesID], 
request.timestamp)
+       key := newLogSeriesID(request.seriesID)
+       buffer.timestampMap[key] = append(buffer.timestampMap[key], 
request.timestamp)
 
        // Value item: binary-length(2-bytes) + binary data(n-bytes)
        binaryLen := uint16(len(request.data))
-       buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], 
byte(binaryLen), byte(binaryLen>>8))
-       buffer.valueMap[seriesID] = append(buffer.valueMap[seriesID], 
request.data...)
+       buffer.valueMap[key] = append(buffer.valueMap[key], byte(binaryLen), 
byte(binaryLen>>8))
+       buffer.valueMap[key] = append(buffer.valueMap[key], request.data...)
 
-       buffer.callbackMap[seriesID] = append(buffer.callbackMap[seriesID], 
request.callback)
+       buffer.callbackMap[key] = append(buffer.callbackMap[key], 
request.callback)
        buffer.count++
 }
 
@@ -946,9 +961,11 @@ func (buffer *buffer) notifyRequests(err error) {
                valuePos = 0
                for index, callback := range callbacks {
                        valuePos, valueItem = readValuesBinary(values, 
valuePos, valuesBinaryLength)
-                       buffer.runningCallback(func() {
-                               callback(seriesID, timestamps[index], 
valueItem, err)
-                       })
+                       if callback != nil {
+                               buffer.runningCallback(func() {
+                                       callback(seriesID.bytes(), 
timestamps[index], valueItem, err)
+                               })
+                       }
                }
        }
 }
@@ -973,7 +990,7 @@ func parseSegmentID(segmentName string) (uint64, error) {
        if !strings.HasSuffix(segmentName, segmentNameSuffix) {
                return 0, errors.New("Invalid segment name: " + segmentName)
        }
-       return strconv.ParseUint(segmentName[3:19], 10, 64)
+       return strconv.ParseUint(segmentName[3:19], 16, 64)
 }
 
 func readValuesBinary(raw []byte, position int, offsetLen int) (int, []byte) {
@@ -1051,10 +1068,6 @@ func bytesToUint16(buf []byte) uint16 {
        return binary.LittleEndian.Uint16(buf)
 }
 
-func bytesToUint64(buf []byte) uint64 {
-       return binary.LittleEndian.Uint64(buf)
-}
-
 func timeToUnixNano(time time.Time) uint64 {
        return uint64(time.UnixNano())
 }
diff --git a/pkg/wal/wal_benchmark_test.go b/pkg/wal/wal_benchmark_test.go
index d6eccc39..ba9570f2 100644
--- a/pkg/wal/wal_benchmark_test.go
+++ b/pkg/wal/wal_benchmark_test.go
@@ -27,7 +27,6 @@ import (
        "testing"
        "time"
 
-       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -41,11 +40,11 @@ var (
        seriesID100  = newSeriesIDList(100)
        seriesID500  = newSeriesIDList(500)
        seriesID1000 = newSeriesIDList(1000)
-       callback     = func(seriesID common.GlobalSeriesID, t time.Time, bytes 
[]byte, err error) {}
+       callback     = func(seriesID []byte, t time.Time, bytes []byte, err 
error) {}
 )
 
 func Benchmark_SeriesID_1(b *testing.B) {
-       wal := newWAL(nil)
+       wal := newWAL(&Options{SyncFlush: true})
        defer closeWAL(wal)
 
        seriesID := seriesID1
@@ -53,13 +52,13 @@ func Benchmark_SeriesID_1(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_20(b *testing.B) {
-       wal := newWAL(nil)
+       wal := newWAL(&Options{SyncFlush: true})
        defer closeWAL(wal)
 
        seriesID := seriesID20
@@ -67,13 +66,13 @@ func Benchmark_SeriesID_20(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_100(b *testing.B) {
-       wal := newWAL(nil)
+       wal := newWAL(&Options{SyncFlush: true})
        defer closeWAL(wal)
 
        seriesID := seriesID100
@@ -81,13 +80,13 @@ func Benchmark_SeriesID_100(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_500(b *testing.B) {
-       wal := newWAL(nil)
+       wal := newWAL(&Options{SyncFlush: true})
        defer closeWAL(wal)
 
        seriesID := seriesID500
@@ -95,13 +94,13 @@ func Benchmark_SeriesID_500(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000(b *testing.B) {
-       wal := newWAL(nil)
+       wal := newWAL(&Options{SyncFlush: true})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -109,13 +108,13 @@ func Benchmark_SeriesID_1000(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64})
+       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 64})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -123,13 +122,13 @@ func Benchmark_SeriesID_1000_Buffer_64K(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 128})
+       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 128})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -137,13 +136,13 @@ func Benchmark_SeriesID_1000_Buffer_128K(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 512})
+       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 512})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -151,13 +150,13 @@ func Benchmark_SeriesID_1000_Buffer_512K(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024})
+       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -165,13 +164,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2})
+       wal := newWAL(&Options{SyncFlush: true, BufferSize: 1024 * 1024 * 2})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -179,13 +178,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB(b *testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -193,13 +192,13 @@ func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush(b 
*testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 128, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 128, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -207,13 +206,13 @@ func Benchmark_SeriesID_1000_Buffer_128K_NoSyncFlush(b 
*testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 512, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 512, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -221,13 +220,13 @@ func Benchmark_SeriesID_1000_Buffer_512K_NoSyncFlush(b 
*testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 1024, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -235,13 +234,13 @@ func Benchmark_SeriesID_1000_Buffer_1MB_NoSyncFlush(b 
*testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b *testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 1024 * 2, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -249,13 +248,13 @@ func Benchmark_SeriesID_1000_Buffer_2MB_NoSyncFlush(b 
*testing.B) {
 
        b.ResetTimer()
        for i := 0; i < b.N; i++ {
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
data[i%dataLen].binary, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), data[i%dataLen].binary, callback)
        }
        b.StopTimer()
 }
 
 func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -281,7 +280,7 @@ func 
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B
        }()
        for i := 0; i < b.N; i++ {
                binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
binaryData, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
 
                logVolume += seriesIDVolume + timeVolume + len(binaryData)
                if logVolume >= rotateSize {
@@ -293,7 +292,7 @@ func 
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_16MB(b *testing.B
 }
 
 func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -319,7 +318,7 @@ func 
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B
        }()
        for i := 0; i < b.N; i++ {
                binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
binaryData, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
 
                logVolume += seriesIDVolume + timeVolume + len(binaryData)
                if logVolume >= rotateSize {
@@ -331,7 +330,7 @@ func 
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_32MB(b *testing.B
 }
 
 func Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b 
*testing.B) {
-       wal := newWAL(&Options{BufferSize: 1024 * 64, NoSync: true})
+       wal := newWAL(&Options{BufferSize: 1024 * 64, SyncFlush: false})
        defer closeWAL(wal)
 
        seriesID := seriesID1000
@@ -357,7 +356,7 @@ func 
Benchmark_SeriesID_1000_Buffer_64K_NoSyncFlush_And_Rotate_64MB(b *testing.B
        }()
        for i := 0; i < b.N; i++ {
                binaryData = data[i%dataLen].binary
-               wal.Write(seriesID[i%seriesIDLen], time.UnixMilli(baseTime+1), 
binaryData, callback)
+               wal.Write(seriesID[i%seriesIDLen].key, 
time.UnixMilli(baseTime+1), binaryData, callback)
 
                logVolume += seriesIDVolume + timeVolume + len(binaryData)
                if logVolume >= rotateSize {
@@ -398,13 +397,14 @@ func closeWAL(wal WAL) {
        }
 }
 
-func newSeriesIDList(series int) []common.GlobalSeriesID {
-       var seriesIDSet []common.GlobalSeriesID
+type SeriesID struct {
+       key []byte
+}
+
+func newSeriesIDList(series int) []SeriesID {
+       var seriesIDSet []SeriesID
        for i := 0; i < series; i++ {
-               seriesID := common.GlobalSeriesID{
-                       SeriesID: common.SeriesID(i),
-                       Name:     fmt.Sprintf("series-%d", i),
-               }
+               seriesID := SeriesID{key: []byte(fmt.Sprintf("series-%d", i))}
                seriesIDSet = append(seriesIDSet, seriesID)
        }
        return seriesIDSet
diff --git a/pkg/wal/wal_test.go b/pkg/wal/wal_test.go
index f21cda56..d807f970 100644
--- a/pkg/wal/wal_test.go
+++ b/pkg/wal/wal_test.go
@@ -31,7 +31,6 @@ import (
        "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
 
-       "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/wal"
 )
@@ -83,20 +82,17 @@ var _ = ginkgo.Describe("WAL", func() {
                        wg.Add(writeLogCount)
                        baseTime := time.Now()
                        for i := 0; i < seriesIDCount; i++ {
-                               seriesID := &common.GlobalSeriesID{
-                                       SeriesID: common.SeriesID(i),
-                                       Name:     fmt.Sprintf("series-%d", i),
-                               }
+                               seriesID := []byte(fmt.Sprintf("series-%d", i))
                                go func() {
                                        for j := 0; j < seriesIDElementCount; 
j++ {
                                                timestamp := 
time.UnixMilli(baseTime.UnixMilli() + int64(j))
                                                value := 
[]byte(fmt.Sprintf("value-%d", j))
-                                               callback := func(seriesID 
common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
+                                               callback := func(seriesID 
[]byte, t time.Time, bytes []byte, err error) {
                                                        
gomega.Expect(err).ToNot(gomega.HaveOccurred())
 
                                                        wg.Done()
                                                }
-                                               log.Write(*seriesID, timestamp, 
value, callback)
+                                               log.Write(seriesID, timestamp, 
value, callback)
                                        }
                                }()
                        }
@@ -114,13 +110,8 @@ var _ = ginkgo.Describe("WAL", func() {
                                entries := segment.GetLogEntries()
                                for _, entity := range entries {
                                        seriesID := entity.GetSeriesID()
-                                       seriesIDSequence := seriesID.SeriesID
-                                       expectSeriesID := common.GlobalSeriesID{
-                                               SeriesID: seriesIDSequence,
-                                               Name:     
fmt.Sprintf("series-%d", seriesIDSequence),
-                                       }
                                        // Check seriesID
-                                       gomega.Expect(expectSeriesID == 
seriesID).To(gomega.BeTrue())
+                                       gomega.Expect(seriesID != 
nil).To(gomega.BeTrue())
 
                                        timestamps := entity.GetTimestamps()
                                        values := entity.GetValues()
@@ -171,15 +162,12 @@ var _ = ginkgo.Describe("WAL", func() {
                        writeLogCount := 3
 
                        wg.Add(writeLogCount)
-                       expectSegments := 
make(map[wal.SegmentID]common.GlobalSeriesID)
+                       expectSegments := make(map[wal.SegmentID][]byte)
                        for i := 0; i < writeLogCount; i++ {
-                               seriesID := &common.GlobalSeriesID{
-                                       SeriesID: common.SeriesID(i),
-                                       Name:     fmt.Sprintf("series-%d", i),
-                               }
+                               seriesID := []byte(fmt.Sprintf("series-%d", i))
                                timestamp := time.Now()
                                value := []byte(fmt.Sprintf("value-%d", i))
-                               callback := func(seriesID 
common.GlobalSeriesID, t time.Time, bytes []byte, err error) {
+                               callback := func(seriesID []byte, t time.Time, 
bytes []byte, err error) {
                                        
gomega.Expect(err).ToNot(gomega.HaveOccurred())
 
                                        // Rotate
@@ -189,7 +177,7 @@ var _ = ginkgo.Describe("WAL", func() {
 
                                        wg.Done()
                                }
-                               log.Write(*seriesID, timestamp, value, callback)
+                               log.Write(seriesID, timestamp, value, callback)
                        }
                        wg.Wait()
 
@@ -205,7 +193,7 @@ var _ = ginkgo.Describe("WAL", func() {
                                gomega.Expect(err).ToNot(gomega.HaveOccurred())
                                entries := segment.GetLogEntries()
                                gomega.Expect(len(entries) == 
1).To(gomega.BeTrue())
-                               gomega.Expect(entries[0].GetSeriesID() == 
seriesID).To(gomega.BeTrue())
+                               
gomega.Expect(bytes.Equal(entries[0].GetSeriesID(), 
seriesID)).To(gomega.BeTrue())
                        }
                })
        })

Reply via email to