This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 77b8caa6 Implement GC for SeriesDatabase of TSDB (#428) 77b8caa6 is described below commit 77b8caa659e765b3ca4e60f7f6f07038a3ec3a8b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Thu Apr 4 12:12:18 2024 +0800 Implement GC for SeriesDatabase of TSDB (#428) --- banyand/internal/storage/index.go | 215 +++++++++++++++++++++++++++++++-- banyand/internal/storage/index_test.go | 86 ++++++++++++- banyand/internal/storage/retention.go | 19 ++- banyand/internal/storage/segment.go | 20 +-- banyand/internal/storage/shard.go | 18 +-- banyand/internal/storage/storage.go | 10 ++ banyand/internal/storage/tsdb.go | 40 +++--- 7 files changed, 351 insertions(+), 57 deletions(-) diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index e85b2178..7bc91fea 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -19,7 +19,14 @@ package storage import ( "context" + "fmt" "path" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "time" "github.com/pkg/errors" "go.uber.org/multierr" @@ -31,28 +38,33 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/posting" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) func (d *database[T, O]) IndexDB() IndexDB { - return d.index + return d.indexController.hot } func (d *database[T, O]) Lookup(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { - return d.index.searchPrimary(ctx, series) + return d.indexController.searchPrimary(ctx, series) } type seriesIndex struct { - store index.SeriesStore - l *logger.Logger + startTime time.Time + store index.SeriesStore + l *logger.Logger + path string } -func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64) (*seriesIndex, error) { +func newSeriesIndex(ctx context.Context, path string, startTime time.Time, flushTimeoutSeconds int64) (*seriesIndex, error) { si := &seriesIndex{ - l: logger.Fetch(ctx, "series_index"), + path: path, + startTime: startTime, + l: logger.Fetch(ctx, "series_index"), } var err error if si.store, err = inverted.NewStore(inverted.StoreOpts{ - Path: path.Join(root, "idx"), + Path: path, Logger: si.l, BatchWaitSec: flushTimeoutSeconds, }); err != nil { @@ -224,3 +236,192 @@ func appendSeriesList(dest, src pbv1.SeriesList, filter posting.List) pbv1.Serie func (s *seriesIndex) Close() error { return s.store.Close() } + +type seriesIndexController[T TSTable, O any] struct { + clock timestamp.Clock + hot *seriesIndex + standby *seriesIndex + l *logger.Logger + location string + opts TSDBOpts[T, O] + standbyLiveTime time.Duration + sync.RWMutex +} + +func newSeriesIndexController[T TSTable, O any]( + ctx context.Context, + opts TSDBOpts[T, O], +) (*seriesIndexController[T, O], error) { + l := logger.Fetch(ctx, "seriesIndexController") + clock, ctx := timestamp.GetClock(ctx) + var standbyLiveTime time.Duration + switch opts.TTL.Unit { + case HOUR: + standbyLiveTime = time.Hour + case DAY: + standbyLiveTime = 24 * time.Hour + default: + } + sic := &seriesIndexController[T, O]{ + opts: opts, + clock: clock, + standbyLiveTime: standbyLiveTime, + location: filepath.Clean(opts.Location), + l: l, + } + idxName, err := sic.loadIdx() + if err != nil { + return nil, err + } + switch len(idxName) { + case 0: + if sic.hot, err = sic.newIdx(ctx); err != nil { + return nil, err + } + case 1: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + case 2: + if sic.hot, err = sic.openIdx(ctx, idxName[0]); err != nil { + return nil, err + } + if sic.standby, err = sic.openIdx(ctx, idxName[1]); err != nil { + return nil, err + } + default: + return nil, errors.New("unexpected series index count") + } + return sic, nil +} + +func (sic *seriesIndexController[T, O]) loadIdx() ([]string, error) { + idxName := make([]string, 0) + if err := walkDir( + sic.location, + "idx", + func(suffix string) error { + idxName = append(idxName, "idx-"+suffix) + return nil + }); err != nil { + return nil, err + } + sort.StringSlice(idxName).Sort() + if len(idxName) > 2 { + redundantIdx := idxName[:len(idxName)-2] + for i := range redundantIdx { + lfs.MustRMAll(filepath.Join(sic.location, redundantIdx[i])) + } + idxName = idxName[len(idxName)-2:] + } + return idxName, nil +} + +func (sic *seriesIndexController[T, O]) newIdx(ctx context.Context) (*seriesIndex, error) { + return sic.openIdx(ctx, fmt.Sprintf("idx-%016x", time.Now().UnixNano())) +} + +func (sic *seriesIndexController[T, O]) openIdx(ctx context.Context, name string) (*seriesIndex, error) { + p := path.Join(sic.location, name) + if ts, ok := strings.CutPrefix(name, "idx-"); ok { + t, err := strconv.ParseInt(ts, 16, 64) + if err != nil { + return nil, err + } + + return newSeriesIndex(ctx, p, sic.opts.TTL.Unit.standard(time.Unix(0, t)), sic.opts.SeriesIndexFlushTimeoutSeconds) + } + return nil, errors.New("unexpected series index name") +} + +func (sic *seriesIndexController[T, O]) run(deadline time.Time) (err error) { + var standby *seriesIndex + ctx := context.WithValue(context.Background(), logger.ContextKey, sic.l) + _, err = sic.loadIdx() + if err != nil { + sic.l.Warn().Err(err).Msg("fail to clear redundant series index") + } + if sic.hot.startTime.Before(deadline) { + sic.l.Info().Time("deadline", deadline).Msg("start to swap series index") + sic.Lock() + if sic.standby == nil { + sic.standby, err = sic.newIdx(ctx) + if err != nil { + sic.Unlock() + return err + } + } + standby = sic.hot + sic.hot = sic.standby + sic.standby = nil + sic.Unlock() + err = standby.Close() + if err != nil { + sic.l.Warn().Err(err).Msg("fail to close standby series index") + } + lfs.MustRMAll(standby.path) + sic.l.Info().Str("path", standby.path).Msg("dropped series index") + lfs.SyncPath(sic.location) + } + + liveTime := sic.hot.startTime.Sub(deadline) + if liveTime > 0 && liveTime < sic.standbyLiveTime { + sic.l.Info().Time("deadline", deadline).Msg("start to create standby series index") + standby, err = sic.newIdx(ctx) + if err != nil { + return err + } + sic.Lock() + sic.standby = standby + sic.Unlock() + } + return nil +} + +func (sic *seriesIndexController[T, O]) Write(docs index.Documents) error { + sic.RLock() + defer sic.RUnlock() + if sic.standby != nil { + return sic.standby.Write(docs) + } + return sic.hot.Write(docs) +} + +func (sic *seriesIndexController[T, O]) searchPrimary(ctx context.Context, series *pbv1.Series) (pbv1.SeriesList, error) { + sic.RLock() + defer sic.RUnlock() + + sl, err := sic.hot.searchPrimary(ctx, series) + if err != nil { + return nil, err + } + if len(sl) > 0 || sic.standby == nil { + return sl, nil + } + return sic.standby.searchPrimary(ctx, series) +} + +func (sic *seriesIndexController[T, O]) Search(ctx context.Context, series *pbv1.Series, + filter index.Filter, order *pbv1.OrderBy, preloadSize int, +) (pbv1.SeriesList, error) { + sic.RLock() + defer sic.RUnlock() + + sl, err := sic.hot.Search(ctx, series, filter, order, preloadSize) + if err != nil { + return nil, err + } + if len(sl) > 0 || sic.standby == nil { + return sl, nil + } + return sic.standby.Search(ctx, series, filter, order, preloadSize) +} + +func (sic *seriesIndexController[T, O]) Close() error { + sic.Lock() + defer sic.Unlock() + if sic.standby != nil { + return multierr.Combine(sic.hot.Close(), sic.standby.Close()) + } + return sic.hot.Close() +} diff --git a/banyand/internal/storage/index_test.go b/banyand/internal/storage/index_test.go index 4f26c67f..a7adc370 100644 --- a/banyand/internal/storage/index_test.go +++ b/banyand/internal/storage/index_test.go @@ -20,7 +20,10 @@ package storage import ( "context" "fmt" + "os" + "path" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -38,7 +41,7 @@ var testSeriesPool pbv1.SeriesPool func TestSeriesIndex_Primary(t *testing.T) { ctx := context.Background() path, fn := setUp(require.New(t)) - si, err := newSeriesIndex(ctx, path, 0) + si, err := newSeriesIndex(ctx, path, time.Now(), 0) require.NoError(t, err) defer func() { require.NoError(t, si.Close()) @@ -69,7 +72,7 @@ func TestSeriesIndex_Primary(t *testing.T) { require.NoError(t, si.Write(docs)) // Restart the index require.NoError(t, si.Close()) - si, err = newSeriesIndex(ctx, path, 0) + si, err = newSeriesIndex(ctx, path, time.Now(), 0) require.NoError(t, err) tests := []struct { name string @@ -140,3 +143,82 @@ func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { tempDir, deferFunc = test.Space(t) return tempDir, deferFunc } + +func TestSeriesIndexController(t *testing.T) { + ttl := IntervalRule{ + Unit: DAY, + Num: 3, + } + t.Run("Test setup", func(t *testing.T) { + ctx := context.Background() + tmpDir, dfFn, err := test.NewSpace() + require.NoError(t, err) + defer dfFn() + + opts := TSDBOpts[TSTable, any]{ + Location: tmpDir, + TTL: ttl, + } + + sic, err := newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames := make([]string, 0) + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 1, len(idxNames)) + require.NoError(t, sic.Close()) + sic, err = newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames = idxNames[:0] + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 1, len(idxNames)) + require.NoError(t, sic.Close()) + + require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-20000)), 0o755)) + require.NoError(t, os.MkdirAll(path.Join(tmpDir, fmt.Sprintf("idx-%016x", time.Now().UnixNano()-10000)), 0o755)) + sic, err = newSeriesIndexController(ctx, opts) + assert.NoError(t, err) + assert.NotNil(t, sic) + idxNames = idxNames[:0] + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 2, len(idxNames)) + require.NoError(t, sic.Close()) + }) + + t.Run("Test retention", func(t *testing.T) { + ctx := context.Background() + tmpDir, dfFn, err := test.NewSpace() + require.NoError(t, err) + defer dfFn() + + opts := TSDBOpts[TSTable, any]{ + Location: tmpDir, + TTL: ttl, + } + sic, err := newSeriesIndexController(ctx, opts) + require.NoError(t, err) + defer sic.Close() + require.NoError(t, sic.run(time.Now().Add(-time.Hour*23))) + assert.NotNil(t, sic.standby) + idxNames := make([]string, 0) + walkDir(tmpDir, "idx-", func(suffix string) error { + idxNames = append(idxNames, suffix) + return nil + }) + assert.Equal(t, 2, len(idxNames)) + nextTime := sic.standby.startTime + require.NoError(t, sic.run(time.Now().Add(time.Hour))) + assert.Nil(t, sic.standby) + assert.Equal(t, nextTime, sic.hot.startTime) + }) +} diff --git a/banyand/internal/storage/retention.go b/banyand/internal/storage/retention.go index ee5f66ea..445b7f56 100644 --- a/banyand/internal/storage/retention.go +++ b/banyand/internal/storage/retention.go @@ -26,13 +26,13 @@ import ( ) type retentionTask[T TSTable, O any] struct { - segment *segmentController[T, O] + database *database[T, O] expr string option cron.ParseOption duration time.Duration } -func newRetentionTask[T TSTable, O any](segment *segmentController[T, O], ttl IntervalRule) *retentionTask[T, O] { +func newRetentionTask[T TSTable, O any](database *database[T, O], ttl IntervalRule) *retentionTask[T, O] { var expr string switch ttl.Unit { case HOUR: @@ -43,7 +43,7 @@ func newRetentionTask[T TSTable, O any](segment *segmentController[T, O], ttl In expr = "5 0" } return &retentionTask[T, O]{ - segment: segment, + database: database, option: cron.Minute | cron.Hour, expr: expr, duration: ttl.estimatedDuration(), @@ -51,7 +51,18 @@ func newRetentionTask[T TSTable, O any](segment *segmentController[T, O], ttl In } func (rc *retentionTask[T, O]) run(now time.Time, l *logger.Logger) bool { - if err := rc.segment.remove(now.Add(-rc.duration)); err != nil { + var shardList []*shard[T, O] + rc.database.RLock() + shardList = append(shardList, rc.database.sLst...) + rc.database.RUnlock() + deadline := now.Add(-rc.duration) + + for _, shard := range shardList { + if err := shard.segmentController.remove(deadline); err != nil { + l.Error().Err(err) + } + } + if err := rc.database.indexController.run(deadline); err != nil { l.Error().Err(err) } return true diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 8622a3dd..22890ddc 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -51,7 +51,7 @@ type segment[T TSTable] struct { } func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, path, suffix string, - segmentSize IntervalRule, scheduler *timestamp.Scheduler, tsTable T, + segmentSize IntervalRule, scheduler *timestamp.Scheduler, tsTable T, p common.Position, ) (s *segment[T], err error) { suffixInteger, err := strconv.Atoi(suffix) if err != nil { @@ -71,7 +71,7 @@ func openSegment[T TSTable](ctx context.Context, startTime, endTime time.Time, p l := logger.Fetch(ctx, s.String()) s.l = l clock, _ := timestamp.GetClock(ctx) - s.Reporter = bucket.NewTimeBasedReporter(s.String(), timeRange, clock, scheduler) + s.Reporter = bucket.NewTimeBasedReporter(fmt.Sprintf("%s-%s", p.Shard, s.String()), timeRange, clock, scheduler) return s, nil } @@ -181,7 +181,7 @@ func (sc *segmentController[T, O]) segments() (ss []*segment[T]) { } func (sc *segmentController[T, O]) Current() (bucket.Reporter, error) { - now := sc.Standard(sc.clock.Now()) + now := sc.segmentSize.Unit.standard(sc.clock.Now()) ns := uint64(now.UnixNano()) if b := func() bucket.Reporter { sc.RLock() @@ -222,16 +222,6 @@ func (sc *segmentController[T, O]) OnMove(prev bucket.Reporter, next bucket.Repo event.Msg("move to the next segment") } -func (sc *segmentController[T, O]) Standard(t time.Time) time.Time { - switch sc.segmentSize.Unit { - case HOUR: - return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) - case DAY: - return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) - } - panic("invalid interval unit") -} - func (sc *segmentController[T, O]) Format(tm time.Time) string { switch sc.segmentSize.Unit { case HOUR: @@ -282,7 +272,7 @@ func (sc *segmentController[T, O]) open() error { func (sc *segmentController[T, O]) create(start time.Time) (*segment[T], error) { sc.Lock() defer sc.Unlock() - start = sc.Standard(start) + start = sc.segmentSize.Unit.standard(start) var next *segment[T] for _, s := range sc.lst { if s.Contains(uint64(start.UnixNano())) { @@ -332,7 +322,7 @@ func (sc *segmentController[T, O]) load(start, end time.Time, root string) (seg if tsTable, err = sc.tsTableCreator(lfs, segPath, p, sc.l, timestamp.NewSectionTimeRange(start, end), sc.option); err != nil { return nil, err } - seg, err = openSegment[T](context.WithValue(context.Background(), logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable) + seg, err = openSegment[T](context.WithValue(context.Background(), logger.ContextKey, sc.l), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable, p) if err != nil { return nil, err } diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index 3ea2f050..38145f38 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -27,14 +27,12 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" - "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type shard[T TSTable, O any] struct { l *logger.Logger segmentController *segmentController[T, O] segmentManageStrategy *bucket.Strategy - scheduler *timestamp.Scheduler position common.Position closeOnce sync.Once id common.ShardID @@ -50,16 +48,13 @@ func (d *database[T, O]) openShard(ctx context.Context, id common.ShardID) (*sha p.Shard = strconv.Itoa(int(id)) return p }) - clock, _ := timestamp.GetClock(shardCtx) - scheduler := timestamp.NewScheduler(l, clock) s := &shard[T, O]{ - id: id, - l: l, - scheduler: scheduler, - position: common.GetPosition(shardCtx), + id: id, + l: l, + position: common.GetPosition(shardCtx), segmentController: newSegmentController[T](shardCtx, location, - d.opts.SegmentInterval, l, scheduler, d.opts.TSTableCreator, d.opts.Option), + d.opts.SegmentInterval, l, d.scheduler, d.opts.TSTableCreator, d.opts.Option), } var err error if err = s.segmentController.open(); err != nil { @@ -69,16 +64,11 @@ func (d *database[T, O]) openShard(ctx context.Context, id common.ShardID) (*sha return nil, err } s.segmentManageStrategy.Run() - retentionTask := newRetentionTask(s.segmentController, d.opts.TTL) - if err := scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil { - return nil, err - } return s, nil } func (s *shard[T, O]) close() { s.closeOnce.Do(func() { - s.scheduler.Close() s.segmentManageStrategy.Close() s.segmentController.close() }) diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index bce8c8b7..79431224 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -114,6 +114,16 @@ func (iu IntervalUnit) String() string { panic("invalid interval unit") } +func (iu IntervalUnit) standard(t time.Time) time.Time { + switch iu { + case HOUR: + return time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) + case DAY: + return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) + } + panic("invalid interval unit") +} + // IntervalRule defines a length of two points in time. type IntervalRule struct { Unit IntervalUnit diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index c0fa1737..7479b5de 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -68,13 +68,14 @@ func generateSegID(unit IntervalUnit, suffix int) segmentID { } type database[T TSTable, O any] struct { - logger *logger.Logger - lock fs.File - index *seriesIndex - p common.Position - location string - sLst []*shard[T, O] - opts TSDBOpts[T, O] + logger *logger.Logger + lock fs.File + indexController *seriesIndexController[T, O] + scheduler *timestamp.Scheduler + p common.Position + location string + sLst []*shard[T, O] + opts TSDBOpts[T, O] sync.RWMutex sLen uint32 } @@ -82,6 +83,7 @@ type database[T TSTable, O any] struct { func (d *database[T, O]) Close() error { d.Lock() defer d.Unlock() + d.scheduler.Close() for _, s := range d.sLst { s.close() } @@ -89,7 +91,7 @@ func (d *database[T, O]) Close() error { if err := lfs.DeleteFile(d.lock.Path()); err != nil { logger.Panicf("cannot delete lock file %s: %s", d.lock.Path(), err) } - return d.index.Close() + return d.indexController.Close() } // OpenTSDB returns a new tsdb runtime. This constructor will create a new database if it's absent, @@ -104,16 +106,20 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ p := common.GetPosition(ctx) location := filepath.Clean(opts.Location) lfs.MkdirIfNotExist(location, dirPerm) - si, err := newSeriesIndex(ctx, location, opts.SeriesIndexFlushTimeoutSeconds) + sir, err := newSeriesIndexController(ctx, opts) if err != nil { - return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index failed").Error()) + return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "create series index controller failed").Error()) } + l := logger.Fetch(ctx, p.Database) + clock, _ := timestamp.GetClock(ctx) + scheduler := timestamp.NewScheduler(l, clock) db := &database[T, O]{ - location: location, - logger: logger.Fetch(ctx, p.Database), - index: si, - opts: opts, - p: p, + location: location, + scheduler: scheduler, + logger: l, + indexController: sir, + opts: opts, + p: p, } db.logger.Info().Str("path", opts.Location).Msg("initialized") lockPath := filepath.Join(opts.Location, lockFilename) @@ -125,6 +131,10 @@ func OpenTSDB[T TSTable, O any](ctx context.Context, opts TSDBOpts[T, O]) (TSDB[ if err = db.loadDatabase(); err != nil { return nil, errors.Wrap(errOpenDatabase, errors.WithMessage(err, "load database failed").Error()) } + retentionTask := newRetentionTask(db, opts.TTL) + if err = db.scheduler.Register("retention", retentionTask.option, retentionTask.expr, retentionTask.run); err != nil { + return nil, err + } return db, nil }