hailin0 commented on code in PR #261:
URL: 
https://github.com/apache/skywalking-banyandb/pull/261#discussion_r1249551940


##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+       entryCloser      *run.Closer
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentMap       map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+       writeWaitGroup   sync.WaitGroup
+       flushWaitGroup   sync.WaitGroup
+       workSegmentMutex sync.Mutex
+       segmentMapMutex  sync.RWMutex
+       closerOnce       sync.Once
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       walOptions := DefaultOptions
+       if options != nil {
+               fileSize := options.FileSize
+               if fileSize <= 0 {
+                       fileSize = DefaultOptions.FileSize
+               }
+               bufferSize := options.BufferSize
+               if bufferSize <= 0 {
+                       bufferSize = DefaultOptions.BufferSize
+               }
+               bufferBatchInterval := options.BufferBatchInterval
+               if bufferBatchInterval <= 0 {
+                       bufferBatchInterval = DefaultOptions.BufferBatchInterval
+               }
+               walOptions = &Options{
+                       FileSize:            fileSize,
+                       BufferSize:          bufferSize,
+                       BufferBatchInterval: bufferBatchInterval,
+               }
+       }
+
+       // Initial WAL path.
+       path, err := filepath.Abs(path)
+       if err != nil {
+               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+       }
+       if err := os.MkdirAll(path, os.ModePerm); err != nil {
+               return nil, err
+       }
+
+       log := &log{
+               path:             path,
+               options:          *walOptions,
+               logger:           logger.GetLogger(moduleName),
+               writeChannel:     make(chan logRequest),
+               flushChannel:     make(chan buffer, walOptions.FlushQueueSize),
+               bytesBuffer:      bytes.NewBuffer([]byte{}),
+               timestampsBuffer: bytes.NewBuffer([]byte{}),
+               entryCloser:      run.NewCloser(1),
+               buffer: buffer{
+                       timestampMap: make(map[common.SeriesIDV2][]time.Time),
+                       valueMap:     make(map[common.SeriesIDV2][]byte),
+                       callbackMap:  
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+                       count:        0,
+               },
+       }
+       if err := log.load(); err != nil {
+               return nil, err
+       }
+
+       log.writeWaitGroup.Add(1)
+       log.flushWaitGroup.Add(1)
+       log.start()
+
+       log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+       return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the 
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data 
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+       if !log.entryCloser.AddRunning() {
+               return
+       }
+       defer log.entryCloser.Done()
+
+       log.writeChannel <- logRequest{
+               seriesID:  seriesID,
+               timestamp: timestamp,
+               data:      data,
+               callback:  callback,
+       }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segment := log.segmentMap[segmentID]
+       return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segments := make([]Segment, 0)
+       for _, segment := range log.segmentMap {
+               segments = append(segments, segment)
+       }
+       return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed 
segment details.
+func (log *log) Rotate() (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       oldWorkSegment, err := log.swapWorkSegment()
+       if err != nil {
+               return nil, err
+       }
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Update segment information.
+       workSegment := log.workSegment
+       log.segmentMap[workSegment.segmentID] = workSegment
+       return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+       if !log.entryCloser.AddRunning() {
+               return errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Segment which will be deleted must be closed.
+       if segmentID == log.workSegment.segmentID {
+               return errors.New("Can not delete the segment which is working")
+       }
+
+       err := os.Remove(log.segmentMap[segmentID].path)
+       if err != nil {
+               return errors.Wrap(err, "Delete WAL segment error")
+       }
+       delete(log.segmentMap, segmentID)
+       return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+       log.closerOnce.Do(func() {
+               log.logger.Info().Msg("Closing WAL...")
+
+               log.entryCloser.Done()
+               log.entryCloser.CloseThenWait()
+
+               close(log.writeChannel)
+               log.writeWaitGroup.Wait()
+
+               close(log.flushChannel)
+               log.flushWaitGroup.Wait()
+
+               if err := log.flushBuffer(log.buffer); err != nil {
+                       log.logger.Err(err).Msg("Flushing buffer failed")
+               }
+               if err := log.workSegment.file.Close(); err != nil {
+                       log.logger.Err(err).Msg("Close work segment file error")
+               }
+               log.logger.Info().Msg("Closed WAL")
+       })
+       return nil
+}
+
+func (log *log) start() {
+       go func() {
+               log.logger.Info().Msg("Start batch task...")
+
+               defer log.writeWaitGroup.Done()
+
+               bufferVolume := 0
+               for {
+                       timer := time.NewTimer(log.options.BufferBatchInterval)
+                       select {
+                       case request, chOpen := <-log.writeChannel:
+                               if !chOpen {
+                                       log.logger.Info().Msg("Stop batch task 
when write-channel closed!")
+                                       return
+                               }
+
+                               log.buffer.write(request)
+                               if log.logger.Debug().Enabled() {
+                                       log.logger.Debug().Msg("Write request 
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+                               }
+
+                               bufferVolume += len(request.seriesID.Marshal()) 
+ timestampVolumeLength + len(request.data)
+                               if bufferVolume > log.options.BufferSize {
+                                       log.triggerFlushing()
+                                       bufferVolume = 0
+                               }
+                               continue
+                       case <-timer.C:
+                               if bufferVolume == 0 {
+                                       continue
+                               }
+                               log.triggerFlushing()
+                               bufferVolume = 0
+                               continue

Review Comment:
   fixed



##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+       entryCloser      *run.Closer
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentMap       map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+       writeWaitGroup   sync.WaitGroup
+       flushWaitGroup   sync.WaitGroup
+       workSegmentMutex sync.Mutex
+       segmentMapMutex  sync.RWMutex
+       closerOnce       sync.Once
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       walOptions := DefaultOptions
+       if options != nil {
+               fileSize := options.FileSize
+               if fileSize <= 0 {
+                       fileSize = DefaultOptions.FileSize
+               }
+               bufferSize := options.BufferSize
+               if bufferSize <= 0 {
+                       bufferSize = DefaultOptions.BufferSize
+               }
+               bufferBatchInterval := options.BufferBatchInterval
+               if bufferBatchInterval <= 0 {
+                       bufferBatchInterval = DefaultOptions.BufferBatchInterval
+               }
+               walOptions = &Options{
+                       FileSize:            fileSize,
+                       BufferSize:          bufferSize,
+                       BufferBatchInterval: bufferBatchInterval,
+               }
+       }
+
+       // Initial WAL path.
+       path, err := filepath.Abs(path)
+       if err != nil {
+               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+       }
+       if err := os.MkdirAll(path, os.ModePerm); err != nil {
+               return nil, err
+       }
+
+       log := &log{
+               path:             path,
+               options:          *walOptions,
+               logger:           logger.GetLogger(moduleName),
+               writeChannel:     make(chan logRequest),
+               flushChannel:     make(chan buffer, walOptions.FlushQueueSize),
+               bytesBuffer:      bytes.NewBuffer([]byte{}),
+               timestampsBuffer: bytes.NewBuffer([]byte{}),
+               entryCloser:      run.NewCloser(1),
+               buffer: buffer{
+                       timestampMap: make(map[common.SeriesIDV2][]time.Time),
+                       valueMap:     make(map[common.SeriesIDV2][]byte),
+                       callbackMap:  
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+                       count:        0,
+               },
+       }
+       if err := log.load(); err != nil {
+               return nil, err
+       }
+
+       log.writeWaitGroup.Add(1)
+       log.flushWaitGroup.Add(1)
+       log.start()
+
+       log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+       return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the 
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data 
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+       if !log.entryCloser.AddRunning() {
+               return
+       }
+       defer log.entryCloser.Done()
+
+       log.writeChannel <- logRequest{
+               seriesID:  seriesID,
+               timestamp: timestamp,
+               data:      data,
+               callback:  callback,
+       }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segment := log.segmentMap[segmentID]
+       return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segments := make([]Segment, 0)
+       for _, segment := range log.segmentMap {
+               segments = append(segments, segment)
+       }
+       return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed 
segment details.
+func (log *log) Rotate() (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       oldWorkSegment, err := log.swapWorkSegment()
+       if err != nil {
+               return nil, err
+       }
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Update segment information.
+       workSegment := log.workSegment
+       log.segmentMap[workSegment.segmentID] = workSegment
+       return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+       if !log.entryCloser.AddRunning() {
+               return errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Segment which will be deleted must be closed.
+       if segmentID == log.workSegment.segmentID {
+               return errors.New("Can not delete the segment which is working")
+       }
+
+       err := os.Remove(log.segmentMap[segmentID].path)
+       if err != nil {
+               return errors.Wrap(err, "Delete WAL segment error")
+       }
+       delete(log.segmentMap, segmentID)
+       return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+       log.closerOnce.Do(func() {
+               log.logger.Info().Msg("Closing WAL...")
+
+               log.entryCloser.Done()
+               log.entryCloser.CloseThenWait()
+
+               close(log.writeChannel)
+               log.writeWaitGroup.Wait()
+
+               close(log.flushChannel)
+               log.flushWaitGroup.Wait()
+
+               if err := log.flushBuffer(log.buffer); err != nil {
+                       log.logger.Err(err).Msg("Flushing buffer failed")
+               }
+               if err := log.workSegment.file.Close(); err != nil {
+                       log.logger.Err(err).Msg("Close work segment file error")
+               }
+               log.logger.Info().Msg("Closed WAL")
+       })
+       return nil
+}
+
+func (log *log) start() {
+       go func() {
+               log.logger.Info().Msg("Start batch task...")
+
+               defer log.writeWaitGroup.Done()
+
+               bufferVolume := 0
+               for {
+                       timer := time.NewTimer(log.options.BufferBatchInterval)
+                       select {
+                       case request, chOpen := <-log.writeChannel:
+                               if !chOpen {
+                                       log.logger.Info().Msg("Stop batch task 
when write-channel closed!")
+                                       return
+                               }
+
+                               log.buffer.write(request)
+                               if log.logger.Debug().Enabled() {
+                                       log.logger.Debug().Msg("Write request 
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+                               }
+
+                               bufferVolume += len(request.seriesID.Marshal()) 
+ timestampVolumeLength + len(request.data)
+                               if bufferVolume > log.options.BufferSize {
+                                       log.triggerFlushing()
+                                       bufferVolume = 0
+                               }
+                               continue
+                       case <-timer.C:
+                               if bufferVolume == 0 {
+                                       continue
+                               }
+                               log.triggerFlushing()
+                               bufferVolume = 0
+                               continue
+                       }
+               }
+       }()
+
+       go func() {
+               log.logger.Info().Msg("Start flush task...")
+
+               defer log.flushWaitGroup.Done()
+
+               for batch := range log.flushChannel {
+                       startTime := time.Now()
+
+                       var err error
+                       for i := 0; i < maxRetries; i++ {
+                               if err = log.flushBuffer(batch); err != nil {
+                                       log.logger.Err(err).Msg("Flushing 
buffer failed. Retrying...")
+                                       time.Sleep(100 * time.Millisecond)

Review Comment:
   fixed



##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+       entryCloser      *run.Closer
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentMap       map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+       writeWaitGroup   sync.WaitGroup
+       flushWaitGroup   sync.WaitGroup
+       workSegmentMutex sync.Mutex
+       segmentMapMutex  sync.RWMutex
+       closerOnce       sync.Once
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       walOptions := DefaultOptions
+       if options != nil {
+               fileSize := options.FileSize
+               if fileSize <= 0 {
+                       fileSize = DefaultOptions.FileSize
+               }
+               bufferSize := options.BufferSize
+               if bufferSize <= 0 {
+                       bufferSize = DefaultOptions.BufferSize
+               }
+               bufferBatchInterval := options.BufferBatchInterval
+               if bufferBatchInterval <= 0 {
+                       bufferBatchInterval = DefaultOptions.BufferBatchInterval
+               }
+               walOptions = &Options{
+                       FileSize:            fileSize,
+                       BufferSize:          bufferSize,
+                       BufferBatchInterval: bufferBatchInterval,
+               }
+       }
+
+       // Initial WAL path.
+       path, err := filepath.Abs(path)
+       if err != nil {
+               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+       }
+       if err := os.MkdirAll(path, os.ModePerm); err != nil {
+               return nil, err
+       }
+
+       log := &log{
+               path:             path,
+               options:          *walOptions,
+               logger:           logger.GetLogger(moduleName),
+               writeChannel:     make(chan logRequest),
+               flushChannel:     make(chan buffer, walOptions.FlushQueueSize),
+               bytesBuffer:      bytes.NewBuffer([]byte{}),
+               timestampsBuffer: bytes.NewBuffer([]byte{}),
+               entryCloser:      run.NewCloser(1),
+               buffer: buffer{
+                       timestampMap: make(map[common.SeriesIDV2][]time.Time),
+                       valueMap:     make(map[common.SeriesIDV2][]byte),
+                       callbackMap:  
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+                       count:        0,
+               },
+       }
+       if err := log.load(); err != nil {
+               return nil, err
+       }
+
+       log.writeWaitGroup.Add(1)
+       log.flushWaitGroup.Add(1)
+       log.start()
+
+       log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+       return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the 
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data 
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+       if !log.entryCloser.AddRunning() {
+               return
+       }
+       defer log.entryCloser.Done()
+
+       log.writeChannel <- logRequest{
+               seriesID:  seriesID,
+               timestamp: timestamp,
+               data:      data,
+               callback:  callback,
+       }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segment := log.segmentMap[segmentID]
+       return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.RLock()
+       defer log.segmentMapMutex.RUnlock()
+
+       segments := make([]Segment, 0)
+       for _, segment := range log.segmentMap {
+               segments = append(segments, segment)
+       }
+       return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed 
segment details.
+func (log *log) Rotate() (Segment, error) {
+       if !log.entryCloser.AddRunning() {
+               return nil, errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       oldWorkSegment, err := log.swapWorkSegment()
+       if err != nil {
+               return nil, err
+       }
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Update segment information.
+       workSegment := log.workSegment
+       log.segmentMap[workSegment.segmentID] = workSegment
+       return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+       if !log.entryCloser.AddRunning() {
+               return errors.New("WAL is closed")
+       }
+       defer log.entryCloser.Done()
+
+       log.segmentMapMutex.Lock()
+       defer log.segmentMapMutex.Unlock()
+
+       // Segment which will be deleted must be closed.
+       if segmentID == log.workSegment.segmentID {
+               return errors.New("Can not delete the segment which is working")
+       }
+
+       err := os.Remove(log.segmentMap[segmentID].path)
+       if err != nil {
+               return errors.Wrap(err, "Delete WAL segment error")
+       }
+       delete(log.segmentMap, segmentID)
+       return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+       log.closerOnce.Do(func() {
+               log.logger.Info().Msg("Closing WAL...")
+
+               log.entryCloser.Done()
+               log.entryCloser.CloseThenWait()
+
+               close(log.writeChannel)
+               log.writeWaitGroup.Wait()
+
+               close(log.flushChannel)
+               log.flushWaitGroup.Wait()
+
+               if err := log.flushBuffer(log.buffer); err != nil {
+                       log.logger.Err(err).Msg("Flushing buffer failed")
+               }
+               if err := log.workSegment.file.Close(); err != nil {
+                       log.logger.Err(err).Msg("Close work segment file error")
+               }
+               log.logger.Info().Msg("Closed WAL")
+       })
+       return nil
+}
+
+func (log *log) start() {
+       go func() {
+               log.logger.Info().Msg("Start batch task...")
+
+               defer log.writeWaitGroup.Done()
+
+               bufferVolume := 0
+               for {
+                       timer := time.NewTimer(log.options.BufferBatchInterval)
+                       select {
+                       case request, chOpen := <-log.writeChannel:
+                               if !chOpen {
+                                       log.logger.Info().Msg("Stop batch task 
when write-channel closed!")

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to