hanahmily commented on code in PR #261:
URL:
https://github.com/apache/skywalking-banyandb/pull/261#discussion_r1241182783
##########
pkg/wal/wal.go:
##########
@@ -42,19 +72,642 @@ type Segment interface {
type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
- // The returned function will be called when the entity is flushed on
the persistent storage.
- Write(seriesID common.SeriesID, timestamp time.Time, data []byte)
(func(), error)
+ // The callback function will be called when the entity is flushed on
the persistent storage.
+ Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte,
callback func(common.SeriesIDV2, time.Time, []byte, error))
// Read specified segment by SegmentID.
- Read(segmentID SegmentID) (*Segment, error)
+ Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
- ReadAllSegments() ([]*Segment, error)
+ ReadAllSegments() ([]Segment, error)
// Rotate closes the open segment and opens a new one, returning the
closed segment details.
- Rotate() (*Segment, error)
+ Rotate() (Segment, error)
// Delete the specified segment.
Delete(segmentID SegmentID) error
+ // Close all of segments and stop WAL work.
+ Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+ GetSegmentID() SegmentID
+ GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+ GetSeriesID() common.SeriesIDV2
+ GetTimestamps() []time.Time
+ GetValues() *list.List
+}
+
+// Log implements the WAL interface.
+type Log struct {
+ buffer buffer
+ logger *logger.Logger
+ bytesBuffer *bytes.Buffer
+ timestampsBuffer *bytes.Buffer
+ segmentIndexMap map[SegmentID]*segment
+ workSegment *segment
+ writeChannel chan logRequest
+ flushChannel chan buffer
+ path string
+ options Options
+}
+
+type segment struct {
+ file *os.File
+ path string
+ logEntries []LogEntry
+ segmentID SegmentID
+}
+
+type logRequest struct {
+ seriesID common.SeriesIDV2
+ timestamp time.Time
+ callback func(common.SeriesIDV2, time.Time, []byte, error)
+ data []byte
+}
+
+type logEntry struct {
+ timestamps []time.Time
+ values *list.List
+ seriesID common.SeriesIDV2
+ entryLength int64
+ count int32
+}
+
+type buffer struct {
+ timestampMap map[common.SeriesIDV2][]time.Time
+ valueMap map[common.SeriesIDV2][]byte
+ callbackMap map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time,
[]byte, error)
+ count int
}
// New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
- return nil, nil
+func New(path string, options *Options) (WAL, error) {
+ // Check configuration options.
+ if options == nil {
+ options = DefaultOptions
+ }
+ if options.FileSize <= 0 {
+ options.FileSize = DefaultOptions.FileSize
+ }
+ if options.BufferSize <= 0 {
+ options.BufferSize = DefaultOptions.BufferSize
+ }
+ if options.BufferBatchInterval <= 0 {
+ options.BufferBatchInterval = DefaultOptions.BufferBatchInterval
+ }
+
+ // Initial WAL path.
+ path, err := filepath.Abs(path)
+ if err != nil {
+ return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+ }
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ log := &Log{path: path, options: *options, logger:
logger.GetLogger(moduleName)}
+
+ if err := log.load(); err != nil {
+ return nil, err
+ }
+ log.startAsyncFlushTask()
+
+ log.logger.Info().Msgf("WAL initialized at %s", path)
+ return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the
persistent storage.
+func (log *Log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+ log.writeChannel <- logRequest{
+ seriesID: seriesID,
+ timestamp: timestamp,
+ data: data,
+ callback: callback,
+ }
+}
+
+// Read specified segment by SegmentID.
+func (log *Log) Read(segmentID SegmentID) (Segment, error) {
+ segment := log.segmentIndexMap[segmentID]
+ return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
+func (log *Log) ReadAllSegments() ([]Segment, error) {
+ segments := make([]Segment, 0)
+ for _, segment := range log.segmentIndexMap {
+ segments = append(segments, segment)
+ }
+ return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed
segment details.
+func (log *Log) Rotate() (Segment, error) {
+ if err := log.workSegment.file.Close(); err != nil {
+ return nil, errors.Wrap(err, "Close WAL segment error")
+ }
+ oldSegment := log.workSegment
+ // Create new segment.
+ segmentID := log.workSegment.segmentID + 1
+ segment := &segment{
+ segmentID: segmentID,
+ path: filepath.Join(log.path,
segmentName(uint64(segmentID))),
+ }
+ var err error
+ segment.file, err = os.OpenFile(segment.path,
os.O_CREATE|os.O_RDWR|os.O_TRUNC, os.ModePerm)
+ if err != nil {
+ return nil, errors.Wrap(err, "Open WAL segment error")
+ }
+
+ // Update segment information.
+ log.segmentIndexMap[segmentID] = segment
+ log.workSegment = segment
+ return oldSegment, nil
+}
+
+// Delete the specified segment.
+func (log *Log) Delete(segmentID SegmentID) error {
+ // Segment which will be deleted must be closed.
+ err := os.Remove(log.segmentIndexMap[segmentID].path)
+ if err != nil {
+ return errors.Wrap(err, "Delete WAL segment error")
+ }
+ delete(log.segmentIndexMap, segmentID)
+ return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *Log) Close() error {
+ log.logger.Info().Msg("Closing WAL...")
+ close(log.writeChannel)
+ close(log.flushChannel)
+ err := log.workSegment.file.Close()
+ if err != nil {
+ return errors.Wrap(err, "Close WAL error")
+ }
+ log.logger.Info().Msg("Closed WAL")
+ return nil
+}
+
+func (log *Log) startAsyncFlushTask() {
+ go func() {
+ log.logger.Info().Msg("Start batch task...")
+ bufferSize := 0
+ for {
+ timer := time.NewTimer(log.options.BufferBatchInterval)
+ select {
+ case request, ok := <-log.writeChannel:
+ if !ok {
+ log.logger.Info().Msg("Exit selector
when write-channel closed!")
+ return
+ }
+ log.buffer.write(request)
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Write request
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+ }
+
+ bufferSize += len(request.seriesID.Marshal()) +
timestampLength + len(request.data)
+ if bufferSize > log.options.BufferSize {
+ // Clone buffer,avoiding to block write.
+ buf := log.buffer
+ // Clear buffer to receive Log request.
+ log.newBuffer()
+ bufferSize = 0
+ // Send buffer to flushBuffer channel.
+ log.flushChannel <- buf
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Send
buffer to flush-channel. elements: " + strconv.Itoa(buf.count))
+ }
+ }
+ case <-timer.C:
+ if bufferSize == 0 {
+ continue
+ }
+ // Clone buffer,avoiding to block write.
+ buf := log.buffer
+ // Clear buffer to receive Log request.
+ log.newBuffer()
+ bufferSize = 0
+ // Send buffer to flushBuffer channel.
+ log.flushChannel <- buf
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Send buffer to
flush-channel. elements: " + strconv.Itoa(buf.count))
+ }
+ }
+ }
+ }()
+
+ go func() {
+ log.logger.Info().Msg("Start flush task...")
+
+ for batch := range log.flushChannel {
+ log.flushBuffer(batch)
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Flushed buffer to WAL
file. elements: " + strconv.Itoa(batch.count))
+ }
+ }
+
+ log.logger.Info().Msg("Exit flush task when flush-channel
closed!")
+ }()
+
+ log.logger.Info().Msg("Started WAL async flush task.")
+}
+
+func (log *Log) flushBuffer(buffer buffer) {
+ log.bytesBuffer.Reset()
+ // placeholder, preset batch length value is 0
+ batchLength := 0
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
int64(batchLength))
+
+ for seriesID, timestamps := range buffer.timestampMap {
+ // Generate seriesID binary
+ seriesIDBytes := seriesID.Marshal()
+ seriesIDBytesLen := len(seriesIDBytes)
+
+ // Generate timestamps compression binary
+ log.timestampsBuffer.Reset()
+ timestampWriter := encoding.NewWriter()
+ timestampEncoder := encoding.NewXOREncoder(timestampWriter)
+ timestampWriter.Reset(log.timestampsBuffer)
+ for _, timestamp := range timestamps {
+ timestampEncoder.Write(timeTouUnixNano(timestamp))
+ }
+ timestampWriter.Flush()
+ timestampsBytes := log.timestampsBuffer.Bytes()
+ timestampsBytesLen := len(timestampsBytes)
+
+ // Generate values compression binary
+ valuesBytes := snappy.Encode(nil, buffer.valueMap[seriesID])
+
+ // Write entry data
+ entryLength := seriesIDLength + seriesIDBytesLen +
seriesCountLength + timestampsBinaryLength + timestampsBytesLen +
len(valuesBytes)
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
int64(entryLength))
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
int16(seriesIDBytesLen))
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
seriesIDBytes)
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
int32(len(timestamps)))
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
int16(timestampsBytesLen))
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
timestampsBytes)
+ log.writeToBytesBuffer(log.bytesBuffer, binary.LittleEndian,
valuesBytes)
+ }
+ // Rewrite batch length
+ batchBytes := log.bytesBuffer.Bytes()
+ batchLength = len(batchBytes) - batchWriteLength
+ rewriteInt64InBuf(batchBytes, int64(batchLength), 0,
binary.LittleEndian)
+ log.bytesBuffer.Reset()
+
+ // Flush
+ _, err := log.workSegment.file.Write(batchBytes)
+ if err != nil {
+ log.logger.Error().Err(err).Msg("Write WAL segment file error,
file: " + log.workSegment.path)
+ buffer.notifyRequests(err)
+ return
+ }
+ err = log.workSegment.file.Sync()
+ if err != nil {
+ log.logger.Error().Err(err).Msg("Sync WAL segment file to disk
error, file: " + log.workSegment.path)
+ buffer.notifyRequests(err)
+ return
+ }
+ buffer.notifyRequests(nil)
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Flushed buffer to WAL. file: " +
log.workSegment.path +
+ ", elements: " + strconv.Itoa(buffer.count) +
+ ", bytes: " + strconv.Itoa(len(batchBytes)))
+ }
+}
+
+func (log *Log) newBuffer() {
+ log.buffer.timestampMap = make(map[common.SeriesIDV2][]time.Time)
+ log.buffer.valueMap = make(map[common.SeriesIDV2][]byte)
+ log.buffer.callbackMap =
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error))
+ log.buffer.count = 0
+}
+
+func (log *Log) load() error {
+ files, err := os.ReadDir(log.path)
+ if err != nil {
+ return errors.Wrap(err, "Can not read dir: "+log.path)
+ }
+ // Load all of WAL segments.
+ var workSegmentID SegmentID
+ log.segmentIndexMap = make(map[SegmentID]*segment)
+ for _, file := range files {
Review Comment:
During the booting process, it is necessary to remove all files except for
the `workSegment`.
To create a new `workSegment`, the user should rotate the WAL and delete the
previous one. In case the deletion fails due to a process crash, the `load`
function will take care of cleaning up all non-work segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]