This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 5452c9a  Introduce two key feats to tsdb (#177)
5452c9a is described below

commit 5452c9a498e56baad497e13b318daadbe4a4a83f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 29 21:00:03 2022 +0800

    Introduce two key feats to tsdb (#177)
    
    * Introduce two key feats to tsdb
    
    * Add the retention controller to support the group TTL.
    * Allow writing cold data whose timestamp is not in the
      current block.
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/measure/measure_write.go                   |   6 +-
 banyand/measure/metadata.go                        |   3 +
 banyand/stream/metadata.go                         |   3 +
 banyand/stream/stream_write.go                     |   4 +-
 banyand/tsdb/block.go                              |  22 ++++
 banyand/tsdb/bucket/queue.go                       |  20 +++
 banyand/tsdb/retention.go                          | 103 +++++++++++++++
 banyand/tsdb/segment.go                            |  65 +++++++++-
 banyand/tsdb/series.go                             |  24 ++++
 banyand/tsdb/seriesdb.go                           |  31 +++++
 banyand/tsdb/shard.go                              |  88 ++++++++++---
 banyand/tsdb/shard_test.go                         | 138 +++++++++++++++++++++
 banyand/tsdb/tsdb.go                               |  21 ++--
 banyand/tsdb/tsdb_test.go                          |   1 +
 dist/LICENSE                                       |   1 +
 .../licenses/license-github.com-robfig-cron-v3.txt |  21 ++++
 go.mod                                             |   1 +
 go.sum                                             |   2 +
 test/integration/cold_query/query_suite_test.go    |  92 ++++++++++++++
 19 files changed, 608 insertions(+), 38 deletions(-)

diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 58d0bab..bb14e0b 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -61,7 +61,7 @@ func (s *measure) Write(value *measurev1.DataPointValue) 
error {
 }
 
 func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value 
*measurev1.DataPointValue, cb index.CallbackFn) error {
-       t := value.GetTimestamp().AsTime()
+       t := value.GetTimestamp().AsTime().Local()
        if err := timestamp.Check(t); err != nil {
                return errors.WithMessage(err, "writing stream")
        }
@@ -81,7 +81,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey 
[]byte, value *mea
        if err != nil {
                return err
        }
-       wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+       wp, err := series.Create(t)
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
@@ -189,7 +189,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
        }
        err := stm.write(common.ShardID(writeEvent.GetShardId()), 
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
        if err != nil {
-               w.l.Debug().Err(err).Msg("fail to write entity")
+               w.l.Error().Err(err).Msg("fail to write entity")
        }
        return
 }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index fa4a545..7cd0d46 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -215,6 +215,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(tsdb.Database, error) {
        if opts.SegmentInterval, err = 
pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil {
                return nil, err
        }
+       if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); 
err != nil {
+               return nil, err
+       }
        return tsdb.OpenDatabase(
                context.WithValue(context.Background(), common.PositionKey, 
common.Position{
                        Module:   "measure",
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 67f1f82..870a887 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -218,6 +218,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group) 
(tsdb.Database, error) {
        if opts.SegmentInterval, err = 
pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil {
                return nil, err
        }
+       if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl); 
err != nil {
+               return nil, err
+       }
        return tsdb.OpenDatabase(
                context.WithValue(context.Background(), common.PositionKey, 
common.Position{
                        Module:   "stream",
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 1d172f5..0d685be 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -88,7 +88,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey 
[]byte, value *stre
                return err
        }
        t := timestamp.MToN(tp)
-       wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+       wp, err := series.Create(t)
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
@@ -183,7 +183,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp 
bus.Message) {
        }
        err := stm.write(common.ShardID(writeEvent.GetShardId()), 
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
        if err != nil {
-               w.l.Debug().Err(err).Msg("fail to write entity")
+               w.l.Error().Err(err).Msg("fail to write entity")
        }
        return
 }
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 1dc1df0..5e8c699 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -21,6 +21,7 @@ import (
        "context"
        "fmt"
        "io"
+       "os"
        "path"
        "runtime"
        "strconv"
@@ -38,6 +39,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/lsm"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/pkg/errors"
 )
 
 const (
@@ -55,6 +57,7 @@ type block struct {
        suffix     string
        ref        *atomic.Int32
        closed     *atomic.Bool
+       deleted    *atomic.Bool
        lock       sync.RWMutex
        position   common.Position
        memSize    int64
@@ -100,6 +103,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
                flushCh:   make(chan struct{}, 1),
                ref:       &atomic.Int32{},
                closed:    &atomic.Bool{},
+               deleted:   &atomic.Bool{},
                queue:     opts.queue,
        }
        b.options(ctx)
@@ -136,6 +140,9 @@ func (b *block) options(ctx context.Context) {
 }
 
 func (b *block) open() (err error) {
+       if b.deleted.Load() {
+               return nil
+       }
        if b.store, err = kv.OpenTimeSeriesStore(
                0,
                path.Join(b.path, componentMain),
@@ -181,6 +188,9 @@ func (b *block) open() (err error) {
 }
 
 func (b *block) delegate() (blockDelegate, error) {
+       if b.deleted.Load() {
+               return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is 
deleted", b.blockID)
+       }
        if b.incRef() {
                return &bDelegate{
                        delegate: b,
@@ -272,6 +282,18 @@ func (b *block) stopThenClose() {
        b.close()
 }
 
+func (b *block) delete() error {
+       if b.deleted.Load() {
+               return nil
+       }
+       b.deleted.Store(true)
+       if b.Reporter != nil {
+               b.Stop()
+       }
+       b.close()
+       return os.RemoveAll(b.path)
+}
+
 func (b *block) Closed() bool {
        return b.closed.Load()
 }
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index af13d6b..2b3dced 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -27,6 +27,7 @@ type EvictFn func(id interface{})
 
 type Queue interface {
        Push(id interface{})
+       Remove(id interface{})
        Len() int
        All() []interface{}
 }
@@ -108,6 +109,25 @@ func (q *lruQueue) Push(id interface{}) {
        q.recent.Add(id, nil)
 }
 
+func (q *lruQueue) Remove(id interface{}) {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+
+       if q.frequent.Contains(id) {
+               q.frequent.Remove(id)
+               return
+       }
+
+       if q.recent.Contains(id) {
+               q.recent.Remove(id)
+               return
+       }
+
+       if q.recentEvict.Contains(id) {
+               q.recentEvict.Remove(id)
+       }
+}
+
 func (q *lruQueue) Len() int {
        q.lock.RLock()
        defer q.lock.RUnlock()
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
new file mode 100644
index 0000000..957c268
--- /dev/null
+++ b/banyand/tsdb/retention.go
@@ -0,0 +1,103 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/robfig/cron/v3"
+)
+
+type retentionController struct {
+       segment   *segmentController
+       scheduler cron.Schedule
+       stopped   bool
+       stopMux   sync.Mutex
+       stopCh    chan struct{}
+       duration  time.Duration
+       l         *logger.Logger
+}
+
+func newRetentionController(segment *segmentController, ttl IntervalRule) 
(*retentionController, error) {
+       var expr string
+       switch ttl.Unit {
+       case HOUR:
+               // Every hour on the 5th minute
+               expr = "5 *"
+       case DAY:
+               // Every day on 00:05
+               expr = "5 0"
+
+       }
+       parser := cron.NewParser(cron.Minute | cron.Hour)
+       scheduler, err := parser.Parse(expr)
+       if err != nil {
+               return nil, err
+       }
+       return &retentionController{
+               segment:   segment,
+               scheduler: scheduler,
+               stopCh:    make(chan struct{}),
+               l:         segment.l.Named("retention-controller"),
+               duration:  ttl.EstimatedDuration(),
+       }, nil
+}
+
+func (rc *retentionController) start() {
+       rc.stopMux.Lock()
+       if rc.stopped {
+               return
+       }
+       rc.stopMux.Unlock()
+       go rc.run()
+}
+
+func (rc *retentionController) run() {
+       rc.l.Info().Msg("start")
+       now := rc.segment.clock.Now()
+       for {
+               next := rc.scheduler.Next(now)
+               timer := rc.segment.clock.Timer(next.Sub(now))
+               for {
+                       select {
+                       case now = <-timer.C:
+                               rc.l.Info().Time("now", now).Msg("wake")
+                               if err := 
rc.segment.remove(now.Add(-rc.duration)); err != nil {
+                                       rc.l.Error().Err(err)
+                               }
+                       case <-rc.stopCh:
+                               timer.Stop()
+                               rc.l.Info().Msg("stop")
+                               return
+                       }
+                       break
+               }
+       }
+}
+
+func (rc *retentionController) stop() {
+       rc.stopMux.Lock()
+       defer rc.stopMux.Unlock()
+       if rc.stopped {
+               return
+       }
+       rc.stopped = true
+       close(rc.stopCh)
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index be2cb7a..bbdad22 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -21,6 +21,8 @@ import (
        "context"
        "errors"
        "fmt"
+       "os"
+       "sort"
        "strconv"
        "sync"
        "time"
@@ -31,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "go.uber.org/multierr"
 )
 
 var ErrEndOfSegment = errors.New("reached the end of the segment")
@@ -69,7 +72,8 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
                TimeRange:       timeRange,
                Reporter:        bucket.NewTimeBasedReporter(timeRange, clock),
        }
-       err = s.blockController.open()
+       isHead := s.End.After(clock.Now())
+       err = s.blockController.open(isHead)
        if err != nil {
                return nil, err
        }
@@ -96,7 +100,9 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
                        }
                }
        }
-
+       if !isHead {
+               return
+       }
        s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, 
bucket.WithLogger(s.l))
        if err != nil {
                return nil, err
@@ -110,7 +116,9 @@ func (s *segment) close() {
        if s.globalIndex != nil {
                s.globalIndex.Close()
        }
-       s.blockManageStrategy.Close()
+       if s.blockManageStrategy != nil {
+               s.blockManageStrategy.Close()
+       }
        if s.Reporter != nil {
                s.Stop()
        }
@@ -120,6 +128,11 @@ func (s *segment) closeBlock(id uint16) {
        s.blockController.closeBlock(id)
 }
 
+func (s *segment) delete() error {
+       s.close()
+       return os.RemoveAll(s.path)
+}
+
 func (s segment) String() string {
        return fmt.Sprintf("SegID-%d", s.id)
 }
@@ -313,17 +326,22 @@ func (bc *blockController) startTime(suffix string) 
(time.Time, error) {
        panic("invalid interval unit")
 }
 
-func (bc *blockController) open() error {
+func (bc *blockController) open(createIfEmpty bool) error {
        err := WalkDir(
                bc.location,
                segPathPrefix,
                func(suffix, absolutePath string) error {
+                       bc.Lock()
+                       defer bc.Unlock()
                        _, err := bc.load(suffix, absolutePath)
                        return err
                })
        if err != nil {
                return err
        }
+       if !createIfEmpty {
+               return nil
+       }
        if bc.Current() == nil {
                _, err := bc.create(bc.clock.Now())
                if err != nil {
@@ -340,6 +358,8 @@ func (bc *blockController) create(startTime time.Time) 
(*block, error) {
        if !startTime.Before(bc.segTimeRange.End) {
                return nil, ErrEndOfSegment
        }
+       bc.Lock()
+       defer bc.Unlock()
        suffix := bc.Format(startTime)
        segPath, err := mkdir(blockTemplate, bc.location, suffix)
        if err != nil {
@@ -368,14 +388,47 @@ func (bc *blockController) load(suffix, path string) (b 
*block, err error) {
                }); err != nil {
                return nil, err
        }
-       bc.Lock()
-       defer bc.Unlock()
        bc.lst = append(bc.lst, b)
+       bc.sortLst()
        return b, nil
 }
 
+func (bc *blockController) sortLst() {
+       sort.Slice(bc.lst, func(i, j int) bool {
+               return bc.lst[i].blockID < bc.lst[j].blockID
+       })
+}
+
 func (bc *blockController) close() {
        for _, s := range bc.lst {
                s.stopThenClose()
        }
 }
+
+func (bc *blockController) remove(deadline time.Time) (err error) {
+       for _, b := range bc.blocks() {
+               if b.End.Before(deadline) {
+                       bc.Lock()
+                       if errDel := b.delete(); errDel != nil {
+                               err = multierr.Append(err, errDel)
+                       } else {
+                               b.queue.Remove(BlockID{
+                                       BlockID: b.blockID,
+                                       SegID:   b.segID,
+                               })
+                               bc.removeBlock(b.blockID)
+                       }
+                       bc.Unlock()
+               }
+       }
+       return err
+}
+
+func (bc *blockController) removeBlock(blockID uint16) {
+       for i, b := range bc.lst {
+               if b.blockID == blockID {
+                       bc.lst = append(bc.lst[:i], bc.lst[i+1:]...)
+                       break
+               }
+       }
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 1696616..8aa98d0 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -76,6 +76,7 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 type Series interface {
        ID() common.SeriesID
        Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
+       Create(t time.Time) (SeriesSpan, error)
        Get(id GlobalItemID) (Item, io.Closer, error)
 }
 
@@ -127,6 +128,29 @@ func (s *series) Span(timeRange timestamp.TimeRange) 
(SeriesSpan, error) {
        return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
 }
 
+func (s *series) Create(t time.Time) (SeriesSpan, error) {
+       tr := timestamp.NewInclusiveTimeRange(t, t)
+       blocks, err := s.blockDB.span(tr)
+       if err != nil {
+               return nil, err
+       }
+       if len(blocks) > 0 {
+               s.l.Debug().
+                       Time("time", t).
+                       Msg("load a series span")
+               return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
+       }
+       b, err := s.blockDB.create(tr)
+       if err != nil {
+               return nil, err
+       }
+       blocks = append(blocks, b)
+       s.l.Debug().
+               Time("time", t).
+               Msg("create a series span")
+       return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
+}
+
 func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase) 
*series {
        s := &series{
                id:      id,
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 720968a..bd61e44 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -148,6 +148,7 @@ type SeriesDatabase interface {
 type blockDatabase interface {
        shardID() common.ShardID
        span(timeRange timestamp.TimeRange) ([]blockDelegate, error)
+       create(timeRange timestamp.TimeRange) (blockDelegate, error)
        block(id GlobalItemID) (blockDelegate, error)
 }
 
@@ -265,6 +266,36 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) 
([]blockDelegate, error)
        return result, nil
 }
 
+func (s *seriesDB) create(timeRange timestamp.TimeRange) (blockDelegate, 
error) {
+       s.Lock()
+       defer s.Unlock()
+       ss := s.segCtrl.span(timeRange)
+       if len(ss) > 0 {
+               s := ss[0]
+               dd, err := s.blockController.span(timeRange)
+               if err != nil {
+                       return nil, err
+               }
+               if len(dd) > 0 {
+                       return dd[0], nil
+               }
+               block, err := s.blockController.create(timeRange.Start)
+               if err != nil {
+                       return nil, err
+               }
+               return block.delegate()
+       }
+       seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false)
+       if err != nil {
+               return nil, err
+       }
+       block, err := seg.blockController.create(timeRange.Start)
+       if err != nil {
+               return nil, err
+       }
+       return block.delegate()
+}
+
 func (s *seriesDB) context() context.Context {
        return context.WithValue(context.Background(), logger.ContextKey, s.l)
 }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 831dde3..4f3c168 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -25,6 +25,7 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
@@ -48,11 +49,12 @@ type shard struct {
        indexDatabase         IndexDatabase
        segmentController     *segmentController
        segmentManageStrategy *bucket.Strategy
+       retentionController   *retentionController
        stopCh                chan struct{}
 }
 
 func OpenShard(ctx context.Context, id common.ShardID,
-       root string, segmentSize, blockSize IntervalRule, openedBlockSize int,
+       root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize 
int,
 ) (Shard, error) {
        path, err := mkdir(shardTemplate, root, int(id))
        if err != nil {
@@ -106,6 +108,10 @@ func OpenShard(ctx context.Context, id common.ShardID,
                s.position = position.(common.Position)
        }
        s.runStat()
+       if s.retentionController, err = 
newRetentionController(s.segmentController, ttl); err != nil {
+               return nil, err
+       }
+       s.retentionController.start()
        return s, nil
 }
 
@@ -124,7 +130,9 @@ func (s *shard) Index() IndexDatabase {
 func (s *shard) State() (shardState ShardState) {
        shardState.StrategyManagers = append(shardState.StrategyManagers, 
s.segmentManageStrategy.String())
        for _, seg := range s.segmentController.segments() {
-               shardState.StrategyManagers = 
append(shardState.StrategyManagers, seg.blockManageStrategy.String())
+               if seg.blockManageStrategy != nil {
+                       shardState.StrategyManagers = 
append(shardState.StrategyManagers, seg.blockManageStrategy.String())
+               }
                for _, b := range seg.blockController.blocks() {
                        shardState.Blocks = append(shardState.Blocks, 
BlockState{
                                ID: BlockID{
@@ -149,11 +157,12 @@ func (s *shard) State() (shardState ShardState) {
                }
                return x.SegID < y.SegID
        })
-       s.l.Info().Interface("", shardState).Msg("state")
+       s.l.Debug().Interface("", shardState).Msg("state")
        return shardState
 }
 
 func (s *shard) Close() error {
+       s.retentionController.stop()
        s.segmentManageStrategy.Close()
        s.segmentController.close()
        err := s.seriesDatabase.Close()
@@ -193,6 +202,16 @@ func (ir IntervalRule) NextTime(current time.Time) 
time.Time {
        panic("invalid interval unit")
 }
 
+func (ir IntervalRule) PreviousTime(current time.Time) time.Time {
+       switch ir.Unit {
+       case HOUR:
+               return current.Add(-time.Hour * time.Duration(ir.Num))
+       case DAY:
+               return current.AddDate(0, 0, -ir.Num)
+       }
+       panic("invalid interval unit")
+}
+
 func (ir IntervalRule) EstimatedDuration() time.Duration {
        switch ir.Unit {
        case HOUR:
@@ -243,7 +262,7 @@ func newSegmentController(shardCtx context.Context, 
location string, segmentSize
 }
 
 func (sc *segmentController) get(segID uint16) *segment {
-       lst := sc.snapshotLst()
+       lst := sc.segments()
        last := len(lst) - 1
        for i := range lst {
                s := lst[last-i]
@@ -255,7 +274,7 @@ func (sc *segmentController) get(segID uint16) *segment {
 }
 
 func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss 
[]*segment) {
-       lst := sc.snapshotLst()
+       lst := sc.segments()
        last := len(lst) - 1
        for i := range lst {
                s := lst[last-i]
@@ -266,12 +285,6 @@ func (sc *segmentController) span(timeRange 
timestamp.TimeRange) (ss []*segment)
        return ss
 }
 
-func (sc *segmentController) snapshotLst() []*segment {
-       sc.RLock()
-       defer sc.RUnlock()
-       return sc.lst
-}
-
 func (sc *segmentController) segments() (ss []*segment) {
        sc.RLock()
        defer sc.RUnlock()
@@ -299,7 +312,7 @@ func (sc *segmentController) Current() bucket.Reporter {
 func (sc *segmentController) Next() (bucket.Reporter, error) {
        seg := sc.Current().(*segment)
        reporter, err := sc.create(sc.Format(
-               sc.segmentSize.NextTime(seg.Start)))
+               sc.segmentSize.NextTime(seg.Start)), true)
        if errors.Is(err, ErrEndOfSegment) {
                return nil, bucket.ErrNoMoreBucket
        }
@@ -342,7 +355,9 @@ func (sc *segmentController) open() error {
                sc.location,
                segPathPrefix,
                func(suffix, absolutePath string) error {
-                       _, err := sc.load(suffix, absolutePath)
+                       sc.Lock()
+                       defer sc.Unlock()
+                       _, err := sc.load(suffix, absolutePath, false)
                        if errors.Is(err, ErrEndOfSegment) {
                                return nil
                        }
@@ -352,7 +367,7 @@ func (sc *segmentController) open() error {
                return err
        }
        if sc.Current() == nil {
-               _, err = sc.create(sc.Format(sc.clock.Now()))
+               _, err = sc.create(sc.Format(sc.clock.Now()), true)
                if err != nil {
                        return err
                }
@@ -360,15 +375,23 @@ func (sc *segmentController) open() error {
        return nil
 }
 
-func (sc *segmentController) create(suffix string) (*segment, error) {
+func (sc *segmentController) create(suffix string, createBlockIfEmpty bool) 
(*segment, error) {
+       sc.Lock()
+       defer sc.Unlock()
        segPath, err := mkdir(segTemplate, sc.location, suffix)
        if err != nil {
                return nil, err
        }
-       return sc.load(suffix, segPath)
+       return sc.load(suffix, segPath, createBlockIfEmpty)
 }
 
-func (sc *segmentController) load(suffix, path string) (seg *segment, err 
error) {
+func (sc *segmentController) sortLst() {
+       sort.Slice(sc.lst, func(i, j int) bool {
+               return sc.lst[i].id < sc.lst[j].id
+       })
+}
+
+func (sc *segmentController) load(suffix, path string, createBlockIfEmpty 
bool) (seg *segment, err error) {
        startTime, err := sc.Parse(suffix)
        if err != nil {
                return nil, err
@@ -380,12 +403,39 @@ func (sc *segmentController) load(suffix, path string) 
(seg *segment, err error)
        if err != nil {
                return nil, err
        }
-       sc.Lock()
-       defer sc.Unlock()
        sc.lst = append(sc.lst, seg)
+       sc.sortLst()
        return seg, nil
 }
 
+func (sc *segmentController) remove(deadline time.Time) (err error) {
+       sc.l.Info().Time("deadline", deadline).Msg("start to remove before 
deadline")
+       for _, s := range sc.segments() {
+               if s.End.Before(deadline) || 
s.Contains(uint64(deadline.UnixNano())) {
+                       err = multierr.Append(err, 
s.blockController.remove(deadline))
+                       if s.End.Before(deadline) {
+                               sc.Lock()
+                               if errDel := s.delete(); errDel != nil {
+                                       err = multierr.Append(err, errDel)
+                               } else {
+                                       sc.removeSeg(s.id)
+                               }
+                               sc.Unlock()
+                       }
+               }
+       }
+       return err
+}
+
+func (sc *segmentController) removeSeg(segID uint16) {
+       for i, b := range sc.lst {
+               if b.id == segID {
+                       sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
+                       break
+               }
+       }
+}
+
 func (sc *segmentController) close() {
        for _, s := range sc.lst {
                s.close()
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 2c928c6..350085a 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -65,6 +65,10 @@ var _ = Describe("Shard", func() {
                                        Unit: tsdb.HOUR,
                                        Num:  12,
                                },
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.DAY,
+                                       Num:  7,
+                               },
                                2,
                        )
                        Expect(err).NotTo(HaveOccurred())
@@ -402,5 +406,139 @@ var _ = Describe("Shard", func() {
                                },
                        }))
                })
+               It("retention", func() {
+                       var err error
+                       shard, err = 
tsdb.OpenShard(timestamp.SetClock(context.Background(), clock), 
common.ShardID(0), tmp,
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.DAY,
+                                       Num:  1,
+                               },
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.HOUR,
+                                       Num:  12,
+                               },
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.DAY,
+                                       Num:  1,
+                               },
+                               10,
+                       )
+                       Expect(err).NotTo(HaveOccurred())
+                       By("open 4 blocks")
+                       t1 := clock.Now()
+                       By("01/01 00:00 1st block is opened")
+                       clock.Add(2 * time.Minute)
+                       By("01/01 00:05 retention")
+                       clock.Add(3 * time.Minute)
+                       By("01/01 10:00 2nd block is opened")
+                       clock.Add(9*time.Hour + 55*time.Minute)
+                       t2 := clock.Now().Add(2 * time.Hour)
+                       By("01/01 13:00 moves to the 2nd block")
+                       clock.Add(3 * time.Hour)
+                       By("01/01 22:00 3rd block is opened")
+                       clock.Add(9 * time.Hour)
+                       t3 := clock.Now().Add(2 * time.Hour)
+                       By("01/02 00:02 moves to 3rd block")
+                       clock.Add(2*time.Hour + 2*time.Minute)
+                       By("01/02 00:05 retention")
+                       clock.Add(3 * time.Minute)
+                       By("01/02 10:00 4th block is opened")
+                       clock.Add(9*time.Hour + 55*time.Minute)
+                       t4 := clock.Now().Add(2 * time.Hour)
+
+                       Eventually(func() []tsdb.BlockState {
+                               if clock.TriggerTimer() {
+                                       GinkgoWriter.Println("01/02 13:00 has 
been triggered")
+                               }
+                               return shard.State().Blocks
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+                               },
+                       }))
+                       By("01/02 13:00 moves to 4th block")
+                       clock.Add(3 * time.Hour)
+                       By("01/02 22:00 5th block is opened")
+                       clock.Add(9 * time.Hour)
+                       t5 := clock.Now().Add(2 * time.Hour)
+                       By("01/03 00:02 move to 5th block")
+                       clock.Add(2*time.Hour + 2*time.Minute)
+                       By("01/03 00:05 retention: remove segment and blocks on 
01/01")
+                       clock.Add(3 * time.Minute)
+                       clock.Add(9*time.Hour + 55*time.Minute)
+                       t6 := clock.Now().Add(2 * time.Hour)
+                       Eventually(func() []tsdb.BlockState {
+                               if clock.TriggerTimer() {
+                                       GinkgoWriter.Println("01/03 01:00 has 
been triggered")
+                               }
+                               return shard.State().Blocks
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+                               },
+                               {
+                                       ID: tsdb.BlockID{
+                                               SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+                                               BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                                       },
+                                       TimeRange: 
timestamp.NewTimeRangeDuration(t6, 12*time.Hour, true, false),
+                               },
+                       }))
+                       Eventually(func() []tsdb.BlockID {
+                               return shard.State().OpenBlocks
+                       }).Should(Equal([]tsdb.BlockID{
+                               {
+                                       SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                       BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+                               },
+                               {
+                                       SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                       BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                               },
+                       }))
+               })
        })
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3828e86..dc6b62a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -89,6 +89,7 @@ type DatabaseOpts struct {
        EncodingMethod     EncodingMethod
        SegmentInterval    IntervalRule
        BlockInterval      IntervalRule
+       TTL                IntervalRule
        BlockMemSize       int64
        SeriesMemSize      int64
        EnableGlobalIndex  bool
@@ -126,6 +127,7 @@ type database struct {
        shardNum    uint32
        segmentSize IntervalRule
        blockSize   IntervalRule
+       ttl         IntervalRule
 
        sLst []Shard
        sync.Mutex
@@ -160,23 +162,25 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
        if _, err := mkdir(opts.Location); err != nil {
                return nil, err
        }
-       segmentSize := opts.SegmentInterval
-       if segmentSize.Num == 0 {
+       if opts.SegmentInterval.Num == 0 {
                return nil, errors.Wrap(ErrOpenDatabase, "segment interval is 
absent")
        }
-       blockSize := opts.BlockInterval
-       if blockSize.Num == 0 {
+       if opts.BlockInterval.Num == 0 {
                return nil, errors.Wrap(ErrOpenDatabase, "block interval is 
absent")
        }
-       if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() {
+       if opts.BlockInterval.EstimatedDuration() > 
opts.SegmentInterval.EstimatedDuration() {
                return nil, errors.Wrapf(ErrOpenDatabase, "the block size is 
bigger than the segment size")
        }
+       if opts.TTL.Num == 0 {
+               return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
+       }
        db := &database{
                location:    opts.Location,
                shardNum:    opts.ShardNum,
                logger:      logger.Fetch(ctx, "tsdb"),
-               segmentSize: segmentSize,
-               blockSize:   blockSize,
+               segmentSize: opts.SegmentInterval,
+               blockSize:   opts.BlockInterval,
+               ttl:         opts.TTL,
        }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        var entries []os.DirEntry
@@ -203,7 +207,7 @@ func createDatabase(ctx context.Context, db *database, 
startID int) (Database, e
        for i := startID; i < int(db.shardNum); i++ {
                db.logger.Info().Int("shard_id", i).Msg("creating a shard")
                so, errNewShard := OpenShard(ctx, common.ShardID(i),
-                       db.location, db.segmentSize, db.blockSize, 
defaultBlockQueueSize)
+                       db.location, db.segmentSize, db.blockSize, db.ttl, 
defaultBlockQueueSize)
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -233,6 +237,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
                        db.location,
                        db.segmentSize,
                        db.blockSize,
+                       db.ttl,
                        defaultBlockQueueSize,
                )
                if errOpenShard != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index b3f9ede..fe6c9a6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -81,6 +81,7 @@ func openDatabase(t *require.Assertions, path string) (db 
Database) {
                        },
                        BlockInterval:   IntervalRule{Num: 2},
                        SegmentInterval: IntervalRule{Num: 1, Unit: DAY},
+                       TTL:             IntervalRule{Num: 7, Unit: DAY},
                })
        t.NoError(err)
        t.NotNil(db)
diff --git a/dist/LICENSE b/dist/LICENSE
index 0143495..326ad8b 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -307,6 +307,7 @@ MIT licenses
     github.com/onsi/ginkgo/v2 v2.1.4 MIT
     github.com/onsi/gomega v1.20.0 MIT
     github.com/pelletier/go-toml/v2 v2.0.1 MIT
+    github.com/robfig/cron/v3 v3.0.1 MIT
     github.com/rs/zerolog v1.26.1 MIT
     github.com/sirupsen/logrus v1.7.0 MIT
     github.com/spf13/cast v1.5.0 MIT
diff --git a/dist/licenses/license-github.com-robfig-cron-v3.txt 
b/dist/licenses/license-github.com-robfig-cron-v3.txt
new file mode 100644
index 0000000..3a0f627
--- /dev/null
+++ b/dist/licenses/license-github.com-robfig-cron-v3.txt
@@ -0,0 +1,21 @@
+Copyright (C) 2012 Rob Figueiredo
+All Rights Reserved.
+
+MIT LICENSE
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies 
of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 
FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/go.mod b/go.mod
index 367d4a1..2abaffb 100644
--- a/go.mod
+++ b/go.mod
@@ -89,6 +89,7 @@ require (
        github.com/prometheus/client_model v0.2.0 // indirect
        github.com/prometheus/common v0.32.1 // indirect
        github.com/prometheus/procfs v0.7.3 // indirect
+       github.com/robfig/cron/v3 v3.0.1
        github.com/sirupsen/logrus v1.7.0 // indirect
        github.com/soheilhy/cmux v0.1.5 // indirect
        github.com/spf13/afero v1.8.2 // indirect
diff --git a/go.sum b/go.sum
index d4f717b..044c43c 100644
--- a/go.sum
+++ b/go.sum
@@ -431,6 +431,8 @@ github.com/prometheus/procfs v0.6.0/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
 github.com/prometheus/procfs v0.7.3 
h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
 github.com/prometheus/procfs v0.7.3/go.mod 
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
 github.com/prometheus/tsdb v0.7.1/go.mod 
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/robfig/cron/v3 v3.0.1 
h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod 
h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
 github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod 
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
 github.com/rogpeppe/fastuuid v1.2.0/go.mod 
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
 github.com/rogpeppe/go-internal v1.3.0/go.mod 
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
diff --git a/test/integration/cold_query/query_suite_test.go 
b/test/integration/cold_query/query_suite_test.go
new file mode 100644
index 0000000..6f9ed17
--- /dev/null
+++ b/test/integration/cold_query/query_suite_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_cold_query_test
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+       cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       grpclib "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestIntegrationColdQuery(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Query Cold Data Suite")
+}
+
+var (
+       connection *grpclib.ClientConn
+       now        time.Time
+       deferFunc  func()
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: "warn",
+       })).To(Succeed())
+       var addr string
+       addr, deferFunc = setup.SetUp()
+       conn, err := grpclib.Dial(
+               addr,
+               grpclib.WithTransportCredentials(insecure.NewCredentials()),
+       )
+       Expect(err).NotTo(HaveOccurred())
+       now = timestamp.NowMilli().Add(-time.Hour * 24)
+       interval := 500 * time.Millisecond
+       cases_stream.Write(conn, "data.json", now, interval)
+       cases_measure_data.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
+       cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
+       cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
+       Expect(conn.Close()).To(Succeed())
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpclib.Dial(
+               string(address),
+               grpclib.WithTransportCredentials(insecure.NewCredentials()),
+               grpclib.WithBlock(),
+       )
+       cases_stream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+       cases_measure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {
+       deferFunc()
+})


Reply via email to