This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch tsdb-retention in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 1c47f7a6961e012a0f52ebcceba0bf76c03c8a93 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Sep 28 04:54:25 2022 +0000 Introduce two key feats to tsdb * Add the retention controller to support the group ttl. * Allow to write cold data whose timestamp is not in the current block. Signed-off-by: Gao Hongtao <[email protected]> --- banyand/measure/measure_write.go | 4 +- banyand/measure/metadata.go | 3 + banyand/stream/metadata.go | 3 + banyand/stream/stream_write.go | 4 +- banyand/tsdb/block.go | 22 ++++ banyand/tsdb/bucket/queue.go | 20 ++++ banyand/tsdb/retention.go | 87 +++++++++++++++ banyand/tsdb/segment.go | 65 +++++++++-- banyand/tsdb/series.go | 24 +++++ banyand/tsdb/seriesdb.go | 31 ++++++ banyand/tsdb/shard.go | 88 +++++++++++---- banyand/tsdb/shard_test.go | 138 ++++++++++++++++++++++++ banyand/tsdb/tsdb.go | 21 ++-- banyand/tsdb/tsdb_test.go | 1 + go.mod | 2 + go.sum | 4 + test/integration/cold_query/query_suite_test.go | 92 ++++++++++++++++ 17 files changed, 572 insertions(+), 37 deletions(-) diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index 58d0bab..f2992d4 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -81,7 +81,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea if err != nil { return err } - wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0)) + wp, err := series.Create(t) if err != nil { if wp != nil { _ = wp.Close() @@ -189,7 +189,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil) if err != nil { - w.l.Debug().Err(err).Msg("fail to write entity") + w.l.Error().Err(err).Msg("fail to write entity") } return } diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go index fa4a545..7cd0d46 100644 --- a/banyand/measure/metadata.go +++ b/banyand/measure/metadata.go @@ -215,6 +215,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil { return nil, err } + if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil { + return nil, err + } return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "measure", diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go index 67f1f82..870a887 100644 --- a/banyand/stream/metadata.go +++ b/banyand/stream/metadata.go @@ -218,6 +218,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) { if opts.SegmentInterval, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil { return nil, err } + if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); err != nil { + return nil, err + } return tsdb.OpenDatabase( context.WithValue(context.Background(), common.PositionKey, common.Position{ Module: "stream", diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 1d172f5..0d685be 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -88,7 +88,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre return err } t := timestamp.MToN(tp) - wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0)) + wp, err := series.Create(t) if err != nil { if wp != nil { _ = wp.Close() @@ -183,7 +183,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp bus.Message) { } err := stm.write(common.ShardID(writeEvent.GetShardId()), writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil) if err != nil { - w.l.Debug().Err(err).Msg("fail to write entity") + w.l.Error().Err(err).Msg("fail to write entity") } return } diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 1dc1df0..5e8c699 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -21,6 +21,7 @@ import ( "context" "fmt" "io" + "os" "path" "runtime" "strconv" @@ -38,6 +39,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/lsm" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" + "github.com/pkg/errors" ) const ( @@ -55,6 +57,7 @@ type block struct { suffix string ref *atomic.Int32 closed *atomic.Bool + deleted *atomic.Bool lock sync.RWMutex position common.Position memSize int64 @@ -100,6 +103,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { flushCh: make(chan struct{}, 1), ref: &atomic.Int32{}, closed: &atomic.Bool{}, + deleted: &atomic.Bool{}, queue: opts.queue, } b.options(ctx) @@ -136,6 +140,9 @@ func (b *block) options(ctx context.Context) { } func (b *block) open() (err error) { + if b.deleted.Load() { + return nil + } if b.store, err = kv.OpenTimeSeriesStore( 0, path.Join(b.path, componentMain), @@ -181,6 +188,9 @@ func (b *block) open() (err error) { } func (b *block) delegate() (blockDelegate, error) { + if b.deleted.Load() { + return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is deleted", b.blockID) + } if b.incRef() { return &bDelegate{ delegate: b, @@ -272,6 +282,18 @@ func (b *block) stopThenClose() { b.close() } +func (b *block) delete() error { + if b.deleted.Load() { + return nil + } + b.deleted.Store(true) + if b.Reporter != nil { + b.Stop() + } + b.close() + return os.RemoveAll(b.path) +} + func (b *block) Closed() bool { return b.closed.Load() } diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go index af13d6b..2b3dced 100644 --- a/banyand/tsdb/bucket/queue.go +++ b/banyand/tsdb/bucket/queue.go @@ -27,6 +27,7 @@ type EvictFn func(id interface{}) type Queue interface { Push(id interface{}) + Remove(id interface{}) Len() int All() []interface{} } @@ -108,6 +109,25 @@ func (q *lruQueue) Push(id interface{}) { q.recent.Add(id, nil) } +func (q *lruQueue) Remove(id interface{}) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.frequent.Contains(id) { + q.frequent.Remove(id) + return + } + + if q.recent.Contains(id) { + q.recent.Remove(id) + return + } + + if q.recentEvict.Contains(id) { + q.recentEvict.Remove(id) + } +} + func (q *lruQueue) Len() int { q.lock.RLock() defer q.lock.RUnlock() diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go new file mode 100644 index 0000000..c876edf --- /dev/null +++ b/banyand/tsdb/retention.go @@ -0,0 +1,87 @@ +package tsdb + +import ( + "sync" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/robfig/cron/v3" +) + +type retentionController struct { + segment *segmentController + scheduler cron.Schedule + stopped bool + stopMux sync.Mutex + stopCh chan struct{} + duration time.Duration + l *logger.Logger +} + +func newRetentionController(segment *segmentController, ttl IntervalRule) (*retentionController, error) { + var expr string + switch ttl.Unit { + case HOUR: + // Every hour on the 5th minute + expr = "5 *" + case DAY: + // Every day on 00:05 + expr = "5 0" + + } + parser := cron.NewParser(cron.Minute | cron.Hour) + scheduler, err := parser.Parse(expr) + if err != nil { + return nil, err + } + return &retentionController{ + segment: segment, + scheduler: scheduler, + stopCh: make(chan struct{}), + l: segment.l.Named("retention-controller"), + duration: ttl.EstimatedDuration(), + }, nil +} + +func (rc *retentionController) start() { + rc.stopMux.Lock() + if rc.stopped { + return + } + rc.stopMux.Unlock() + go rc.run() +} + +func (rc *retentionController) run() { + rc.l.Info().Msg("start") + now := rc.segment.clock.Now() + for { + next := rc.scheduler.Next(now) + timer := rc.segment.clock.Timer(next.Sub(now)) + for { + select { + case now = <-timer.C: + rc.l.Info().Time("now", now).Msg("wake") + if err := rc.segment.remove(now.Add(-rc.duration)); err != nil { + rc.l.Error().Err(err) + } + case <-rc.stopCh: + timer.Stop() + rc.l.Info().Msg("stop") + return + } + break + } + } + +} + +func (rc *retentionController) stop() { + rc.stopMux.Lock() + defer rc.stopMux.Unlock() + if rc.stopped { + return + } + rc.stopped = true + close(rc.stopCh) +} diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index be2cb7a..bbdad22 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -21,6 +21,8 @@ import ( "context" "errors" "fmt" + "os" + "sort" "strconv" "sync" "time" @@ -31,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" + "go.uber.org/multierr" ) var ErrEndOfSegment = errors.New("reached the end of the segment") @@ -69,7 +72,8 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, TimeRange: timeRange, Reporter: bucket.NewTimeBasedReporter(timeRange, clock), } - err = s.blockController.open() + isHead := s.End.After(clock.Now()) + err = s.blockController.open(isHead) if err != nil { return nil, err } @@ -96,7 +100,9 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, } } } - + if !isHead { + return + } s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l)) if err != nil { return nil, err @@ -110,7 +116,9 @@ func (s *segment) close() { if s.globalIndex != nil { s.globalIndex.Close() } - s.blockManageStrategy.Close() + if s.blockManageStrategy != nil { + s.blockManageStrategy.Close() + } if s.Reporter != nil { s.Stop() } @@ -120,6 +128,11 @@ func (s *segment) closeBlock(id uint16) { s.blockController.closeBlock(id) } +func (s *segment) delete() error { + s.close() + return os.RemoveAll(s.path) +} + func (s segment) String() string { return fmt.Sprintf("SegID-%d", s.id) } @@ -313,17 +326,22 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) { panic("invalid interval unit") } -func (bc *blockController) open() error { +func (bc *blockController) open(createIfEmpty bool) error { err := WalkDir( bc.location, segPathPrefix, func(suffix, absolutePath string) error { + bc.Lock() + defer bc.Unlock() _, err := bc.load(suffix, absolutePath) return err }) if err != nil { return err } + if !createIfEmpty { + return nil + } if bc.Current() == nil { _, err := bc.create(bc.clock.Now()) if err != nil { @@ -340,6 +358,8 @@ func (bc *blockController) create(startTime time.Time) (*block, error) { if !startTime.Before(bc.segTimeRange.End) { return nil, ErrEndOfSegment } + bc.Lock() + defer bc.Unlock() suffix := bc.Format(startTime) segPath, err := mkdir(blockTemplate, bc.location, suffix) if err != nil { @@ -368,14 +388,47 @@ func (bc *blockController) load(suffix, path string) (b *block, err error) { }); err != nil { return nil, err } - bc.Lock() - defer bc.Unlock() bc.lst = append(bc.lst, b) + bc.sortLst() return b, nil } +func (bc *blockController) sortLst() { + sort.Slice(bc.lst, func(i, j int) bool { + return bc.lst[i].blockID < bc.lst[j].blockID + }) +} + func (bc *blockController) close() { for _, s := range bc.lst { s.stopThenClose() } } + +func (bc *blockController) remove(deadline time.Time) (err error) { + for _, b := range bc.blocks() { + if b.End.Before(deadline) { + bc.Lock() + if errDel := b.delete(); errDel != nil { + err = multierr.Append(err, errDel) + } else { + b.queue.Remove(BlockID{ + BlockID: b.blockID, + SegID: b.segID, + }) + bc.removeBlock(b.blockID) + } + bc.Unlock() + } + } + return err +} + +func (bc *blockController) removeBlock(blockID uint16) { + for i, b := range bc.lst { + if b.blockID == blockID { + bc.lst = append(bc.lst[:i], bc.lst[i+1:]...) + break + } + } +} diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index 1696616..8aa98d0 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -76,6 +76,7 @@ func (i *GlobalItemID) UnMarshal(data []byte) error { type Series interface { ID() common.SeriesID Span(timeRange timestamp.TimeRange) (SeriesSpan, error) + Create(t time.Time) (SeriesSpan, error) Get(id GlobalItemID) (Item, io.Closer, error) } @@ -127,6 +128,29 @@ func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) { return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil } +func (s *series) Create(t time.Time) (SeriesSpan, error) { + tr := timestamp.NewInclusiveTimeRange(t, t) + blocks, err := s.blockDB.span(tr) + if err != nil { + return nil, err + } + if len(blocks) > 0 { + s.l.Debug(). + Time("time", t). + Msg("load a series span") + return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil + } + b, err := s.blockDB.create(tr) + if err != nil { + return nil, err + } + blocks = append(blocks, b) + s.l.Debug(). + Time("time", t). + Msg("create a series span") + return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil +} + func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) *series { s := &series{ id: id, diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 720968a..bd61e44 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -148,6 +148,7 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID span(timeRange timestamp.TimeRange) ([]blockDelegate, error) + create(timeRange timestamp.TimeRange) (blockDelegate, error) block(id GlobalItemID) (blockDelegate, error) } @@ -265,6 +266,36 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]blockDelegate, error) return result, nil } +func (s *seriesDB) create(timeRange timestamp.TimeRange) (blockDelegate, error) { + s.Lock() + defer s.Unlock() + ss := s.segCtrl.span(timeRange) + if len(ss) > 0 { + s := ss[0] + dd, err := s.blockController.span(timeRange) + if err != nil { + return nil, err + } + if len(dd) > 0 { + return dd[0], nil + } + block, err := s.blockController.create(timeRange.Start) + if err != nil { + return nil, err + } + return block.delegate() + } + seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false) + if err != nil { + return nil, err + } + block, err := seg.blockController.create(timeRange.Start) + if err != nil { + return nil, err + } + return block.delegate() +} + func (s *seriesDB) context() context.Context { return context.WithValue(context.Background(), logger.ContextKey, s.l) } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 831dde3..4f3c168 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -25,6 +25,7 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" @@ -48,11 +49,12 @@ type shard struct { indexDatabase IndexDatabase segmentController *segmentController segmentManageStrategy *bucket.Strategy + retentionController *retentionController stopCh chan struct{} } func OpenShard(ctx context.Context, id common.ShardID, - root string, segmentSize, blockSize IntervalRule, openedBlockSize int, + root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize int, ) (Shard, error) { path, err := mkdir(shardTemplate, root, int(id)) if err != nil { @@ -106,6 +108,10 @@ func OpenShard(ctx context.Context, id common.ShardID, s.position = position.(common.Position) } s.runStat() + if s.retentionController, err = newRetentionController(s.segmentController, ttl); err != nil { + return nil, err + } + s.retentionController.start() return s, nil } @@ -124,7 +130,9 @@ func (s *shard) Index() IndexDatabase { func (s *shard) State() (shardState ShardState) { shardState.StrategyManagers = append(shardState.StrategyManagers, s.segmentManageStrategy.String()) for _, seg := range s.segmentController.segments() { - shardState.StrategyManagers = append(shardState.StrategyManagers, seg.blockManageStrategy.String()) + if seg.blockManageStrategy != nil { + shardState.StrategyManagers = append(shardState.StrategyManagers, seg.blockManageStrategy.String()) + } for _, b := range seg.blockController.blocks() { shardState.Blocks = append(shardState.Blocks, BlockState{ ID: BlockID{ @@ -149,11 +157,12 @@ func (s *shard) State() (shardState ShardState) { } return x.SegID < y.SegID }) - s.l.Info().Interface("", shardState).Msg("state") + s.l.Debug().Interface("", shardState).Msg("state") return shardState } func (s *shard) Close() error { + s.retentionController.stop() s.segmentManageStrategy.Close() s.segmentController.close() err := s.seriesDatabase.Close() @@ -193,6 +202,16 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time { panic("invalid interval unit") } +func (ir IntervalRule) PreviousTime(current time.Time) time.Time { + switch ir.Unit { + case HOUR: + return current.Add(-time.Hour * time.Duration(ir.Num)) + case DAY: + return current.AddDate(0, 0, -ir.Num) + } + panic("invalid interval unit") +} + func (ir IntervalRule) EstimatedDuration() time.Duration { switch ir.Unit { case HOUR: @@ -243,7 +262,7 @@ func newSegmentController(shardCtx context.Context, location string, segmentSize } func (sc *segmentController) get(segID uint16) *segment { - lst := sc.snapshotLst() + lst := sc.segments() last := len(lst) - 1 for i := range lst { s := lst[last-i] @@ -255,7 +274,7 @@ func (sc *segmentController) get(segID uint16) *segment { } func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss []*segment) { - lst := sc.snapshotLst() + lst := sc.segments() last := len(lst) - 1 for i := range lst { s := lst[last-i] @@ -266,12 +285,6 @@ func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss []*segment) return ss } -func (sc *segmentController) snapshotLst() []*segment { - sc.RLock() - defer sc.RUnlock() - return sc.lst -} - func (sc *segmentController) segments() (ss []*segment) { sc.RLock() defer sc.RUnlock() @@ -299,7 +312,7 @@ func (sc *segmentController) Current() bucket.Reporter { func (sc *segmentController) Next() (bucket.Reporter, error) { seg := sc.Current().(*segment) reporter, err := sc.create(sc.Format( - sc.segmentSize.NextTime(seg.Start))) + sc.segmentSize.NextTime(seg.Start)), true) if errors.Is(err, ErrEndOfSegment) { return nil, bucket.ErrNoMoreBucket } @@ -342,7 +355,9 @@ func (sc *segmentController) open() error { sc.location, segPathPrefix, func(suffix, absolutePath string) error { - _, err := sc.load(suffix, absolutePath) + sc.Lock() + defer sc.Unlock() + _, err := sc.load(suffix, absolutePath, false) if errors.Is(err, ErrEndOfSegment) { return nil } @@ -352,7 +367,7 @@ func (sc *segmentController) open() error { return err } if sc.Current() == nil { - _, err = sc.create(sc.Format(sc.clock.Now())) + _, err = sc.create(sc.Format(sc.clock.Now()), true) if err != nil { return err } @@ -360,15 +375,23 @@ func (sc *segmentController) open() error { return nil } -func (sc *segmentController) create(suffix string) (*segment, error) { +func (sc *segmentController) create(suffix string, createBlockIfEmpty bool) (*segment, error) { + sc.Lock() + defer sc.Unlock() segPath, err := mkdir(segTemplate, sc.location, suffix) if err != nil { return nil, err } - return sc.load(suffix, segPath) + return sc.load(suffix, segPath, createBlockIfEmpty) } -func (sc *segmentController) load(suffix, path string) (seg *segment, err error) { +func (sc *segmentController) sortLst() { + sort.Slice(sc.lst, func(i, j int) bool { + return sc.lst[i].id < sc.lst[j].id + }) +} + +func (sc *segmentController) load(suffix, path string, createBlockIfEmpty bool) (seg *segment, err error) { startTime, err := sc.Parse(suffix) if err != nil { return nil, err @@ -380,12 +403,39 @@ func (sc *segmentController) load(suffix, path string) (seg *segment, err error) if err != nil { return nil, err } - sc.Lock() - defer sc.Unlock() sc.lst = append(sc.lst, seg) + sc.sortLst() return seg, nil } +func (sc *segmentController) remove(deadline time.Time) (err error) { + sc.l.Info().Time("deadline", deadline).Msg("start to remove before deadline") + for _, s := range sc.segments() { + if s.End.Before(deadline) || s.Contains(uint64(deadline.UnixNano())) { + err = multierr.Append(err, s.blockController.remove(deadline)) + if s.End.Before(deadline) { + sc.Lock() + if errDel := s.delete(); errDel != nil { + err = multierr.Append(err, errDel) + } else { + sc.removeSeg(s.id) + } + sc.Unlock() + } + } + } + return err +} + +func (sc *segmentController) removeSeg(segID uint16) { + for i, b := range sc.lst { + if b.id == segID { + sc.lst = append(sc.lst[:i], sc.lst[i+1:]...) + break + } + } +} + func (sc *segmentController) close() { for _, s := range sc.lst { s.close() diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go index 2c928c6..350085a 100644 --- a/banyand/tsdb/shard_test.go +++ b/banyand/tsdb/shard_test.go @@ -65,6 +65,10 @@ var _ = Describe("Shard", func() { Unit: tsdb.HOUR, Num: 12, }, + tsdb.IntervalRule{ + Unit: tsdb.DAY, + Num: 7, + }, 2, ) Expect(err).NotTo(HaveOccurred()) @@ -402,5 +406,139 @@ var _ = Describe("Shard", func() { }, })) }) + It("retention", func() { + var err error + shard, err = tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), common.ShardID(0), tmp, + tsdb.IntervalRule{ + Unit: tsdb.DAY, + Num: 1, + }, + tsdb.IntervalRule{ + Unit: tsdb.HOUR, + Num: 12, + }, + tsdb.IntervalRule{ + Unit: tsdb.DAY, + Num: 1, + }, + 10, + ) + Expect(err).NotTo(HaveOccurred()) + By("open 4 blocks") + t1 := clock.Now() + By("01/01 00:00 1st block is opened") + clock.Add(2 * time.Minute) + By("01/01 00:05 retention") + clock.Add(3 * time.Minute) + By("01/01 10:00 2nd block is opened") + clock.Add(9*time.Hour + 55*time.Minute) + t2 := clock.Now().Add(2 * time.Hour) + By("01/01 13:00 moves to the 2nd block") + clock.Add(3 * time.Hour) + By("01/01 22:00 3rd block is opened") + clock.Add(9 * time.Hour) + t3 := clock.Now().Add(2 * time.Hour) + By("01/02 00:02 moves to 3rd block") + clock.Add(2*time.Hour + 2*time.Minute) + By("01/02 00:05 retention") + clock.Add(3 * time.Minute) + By("01/02 10:00 4th block is opened") + clock.Add(9*time.Hour + 55*time.Minute) + t4 := clock.Now().Add(2 * time.Hour) + + Eventually(func() []tsdb.BlockState { + if clock.TriggerTimer() { + GinkgoWriter.Println("01/02 13:00 has been triggered") + } + return shard.State().Blocks + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), + }, + TimeRange: timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + }, + TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), + }, + TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + }, + TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false), + }, + })) + By("01/02 13:00 moves to 4th block") + clock.Add(3 * time.Hour) + By("01/02 22:00 5th block is opened") + clock.Add(9 * time.Hour) + t5 := clock.Now().Add(2 * time.Hour) + By("01/03 00:02 move to 5th block") + clock.Add(2*time.Hour + 2*time.Minute) + By("01/03 00:05 retention: remove segment and blocks on 01/01") + clock.Add(3 * time.Minute) + clock.Add(9*time.Hour + 55*time.Minute) + t6 := clock.Now().Add(2 * time.Hour) + Eventually(func() []tsdb.BlockState { + if clock.TriggerTimer() { + GinkgoWriter.Println("01/03 01:00 has been triggered") + } + return shard.State().Blocks + }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), + }, + TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + }, + TimeRange: timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700103), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), + }, + TimeRange: timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false), + }, + { + ID: tsdb.BlockID{ + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700103), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + }, + TimeRange: timestamp.NewTimeRangeDuration(t6, 12*time.Hour, true, false), + }, + })) + Eventually(func() []tsdb.BlockID { + return shard.State().OpenBlocks + }).Should(Equal([]tsdb.BlockID{ + { + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), + }, + { + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + }, + })) + }) }) }) diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 3828e86..dc6b62a 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -89,6 +89,7 @@ type DatabaseOpts struct { EncodingMethod EncodingMethod SegmentInterval IntervalRule BlockInterval IntervalRule + TTL IntervalRule BlockMemSize int64 SeriesMemSize int64 EnableGlobalIndex bool @@ -126,6 +127,7 @@ type database struct { shardNum uint32 segmentSize IntervalRule blockSize IntervalRule + ttl IntervalRule sLst []Shard sync.Mutex @@ -160,23 +162,25 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { if _, err := mkdir(opts.Location); err != nil { return nil, err } - segmentSize := opts.SegmentInterval - if segmentSize.Num == 0 { + if opts.SegmentInterval.Num == 0 { return nil, errors.Wrap(ErrOpenDatabase, "segment interval is absent") } - blockSize := opts.BlockInterval - if blockSize.Num == 0 { + if opts.BlockInterval.Num == 0 { return nil, errors.Wrap(ErrOpenDatabase, "block interval is absent") } - if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() { + if opts.BlockInterval.EstimatedDuration() > opts.SegmentInterval.EstimatedDuration() { return nil, errors.Wrapf(ErrOpenDatabase, "the block size is bigger than the segment size") } + if opts.TTL.Num == 0 { + return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent") + } db := &database{ location: opts.Location, shardNum: opts.ShardNum, logger: logger.Fetch(ctx, "tsdb"), - segmentSize: segmentSize, - blockSize: blockSize, + segmentSize: opts.SegmentInterval, + blockSize: opts.BlockInterval, + ttl: opts.TTL, } db.logger.Info().Str("path", opts.Location).Msg("initialized") var entries []os.DirEntry @@ -203,7 +207,7 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e for i := startID; i < int(db.shardNum); i++ { db.logger.Info().Int("shard_id", i).Msg("creating a shard") so, errNewShard := OpenShard(ctx, common.ShardID(i), - db.location, db.segmentSize, db.blockSize, defaultBlockQueueSize) + db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue @@ -233,6 +237,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { db.location, db.segmentSize, db.blockSize, + db.ttl, defaultBlockQueueSize, ) if errOpenShard != nil { diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index b3f9ede..fe6c9a6 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -81,6 +81,7 @@ func openDatabase(t *require.Assertions, path string) (db Database) { }, BlockInterval: IntervalRule{Num: 2}, SegmentInterval: IntervalRule{Num: 1, Unit: DAY}, + TTL: IntervalRule{Num: 7, Unit: DAY}, }) t.NoError(err) t.NotNil(db) diff --git a/go.mod b/go.mod index 367d4a1..138cd8d 100644 --- a/go.mod +++ b/go.mod @@ -89,6 +89,8 @@ require ( github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/common v0.32.1 // indirect github.com/prometheus/procfs v0.7.3 // indirect + github.com/robfig/cron v1.2.0 + github.com/robfig/cron/v3 v3.0.1 // indirect github.com/sirupsen/logrus v1.7.0 // indirect github.com/soheilhy/cmux v0.1.5 // indirect github.com/spf13/afero v1.8.2 // indirect diff --git a/go.sum b/go.sum index d4f717b..743fc08 100644 --- a/go.sum +++ b/go.sum @@ -431,6 +431,10 @@ github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1 github.com/prometheus/procfs v0.7.3 h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU= github.com/prometheus/procfs v0.7.3/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= +github.com/robfig/cron v1.2.0 h1:ZjScXvvxeQ63Dbyxy76Fj3AT3Ut0aKsyd2/tl3DTMuQ= +github.com/robfig/cron v1.2.0/go.mod h1:JGuDeoQd7Z6yL4zQhZ3OPEVHB7fL6Ka6skscFHfmt2k= +github.com/robfig/cron/v3 v3.0.1 h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs= +github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go new file mode 100644 index 0000000..6f9ed17 --- /dev/null +++ b/test/integration/cold_query/query_suite_test.go @@ -0,0 +1,92 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package integration_cold_query_test + +import ( + "testing" + "time" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/helpers" + "github.com/apache/skywalking-banyandb/pkg/test/setup" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure" + cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data" + cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + grpclib "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +func TestIntegrationColdQuery(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Integration Query Cold Data Suite") +} + +var ( + connection *grpclib.ClientConn + now time.Time + deferFunc func() +) + +var _ = SynchronizedBeforeSuite(func() []byte { + Expect(logger.Init(logger.Logging{ + Env: "dev", + Level: "warn", + })).To(Succeed()) + var addr string + addr, deferFunc = setup.SetUp() + conn, err := grpclib.Dial( + addr, + grpclib.WithTransportCredentials(insecure.NewCredentials()), + ) + Expect(err).NotTo(HaveOccurred()) + now = timestamp.NowMilli().Add(-time.Hour * 24) + interval := 500 * time.Millisecond + cases_stream.Write(conn, "data.json", now, interval) + cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) + cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) + cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) + Expect(conn.Close()).To(Succeed()) + return []byte(addr) +}, func(address []byte) { + var err error + connection, err = grpclib.Dial( + string(address), + grpclib.WithTransportCredentials(insecure.NewCredentials()), + grpclib.WithBlock(), + ) + cases_stream.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } + cases_measure.SharedContext = helpers.SharedContext{ + Connection: connection, + BaseTime: now, + } + Expect(err).NotTo(HaveOccurred()) +}) + +var _ = SynchronizedAfterSuite(func() { + if connection != nil { + Expect(connection.Close()).To(Succeed()) + } +}, func() { + deferFunc() +})
