hanahmily commented on code in PR #261:
URL: 
https://github.com/apache/skywalking-banyandb/pull/261#discussion_r1241173838


##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentIndexMap  map[SegmentID]*segment

Review Comment:
   A data race occurs when multi goroutine accesses it. Add a `mutex` to 
protect it.



##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentIndexMap  map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       if options == nil {
+               options = DefaultOptions
+       }
+       if options.FileSize <= 0 {
+               options.FileSize = DefaultOptions.FileSize

Review Comment:
   Instead of modifying the parameter, kindly create a local `options`.



##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {

Review Comment:
   Convert it into an unexported `log`. As you have already defined an 
interface `WAL` to carry out necessary operations, there is no need to export 
this struct.



##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentIndexMap  map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       if options == nil {
+               options = DefaultOptions
+       }
+       if options.FileSize <= 0 {
+               options.FileSize = DefaultOptions.FileSize
+       }
+       if options.BufferSize <= 0 {
+               options.BufferSize = DefaultOptions.BufferSize
+       }
+       if options.BufferBatchInterval <= 0 {
+               options.BufferBatchInterval = DefaultOptions.BufferBatchInterval
+       }
+
+       // Initial WAL path.
+       path, err := filepath.Abs(path)
+       if err != nil {
+               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+       }
+       if err := os.MkdirAll(path, os.ModePerm); err != nil {
+               return nil, err
+       }
+
+       log := &Log{path: path, options: *options, logger: 
logger.GetLogger(moduleName)}
+
+       if err := log.load(); err != nil {
+               return nil, err
+       }
+       log.startAsyncFlushTask()
+
+       log.logger.Info().Msgf("WAL initialized at %s", path)
+       return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the 
persistent storage.
+func (log *Log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data 
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+       log.writeChannel <- logRequest{
+               seriesID:  seriesID,
+               timestamp: timestamp,
+               data:      data,
+               callback:  callback,
+       }
+}
+
+// Read specified segment by SegmentID.
+func (log *Log) Read(segmentID SegmentID) (Segment, error) {
+       segment := log.segmentIndexMap[segmentID]
+       return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
+func (log *Log) ReadAllSegments() ([]Segment, error) {
+       segments := make([]Segment, 0)
+       for _, segment := range log.segmentIndexMap {
+               segments = append(segments, segment)
+       }
+       return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed 
segment details.
+func (log *Log) Rotate() (Segment, error) {
+       if err := log.workSegment.file.Close(); err != nil {
+               return nil, errors.Wrap(err, "Close WAL segment error")
+       }
+       oldSegment := log.workSegment
+       // Create new segment.
+       segmentID := log.workSegment.segmentID + 1
+       segment := &segment{
+               segmentID: segmentID,
+               path:      filepath.Join(log.path, 
segmentName(uint64(segmentID))),
+       }
+       var err error
+       segment.file, err = os.OpenFile(segment.path, 
os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
+       if err != nil {
+               return nil, errors.Wrap(err, "Open WAL segment error")
+       }
+
+       // Update segment information.
+       log.segmentIndexMap[segmentID] = segment
+       log.workSegment = segment
+       return oldSegment, nil
+}
+
+// Delete the specified segment.
+func (log *Log) Delete(segmentID SegmentID) error {
+       // Segment which will be deleted must be closed.
+       err := os.Remove(log.segmentIndexMap[segmentID].path)
+       if err != nil {
+               return errors.Wrap(err, "Delete WAL segment error")
+       }
+       delete(log.segmentIndexMap, segmentID)
+       return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *Log) Close() error {
+       log.logger.Info().Msg("Closing WAL...")
+       close(log.writeChannel)
+       close(log.flushChannel)
+       err := log.workSegment.file.Close()
+       if err != nil {
+               return errors.Wrap(err, "Close WAL error")
+       }
+       log.logger.Info().Msg("Closed WAL")
+       return nil
+}
+
+func (log *Log) startAsyncFlushTask() {
+       go func() {
+               log.logger.Info().Msg("Start batch task...")
+               bufferSize := 0
+               for {
+                       timer := time.NewTimer(log.options.BufferBatchInterval)
+                       select {
+                       case request, ok := <-log.writeChannel:
+                               if !ok {
+                                       log.logger.Info().Msg("Exit selector 
when write-channel closed!")
+                                       return
+                               }
+                               log.buffer.write(request)
+                               if log.logger.Debug().Enabled() {
+                                       log.logger.Debug().Msg("Write request 
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+                               }
+
+                               bufferSize += len(request.seriesID.Marshal()) + 
timestampLength + len(request.data)
+                               if bufferSize > log.options.BufferSize {
+                                       // Clone buffer,avoiding to block write.
+                                       buf := log.buffer
+                                       // Clear buffer to receive Log request.
+                                       log.newBuffer()
+                                       bufferSize = 0
+                                       // Send buffer to flushBuffer channel.
+                                       log.flushChannel <- buf
+                                       if log.logger.Debug().Enabled() {
+                                               log.logger.Debug().Msg("Send 
buffer to flush-channel. elements: " + strconv.Itoa(buf.count))
+                                       }
+                               }
+                       case <-timer.C:
+                               if bufferSize == 0 {
+                                       continue
+                               }
+                               // Clone buffer,avoiding to block write.
+                               buf := log.buffer
+                               // Clear buffer to receive Log request.
+                               log.newBuffer()
+                               bufferSize = 0
+                               // Send buffer to flushBuffer channel.
+                               log.flushChannel <- buf
+                               if log.logger.Debug().Enabled() {
+                                       log.logger.Debug().Msg("Send buffer to 
flush-channel. elements: " + strconv.Itoa(buf.count))
+                               }
+                       }
+               }
+       }()
+
+       go func() {
+               log.logger.Info().Msg("Start flush task...")
+
+               for batch := range log.flushChannel {
+                       log.flushBuffer(batch)
+                       if log.logger.Debug().Enabled() {
+                               log.logger.Debug().Msg("Flushed buffer to WAL 
file. elements: " + strconv.Itoa(batch.count))
+                       }
+               }
+
+               log.logger.Info().Msg("Exit flush task when flush-channel 
closed!")
+       }()
+
+       log.logger.Info().Msg("Started WAL async flush task.")
+}
+
+func (log *Log) flushBuffer(buffer buffer) {
+       log.bytesBuffer.Reset()
+       // placeholder, preset batch length value is 0
+       batchLength := 0
+       log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
int64(batchLength))
+
+       for seriesID, timestamps := range buffer.timestampMap {
+               // Generate seriesID binary
+               seriesIDBytes := seriesID.Marshal()
+               seriesIDBytesLen := len(seriesIDBytes)
+
+               // Generate timestamps compression binary
+               log.timestampsBuffer.Reset()
+               timestampWriter := encoding.NewWriter()
+               timestampEncoder := encoding.NewXOREncoder(timestampWriter)
+               timestampWriter.Reset(log.timestampsBuffer)
+               for _, timestamp := range timestamps {
+                       timestampEncoder.Write(timeTouUnixNano(timestamp))
+               }
+               timestampWriter.Flush()
+               timestampsBytes := log.timestampsBuffer.Bytes()
+               timestampsBytesLen := len(timestampsBytes)
+
+               // Generate values compression binary
+               valuesBytes := snappy.Encode(nil, buffer.valueMap[seriesID])
+
+               // Write entry data
+               entryLength := seriesIDLength + seriesIDBytesLen + 
seriesCountLength + timestampsBinaryLength + timestampsBytesLen + 
len(valuesBytes)
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
int64(entryLength))
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
int16(seriesIDBytesLen))
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
seriesIDBytes)
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
int32(len(timestamps)))
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
int16(timestampsBytesLen))
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
timestampsBytes)
+               log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian, 
valuesBytes)
+       }
+       // Rewrite batch length
+       batchBytes := log.bytesBuffer.Bytes()
+       batchLength = len(batchBytes) - batchWriteLength
+       rewriteInt64InBuf(batchBytes, int64(batchLength), 0, 
binary.LittleEndian)
+       log.bytesBuffer.Reset()
+
+       // Flush
+       _, err := log.workSegment.file.Write(batchBytes)
+       if err != nil {
+               log.logger.Error().Err(err).Msg("Write WAL segment file error, 
file: " + log.workSegment.path)
+               buffer.notifyRequests(err)
+               return
+       }
+       err = log.workSegment.file.Sync()
+       if err != nil {
+               log.logger.Error().Err(err).Msg("Sync WAL segment file to disk 
error, file: " + log.workSegment.path)
+               buffer.notifyRequests(err)
+               return
+       }
+       buffer.notifyRequests(nil)
+       if log.logger.Debug().Enabled() {
+               log.logger.Debug().Msg("Flushed buffer to WAL. file: " + 
log.workSegment.path +
+                       ", elements: " + strconv.Itoa(buffer.count) +
+                       ", bytes: " + strconv.Itoa(len(batchBytes)))
+       }
+}
+
+func (log *Log) newBuffer() {
+       log.buffer.timestampMap = make(map[common.SeriesIDV2][]time.Time)
+       log.buffer.valueMap = make(map[common.SeriesIDV2][]byte)
+       log.buffer.callbackMap = 
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error))
+       log.buffer.count = 0
+}
+
+func (log *Log) load() error {
+       files, err := os.ReadDir(log.path)
+       if err != nil {
+               return errors.Wrap(err, "Can not read dir: "+log.path)
+       }
+       // Load all of WAL segments.
+       var workSegmentID SegmentID
+       log.segmentIndexMap = make(map[SegmentID]*segment)
+       for _, file := range files {

Review Comment:
   During the booting process, it is necessary to remove all files except for 
the `workSegment`. 
   
   To create a new `workSegment`, the user should rotate the WAL and delete the 
previous one. In case the deletion fails due to a process crash, the `load` 
function will take care of cleaning up all non-work segments.



##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentIndexMap  map[SegmentID]*segment
+       workSegment      *segment
+       writeChannel     chan logRequest
+       flushChannel     chan buffer
+       path             string
+       options          Options
+}
+
+type segment struct {
+       file       *os.File
+       path       string
+       logEntries []LogEntry
+       segmentID  SegmentID
+}
+
+type logRequest struct {
+       seriesID  common.SeriesIDV2
+       timestamp time.Time
+       callback  func(common.SeriesIDV2, time.Time, []byte, error)
+       data      []byte
+}
+
+type logEntry struct {
+       timestamps  []time.Time
+       values      *list.List
+       seriesID    common.SeriesIDV2
+       entryLength int64
+       count       int32
+}
+
+type buffer struct {
+       timestampMap map[common.SeriesIDV2][]time.Time
+       valueMap     map[common.SeriesIDV2][]byte
+       callbackMap  map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, 
[]byte, error)
+       count        int
 }
 
 // New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
-       return nil, nil
+func New(path string, options *Options) (WAL, error) {
+       //  Check configuration options.
+       if options == nil {
+               options = DefaultOptions
+       }
+       if options.FileSize <= 0 {
+               options.FileSize = DefaultOptions.FileSize
+       }
+       if options.BufferSize <= 0 {
+               options.BufferSize = DefaultOptions.BufferSize
+       }
+       if options.BufferBatchInterval <= 0 {
+               options.BufferBatchInterval = DefaultOptions.BufferBatchInterval
+       }
+
+       // Initial WAL path.
+       path, err := filepath.Abs(path)
+       if err != nil {
+               return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+       }
+       if err := os.MkdirAll(path, os.ModePerm); err != nil {
+               return nil, err
+       }
+
+       log := &Log{path: path, options: *options, logger: 
logger.GetLogger(moduleName)}
+
+       if err := log.load(); err != nil {
+               return nil, err
+       }
+       log.startAsyncFlushTask()
+
+       log.logger.Info().Msgf("WAL initialized at %s", path)

Review Comment:
   ```suggestion
        log.logger.Info().Str("path", path).Msg("WAL has be initialized")
   ```



##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
 type WAL interface {
        // Write a logging entity.
        // It will return immediately when the data is written in the buffer,
-       // The returned function will be called when the entity is flushed on 
the persistent storage.
-       Write(seriesID common.SeriesID, timestamp time.Time, data []byte) 
(func(), error)
+       // The callback function will be called when the entity is flushed on 
the persistent storage.
+       Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte, 
callback func(common.SeriesIDV2, time.Time, []byte, error))
        // Read specified segment by SegmentID.
-       Read(segmentID SegmentID) (*Segment, error)
+       Read(segmentID SegmentID) (Segment, error)
        // ReadAllSegments reads all segments sorted by their creation time in 
ascending order.
-       ReadAllSegments() ([]*Segment, error)
+       ReadAllSegments() ([]Segment, error)
        // Rotate closes the open segment and opens a new one, returning the 
closed segment details.
-       Rotate() (*Segment, error)
+       Rotate() (Segment, error)
        // Delete the specified segment.
        Delete(segmentID SegmentID) error
+       // Close all of segments and stop WAL work.
+       Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+       GetSegmentID() SegmentID
+       GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+       GetSeriesID() common.SeriesIDV2
+       GetTimestamps() []time.Time
+       GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+       buffer           buffer
+       logger           *logger.Logger
+       bytesBuffer      *bytes.Buffer
+       timestampsBuffer *bytes.Buffer
+       segmentIndexMap  map[SegmentID]*segment
+       workSegment      *segment

Review Comment:
   Please use a separate `mutex` to safeguard the field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to