This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 5452c9a Introduce two key feats to tsdb (#177)
5452c9a is described below
commit 5452c9a498e56baad497e13b318daadbe4a4a83f
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Sep 29 21:00:03 2022 +0800
Introduce two key feats to tsdb (#177)
* Introduce two key feats to tsdb
* Add the retention controller to support the group TTL.
* Allow writing cold data whose timestamp is not in the
current block.
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/measure/measure_write.go | 6 +-
banyand/measure/metadata.go | 3 +
banyand/stream/metadata.go | 3 +
banyand/stream/stream_write.go | 4 +-
banyand/tsdb/block.go | 22 ++++
banyand/tsdb/bucket/queue.go | 20 +++
banyand/tsdb/retention.go | 103 +++++++++++++++
banyand/tsdb/segment.go | 65 +++++++++-
banyand/tsdb/series.go | 24 ++++
banyand/tsdb/seriesdb.go | 31 +++++
banyand/tsdb/shard.go | 88 ++++++++++---
banyand/tsdb/shard_test.go | 138 +++++++++++++++++++++
banyand/tsdb/tsdb.go | 21 ++--
banyand/tsdb/tsdb_test.go | 1 +
dist/LICENSE | 1 +
.../licenses/license-github.com-robfig-cron-v3.txt | 21 ++++
go.mod | 1 +
go.sum | 2 +
test/integration/cold_query/query_suite_test.go | 92 ++++++++++++++
19 files changed, 608 insertions(+), 38 deletions(-)
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index 58d0bab..bb14e0b 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -61,7 +61,7 @@ func (s *measure) Write(value *measurev1.DataPointValue)
error {
}
func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value
*measurev1.DataPointValue, cb index.CallbackFn) error {
- t := value.GetTimestamp().AsTime()
+ t := value.GetTimestamp().AsTime().Local()
if err := timestamp.Check(t); err != nil {
return errors.WithMessage(err, "writing stream")
}
@@ -81,7 +81,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey
[]byte, value *mea
if err != nil {
return err
}
- wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+ wp, err := series.Create(t)
if err != nil {
if wp != nil {
_ = wp.Close()
@@ -189,7 +189,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp
bus.Message) {
}
err := stm.write(common.ShardID(writeEvent.GetShardId()),
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetDataPoint(), nil)
if err != nil {
- w.l.Debug().Err(err).Msg("fail to write entity")
+ w.l.Error().Err(err).Msg("fail to write entity")
}
return
}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index fa4a545..7cd0d46 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -215,6 +215,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(tsdb.Database, error) {
if opts.SegmentInterval, err =
pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil {
return nil, err
}
+ if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl);
err != nil {
+ return nil, err
+ }
return tsdb.OpenDatabase(
context.WithValue(context.Background(), common.PositionKey,
common.Position{
Module: "measure",
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 67f1f82..870a887 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -218,6 +218,9 @@ func (s *supplier) OpenDB(groupSchema *commonv1.Group)
(tsdb.Database, error) {
if opts.SegmentInterval, err =
pb_v1.ToIntervalRule(groupSchema.ResourceOpts.SegmentInterval); err != nil {
return nil, err
}
+ if opts.TTL, err = pb_v1.ToIntervalRule(groupSchema.ResourceOpts.Ttl);
err != nil {
+ return nil, err
+ }
return tsdb.OpenDatabase(
context.WithValue(context.Background(), common.PositionKey,
common.Position{
Module: "stream",
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 1d172f5..0d685be 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -88,7 +88,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey
[]byte, value *stre
return err
}
t := timestamp.MToN(tp)
- wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
+ wp, err := series.Create(t)
if err != nil {
if wp != nil {
_ = wp.Close()
@@ -183,7 +183,7 @@ func (w *writeCallback) Rev(message bus.Message) (resp
bus.Message) {
}
err := stm.write(common.ShardID(writeEvent.GetShardId()),
writeEvent.GetSeriesHash(), writeEvent.GetRequest().GetElement(), nil)
if err != nil {
- w.l.Debug().Err(err).Msg("fail to write entity")
+ w.l.Error().Err(err).Msg("fail to write entity")
}
return
}
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 1dc1df0..5e8c699 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -21,6 +21,7 @@ import (
"context"
"fmt"
"io"
+ "os"
"path"
"runtime"
"strconv"
@@ -38,6 +39,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/index/lsm"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "github.com/pkg/errors"
)
const (
@@ -55,6 +57,7 @@ type block struct {
suffix string
ref *atomic.Int32
closed *atomic.Bool
+ deleted *atomic.Bool
lock sync.RWMutex
position common.Position
memSize int64
@@ -100,6 +103,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b
*block, err error) {
flushCh: make(chan struct{}, 1),
ref: &atomic.Int32{},
closed: &atomic.Bool{},
+ deleted: &atomic.Bool{},
queue: opts.queue,
}
b.options(ctx)
@@ -136,6 +140,9 @@ func (b *block) options(ctx context.Context) {
}
func (b *block) open() (err error) {
+ if b.deleted.Load() {
+ return nil
+ }
if b.store, err = kv.OpenTimeSeriesStore(
0,
path.Join(b.path, componentMain),
@@ -181,6 +188,9 @@ func (b *block) open() (err error) {
}
func (b *block) delegate() (blockDelegate, error) {
+ if b.deleted.Load() {
+ return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is
deleted", b.blockID)
+ }
if b.incRef() {
return &bDelegate{
delegate: b,
@@ -272,6 +282,18 @@ func (b *block) stopThenClose() {
b.close()
}
+func (b *block) delete() error {
+ if b.deleted.Load() {
+ return nil
+ }
+ b.deleted.Store(true)
+ if b.Reporter != nil {
+ b.Stop()
+ }
+ b.close()
+ return os.RemoveAll(b.path)
+}
+
func (b *block) Closed() bool {
return b.closed.Load()
}
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index af13d6b..2b3dced 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -27,6 +27,7 @@ type EvictFn func(id interface{})
type Queue interface {
Push(id interface{})
+ Remove(id interface{})
Len() int
All() []interface{}
}
@@ -108,6 +109,25 @@ func (q *lruQueue) Push(id interface{}) {
q.recent.Add(id, nil)
}
+func (q *lruQueue) Remove(id interface{}) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if q.frequent.Contains(id) {
+ q.frequent.Remove(id)
+ return
+ }
+
+ if q.recent.Contains(id) {
+ q.recent.Remove(id)
+ return
+ }
+
+ if q.recentEvict.Contains(id) {
+ q.recentEvict.Remove(id)
+ }
+}
+
func (q *lruQueue) Len() int {
q.lock.RLock()
defer q.lock.RUnlock()
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
new file mode 100644
index 0000000..957c268
--- /dev/null
+++ b/banyand/tsdb/retention.go
@@ -0,0 +1,103 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package tsdb
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/robfig/cron/v3"
+)
+
+type retentionController struct {
+ segment *segmentController
+ scheduler cron.Schedule
+ stopped bool
+ stopMux sync.Mutex
+ stopCh chan struct{}
+ duration time.Duration
+ l *logger.Logger
+}
+
+func newRetentionController(segment *segmentController, ttl IntervalRule)
(*retentionController, error) {
+ var expr string
+ switch ttl.Unit {
+ case HOUR:
+ // Every hour on the 5th minute
+ expr = "5 *"
+ case DAY:
+ // Every day on 00:05
+ expr = "5 0"
+
+ }
+ parser := cron.NewParser(cron.Minute | cron.Hour)
+ scheduler, err := parser.Parse(expr)
+ if err != nil {
+ return nil, err
+ }
+ return &retentionController{
+ segment: segment,
+ scheduler: scheduler,
+ stopCh: make(chan struct{}),
+ l: segment.l.Named("retention-controller"),
+ duration: ttl.EstimatedDuration(),
+ }, nil
+}
+
+func (rc *retentionController) start() {
+ rc.stopMux.Lock()
+ if rc.stopped {
+ return
+ }
+ rc.stopMux.Unlock()
+ go rc.run()
+}
+
+func (rc *retentionController) run() {
+ rc.l.Info().Msg("start")
+ now := rc.segment.clock.Now()
+ for {
+ next := rc.scheduler.Next(now)
+ timer := rc.segment.clock.Timer(next.Sub(now))
+ for {
+ select {
+ case now = <-timer.C:
+ rc.l.Info().Time("now", now).Msg("wake")
+ if err :=
rc.segment.remove(now.Add(-rc.duration)); err != nil {
+ rc.l.Error().Err(err)
+ }
+ case <-rc.stopCh:
+ timer.Stop()
+ rc.l.Info().Msg("stop")
+ return
+ }
+ break
+ }
+ }
+}
+
+func (rc *retentionController) stop() {
+ rc.stopMux.Lock()
+ defer rc.stopMux.Unlock()
+ if rc.stopped {
+ return
+ }
+ rc.stopped = true
+ close(rc.stopCh)
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index be2cb7a..bbdad22 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -21,6 +21,8 @@ import (
"context"
"errors"
"fmt"
+ "os"
+ "sort"
"strconv"
"sync"
"time"
@@ -31,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
+ "go.uber.org/multierr"
)
var ErrEndOfSegment = errors.New("reached the end of the segment")
@@ -69,7 +72,8 @@ func openSegment(ctx context.Context, startTime time.Time,
path, suffix string,
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange, clock),
}
- err = s.blockController.open()
+ isHead := s.End.After(clock.Now())
+ err = s.blockController.open(isHead)
if err != nil {
return nil, err
}
@@ -96,7 +100,9 @@ func openSegment(ctx context.Context, startTime time.Time,
path, suffix string,
}
}
}
-
+ if !isHead {
+ return
+ }
s.blockManageStrategy, err = bucket.NewStrategy(s.blockController,
bucket.WithLogger(s.l))
if err != nil {
return nil, err
@@ -110,7 +116,9 @@ func (s *segment) close() {
if s.globalIndex != nil {
s.globalIndex.Close()
}
- s.blockManageStrategy.Close()
+ if s.blockManageStrategy != nil {
+ s.blockManageStrategy.Close()
+ }
if s.Reporter != nil {
s.Stop()
}
@@ -120,6 +128,11 @@ func (s *segment) closeBlock(id uint16) {
s.blockController.closeBlock(id)
}
+func (s *segment) delete() error {
+ s.close()
+ return os.RemoveAll(s.path)
+}
+
func (s segment) String() string {
return fmt.Sprintf("SegID-%d", s.id)
}
@@ -313,17 +326,22 @@ func (bc *blockController) startTime(suffix string)
(time.Time, error) {
panic("invalid interval unit")
}
-func (bc *blockController) open() error {
+func (bc *blockController) open(createIfEmpty bool) error {
err := WalkDir(
bc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
+ bc.Lock()
+ defer bc.Unlock()
_, err := bc.load(suffix, absolutePath)
return err
})
if err != nil {
return err
}
+ if !createIfEmpty {
+ return nil
+ }
if bc.Current() == nil {
_, err := bc.create(bc.clock.Now())
if err != nil {
@@ -340,6 +358,8 @@ func (bc *blockController) create(startTime time.Time)
(*block, error) {
if !startTime.Before(bc.segTimeRange.End) {
return nil, ErrEndOfSegment
}
+ bc.Lock()
+ defer bc.Unlock()
suffix := bc.Format(startTime)
segPath, err := mkdir(blockTemplate, bc.location, suffix)
if err != nil {
@@ -368,14 +388,47 @@ func (bc *blockController) load(suffix, path string) (b
*block, err error) {
}); err != nil {
return nil, err
}
- bc.Lock()
- defer bc.Unlock()
bc.lst = append(bc.lst, b)
+ bc.sortLst()
return b, nil
}
+func (bc *blockController) sortLst() {
+ sort.Slice(bc.lst, func(i, j int) bool {
+ return bc.lst[i].blockID < bc.lst[j].blockID
+ })
+}
+
func (bc *blockController) close() {
for _, s := range bc.lst {
s.stopThenClose()
}
}
+
+func (bc *blockController) remove(deadline time.Time) (err error) {
+ for _, b := range bc.blocks() {
+ if b.End.Before(deadline) {
+ bc.Lock()
+ if errDel := b.delete(); errDel != nil {
+ err = multierr.Append(err, errDel)
+ } else {
+ b.queue.Remove(BlockID{
+ BlockID: b.blockID,
+ SegID: b.segID,
+ })
+ bc.removeBlock(b.blockID)
+ }
+ bc.Unlock()
+ }
+ }
+ return err
+}
+
+func (bc *blockController) removeBlock(blockID uint16) {
+ for i, b := range bc.lst {
+ if b.blockID == blockID {
+ bc.lst = append(bc.lst[:i], bc.lst[i+1:]...)
+ break
+ }
+ }
+}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 1696616..8aa98d0 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -76,6 +76,7 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
type Series interface {
ID() common.SeriesID
Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
+ Create(t time.Time) (SeriesSpan, error)
Get(id GlobalItemID) (Item, io.Closer, error)
}
@@ -127,6 +128,29 @@ func (s *series) Span(timeRange timestamp.TimeRange)
(SeriesSpan, error) {
return newSeriesSpan(context.WithValue(context.Background(),
logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
}
+func (s *series) Create(t time.Time) (SeriesSpan, error) {
+ tr := timestamp.NewInclusiveTimeRange(t, t)
+ blocks, err := s.blockDB.span(tr)
+ if err != nil {
+ return nil, err
+ }
+ if len(blocks) > 0 {
+ s.l.Debug().
+ Time("time", t).
+ Msg("load a series span")
+ return newSeriesSpan(context.WithValue(context.Background(),
logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
+ }
+ b, err := s.blockDB.create(tr)
+ if err != nil {
+ return nil, err
+ }
+ blocks = append(blocks, b)
+ s.l.Debug().
+ Time("time", t).
+ Msg("create a series span")
+ return newSeriesSpan(context.WithValue(context.Background(),
logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
+}
+
func newSeries(ctx context.Context, id common.SeriesID, blockDB blockDatabase)
*series {
s := &series{
id: id,
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 720968a..bd61e44 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -148,6 +148,7 @@ type SeriesDatabase interface {
type blockDatabase interface {
shardID() common.ShardID
span(timeRange timestamp.TimeRange) ([]blockDelegate, error)
+ create(timeRange timestamp.TimeRange) (blockDelegate, error)
block(id GlobalItemID) (blockDelegate, error)
}
@@ -265,6 +266,36 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange)
([]blockDelegate, error)
return result, nil
}
+func (s *seriesDB) create(timeRange timestamp.TimeRange) (blockDelegate,
error) {
+ s.Lock()
+ defer s.Unlock()
+ ss := s.segCtrl.span(timeRange)
+ if len(ss) > 0 {
+ s := ss[0]
+ dd, err := s.blockController.span(timeRange)
+ if err != nil {
+ return nil, err
+ }
+ if len(dd) > 0 {
+ return dd[0], nil
+ }
+ block, err := s.blockController.create(timeRange.Start)
+ if err != nil {
+ return nil, err
+ }
+ return block.delegate()
+ }
+ seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false)
+ if err != nil {
+ return nil, err
+ }
+ block, err := seg.blockController.create(timeRange.Start)
+ if err != nil {
+ return nil, err
+ }
+ return block.delegate()
+}
+
func (s *seriesDB) context() context.Context {
return context.WithValue(context.Background(), logger.ContextKey, s.l)
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 831dde3..4f3c168 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -25,6 +25,7 @@ import (
"time"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
@@ -48,11 +49,12 @@ type shard struct {
indexDatabase IndexDatabase
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
+ retentionController *retentionController
stopCh chan struct{}
}
func OpenShard(ctx context.Context, id common.ShardID,
- root string, segmentSize, blockSize IntervalRule, openedBlockSize int,
+ root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize
int,
) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
@@ -106,6 +108,10 @@ func OpenShard(ctx context.Context, id common.ShardID,
s.position = position.(common.Position)
}
s.runStat()
+ if s.retentionController, err =
newRetentionController(s.segmentController, ttl); err != nil {
+ return nil, err
+ }
+ s.retentionController.start()
return s, nil
}
@@ -124,7 +130,9 @@ func (s *shard) Index() IndexDatabase {
func (s *shard) State() (shardState ShardState) {
shardState.StrategyManagers = append(shardState.StrategyManagers,
s.segmentManageStrategy.String())
for _, seg := range s.segmentController.segments() {
- shardState.StrategyManagers =
append(shardState.StrategyManagers, seg.blockManageStrategy.String())
+ if seg.blockManageStrategy != nil {
+ shardState.StrategyManagers =
append(shardState.StrategyManagers, seg.blockManageStrategy.String())
+ }
for _, b := range seg.blockController.blocks() {
shardState.Blocks = append(shardState.Blocks,
BlockState{
ID: BlockID{
@@ -149,11 +157,12 @@ func (s *shard) State() (shardState ShardState) {
}
return x.SegID < y.SegID
})
- s.l.Info().Interface("", shardState).Msg("state")
+ s.l.Debug().Interface("", shardState).Msg("state")
return shardState
}
func (s *shard) Close() error {
+ s.retentionController.stop()
s.segmentManageStrategy.Close()
s.segmentController.close()
err := s.seriesDatabase.Close()
@@ -193,6 +202,16 @@ func (ir IntervalRule) NextTime(current time.Time)
time.Time {
panic("invalid interval unit")
}
+func (ir IntervalRule) PreviousTime(current time.Time) time.Time {
+ switch ir.Unit {
+ case HOUR:
+ return current.Add(-time.Hour * time.Duration(ir.Num))
+ case DAY:
+ return current.AddDate(0, 0, -ir.Num)
+ }
+ panic("invalid interval unit")
+}
+
func (ir IntervalRule) EstimatedDuration() time.Duration {
switch ir.Unit {
case HOUR:
@@ -243,7 +262,7 @@ func newSegmentController(shardCtx context.Context,
location string, segmentSize
}
func (sc *segmentController) get(segID uint16) *segment {
- lst := sc.snapshotLst()
+ lst := sc.segments()
last := len(lst) - 1
for i := range lst {
s := lst[last-i]
@@ -255,7 +274,7 @@ func (sc *segmentController) get(segID uint16) *segment {
}
func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss
[]*segment) {
- lst := sc.snapshotLst()
+ lst := sc.segments()
last := len(lst) - 1
for i := range lst {
s := lst[last-i]
@@ -266,12 +285,6 @@ func (sc *segmentController) span(timeRange
timestamp.TimeRange) (ss []*segment)
return ss
}
-func (sc *segmentController) snapshotLst() []*segment {
- sc.RLock()
- defer sc.RUnlock()
- return sc.lst
-}
-
func (sc *segmentController) segments() (ss []*segment) {
sc.RLock()
defer sc.RUnlock()
@@ -299,7 +312,7 @@ func (sc *segmentController) Current() bucket.Reporter {
func (sc *segmentController) Next() (bucket.Reporter, error) {
seg := sc.Current().(*segment)
reporter, err := sc.create(sc.Format(
- sc.segmentSize.NextTime(seg.Start)))
+ sc.segmentSize.NextTime(seg.Start)), true)
if errors.Is(err, ErrEndOfSegment) {
return nil, bucket.ErrNoMoreBucket
}
@@ -342,7 +355,9 @@ func (sc *segmentController) open() error {
sc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
- _, err := sc.load(suffix, absolutePath)
+ sc.Lock()
+ defer sc.Unlock()
+ _, err := sc.load(suffix, absolutePath, false)
if errors.Is(err, ErrEndOfSegment) {
return nil
}
@@ -352,7 +367,7 @@ func (sc *segmentController) open() error {
return err
}
if sc.Current() == nil {
- _, err = sc.create(sc.Format(sc.clock.Now()))
+ _, err = sc.create(sc.Format(sc.clock.Now()), true)
if err != nil {
return err
}
@@ -360,15 +375,23 @@ func (sc *segmentController) open() error {
return nil
}
-func (sc *segmentController) create(suffix string) (*segment, error) {
+func (sc *segmentController) create(suffix string, createBlockIfEmpty bool)
(*segment, error) {
+ sc.Lock()
+ defer sc.Unlock()
segPath, err := mkdir(segTemplate, sc.location, suffix)
if err != nil {
return nil, err
}
- return sc.load(suffix, segPath)
+ return sc.load(suffix, segPath, createBlockIfEmpty)
}
-func (sc *segmentController) load(suffix, path string) (seg *segment, err
error) {
+func (sc *segmentController) sortLst() {
+ sort.Slice(sc.lst, func(i, j int) bool {
+ return sc.lst[i].id < sc.lst[j].id
+ })
+}
+
+func (sc *segmentController) load(suffix, path string, createBlockIfEmpty
bool) (seg *segment, err error) {
startTime, err := sc.Parse(suffix)
if err != nil {
return nil, err
@@ -380,12 +403,39 @@ func (sc *segmentController) load(suffix, path string)
(seg *segment, err error)
if err != nil {
return nil, err
}
- sc.Lock()
- defer sc.Unlock()
sc.lst = append(sc.lst, seg)
+ sc.sortLst()
return seg, nil
}
+func (sc *segmentController) remove(deadline time.Time) (err error) {
+ sc.l.Info().Time("deadline", deadline).Msg("start to remove before
deadline")
+ for _, s := range sc.segments() {
+ if s.End.Before(deadline) ||
s.Contains(uint64(deadline.UnixNano())) {
+ err = multierr.Append(err,
s.blockController.remove(deadline))
+ if s.End.Before(deadline) {
+ sc.Lock()
+ if errDel := s.delete(); errDel != nil {
+ err = multierr.Append(err, errDel)
+ } else {
+ sc.removeSeg(s.id)
+ }
+ sc.Unlock()
+ }
+ }
+ }
+ return err
+}
+
+func (sc *segmentController) removeSeg(segID uint16) {
+ for i, b := range sc.lst {
+ if b.id == segID {
+ sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
+ break
+ }
+ }
+}
+
func (sc *segmentController) close() {
for _, s := range sc.lst {
s.close()
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 2c928c6..350085a 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -65,6 +65,10 @@ var _ = Describe("Shard", func() {
Unit: tsdb.HOUR,
Num: 12,
},
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 7,
+ },
2,
)
Expect(err).NotTo(HaveOccurred())
@@ -402,5 +406,139 @@ var _ = Describe("Shard", func() {
},
}))
})
+ It("retention", func() {
+ var err error
+ shard, err =
tsdb.OpenShard(timestamp.SetClock(context.Background(), clock),
common.ShardID(0), tmp,
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 1,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.HOUR,
+ Num: 12,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 1,
+ },
+ 10,
+ )
+ Expect(err).NotTo(HaveOccurred())
+ By("open 4 blocks")
+ t1 := clock.Now()
+ By("01/01 00:00 1st block is opened")
+ clock.Add(2 * time.Minute)
+ By("01/01 00:05 retention")
+ clock.Add(3 * time.Minute)
+ By("01/01 10:00 2nd block is opened")
+ clock.Add(9*time.Hour + 55*time.Minute)
+ t2 := clock.Now().Add(2 * time.Hour)
+ By("01/01 13:00 moves to the 2nd block")
+ clock.Add(3 * time.Hour)
+ By("01/01 22:00 3rd block is opened")
+ clock.Add(9 * time.Hour)
+ t3 := clock.Now().Add(2 * time.Hour)
+ By("01/02 00:02 moves to 3rd block")
+ clock.Add(2*time.Hour + 2*time.Minute)
+ By("01/02 00:05 retention")
+ clock.Add(3 * time.Minute)
+ By("01/02 10:00 4th block is opened")
+ clock.Add(9*time.Hour + 55*time.Minute)
+ t4 := clock.Now().Add(2 * time.Hour)
+
+ Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/02 13:00 has
been triggered")
+ }
+ return shard.State().Blocks
+ },
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t1, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ }))
+ By("01/02 13:00 moves to 4th block")
+ clock.Add(3 * time.Hour)
+ By("01/02 22:00 5th block is opened")
+ clock.Add(9 * time.Hour)
+ t5 := clock.Now().Add(2 * time.Hour)
+ By("01/03 00:02 move to 5th block")
+ clock.Add(2*time.Hour + 2*time.Minute)
+ By("01/03 00:05 retention: remove segment and blocks on
01/01")
+ clock.Add(3 * time.Minute)
+ clock.Add(9*time.Hour + 55*time.Minute)
+ t6 := clock.Now().Add(2 * time.Hour)
+ Eventually(func() []tsdb.BlockState {
+ if clock.TriggerTimer() {
+ GinkgoWriter.Println("01/03 01:00 has
been triggered")
+ }
+ return shard.State().Blocks
+ },
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t4, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t5, 12*time.Hour, true, false),
+ },
+ {
+ ID: tsdb.BlockID{
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700103),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ TimeRange:
timestamp.NewTimeRangeDuration(t6, 12*time.Hour, true, false),
+ },
+ }))
+ Eventually(func() []tsdb.BlockID {
+ return shard.State().OpenBlocks
+ }).Should(Equal([]tsdb.BlockID{
+ {
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
+ },
+ {
+ SegID:
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+ BlockID:
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+ },
+ }))
+ })
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 3828e86..dc6b62a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -89,6 +89,7 @@ type DatabaseOpts struct {
EncodingMethod EncodingMethod
SegmentInterval IntervalRule
BlockInterval IntervalRule
+ TTL IntervalRule
BlockMemSize int64
SeriesMemSize int64
EnableGlobalIndex bool
@@ -126,6 +127,7 @@ type database struct {
shardNum uint32
segmentSize IntervalRule
blockSize IntervalRule
+ ttl IntervalRule
sLst []Shard
sync.Mutex
@@ -160,23 +162,25 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts)
(Database, error) {
if _, err := mkdir(opts.Location); err != nil {
return nil, err
}
- segmentSize := opts.SegmentInterval
- if segmentSize.Num == 0 {
+ if opts.SegmentInterval.Num == 0 {
return nil, errors.Wrap(ErrOpenDatabase, "segment interval is
absent")
}
- blockSize := opts.BlockInterval
- if blockSize.Num == 0 {
+ if opts.BlockInterval.Num == 0 {
return nil, errors.Wrap(ErrOpenDatabase, "block interval is
absent")
}
- if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() {
+ if opts.BlockInterval.EstimatedDuration() >
opts.SegmentInterval.EstimatedDuration() {
return nil, errors.Wrapf(ErrOpenDatabase, "the block size is
bigger than the segment size")
}
+ if opts.TTL.Num == 0 {
+ return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
+ }
db := &database{
location: opts.Location,
shardNum: opts.ShardNum,
logger: logger.Fetch(ctx, "tsdb"),
- segmentSize: segmentSize,
- blockSize: blockSize,
+ segmentSize: opts.SegmentInterval,
+ blockSize: opts.BlockInterval,
+ ttl: opts.TTL,
}
db.logger.Info().Str("path", opts.Location).Msg("initialized")
var entries []os.DirEntry
@@ -203,7 +207,7 @@ func createDatabase(ctx context.Context, db *database,
startID int) (Database, e
for i := startID; i < int(db.shardNum); i++ {
db.logger.Info().Int("shard_id", i).Msg("creating a shard")
so, errNewShard := OpenShard(ctx, common.ShardID(i),
- db.location, db.segmentSize, db.blockSize,
defaultBlockQueueSize)
+ db.location, db.segmentSize, db.blockSize, db.ttl,
defaultBlockQueueSize)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -233,6 +237,7 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
db.location,
db.segmentSize,
db.blockSize,
+ db.ttl,
defaultBlockQueueSize,
)
if errOpenShard != nil {
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index b3f9ede..fe6c9a6 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -81,6 +81,7 @@ func openDatabase(t *require.Assertions, path string) (db
Database) {
},
BlockInterval: IntervalRule{Num: 2},
SegmentInterval: IntervalRule{Num: 1, Unit: DAY},
+ TTL: IntervalRule{Num: 7, Unit: DAY},
})
t.NoError(err)
t.NotNil(db)
diff --git a/dist/LICENSE b/dist/LICENSE
index 0143495..326ad8b 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -307,6 +307,7 @@ MIT licenses
github.com/onsi/ginkgo/v2 v2.1.4 MIT
github.com/onsi/gomega v1.20.0 MIT
github.com/pelletier/go-toml/v2 v2.0.1 MIT
+ github.com/robfig/cron/v3 v3.0.1 MIT
github.com/rs/zerolog v1.26.1 MIT
github.com/sirupsen/logrus v1.7.0 MIT
github.com/spf13/cast v1.5.0 MIT
diff --git a/dist/licenses/license-github.com-robfig-cron-v3.txt
b/dist/licenses/license-github.com-robfig-cron-v3.txt
new file mode 100644
index 0000000..3a0f627
--- /dev/null
+++ b/dist/licenses/license-github.com-robfig-cron-v3.txt
@@ -0,0 +1,21 @@
+Copyright (C) 2012 Rob Figueiredo
+All Rights Reserved.
+
+MIT LICENSE
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/go.mod b/go.mod
index 367d4a1..2abaffb 100644
--- a/go.mod
+++ b/go.mod
@@ -89,6 +89,7 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.32.1 // indirect
github.com/prometheus/procfs v0.7.3 // indirect
+ github.com/robfig/cron/v3 v3.0.1
github.com/sirupsen/logrus v1.7.0 // indirect
github.com/soheilhy/cmux v0.1.5 // indirect
github.com/spf13/afero v1.8.2 // indirect
diff --git a/go.sum b/go.sum
index d4f717b..044c43c 100644
--- a/go.sum
+++ b/go.sum
@@ -431,6 +431,8 @@ github.com/prometheus/procfs v0.6.0/go.mod
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1
github.com/prometheus/procfs v0.7.3
h1:4jVXhlkAyzOScmCkXBTOLRLTz8EeU+eyjrwB/EPq0VU=
github.com/prometheus/procfs v0.7.3/go.mod
h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod
h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
+github.com/robfig/cron/v3 v3.0.1
h1:WdRxkvbJztn8LMz/QEvLN5sBU+xKpSqwwUO1Pjr4qDs=
+github.com/robfig/cron/v3 v3.0.1/go.mod
h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzGIFLtro=
github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod
h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg=
github.com/rogpeppe/fastuuid v1.2.0/go.mod
h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod
h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
diff --git a/test/integration/cold_query/query_suite_test.go
b/test/integration/cold_query/query_suite_test.go
new file mode 100644
index 0000000..6f9ed17
--- /dev/null
+++ b/test/integration/cold_query/query_suite_test.go
@@ -0,0 +1,92 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_cold_query_test
+
+import (
+ "testing"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+ cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
+ cases_measure_data
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+ grpclib "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+)
+
+func TestIntegrationColdQuery(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "Integration Query Cold Data Suite")
+}
+
+var (
+ connection *grpclib.ClientConn
+ now time.Time
+ deferFunc func()
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+ Expect(logger.Init(logger.Logging{
+ Env: "dev",
+ Level: "warn",
+ })).To(Succeed())
+ var addr string
+ addr, deferFunc = setup.SetUp()
+ conn, err := grpclib.Dial(
+ addr,
+ grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ )
+ Expect(err).NotTo(HaveOccurred())
+ now = timestamp.NowMilli().Add(-time.Hour * 24)
+ interval := 500 * time.Millisecond
+ cases_stream.Write(conn, "data.json", now, interval)
+ cases_measure_data.Write(conn, "service_traffic", "sw_metric",
"service_traffic_data.json", now, interval)
+ cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
+ cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
+ Expect(conn.Close()).To(Succeed())
+ return []byte(addr)
+}, func(address []byte) {
+ var err error
+ connection, err = grpclib.Dial(
+ string(address),
+ grpclib.WithTransportCredentials(insecure.NewCredentials()),
+ grpclib.WithBlock(),
+ )
+ cases_stream.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: now,
+ }
+ cases_measure.SharedContext = helpers.SharedContext{
+ Connection: connection,
+ BaseTime: now,
+ }
+ Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+ if connection != nil {
+ Expect(connection.Close()).To(Succeed())
+ }
+}, func() {
+ deferFunc()
+})