This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 77b8caa6 Implement GC for SeriesDatabase of TSDB (#428)
77b8caa6 is described below

commit 77b8caa659e765b3ca4e60f7f6f07038a3ec3a8b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Thu Apr 4 12:12:18 2024 +0800

    Implement GC for SeriesDatabase of TSDB (#428)
---
 banyand/internal/storage/index.go      | 215 +++++++++++++++++++++++++++++++--
 banyand/internal/storage/index_test.go |  86 ++++++++++++-
 banyand/internal/storage/retention.go  |  19 ++-
 banyand/internal/storage/segment.go    |  20 +--
 banyand/internal/storage/shard.go      |  18 +--
 banyand/internal/storage/storage.go    |  10 ++
 banyand/internal/storage/tsdb.go       |  40 +++---
 7 files changed, 351 insertions(+), 57 deletions(-)

diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index e85b2178..7bc91fea 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -19,7 +19,14 @@ package storage
 
 import (
        "context"
+       "fmt"
        "path"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
+       "sync"
+       "time"
 
        "github.com/pkg/errors"
        "go.uber.org/multierr"
@@ -31,28 +38,33 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/posting"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 func (d *database[T, O]) IndexDB() IndexDB {
-       return d.index
+       return d.indexController.hot
 }
 
 func (d *database[T, O]) Lookup(ctx context.Context, series *pbv1.Series) 
(pbv1.SeriesList, error) {
-       return d.index.searchPrimary(ctx, series)
+       return d.indexController.searchPrimary(ctx, series)
 }
 
 type seriesIndex struct {
-       store index.SeriesStore
-       l     *logger.Logger
+       startTime time.Time
+       store     index.SeriesStore
+       l         *logger.Logger
+       path      string
 }
 
-func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds 
int64) (*seriesIndex, error) {
+func newSeriesIndex(ctx context.Context, path string, startTime time.Time, 
flushTimeoutSeconds int64) (*seriesIndex, error) {
        si := &seriesIndex{
-               l: logger.Fetch(ctx, "series_index"),
+               path:      path,
+               startTime: startTime,
+               l:         logger.Fetch(ctx, "series_index"),
        }
        var err error
        if si.store, err = inverted.NewStore(inverted.StoreOpts{
-               Path:         path.Join(root, "idx"),
+               Path:         path,
                Logger:       si.l,
                BatchWaitSec: flushTimeoutSeconds,
        }); err != nil {
@@ -224,3 +236,192 @@ func appendSeriesList(dest, src pbv1.SeriesList, filter 
posting.List) pbv1.Serie
 func (s *seriesIndex) Close() error {
        return s.store.Close()
 }
+
+type seriesIndexController[T TSTable, O any] struct {
+       clock           timestamp.Clock
+       hot             *seriesIndex
+       standby         *seriesIndex
+       l               *logger.Logger
+       location        string
+       opts            TSDBOpts[T, O]
+       standbyLiveTime time.Duration
+       sync.RWMutex
+}
+
+func newSeriesIndexController[T TSTable, O any](
+       ctx context.Context,
+       opts TSDBOpts[T, O],
+) (*seriesIndexController[T, O], error) {
+       l := logger.Fetch(ctx, "seriesIndexController")
+       clock, ctx := timestamp.GetClock(ctx)
+       var standbyLiveTime time.Duration
+       switch opts.TTL.Unit {
+       case HOUR:
+               standbyLiveTime = time.Hour
+       case DAY:
+               standbyLiveTime = 24 * time.Hour
+       default:
+       }
+       sic := &seriesIndexController[T, O]{
+               opts:            opts,
+               clock:           clock,
+               standbyLiveTime: standbyLiveTime,
+               location:        filepath.Clean(opts.Location),
+               l:               l,
+       }
+       idxName, err := sic.loadIdx()
+       if err != nil {
+               return nil, err
+       }
+       switch len(idxName) {
+       case 0:
+               if sic.hot, err = sic.newIdx(ctx); err != nil {
+                       return nil, err
+               }
+       case 1:
+               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+                       return nil, err
+               }
+       case 2:
+               if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil {
+                       return nil, err
+               }
+               if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil {
+                       return nil, err
+               }
+       default:
+               return nil, errors.New("unexpected series index count")
+       }
+       return sic, nil
+}
+
+func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) {
+       idxName := make([]string, 0)
+       if err := walkDir(
+               sic.location,
+               "idx",
+               func(suffix string) error {
+                       idxName = append(idxName, "idx-"+suffix)
+                       return nil
+               }); err != nil {
+               return nil, err
+       }
+       sort.StringSlice(idxName).Sort()
+       if len(idxName) > 2 {
+               redundantIdx := idxName[:len(idxName)-2]
+               for i := range redundantIdx {
+                       lfs.MustRMAll(filepath.Join(sic.location, 
redundantIdx[i]))
+               }
+               idxName = idxName[len(idxName)-2:]
+       }
+       return idxName, nil
+}
+
+func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context) 
(*seriesIndex, error) {
+       return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", time.Now().UnixNano()))
+}
+
+func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name 
string) (*seriesIndex, error) {
+       p := path.Join(sic.location, name)
+       if ts, ok := strings.CutPrefix(name, "idx-"); ok {
+               t, err := strconv.ParseInt(ts, 16, 64)
+               if err != nil {
+                       return nil, err
+               }
+
+               return newSeriesIndex(ctx, p, 
sic.opts.TTL.Unit.standard(time.Unix(0, t)), 
sic.opts.SeriesIndexFlushTimeoutSeconds)
+       }
+       return nil, errors.New("unexpected series index name")
+}
+
+func (sic *seriesIndexController[T, O]) run(deadline time.Time) (err error) {
+       var standby *seriesIndex
+       ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l)
+       _, err = sic.loadIdx()
+       if err != nil {
+               sic.l.Warn().Err(err).Msg("fail to clear redundant series 
index")
+       }
+       if sic.hot.startTime.Before(deadline) {
+               sic.l.Info().Time("deadline", deadline).Msg("start to swap 
series index")
+               sic.Lock()
+               if sic.standby == nil {
+                       sic.standby, err = sic.newIdx(ctx)
+                       if err != nil {
+                               sic.Unlock()
+                               return err
+                       }
+               }
+               standby = sic.hot
+               sic.hot = sic.standby
+               sic.standby = nil
+               sic.Unlock()
+               err = standby.Close()
+               if err != nil {
+                       sic.l.Warn().Err(err).Msg("fail to close standby series 
index")
+               }
+               lfs.MustRMAll(standby.path)
+               sic.l.Info().Str("path", standby.path).Msg("dropped series 
index")
+               lfs.SyncPath(sic.location)
+       }
+
+       liveTime := sic.hot.startTime.Sub(deadline)
+       if liveTime > 0 && liveTime < sic.standbyLiveTime {
+               sic.l.Info().Time("deadline", deadline).Msg("start to create 
standby series index")
+               standby, err = sic.newIdx(ctx)
+               if err != nil {
+                       return err
+               }
+               sic.Lock()
+               sic.standby = standby
+               sic.Unlock()
+       }
+       return nil
+}
+
+func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error {
+       sic.RLock()
+       defer sic.RUnlock()
+       if sic.standby != nil {
+               return sic.standby.Write(docs)
+       }
+       return sic.hot.Write(docs)
+}
+
+func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, 
series *pbv1.Series) (pbv1.SeriesList, error) {
+       sic.RLock()
+       defer sic.RUnlock()
+
+       sl, err := sic.hot.searchPrimary(ctx, series)
+       if err != nil {
+               return nil, err
+       }
+       if len(sl) > 0 || sic.standby == nil {
+               return sl, nil
+       }
+       return sic.standby.searchPrimary(ctx, series)
+}
+
+func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series 
*pbv1.Series,
+       filter index.Filter, order *pbv1.OrderBy, preloadSize int,
+) (pbv1.SeriesList, error) {
+       sic.RLock()
+       defer sic.RUnlock()
+
+       sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize)
+       if err != nil {
+               return nil, err
+       }
+       if len(sl) > 0 || sic.standby == nil {
+               return sl, nil
+       }
+       return sic.standby.Search(ctx, series, filter, order, preloadSize)
+}
+
+func (sic *seriesIndexController[T, O]) Close() error {
+       sic.Lock()
+       defer sic.Unlock()
+       if sic.standby != nil {
+               return multierr.Combine(sic.hot.Close(), sic.standby.Close())
+       }
+       return sic.hot.Close()
+}
diff --git a/banyand/internal/storage/index_test.go 
b/banyand/internal/storage/index_test.go
index 4f26c67f..a7adc370 100644
--- a/banyand/internal/storage/index_test.go
+++ b/banyand/internal/storage/index_test.go
@@ -20,7 +20,10 @@ package storage
 import (
        "context"
        "fmt"
+       "os"
+       "path"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -38,7 +41,7 @@ var testSeriesPool pbv1.SeriesPool
 func TestSeriesIndex_Primary(t *testing.T) {
        ctx := context.Background()
        path, fn := setUp(require.New(t))
-       si, err := newSeriesIndex(ctx, path, 0)
+       si, err := newSeriesIndex(ctx, path, time.Now(), 0)
        require.NoError(t, err)
        defer func() {
                require.NoError(t, si.Close())
@@ -69,7 +72,7 @@ func TestSeriesIndex_Primary(t *testing.T) {
        require.NoError(t, si.Write(docs))
        // Restart the index
        require.NoError(t, si.Close())
-       si, err = newSeriesIndex(ctx, path, 0)
+       si, err = newSeriesIndex(ctx, path, time.Now(), 0)
        require.NoError(t, err)
        tests := []struct {
                name         string
@@ -140,3 +143,82 @@ func setUp(t *require.Assertions) (tempDir string, 
deferFunc func()) {
        tempDir, deferFunc = test.Space(t)
        return tempDir, deferFunc
 }
+
+func TestSeriesIndexController(t *testing.T) {
+       ttl := IntervalRule{
+               Unit: DAY,
+               Num:  3,
+       }
+       t.Run("Test setup", func(t *testing.T) {
+               ctx := context.Background()
+               tmpDir, dfFn, err := test.NewSpace()
+               require.NoError(t, err)
+               defer dfFn()
+
+               opts := TSDBOpts[TSTable, any]{
+                       Location: tmpDir,
+                       TTL:      ttl,
+               }
+
+               sic, err := newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames := make([]string, 0)
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 1, len(idxNames))
+               require.NoError(t, sic.Close())
+               sic, err = newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames = idxNames[:0]
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 1, len(idxNames))
+               require.NoError(t, sic.Close())
+
+               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755))
+               require.NoError(t, os.MkdirAll(path.Join(tmpDir, 
fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755))
+               sic, err = newSeriesIndexController(ctx, opts)
+               assert.NoError(t, err)
+               assert.NotNil(t, sic)
+               idxNames = idxNames[:0]
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 2, len(idxNames))
+               require.NoError(t, sic.Close())
+       })
+
+       t.Run("Test retention", func(t *testing.T) {
+               ctx := context.Background()
+               tmpDir, dfFn, err := test.NewSpace()
+               require.NoError(t, err)
+               defer dfFn()
+
+               opts := TSDBOpts[TSTable, any]{
+                       Location: tmpDir,
+                       TTL:      ttl,
+               }
+               sic, err := newSeriesIndexController(ctx, opts)
+               require.NoError(t, err)
+               defer sic.Close()
+               require.NoError(t, sic.run(time.Now().Add(-time.Hour*23)))
+               assert.NotNil(t, sic.standby)
+               idxNames := make([]string, 0)
+               walkDir(tmpDir, "idx-", func(suffix string) error {
+                       idxNames = append(idxNames, suffix)
+                       return nil
+               })
+               assert.Equal(t, 2, len(idxNames))
+               nextTime := sic.standby.startTime
+               require.NoError(t, sic.run(time.Now().Add(time.Hour)))
+               assert.Nil(t, sic.standby)
+               assert.Equal(t, nextTime, sic.hot.startTime)
+       })
+}
diff --git a/banyand/internal/storage/retention.go 
b/banyand/internal/storage/retention.go
index ee5f66ea..445b7f56 100644
--- a/banyand/internal/storage/retention.go
+++ b/banyand/internal/storage/retention.go
@@ -26,13 +26,13 @@ import (
 )
 
 type retentionTask[T TSTable, O any] struct {
-       segment  *segmentController[T, O]
+       database *database[T, O]
        expr     string
        option   cron.ParseOption
        duration time.Duration
 }
 
-func newRetentionTask[T TSTable, O any](segment *segmentController[T, O], ttl 
IntervalRule) *retentionTask[T, O] {
+func newRetentionTask[T TSTable, O any](database *database[T, O], ttl 
IntervalRule) *retentionTask[T, O] {
        var expr string
        switch ttl.Unit {
        case HOUR:
@@ -43,7 +43,7 @@ func newRetentionTask[T TSTable, O any](segment 
*segmentController[T, O], ttl In
                expr = "5 0"
        }
        return &retentionTask[T, O]{
-               segment:  segment,
+               database: database,
                option:   cron.Minute | cron.Hour,
                expr:     expr,
                duration: ttl.estimatedDuration(),
@@ -51,7 +51,18 @@ func newRetentionTask[T TSTable, O any](segment 
*segmentController[T, O], ttl In
 }
 
 func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool {
-       if err := rc.segment.remove(now.Add(-rc.duration)); err != nil {
+       var shardList []*shard[T, O]
+       rc.database.RLock()
+       shardList = append(shardList, rc.database.sLst...)
+       rc.database.RUnlock()
+       deadline := now.Add(-rc.duration)
+
+       for _, shard := range shardList {
+               if err := shard.segmentController.remove(deadline); err != nil {
+                       l.Error().Err(err)
+               }
+       }
+       if err := rc.database.indexController.run(deadline); err != nil {
                l.Error().Err(err)
        }
        return true
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 8622a3dd..22890ddc 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -51,7 +51,7 @@ type segment[T TSTable] struct {
 }
 
 func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, 
path, suffix string,
-       segmentSize IntervalRule, scheduler *timestamp.Scheduler, tsTable T,
+       segmentSize IntervalRule, scheduler *timestamp.Scheduler, tsTable T, p 
common.Position,
 ) (s *segment[T], err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
@@ -71,7 +71,7 @@ func openSegment[T TSTable](ctx context.Context, startTime, 
endTime time.Time, p
        l := logger.Fetch(ctx, s.String())
        s.l = l
        clock, _ := timestamp.GetClock(ctx)
-       s.Reporter = bucket.NewTimeBasedReporter(s.String(), timeRange, clock, 
scheduler)
+       s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("%s-%s", p.Shard, 
s.String()), timeRange, clock, scheduler)
        return s, nil
 }
 
@@ -181,7 +181,7 @@ func (sc *segmentController[T, O]) segments() (ss 
[]*segment[T]) {
 }
 
 func (sc *segmentController[T, O]) Current() (bucket.Reporter, error) {
-       now := sc.Standard(sc.clock.Now())
+       now := sc.segmentSize.Unit.standard(sc.clock.Now())
        ns := uint64(now.UnixNano())
        if b := func() bucket.Reporter {
                sc.RLock()
@@ -222,16 +222,6 @@ func (sc *segmentController[T, O]) OnMove(prev 
bucket.Reporter, next bucket.Repo
        event.Msg("move to the next segment")
 }
 
-func (sc *segmentController[T, O]) Standard(t time.Time) time.Time {
-       switch sc.segmentSize.Unit {
-       case HOUR:
-               return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
-       case DAY:
-               return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
-       }
-       panic("invalid interval unit")
-}
-
 func (sc *segmentController[T, O]) Format(tm time.Time) string {
        switch sc.segmentSize.Unit {
        case HOUR:
@@ -282,7 +272,7 @@ func (sc *segmentController[T, O]) open() error {
 func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], 
error) {
        sc.Lock()
        defer sc.Unlock()
-       start = sc.Standard(start)
+       start = sc.segmentSize.Unit.standard(start)
        var next *segment[T]
        for _, s := range sc.lst {
                if s.Contains(uint64(start.UnixNano())) {
@@ -332,7 +322,7 @@ func (sc *segmentController[T, O]) load(start, end 
time.Time, root string) (seg
        if tsTable, err = sc.tsTableCreator(lfs, segPath, p, sc.l, 
timestamp.NewSectionTimeRange(start, end), sc.option); err != nil {
                return nil, err
        }
-       seg, err = openSegment[T](context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, 
sc.scheduler, tsTable)
+       seg, err = openSegment[T](context.WithValue(context.Background(), 
logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, 
sc.scheduler, tsTable, p)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index 3ea2f050..38145f38 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -27,14 +27,12 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type shard[T TSTable, O any] struct {
        l                     *logger.Logger
        segmentController     *segmentController[T, O]
        segmentManageStrategy *bucket.Strategy
-       scheduler             *timestamp.Scheduler
        position              common.Position
        closeOnce             sync.Once
        id                    common.ShardID
@@ -50,16 +48,13 @@ func (d *database[T, O]) openShard(ctx context.Context, id 
common.ShardID) (*sha
                p.Shard = strconv.Itoa(int(id))
                return p
        })
-       clock, _ := timestamp.GetClock(shardCtx)
 
-       scheduler := timestamp.NewScheduler(l, clock)
        s := &shard[T, O]{
-               id:        id,
-               l:         l,
-               scheduler: scheduler,
-               position:  common.GetPosition(shardCtx),
+               id:       id,
+               l:        l,
+               position: common.GetPosition(shardCtx),
                segmentController: newSegmentController[T](shardCtx, location,
-                       d.opts.SegmentInterval, l, scheduler, 
d.opts.TSTableCreator, d.opts.Option),
+                       d.opts.SegmentInterval, l, d.scheduler, 
d.opts.TSTableCreator, d.opts.Option),
        }
        var err error
        if err = s.segmentController.open(); err != nil {
@@ -69,16 +64,11 @@ func (d *database[T, O]) openShard(ctx context.Context, id 
common.ShardID) (*sha
                return nil, err
        }
        s.segmentManageStrategy.Run()
-       retentionTask := newRetentionTask(s.segmentController, d.opts.TTL)
-       if err := scheduler.Register("retention", retentionTask.option, 
retentionTask.expr, retentionTask.run); err != nil {
-               return nil, err
-       }
        return s, nil
 }
 
 func (s *shard[T, O]) close() {
        s.closeOnce.Do(func() {
-               s.scheduler.Close()
                s.segmentManageStrategy.Close()
                s.segmentController.close()
        })
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index bce8c8b7..79431224 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -114,6 +114,16 @@ func (iu IntervalUnit) String() string {
        panic("invalid interval unit")
 }
 
+func (iu IntervalUnit) standard(t time.Time) time.Time {
+       switch iu {
+       case HOUR:
+               return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 
0, t.Location())
+       case DAY:
+               return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, 
t.Location())
+       }
+       panic("invalid interval unit")
+}
+
 // IntervalRule defines a length of two points in time.
 type IntervalRule struct {
        Unit IntervalUnit
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index c0fa1737..7479b5de 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -68,13 +68,14 @@ func generateSegID(unit IntervalUnit, suffix int) segmentID 
{
 }
 
 type database[T TSTable, O any] struct {
-       logger   *logger.Logger
-       lock     fs.File
-       index    *seriesIndex
-       p        common.Position
-       location string
-       sLst     []*shard[T, O]
-       opts     TSDBOpts[T, O]
+       logger          *logger.Logger
+       lock            fs.File
+       indexController *seriesIndexController[T, O]
+       scheduler       *timestamp.Scheduler
+       p               common.Position
+       location        string
+       sLst            []*shard[T, O]
+       opts            TSDBOpts[T, O]
        sync.RWMutex
        sLen uint32
 }
@@ -82,6 +83,7 @@ type database[T TSTable, O any] struct {
 func (d *database[T, O]) Close() error {
        d.Lock()
        defer d.Unlock()
+       d.scheduler.Close()
        for _, s := range d.sLst {
                s.close()
        }
@@ -89,7 +91,7 @@ func (d *database[T, O]) Close() error {
        if err := lfs.DeleteFile(d.lock.Path()); err != nil {
                logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), 
err)
        }
-       return d.index.Close()
+       return d.indexController.Close()
 }
 
 // OpenTSDB returns a new tsdb runtime. This constructor will create a new 
database if it's absent,
@@ -104,16 +106,20 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
        p := common.GetPosition(ctx)
        location := filepath.Clean(opts.Location)
        lfs.MkdirIfNotExist(location, dirPerm)
-       si, err := newSeriesIndex(ctx, location, 
opts.SeriesIndexFlushTimeoutSeconds)
+       sir, err := newSeriesIndexController(ctx, opts)
        if err != nil {
-               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index failed").Error())
+               return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "create series index controller failed").Error())
        }
+       l := logger.Fetch(ctx, p.Database)
+       clock, _ := timestamp.GetClock(ctx)
+       scheduler := timestamp.NewScheduler(l, clock)
        db := &database[T, O]{
-               location: location,
-               logger:   logger.Fetch(ctx, p.Database),
-               index:    si,
-               opts:     opts,
-               p:        p,
+               location:        location,
+               scheduler:       scheduler,
+               logger:          l,
+               indexController: sir,
+               opts:            opts,
+               p:               p,
        }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        lockPath := filepath.Join(opts.Location, lockFilename)
@@ -125,6 +131,10 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts 
TSDBOpts[T, O]) (TSDB[
        if err = db.loadDatabase(); err != nil {
                return nil, errors.Wrap(errOpenDatabase, 
errors.WithMessage(err, "load database failed").Error())
        }
+       retentionTask := newRetentionTask(db, opts.TTL)
+       if err = db.scheduler.Register("retention", retentionTask.option, 
retentionTask.expr, retentionTask.run); err != nil {
+               return nil, err
+       }
        return db, nil
 }
 

Reply via email to