hailin0 commented on code in PR #261:
URL:
https://github.com/apache/skywalking-banyandb/pull/261#discussion_r1249551940
##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
- // The returned function will be called when the entity is flushed on
the persistent storage.
- Write(seriesID common.SeriesID, timestamp time.Time, data []byte)
(func(), error)
+ // The callback function will be called when the entity is flushed on
the persistent storage.
+ Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte,
callback func(common.SeriesIDV2, time.Time, []byte, error))
// Read specified segment by SegmentID.
- Read(segmentID SegmentID) (*Segment, error)
+ Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
- ReadAllSegments() ([]*Segment, error)
+ ReadAllSegments() ([]Segment, error)
// Rotate closes the open segment and opens a new one, returning the
closed segment details.
- Rotate() (*Segment, error)
+ Rotate() (Segment, error)
// Delete the specified segment.
Delete(segmentID SegmentID) error
+ // Close all of segments and stop WAL work.
+ Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+ GetSegmentID() SegmentID
+ GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+ GetSeriesID() common.SeriesIDV2
+ GetTimestamps() []time.Time
+ GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+ entryCloser *run.Closer
+ buffer buffer
+ logger *logger.Logger
+ bytesBuffer *bytes.Buffer
+ timestampsBuffer *bytes.Buffer
+ segmentMap map[SegmentID]*segment
+ workSegment *segment
+ writeChannel chan logRequest
+ flushChannel chan buffer
+ path string
+ options Options
+ writeWaitGroup sync.WaitGroup
+ flushWaitGroup sync.WaitGroup
+ workSegmentMutex sync.Mutex
+ segmentMapMutex sync.RWMutex
+ closerOnce sync.Once
+}
+
+type segment struct {
+ file *os.File
+ path string
+ logEntries []LogEntry
+ segmentID SegmentID
+}
+
+type logRequest struct {
+ seriesID common.SeriesIDV2
+ timestamp time.Time
+ callback func(common.SeriesIDV2, time.Time, []byte, error)
+ data []byte
+}
+
+type logEntry struct {
+ timestamps []time.Time
+ values *list.List
+ seriesID common.SeriesIDV2
+ entryLength int64
+ count int32
+}
+
+type buffer struct {
+ timestampMap map[common.SeriesIDV2][]time.Time
+ valueMap map[common.SeriesIDV2][]byte
+ callbackMap map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time,
[]byte, error)
+ count int
}
// New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
- return nil, nil
+func New(path string, options *Options) (WAL, error) {
+ // Check configuration options.
+ walOptions := DefaultOptions
+ if options != nil {
+ fileSize := options.FileSize
+ if fileSize <= 0 {
+ fileSize = DefaultOptions.FileSize
+ }
+ bufferSize := options.BufferSize
+ if bufferSize <= 0 {
+ bufferSize = DefaultOptions.BufferSize
+ }
+ bufferBatchInterval := options.BufferBatchInterval
+ if bufferBatchInterval <= 0 {
+ bufferBatchInterval = DefaultOptions.BufferBatchInterval
+ }
+ walOptions = &Options{
+ FileSize: fileSize,
+ BufferSize: bufferSize,
+ BufferBatchInterval: bufferBatchInterval,
+ }
+ }
+
+ // Initial WAL path.
+ path, err := filepath.Abs(path)
+ if err != nil {
+ return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+ }
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ log := &log{
+ path: path,
+ options: *walOptions,
+ logger: logger.GetLogger(moduleName),
+ writeChannel: make(chan logRequest),
+ flushChannel: make(chan buffer, walOptions.FlushQueueSize),
+ bytesBuffer: bytes.NewBuffer([]byte{}),
+ timestampsBuffer: bytes.NewBuffer([]byte{}),
+ entryCloser: run.NewCloser(1),
+ buffer: buffer{
+ timestampMap: make(map[common.SeriesIDV2][]time.Time),
+ valueMap: make(map[common.SeriesIDV2][]byte),
+ callbackMap:
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+ count: 0,
+ },
+ }
+ if err := log.load(); err != nil {
+ return nil, err
+ }
+
+ log.writeWaitGroup.Add(1)
+ log.flushWaitGroup.Add(1)
+ log.start()
+
+ log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+ return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+ if !log.entryCloser.AddRunning() {
+ return
+ }
+ defer log.entryCloser.Done()
+
+ log.writeChannel <- logRequest{
+ seriesID: seriesID,
+ timestamp: timestamp,
+ data: data,
+ callback: callback,
+ }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segment := log.segmentMap[segmentID]
+ return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segments := make([]Segment, 0)
+ for _, segment := range log.segmentMap {
+ segments = append(segments, segment)
+ }
+ return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed
segment details.
+func (log *log) Rotate() (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ oldWorkSegment, err := log.swapWorkSegment()
+ if err != nil {
+ return nil, err
+ }
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Update segment information.
+ workSegment := log.workSegment
+ log.segmentMap[workSegment.segmentID] = workSegment
+ return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+ if !log.entryCloser.AddRunning() {
+ return errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Segment which will be deleted must be closed.
+ if segmentID == log.workSegment.segmentID {
+ return errors.New("Can not delete the segment which is working")
+ }
+
+ err := os.Remove(log.segmentMap[segmentID].path)
+ if err != nil {
+ return errors.Wrap(err, "Delete WAL segment error")
+ }
+ delete(log.segmentMap, segmentID)
+ return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+ log.closerOnce.Do(func() {
+ log.logger.Info().Msg("Closing WAL...")
+
+ log.entryCloser.Done()
+ log.entryCloser.CloseThenWait()
+
+ close(log.writeChannel)
+ log.writeWaitGroup.Wait()
+
+ close(log.flushChannel)
+ log.flushWaitGroup.Wait()
+
+ if err := log.flushBuffer(log.buffer); err != nil {
+ log.logger.Err(err).Msg("Flushing buffer failed")
+ }
+ if err := log.workSegment.file.Close(); err != nil {
+ log.logger.Err(err).Msg("Close work segment file error")
+ }
+ log.logger.Info().Msg("Closed WAL")
+ })
+ return nil
+}
+
+func (log *log) start() {
+ go func() {
+ log.logger.Info().Msg("Start batch task...")
+
+ defer log.writeWaitGroup.Done()
+
+ bufferVolume := 0
+ for {
+ timer := time.NewTimer(log.options.BufferBatchInterval)
+ select {
+ case request, chOpen := <-log.writeChannel:
+ if !chOpen {
+ log.logger.Info().Msg("Stop batch task
when write-channel closed!")
+ return
+ }
+
+ log.buffer.write(request)
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Write request
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+ }
+
+ bufferVolume += len(request.seriesID.Marshal())
+ timestampVolumeLength + len(request.data)
+ if bufferVolume > log.options.BufferSize {
+ log.triggerFlushing()
+ bufferVolume = 0
+ }
+ continue
+ case <-timer.C:
+ if bufferVolume == 0 {
+ continue
+ }
+ log.triggerFlushing()
+ bufferVolume = 0
+ continue
Review Comment:
fixed
##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
- // The returned function will be called when the entity is flushed on
the persistent storage.
- Write(seriesID common.SeriesID, timestamp time.Time, data []byte)
(func(), error)
+ // The callback function will be called when the entity is flushed on
the persistent storage.
+ Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte,
callback func(common.SeriesIDV2, time.Time, []byte, error))
// Read specified segment by SegmentID.
- Read(segmentID SegmentID) (*Segment, error)
+ Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
- ReadAllSegments() ([]*Segment, error)
+ ReadAllSegments() ([]Segment, error)
// Rotate closes the open segment and opens a new one, returning the
closed segment details.
- Rotate() (*Segment, error)
+ Rotate() (Segment, error)
// Delete the specified segment.
Delete(segmentID SegmentID) error
+ // Close all of segments and stop WAL work.
+ Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+ GetSegmentID() SegmentID
+ GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+ GetSeriesID() common.SeriesIDV2
+ GetTimestamps() []time.Time
+ GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+ entryCloser *run.Closer
+ buffer buffer
+ logger *logger.Logger
+ bytesBuffer *bytes.Buffer
+ timestampsBuffer *bytes.Buffer
+ segmentMap map[SegmentID]*segment
+ workSegment *segment
+ writeChannel chan logRequest
+ flushChannel chan buffer
+ path string
+ options Options
+ writeWaitGroup sync.WaitGroup
+ flushWaitGroup sync.WaitGroup
+ workSegmentMutex sync.Mutex
+ segmentMapMutex sync.RWMutex
+ closerOnce sync.Once
+}
+
+type segment struct {
+ file *os.File
+ path string
+ logEntries []LogEntry
+ segmentID SegmentID
+}
+
+type logRequest struct {
+ seriesID common.SeriesIDV2
+ timestamp time.Time
+ callback func(common.SeriesIDV2, time.Time, []byte, error)
+ data []byte
+}
+
+type logEntry struct {
+ timestamps []time.Time
+ values *list.List
+ seriesID common.SeriesIDV2
+ entryLength int64
+ count int32
+}
+
+type buffer struct {
+ timestampMap map[common.SeriesIDV2][]time.Time
+ valueMap map[common.SeriesIDV2][]byte
+ callbackMap map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time,
[]byte, error)
+ count int
}
// New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
- return nil, nil
+func New(path string, options *Options) (WAL, error) {
+ // Check configuration options.
+ walOptions := DefaultOptions
+ if options != nil {
+ fileSize := options.FileSize
+ if fileSize <= 0 {
+ fileSize = DefaultOptions.FileSize
+ }
+ bufferSize := options.BufferSize
+ if bufferSize <= 0 {
+ bufferSize = DefaultOptions.BufferSize
+ }
+ bufferBatchInterval := options.BufferBatchInterval
+ if bufferBatchInterval <= 0 {
+ bufferBatchInterval = DefaultOptions.BufferBatchInterval
+ }
+ walOptions = &Options{
+ FileSize: fileSize,
+ BufferSize: bufferSize,
+ BufferBatchInterval: bufferBatchInterval,
+ }
+ }
+
+ // Initial WAL path.
+ path, err := filepath.Abs(path)
+ if err != nil {
+ return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+ }
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ log := &log{
+ path: path,
+ options: *walOptions,
+ logger: logger.GetLogger(moduleName),
+ writeChannel: make(chan logRequest),
+ flushChannel: make(chan buffer, walOptions.FlushQueueSize),
+ bytesBuffer: bytes.NewBuffer([]byte{}),
+ timestampsBuffer: bytes.NewBuffer([]byte{}),
+ entryCloser: run.NewCloser(1),
+ buffer: buffer{
+ timestampMap: make(map[common.SeriesIDV2][]time.Time),
+ valueMap: make(map[common.SeriesIDV2][]byte),
+ callbackMap:
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+ count: 0,
+ },
+ }
+ if err := log.load(); err != nil {
+ return nil, err
+ }
+
+ log.writeWaitGroup.Add(1)
+ log.flushWaitGroup.Add(1)
+ log.start()
+
+ log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+ return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+ if !log.entryCloser.AddRunning() {
+ return
+ }
+ defer log.entryCloser.Done()
+
+ log.writeChannel <- logRequest{
+ seriesID: seriesID,
+ timestamp: timestamp,
+ data: data,
+ callback: callback,
+ }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segment := log.segmentMap[segmentID]
+ return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segments := make([]Segment, 0)
+ for _, segment := range log.segmentMap {
+ segments = append(segments, segment)
+ }
+ return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed
segment details.
+func (log *log) Rotate() (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ oldWorkSegment, err := log.swapWorkSegment()
+ if err != nil {
+ return nil, err
+ }
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Update segment information.
+ workSegment := log.workSegment
+ log.segmentMap[workSegment.segmentID] = workSegment
+ return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+ if !log.entryCloser.AddRunning() {
+ return errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Segment which will be deleted must be closed.
+ if segmentID == log.workSegment.segmentID {
+ return errors.New("Can not delete the segment which is working")
+ }
+
+ err := os.Remove(log.segmentMap[segmentID].path)
+ if err != nil {
+ return errors.Wrap(err, "Delete WAL segment error")
+ }
+ delete(log.segmentMap, segmentID)
+ return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+ log.closerOnce.Do(func() {
+ log.logger.Info().Msg("Closing WAL...")
+
+ log.entryCloser.Done()
+ log.entryCloser.CloseThenWait()
+
+ close(log.writeChannel)
+ log.writeWaitGroup.Wait()
+
+ close(log.flushChannel)
+ log.flushWaitGroup.Wait()
+
+ if err := log.flushBuffer(log.buffer); err != nil {
+ log.logger.Err(err).Msg("Flushing buffer failed")
+ }
+ if err := log.workSegment.file.Close(); err != nil {
+ log.logger.Err(err).Msg("Close work segment file error")
+ }
+ log.logger.Info().Msg("Closed WAL")
+ })
+ return nil
+}
+
+func (log *log) start() {
+ go func() {
+ log.logger.Info().Msg("Start batch task...")
+
+ defer log.writeWaitGroup.Done()
+
+ bufferVolume := 0
+ for {
+ timer := time.NewTimer(log.options.BufferBatchInterval)
+ select {
+ case request, chOpen := <-log.writeChannel:
+ if !chOpen {
+ log.logger.Info().Msg("Stop batch task
when write-channel closed!")
+ return
+ }
+
+ log.buffer.write(request)
+ if log.logger.Debug().Enabled() {
+ log.logger.Debug().Msg("Write request
to buffer. elements: " + strconv.Itoa(log.buffer.count))
+ }
+
+ bufferVolume += len(request.seriesID.Marshal())
+ timestampVolumeLength + len(request.data)
+ if bufferVolume > log.options.BufferSize {
+ log.triggerFlushing()
+ bufferVolume = 0
+ }
+ continue
+ case <-timer.C:
+ if bufferVolume == 0 {
+ continue
+ }
+ log.triggerFlushing()
+ bufferVolume = 0
+ continue
+ }
+ }
+ }()
+
+ go func() {
+ log.logger.Info().Msg("Start flush task...")
+
+ defer log.flushWaitGroup.Done()
+
+ for batch := range log.flushChannel {
+ startTime := time.Now()
+
+ var err error
+ for i := 0; i < maxRetries; i++ {
+ if err = log.flushBuffer(batch); err != nil {
+ log.logger.Err(err).Msg("Flushing
buffer failed. Retrying...")
+ time.Sleep(100 * time.Millisecond)
Review Comment:
fixed
##########
pkg/wal/wal.go:
##########
@@ -42,19 +75,759 @@ type Segment interface {
type WAL interface {
// Write a logging entity.
// It will return immediately when the data is written in the buffer,
- // The returned function will be called when the entity is flushed on
the persistent storage.
- Write(seriesID common.SeriesID, timestamp time.Time, data []byte)
(func(), error)
+ // The callback function will be called when the entity is flushed on
the persistent storage.
+ Write(seriesID common.SeriesIDV2, timestamp time.Time, data []byte,
callback func(common.SeriesIDV2, time.Time, []byte, error))
// Read specified segment by SegmentID.
- Read(segmentID SegmentID) (*Segment, error)
+ Read(segmentID SegmentID) (Segment, error)
// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
- ReadAllSegments() ([]*Segment, error)
+ ReadAllSegments() ([]Segment, error)
// Rotate closes the open segment and opens a new one, returning the
closed segment details.
- Rotate() (*Segment, error)
+ Rotate() (Segment, error)
// Delete the specified segment.
Delete(segmentID SegmentID) error
+ // Close all of segments and stop WAL work.
+ Close() error
+}
+
+// SegmentID identities a segment in a WAL.
+type SegmentID uint64
+
+// Segment allows reading underlying segments that hold WAl entities.
+type Segment interface {
+ GetSegmentID() SegmentID
+ GetLogEntries() []LogEntry
+}
+
+// LogEntry used for attain detail value of WAL entry.
+type LogEntry interface {
+ GetSeriesID() common.SeriesIDV2
+ GetTimestamps() []time.Time
+ GetValues() *list.List
+}
+
+// log implements the WAL interface.
+type log struct {
+ entryCloser *run.Closer
+ buffer buffer
+ logger *logger.Logger
+ bytesBuffer *bytes.Buffer
+ timestampsBuffer *bytes.Buffer
+ segmentMap map[SegmentID]*segment
+ workSegment *segment
+ writeChannel chan logRequest
+ flushChannel chan buffer
+ path string
+ options Options
+ writeWaitGroup sync.WaitGroup
+ flushWaitGroup sync.WaitGroup
+ workSegmentMutex sync.Mutex
+ segmentMapMutex sync.RWMutex
+ closerOnce sync.Once
+}
+
+type segment struct {
+ file *os.File
+ path string
+ logEntries []LogEntry
+ segmentID SegmentID
+}
+
+type logRequest struct {
+ seriesID common.SeriesIDV2
+ timestamp time.Time
+ callback func(common.SeriesIDV2, time.Time, []byte, error)
+ data []byte
+}
+
+type logEntry struct {
+ timestamps []time.Time
+ values *list.List
+ seriesID common.SeriesIDV2
+ entryLength int64
+ count int32
+}
+
+type buffer struct {
+ timestampMap map[common.SeriesIDV2][]time.Time
+ valueMap map[common.SeriesIDV2][]byte
+ callbackMap map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time,
[]byte, error)
+ count int
}
// New creates a WAL instance in the specified path.
-func New(_ string, _ Options) (WAL, error) {
- return nil, nil
+func New(path string, options *Options) (WAL, error) {
+ // Check configuration options.
+ walOptions := DefaultOptions
+ if options != nil {
+ fileSize := options.FileSize
+ if fileSize <= 0 {
+ fileSize = DefaultOptions.FileSize
+ }
+ bufferSize := options.BufferSize
+ if bufferSize <= 0 {
+ bufferSize = DefaultOptions.BufferSize
+ }
+ bufferBatchInterval := options.BufferBatchInterval
+ if bufferBatchInterval <= 0 {
+ bufferBatchInterval = DefaultOptions.BufferBatchInterval
+ }
+ walOptions = &Options{
+ FileSize: fileSize,
+ BufferSize: bufferSize,
+ BufferBatchInterval: bufferBatchInterval,
+ }
+ }
+
+ // Initial WAL path.
+ path, err := filepath.Abs(path)
+ if err != nil {
+ return nil, errors.Wrap(err, "Can not get absolute path: "+path)
+ }
+ if err := os.MkdirAll(path, os.ModePerm); err != nil {
+ return nil, err
+ }
+
+ log := &log{
+ path: path,
+ options: *walOptions,
+ logger: logger.GetLogger(moduleName),
+ writeChannel: make(chan logRequest),
+ flushChannel: make(chan buffer, walOptions.FlushQueueSize),
+ bytesBuffer: bytes.NewBuffer([]byte{}),
+ timestampsBuffer: bytes.NewBuffer([]byte{}),
+ entryCloser: run.NewCloser(1),
+ buffer: buffer{
+ timestampMap: make(map[common.SeriesIDV2][]time.Time),
+ valueMap: make(map[common.SeriesIDV2][]byte),
+ callbackMap:
make(map[common.SeriesIDV2][]func(common.SeriesIDV2, time.Time, []byte, error)),
+ count: 0,
+ },
+ }
+ if err := log.load(); err != nil {
+ return nil, err
+ }
+
+ log.writeWaitGroup.Add(1)
+ log.flushWaitGroup.Add(1)
+ log.start()
+
+ log.logger.Info().Str("path", path).Msg("WAL has be initialized")
+ return log, nil
+}
+
+// Write a logging entity.
+// It will return immediately when the data is written in the buffer,
+// The callback function will be called when the entity is flushed on the
persistent storage.
+func (log *log) Write(seriesID common.SeriesIDV2, timestamp time.Time, data
[]byte, callback func(common.SeriesIDV2, time.Time, []byte, error)) {
+ if !log.entryCloser.AddRunning() {
+ return
+ }
+ defer log.entryCloser.Done()
+
+ log.writeChannel <- logRequest{
+ seriesID: seriesID,
+ timestamp: timestamp,
+ data: data,
+ callback: callback,
+ }
+}
+
+// Read specified segment by SegmentID.
+func (log *log) Read(segmentID SegmentID) (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segment := log.segmentMap[segmentID]
+ return segment, nil
+}
+
+// ReadAllSegments reads all segments sorted by their creation time in
ascending order.
+func (log *log) ReadAllSegments() ([]Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.RLock()
+ defer log.segmentMapMutex.RUnlock()
+
+ segments := make([]Segment, 0)
+ for _, segment := range log.segmentMap {
+ segments = append(segments, segment)
+ }
+ return segments, nil
+}
+
+// Rotate closes the open segment and opens a new one, returning the closed
segment details.
+func (log *log) Rotate() (Segment, error) {
+ if !log.entryCloser.AddRunning() {
+ return nil, errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ oldWorkSegment, err := log.swapWorkSegment()
+ if err != nil {
+ return nil, err
+ }
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Update segment information.
+ workSegment := log.workSegment
+ log.segmentMap[workSegment.segmentID] = workSegment
+ return oldWorkSegment, nil
+}
+
+// Delete the specified segment.
+func (log *log) Delete(segmentID SegmentID) error {
+ if !log.entryCloser.AddRunning() {
+ return errors.New("WAL is closed")
+ }
+ defer log.entryCloser.Done()
+
+ log.segmentMapMutex.Lock()
+ defer log.segmentMapMutex.Unlock()
+
+ // Segment which will be deleted must be closed.
+ if segmentID == log.workSegment.segmentID {
+ return errors.New("Can not delete the segment which is working")
+ }
+
+ err := os.Remove(log.segmentMap[segmentID].path)
+ if err != nil {
+ return errors.Wrap(err, "Delete WAL segment error")
+ }
+ delete(log.segmentMap, segmentID)
+ return nil
+}
+
+// Close all of segments and stop WAL work.
+func (log *log) Close() error {
+ log.closerOnce.Do(func() {
+ log.logger.Info().Msg("Closing WAL...")
+
+ log.entryCloser.Done()
+ log.entryCloser.CloseThenWait()
+
+ close(log.writeChannel)
+ log.writeWaitGroup.Wait()
+
+ close(log.flushChannel)
+ log.flushWaitGroup.Wait()
+
+ if err := log.flushBuffer(log.buffer); err != nil {
+ log.logger.Err(err).Msg("Flushing buffer failed")
+ }
+ if err := log.workSegment.file.Close(); err != nil {
+ log.logger.Err(err).Msg("Close work segment file error")
+ }
+ log.logger.Info().Msg("Closed WAL")
+ })
+ return nil
+}
+
+func (log *log) start() {
+ go func() {
+ log.logger.Info().Msg("Start batch task...")
+
+ defer log.writeWaitGroup.Done()
+
+ bufferVolume := 0
+ for {
+ timer := time.NewTimer(log.options.BufferBatchInterval)
+ select {
+ case request, chOpen := <-log.writeChannel:
+ if !chOpen {
+ log.logger.Info().Msg("Stop batch task
when write-channel closed!")
Review Comment:
fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]