This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch tsdb-block in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e4e5781231795386503ec8ffbd1d523ea5643905 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Jan 27 04:08:00 2022 +0000 Add the block manager Signed-off-by: Gao Hongtao <[email protected]> --- banyand/measure/query_test.go | 3 +- banyand/measure/write.go | 3 +- banyand/stream/stream_query_test.go | 23 +-- banyand/stream/stream_write.go | 3 +- banyand/tsdb/block.go | 45 +++-- banyand/tsdb/bucket/bucket.go | 52 ++++++ banyand/tsdb/bucket/strategy_test.go | 3 + banyand/tsdb/indexdb.go | 3 +- banyand/tsdb/segment.go | 280 ++++++++++++++++++++++-------- banyand/tsdb/series.go | 67 +------ banyand/tsdb/series_test.go | 12 +- banyand/tsdb/seriesdb.go | 13 +- banyand/tsdb/shard.go | 121 +++++++------ banyand/tsdb/shard_test.go | 41 +++-- banyand/tsdb/tsdb.go | 70 +++++--- banyand/tsdb/tsdb_test.go | 2 +- pkg/query/logical/common.go | 3 +- pkg/query/logical/plan_indexscan_local.go | 5 +- pkg/timestamp/range.go | 80 +++++++++ 19 files changed, 551 insertions(+), 278 deletions(-) diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 60b8bf2..c26b7f8 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -26,6 +26,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) func Test_ParseTag_And_ParseField(t *testing.T) { @@ -37,7 +38,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) { r.NoError(err) series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")}) r.NoError(err) - seriesSpan, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour)) + seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour)) defer func(seriesSpan tsdb.SeriesSpan) { _ = seriesSpan.Close() }(seriesSpan) diff --git a/banyand/measure/write.go b/banyand/measure/write.go index b0f9854..d26cd74 100644 --- a/banyand/measure/write.go +++ b/banyand/measure/write.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -78,7 +79,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea return err } t := value.GetTimestamp().AsTime() - wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0)) + wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0)) if err != nil { if wp != nil { _ = wp.Close() diff --git a/banyand/stream/stream_query_test.go b/banyand/stream/stream_query_test.go index 2533ef4..f63346d 100644 --- a/banyand/stream/stream_query_test.go +++ b/banyand/stream/stream_query_test.go @@ -42,6 +42,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type shardStruct struct { @@ -106,7 +107,7 @@ func Test_Stream_Series(t *testing.T) { name: "all", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), }, want: shardsForTest{ { @@ -136,7 +137,7 @@ func Test_Stream_Series(t *testing.T) { name: "time range", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 1*time.Hour), }, want: shardsForTest{ { @@ -163,7 +164,7 @@ func Test_Stream_Series(t *testing.T) { name: "find series by service_id and instance_id", args: queryOpts{ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), }, want: shardsForTest{ { @@ -182,7 +183,7 @@ func Test_Stream_Series(t *testing.T) { name: "find a series", args: queryOpts{ entity: tsdb.Entity{tsdb.Entry("webapp_id"), tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), }, want: shardsForTest{ { @@ -196,7 +197,7 @@ func Test_Stream_Series(t *testing.T) { name: "filter", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { builder.Filter(&databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -242,7 +243,7 @@ func Test_Stream_Series(t *testing.T) { name: "order by duration", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { builder.OrderByIndex(&databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -283,7 +284,7 @@ func Test_Stream_Series(t *testing.T) { name: "filter by duration", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { rule := &databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -330,7 +331,7 @@ func Test_Stream_Series(t *testing.T) { name: "filter and sort by duration", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { rule := &databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -378,7 +379,7 @@ func Test_Stream_Series(t *testing.T) { name: "filter by several conditions", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { rule := &databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -441,7 +442,7 @@ func Test_Stream_Series(t *testing.T) { name: "filter by several conditions, sort by duration", args: queryOpts{ entity: tsdb.Entity{tsdb.AnyEntry, tsdb.AnyEntry, tsdb.AnyEntry}, - timeRange: tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), + timeRange: timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour), buildFn: func(builder tsdb.SeekerBuilder) { rule := &databasev1.IndexRule{ Metadata: &commonv1.Metadata{ @@ -628,7 +629,7 @@ func Test_Stream_Global_Index(t *testing.T) { type queryOpts struct { entity tsdb.Entity - timeRange tsdb.TimeRange + timeRange timestamp.TimeRange buildFn func(builder tsdb.SeekerBuilder) } diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 7fef502..21e9561 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/bus" "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -71,7 +72,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre return err } t := value.GetTimestamp().AsTime() - wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0)) + wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0)) if err != nil { if wp != nil { _ = wp.Close() diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index c5a2b98..a33fccd 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -20,6 +20,7 @@ package tsdb import ( "context" "io" + "strconv" "time" "github.com/dgraph-io/ristretto/z" @@ -27,43 +28,55 @@ import ( "github.com/apache/skywalking-banyandb/api/common" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/banyand/kv" + "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/index/inverted" "github.com/apache/skywalking-banyandb/pkg/index/lsm" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type block struct { - path string - l *logger.Logger - ref *z.Closer + path string + l *logger.Logger + suffix string + ref *z.Closer store kv.TimeSeriesStore primaryIndex index.Store invertedIndex index.Store lsmIndex index.Store closableLst []io.Closer - endTime time.Time - startTime time.Time - segID uint16 - blockID uint16 + timestamp.TimeRange + bucket.Reporter + segID uint16 + blockID uint16 } type blockOpts struct { - segID uint16 - blockID uint16 - path string + segID uint16 + blockSize IntervalRule + startTime time.Time + suffix string + path string } func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { + suffixInteger, err := strconv.Atoi(opts.suffix) + if err != nil { + return nil, err + } + id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4) + timeRange := timestamp.NewTimeRange(opts.startTime, opts.blockSize.NextTime(opts.startTime), true, false) b = &block{ segID: opts.segID, - blockID: opts.blockID, + blockID: id, path: opts.path, ref: z.NewCloser(1), - startTime: time.Now(), l: logger.Fetch(ctx, "block"), + TimeRange: timeRange, + Reporter: bucket.NewTimeBasedReporter(timeRange), } encodingMethodObject := ctx.Value(encodingMethodKey) if encodingMethodObject == nil { @@ -169,7 +182,7 @@ func (d *bDelegate) primaryIndexReader() index.Searcher { } func (d *bDelegate) startTime() time.Time { - return d.delegate.startTime + return d.delegate.Start } func (d *bDelegate) identity() (segID uint16, blockID uint16) { @@ -199,11 +212,7 @@ func (d *bDelegate) writeInvertedIndex(field index.Field, id common.ItemID) erro } func (d *bDelegate) contains(ts time.Time) bool { - greaterAndEqualStart := d.delegate.startTime.Equal(ts) || d.delegate.startTime.Before(ts) - if d.delegate.endTime.IsZero() { - return greaterAndEqualStart - } - return greaterAndEqualStart && d.delegate.endTime.After(ts) + return d.delegate.Contains(uint64(ts.UnixNano())) } func (d *bDelegate) Close() error { diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go index 82a76ec..406093c 100644 --- a/banyand/tsdb/bucket/bucket.go +++ b/banyand/tsdb/bucket/bucket.go @@ -17,6 +17,12 @@ package bucket +import ( + "time" + + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + type Controller interface { Current() Reporter Next() (Reporter, error) @@ -31,4 +37,50 @@ type Channel chan Status type Reporter interface { Report() Channel + Stop() +} + +type timeBasedReporter struct { + timestamp.TimeRange + reporterStopCh chan struct{} +} + +func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter { + return &timeBasedReporter{ + TimeRange: timeRange, + reporterStopCh: make(chan struct{}), + } +} + +func (tr *timeBasedReporter) Report() Channel { + ch := make(Channel) + interval := tr.Duration() >> 4 + if interval < 100*time.Millisecond { + interval = 100 * time.Millisecond + } + go func() { + defer close(ch) + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + status := Status{ + Capacity: int(tr.End.UnixNano() - tr.Start.UnixNano()), + Volume: int(time.Now().UnixNano() - tr.Start.UnixNano()), + } + ch <- status + if status.Volume >= status.Capacity { + return + } + case <-tr.reporterStopCh: + return + } + } + }() + return ch +} + +func (tr *timeBasedReporter) Stop() { + close(tr.reporterStopCh) } diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go index 3a302ce..d225d23 100644 --- a/banyand/tsdb/bucket/strategy_test.go +++ b/banyand/tsdb/bucket/strategy_test.go @@ -139,3 +139,6 @@ func (r *reporter) Report() bucket.Channel { }() return ch } + +func (r *reporter) Stop() { +} diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go index 741ba85..0c77c51 100644 --- a/banyand/tsdb/indexdb.go +++ b/banyand/tsdb/indexdb.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/index" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type IndexDatabase interface { @@ -99,7 +100,7 @@ type indexWriterBuilder struct { func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder { i.ts = ts - segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false)) + segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, false)) if len(segs) != 1 { return i } diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index e82767a..580536c 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -19,6 +19,7 @@ package tsdb import ( "context" + "errors" "strconv" "sync" "time" @@ -26,68 +27,46 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +var ErrEndOfSegment = errors.New("reached the end of the segment") + type segment struct { id uint16 path string suffix string - lst []*block globalIndex kv.Store - sync.Mutex - l *logger.Logger - reporterStopCh chan struct{} - TimeRange -} - -func (s *segment) Report() bucket.Channel { - ch := make(bucket.Channel) - interval := s.Duration() >> 4 - if interval < 100*time.Millisecond { - interval = 100 * time.Millisecond - } - go func() { - defer close(ch) - ticker := time.NewTicker(interval) - defer ticker.Stop() - for { - select { - case <-ticker.C: - status := bucket.Status{ - Capacity: int(s.End.UnixNano() - s.Start.UnixNano()), - Volume: int(time.Now().UnixNano() - s.Start.UnixNano()), - } - ch <- status - if status.Volume >= status.Capacity { - return - } - case <-s.reporterStopCh: - return - } - } - }() - return ch + l *logger.Logger + timestamp.TimeRange + bucket.Reporter + blockController *blockController + blockManageStrategy *bucket.Strategy } -func openSegment(ctx context.Context, suffix, path string, intervalRule IntervalRule) (s *segment, err error) { - startTime, err := intervalRule.Unit.Parse(suffix) - if err != nil { - return nil, err - } +func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule) (s *segment, err error) { suffixInteger, err := strconv.Atoi(suffix) if err != nil { return nil, err } - id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4) + // TODO: fix id overflow + id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4) + timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false) s = &segment{ - id: id, - path: path, - suffix: suffix, - l: logger.Fetch(ctx, "segment"), - reporterStopCh: make(chan struct{}), - TimeRange: NewTimeRange(startTime, intervalRule.NextTime(startTime), true, false), + id: id, + path: path, + suffix: suffix, + l: logger.Fetch(ctx, "segment"), + blockController: newBlockController(id, path, timeRange, blockSize), + TimeRange: timeRange, + Reporter: bucket.NewTimeBasedReporter(timeRange), } + err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, s.l)) + if err != nil { + return nil, err + } + indexPath, err := mkdir(globalIndexTemplate, path) if err != nil { return nil, err @@ -95,46 +74,193 @@ func openSegment(ctx context.Context, suffix, path string, intervalRule Interval if s.globalIndex, err = kv.OpenStore(0, indexPath, kv.StoreWithLogger(s.l)); err != nil { return nil, err } - loadBlock := func(path string) error { - var b *block - if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, s.l), blockOpts{ - segID: s.id, - path: path, - }); err != nil { - return err + s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, bucket.WithLogger(s.l)) + if err != nil { + return nil, err + } + s.blockManageStrategy.Run() + return s, nil +} + +func (s *segment) close() { + s.blockManageStrategy.Close() + s.blockController.close() + s.globalIndex.Close() + s.Stop() +} + +type blockController struct { + sync.RWMutex + segID uint16 + location string + segTimeRange timestamp.TimeRange + blockSize IntervalRule + lst []*block +} + +func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange, blockSize IntervalRule) *blockController { + return &blockController{ + segID: segID, + location: location, + blockSize: blockSize, + segTimeRange: segTimeRange, + } +} + +func (bc *blockController) Current() bucket.Reporter { + bc.RLock() + defer bc.RUnlock() + now := time.Now() + for _, s := range bc.lst { + if s.suffix == bc.Format(now) { + return s + } + } + // return the latest segment before now + if len(bc.lst) > 0 { + return bc.lst[len(bc.lst)-1] + } + return nil +} + +func (bc *blockController) Next() (bucket.Reporter, error) { + b := bc.Current().(*block) + reporter, err := bc.create(context.TODO(), + bc.blockSize.NextTime(b.Start)) + if errors.Is(err, ErrEndOfSegment) { + return nil, bucket.ErrNoMoreBucket + } + return reporter, err +} + +func (bc *blockController) Format(tm time.Time) string { + switch bc.blockSize.Unit { + case HOUR: + return tm.Format(blockHourFormat) + case DAY: + return tm.Format(blockDayFormat) + case MILLISECOND: + return tm.Format(millisecondFormat) + } + panic("invalid interval unit") +} + +func (bc *blockController) Parse(value string) (time.Time, error) { + switch bc.blockSize.Unit { + case HOUR: + return time.Parse(blockHourFormat, value) + case DAY: + return time.Parse(blockDayFormat, value) + case MILLISECOND: + return time.Parse(millisecondFormat, value) + } + panic("invalid interval unit") +} + +func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) { + bc.RLock() + defer bc.RUnlock() + last := len(bc.lst) - 1 + for i := range bc.lst { + b := bc.lst[last-i] + if b.Overlapping(timeRange) { + bb = append(bb, b) } - { - s.Lock() - defer s.Unlock() - s.lst = append(s.lst, b) + } + return bb +} + +func (bc *blockController) get(blockID uint16) *block { + bc.RLock() + defer bc.RUnlock() + last := len(bc.lst) - 1 + for i := range bc.lst { + b := bc.lst[last-i] + if b.blockID == blockID { + return b } - return nil } - err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error { - return loadBlock(absolutePath) - }) + return nil +} + +func (bc *blockController) startTime(suffix string) (time.Time, error) { + t, err := bc.Parse(suffix) if err != nil { - return nil, err + return time.Time{}, err } - if len(s.lst) < 1 { - blockPath, err := mkdir(blockTemplate, path, time.Now().Format(blockFormat)) - if err != nil { - return nil, err - } - err = loadBlock(blockPath) + startTime := bc.segTimeRange.Start + switch bc.blockSize.Unit { + case HOUR: + return time.Date(startTime.Year(), startTime.Month(), + startTime.Day(), t.Hour(), 0, 0, 0, time.UTC), nil + case DAY: + return time.Date(startTime.Year(), startTime.Month(), + t.Day(), t.Hour(), 0, 0, 0, time.UTC), nil + case MILLISECOND: + return time.Parse(millisecondFormat, suffix) + } + panic("invalid interval unit") +} + +func (bc *blockController) open(ctx context.Context) error { + err := WalkDir( + bc.location, + segPathPrefix, + func(suffix, absolutePath string) error { + _, err := bc.load(ctx, suffix, absolutePath) + return err + }) + if err != nil { + return err + } + if bc.Current() == nil { + _, err = bc.create(ctx, time.Now()) if err != nil { - return nil, err + return err } } - return s, nil + return nil } -func (s *segment) close() { - s.Lock() - defer s.Unlock() - for _, b := range s.lst { - b.close() +func (bc *blockController) create(ctx context.Context, startTime time.Time) (*block, error) { + if startTime.Before(bc.segTimeRange.Start) { + startTime = bc.segTimeRange.Start + } + if !startTime.Before(bc.segTimeRange.End) { + return nil, ErrEndOfSegment + } + suffix := bc.Format(startTime) + segPath, err := mkdir(blockTemplate, bc.location, suffix) + if err != nil { + return nil, err + } + return bc.load(ctx, suffix, segPath) +} + +func (bc *blockController) load(ctx context.Context, suffix, path string) (b *block, err error) { + starTime, err := bc.startTime(suffix) + if err != nil { + return nil, err + } + if b, err = newBlock(ctx, blockOpts{ + segID: bc.segID, + path: path, + startTime: starTime, + suffix: suffix, + blockSize: bc.blockSize, + }); err != nil { + return nil, err + } + { + bc.Lock() + defer bc.Unlock() + bc.lst = append(bc.lst, b) + } + return b, nil +} + +func (bc *blockController) close() { + for _, s := range bc.lst { + s.close() } - s.globalIndex.Close() - close(s.reporterStopCh) } diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index 5cc9805..bed9555 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -29,6 +29,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var ( @@ -72,67 +73,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error { return nil } -type TimeRange struct { - Start time.Time - End time.Time - IncludeStart bool - IncludeEnd bool -} - -func (t TimeRange) Contains(unixNano uint64) bool { - tp := time.Unix(0, int64(unixNano)) - if t.Start.Equal(tp) { - return t.IncludeStart - } - if t.End.Equal(tp) { - return t.IncludeEnd - } - return !tp.Before(t.Start) && !tp.After(t.End) -} - -func (t TimeRange) Overlapping(other TimeRange) bool { - if t.Start.Equal(other.End) { - return t.IncludeStart && other.IncludeEnd - } - if other.Start.Equal(t.End) { - return t.IncludeEnd && other.IncludeStart - } - return !t.Start.After(other.End) && !other.Start.After(t.End) -} - -func (t TimeRange) Duration() time.Duration { - return t.End.Sub(t.Start) -} - -func NewInclusiveTimeRange(start, end time.Time) TimeRange { - return TimeRange{ - Start: start, - End: end, - IncludeStart: true, - IncludeEnd: true, - } -} - -func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) TimeRange { - return NewTimeRangeDuration(start, duration, true, true) -} - -func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange { - return TimeRange{ - Start: start, - End: end, - IncludeStart: includeStart, - IncludeEnd: includeEnd, - } -} - -func NewTimeRangeDuration(start time.Time, duration time.Duration, includeStart, includeEnd bool) TimeRange { - return NewTimeRange(start, start.Add(duration), includeStart, includeEnd) -} - type Series interface { ID() common.SeriesID - Span(timeRange TimeRange) (SeriesSpan, error) + Span(timeRange timestamp.TimeRange) (SeriesSpan, error) Get(id GlobalItemID) (Item, io.Closer, error) } @@ -167,7 +110,7 @@ func (s *series) ID() common.SeriesID { return s.id } -func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) { +func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) { blocks := s.blockDB.span(timeRange) if len(blocks) < 1 { return nil, ErrEmptySeriesSpan @@ -199,7 +142,7 @@ type seriesSpan struct { blocks []blockDelegate seriesID common.SeriesID shardID common.ShardID - timeRange TimeRange + timeRange timestamp.TimeRange l *logger.Logger } @@ -218,7 +161,7 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder { return newSeekerBuilder(s) } -func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan { +func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks []blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan { s := &seriesSpan{ blocks: blocks, seriesID: id, diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go index 25a7df6..6d2b940 100644 --- a/banyand/tsdb/series_test.go +++ b/banyand/tsdb/series_test.go @@ -24,7 +24,7 @@ import ( . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" - "github.com/apache/skywalking-banyandb/banyand/tsdb" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var _ = Describe("Series", func() { @@ -34,7 +34,7 @@ var _ = Describe("Series", func() { startTime, _ := time.Parse("20060202", start) endTime, _ := time.Parse("20060202", end) tsTime, _ := time.Parse("20060202", ts) - Expect(tsdb.NewTimeRange(startTime, endTime, includeStart, includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected)) + Expect(timestamp.NewTimeRange(startTime, endTime, includeStart, includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected)) } DescribeTable("It's a exclusive range", func(start, end, ts string, expected bool) { @@ -90,8 +90,8 @@ var _ = Describe("Series", func() { for _, r2l := range includes { for _, r2u := range includes { By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l, r2u), func() { - r1 := tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u) - r2 := tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u) + r1 := timestamp.NewTimeRange(startTime1, endTime1, r1l, r1u) + r2 := timestamp.NewTimeRange(startTime2, endTime2, r2l, r2u) Expect(r1.Overlapping(r2)).To(Equal(expected)) Expect(r2.Overlapping(r1)).To(Equal(expected)) }) @@ -114,8 +114,8 @@ var _ = Describe("Series", func() { endTime1, _ := time.Parse("20060102", "20210107") startTime2, _ := time.Parse("20060102", "20210107") endTime2, _ := time.Parse("20060102", "20210109") - r1 := tsdb.NewTimeRange(startTime1, endTime1, false, include1) - r2 := tsdb.NewTimeRange(startTime2, endTime2, include2, false) + r1 := timestamp.NewTimeRange(startTime1, endTime1, false, include1) + r2 := timestamp.NewTimeRange(startTime2, endTime2, include2, false) Expect(r1.Overlapping(r2)).To(Equal(expected)) } diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index de269e6..d082a08 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var maxIntBytes = convert.Uint64ToBytes(math.MaxUint64) @@ -95,7 +96,7 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID - span(timeRange TimeRange) []blockDelegate + span(timeRange timestamp.TimeRange) []blockDelegate block(id GlobalItemID) blockDelegate } @@ -138,7 +139,11 @@ func (s *seriesDB) block(id GlobalItemID) blockDelegate { if seg == nil { return nil } - return seg.lst[id.blockID].delegate() + block := seg.blockController.get(id.blockID) + if block == nil { + return nil + } + return block.delegate() } func (s *seriesDB) shardID() common.ShardID { @@ -195,11 +200,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) { return result, err } -func (s *seriesDB) span(timeRange TimeRange) []blockDelegate { +func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate { //TODO: return correct blocks result := make([]blockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { - for _, b := range s.lst { + for _, b := range s.blockController.span(timeRange) { result = append(result, b.delegate()) } } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 7eaf50f..0f70e53 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -28,6 +28,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var _ Shard = (*shard)(nil) @@ -54,7 +55,7 @@ func (s *shard) Index() IndexDatabase { return s.indexDatabase } -func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule IntervalRule) (Shard, error) { +func OpenShard(ctx context.Context, id common.ShardID, root string, segmentSize, blockSize IntervalRule) (Shard, error) { path, err := mkdir(shardTemplate, root, int(id)) if err != nil { return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id)) @@ -63,7 +64,7 @@ func OpenShard(ctx context.Context, id common.ShardID, root string, intervalRule l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard") s := &shard{ id: id, - segmentController: newSegmentController(path, intervalRule), + segmentController: newSegmentController(path, segmentSize, blockSize), l: l, } shardCtx := context.WithValue(ctx, logger.ContextKey, s.l) @@ -102,20 +103,17 @@ func (s *shard) Close() error { type IntervalUnit int const ( - DAY IntervalUnit = iota - MONTH - YEAR - MILLISECOND // only for testing + MILLISECOND IntervalUnit = iota // only for testing + HOUR + DAY ) func (iu IntervalUnit) String() string { switch iu { + case HOUR: + return "hour" case DAY: return "day" - case MONTH: - return "month" - case YEAR: - return "year" case MILLISECOND: return "millis" @@ -123,34 +121,6 @@ func (iu IntervalUnit) String() string { panic("invalid interval unit") } -func (iu IntervalUnit) Format(tm time.Time) string { - switch iu { - case DAY: - return tm.Format(segDayFormat) - case MONTH: - return tm.Format(segMonthFormat) - case YEAR: - return tm.Format(segYearFormat) - case MILLISECOND: - return tm.Format(segMillisecondFormat) - } - panic("invalid interval unit") -} - -func (iu IntervalUnit) Parse(value string) (time.Time, error) { - switch iu { - case DAY: - return time.Parse(segDayFormat, value) - case MONTH: - return time.Parse(segMonthFormat, value) - case YEAR: - return time.Parse(segYearFormat, value) - case MILLISECOND: - return time.Parse(segMillisecondFormat, value) - } - panic("invalid interval unit") -} - type IntervalRule struct { Unit IntervalUnit Num int @@ -158,29 +128,41 @@ type IntervalRule struct { func (ir IntervalRule) NextTime(current time.Time) time.Time { switch ir.Unit { + case HOUR: + return current.Add(time.Hour * time.Duration(ir.Num)) case DAY: return current.AddDate(0, 0, ir.Num) - case MONTH: - return current.AddDate(0, ir.Num, 0) - case YEAR: - return current.AddDate(ir.Num, 0, 0) case MILLISECOND: return current.Add(time.Millisecond * time.Duration(ir.Num)) } panic("invalid interval unit") } +func (ir IntervalRule) EstimatedDuration() time.Duration { + switch ir.Unit { + case HOUR: + return time.Hour * time.Duration(ir.Num) + case DAY: + return 24 * time.Hour * time.Duration(ir.Num) + case MILLISECOND: + return time.Microsecond * time.Duration(ir.Num) + } + panic("invalid interval unit") +} + type segmentController struct { sync.RWMutex - location string - intervalRule IntervalRule - lst []*segment + location string + segmentSize IntervalRule + blockSize IntervalRule + lst []*segment } -func newSegmentController(location string, intervalRule IntervalRule) *segmentController { +func newSegmentController(location string, segmentSize, blockSize IntervalRule) *segmentController { return &segmentController{ - location: location, - intervalRule: intervalRule, + location: location, + segmentSize: segmentSize, + blockSize: blockSize, } } @@ -197,7 +179,7 @@ func (sc *segmentController) get(segID uint16) *segment { return nil } -func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) { +func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss []*segment) { sc.RLock() defer sc.RUnlock() last := len(sc.lst) - 1 @@ -223,7 +205,7 @@ func (sc *segmentController) Current() bucket.Reporter { defer sc.RUnlock() now := time.Now() for _, s := range sc.lst { - if s.suffix == sc.intervalRule.Unit.Format(now) { + if s.suffix == sc.Format(now) { return s } } @@ -235,12 +217,37 @@ func (sc *segmentController) Current() bucket.Reporter { } func (sc *segmentController) Next() (bucket.Reporter, error) { - return sc.create(context.TODO(), sc.intervalRule.Unit.Format( - sc.intervalRule.NextTime(time.Now()))) + seg := sc.Current().(*segment) + return sc.create(context.TODO(), sc.Format( + sc.segmentSize.NextTime(seg.Start))) +} + +func (sc *segmentController) Format(tm time.Time) string { + switch sc.segmentSize.Unit { + case HOUR: + return tm.Format(segHourFormat) + case DAY: + return tm.Format(segDayFormat) + case MILLISECOND: + return tm.Format(millisecondFormat) + } + panic("invalid interval unit") +} + +func (sc *segmentController) Parse(value string) (time.Time, error) { + switch sc.segmentSize.Unit { + case HOUR: + return time.Parse(segHourFormat, value) + case DAY: + return time.Parse(segDayFormat, value) + case MILLISECOND: + return time.Parse(millisecondFormat, value) + } + panic("invalid interval unit") } func (sc *segmentController) open(ctx context.Context) error { - err := walkDir( + err := WalkDir( sc.location, segPathPrefix, func(suffix, absolutePath string) error { @@ -251,7 +258,7 @@ func (sc *segmentController) open(ctx context.Context) error { return err } if sc.Current() == nil { - _, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now())) + _, err = sc.create(ctx, sc.Format(time.Now())) if err != nil { return err } @@ -268,7 +275,11 @@ func (sc *segmentController) create(ctx context.Context, suffix string) (*segmen } func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg *segment, err error) { - seg, err = openSegment(ctx, suffix, path, sc.intervalRule) + startTime, err := sc.Parse(suffix) + if err != nil { + return nil, err + } + seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize) if err != nil { return nil, err } diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go index 5609f0a..96e78e3 100644 --- a/banyand/tsdb/shard_test.go +++ b/banyand/tsdb/shard_test.go @@ -19,8 +19,6 @@ package tsdb_test import ( "context" - "io/ioutil" - "strings" "time" . "github.com/onsi/ginkgo/v2" @@ -48,22 +46,41 @@ var _ = Describe("Shard", func() { }) It("generates several segments", func() { var err error - shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp, tsdb.IntervalRule{ - Unit: tsdb.MILLISECOND, - Num: 1000, - }) + shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp, + tsdb.IntervalRule{ + Unit: tsdb.MILLISECOND, + Num: 3000, + }, + tsdb.IntervalRule{ + Unit: tsdb.MILLISECOND, + Num: 1000, + }, + ) Expect(err).NotTo(HaveOccurred()) + segDirectories := make([]string, 3) Eventually(func() int { - files, err := ioutil.ReadDir(tmp + "/shard-0") - Expect(err).NotTo(HaveOccurred()) num := 0 - for _, fi := range files { - if fi.IsDir() && strings.HasPrefix(fi.Name(), "seg-") { - num++ + err := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error { + if num < 3 { + segDirectories[num] = absolutePath } - } + num++ + return nil + }) + Expect(err).NotTo(HaveOccurred()) return num }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 3)) + for _, d := range segDirectories { + Eventually(func() int { + num := 0 + err := tsdb.WalkDir(d, "block-", func(suffix, absolutePath string) error { + num++ + return nil + }) + Expect(err).NotTo(HaveOccurred()) + return num + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 3)) + } }) }) diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index a3aff1c..7659b53 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -49,18 +49,18 @@ const ( blockTemplate = rootPrefix + blockPathPrefix + "-%s" globalIndexTemplate = rootPrefix + "index" - segDayFormat = "20060102" - segMonthFormat = "200601" - segYearFormat = "2006" - segMillisecondFormat = "20060102150405000" - blockFormat = "1504" + segHourFormat = "2006010215" + segDayFormat = "20060102" + millisecondFormat = "20060102150405000" + blockHourFormat = "15" + blockDayFormat = "0102" dirPerm = 0700 ) var ( - ErrInvalidShardID = errors.New("invalid shard id") - ErrEncodingMethodAbsent = errors.New("encoding method is absent") + ErrInvalidShardID = errors.New("invalid shard id") + ErrOpenDatabase = errors.New("fails to open the database") indexRulesKey = contextIndexRulesKey{} encodingMethodKey = contextEncodingMethodKey{} @@ -89,6 +89,8 @@ type DatabaseOpts struct { ShardNum uint32 IndexRules []*databasev1.IndexRule EncodingMethod EncodingMethod + SegmentSize IntervalRule + BlockSize IntervalRule } type EncodingMethod struct { @@ -97,9 +99,11 @@ type EncodingMethod struct { } type database struct { - logger *logger.Logger - location string - shardNum uint32 + logger *logger.Logger + location string + shardNum uint32 + segmentSize IntervalRule + blockSize IntervalRule sLst []Shard sync.Mutex @@ -129,15 +133,34 @@ func (d *database) Close() error { func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) { if opts.EncodingMethod.EncoderPool == nil || opts.EncodingMethod.DecoderPool == nil { - return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to open database") + return nil, errors.Wrap(ErrOpenDatabase, "encoding method is absent") } if _, err := mkdir(opts.Location); err != nil { return nil, err } + segmentSize := opts.SegmentSize + if segmentSize.Unit == MILLISECOND { + segmentSize = IntervalRule{ + Unit: DAY, + Num: 1, + } + } + blockSize := opts.BlockSize + if blockSize.Unit == MILLISECOND { + blockSize = IntervalRule{ + Unit: HOUR, + Num: 2, + } + } + if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() { + return nil, errors.Wrapf(ErrOpenDatabase, "the block size is bigger than the segment size") + } db := &database{ - location: opts.Location, - shardNum: opts.ShardNum, - logger: logger.Fetch(ctx, "tsdb"), + location: opts.Location, + shardNum: opts.ShardNum, + logger: logger.Fetch(ctx, "tsdb"), + segmentSize: segmentSize, + blockSize: blockSize, } db.logger.Info().Str("path", opts.Location).Msg("initialized") var entries []fs.FileInfo @@ -164,10 +187,8 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e var err error for i := startID; i < int(db.shardNum); i++ { db.logger.Info().Int("shard_id", i).Msg("creating a shard") - so, errNewShard := OpenShard(ctx, common.ShardID(i), db.location, IntervalRule{ - Unit: DAY, - Num: 1, - }) + so, errNewShard := OpenShard(ctx, common.ShardID(i), + db.location, db.segmentSize, db.blockSize) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue @@ -182,7 +203,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { //TODO: open the manifest file db.Lock() defer db.Unlock() - err := walkDir(db.location, shardPathPrefix, func(suffix, _ string) error { + err := WalkDir(db.location, shardPathPrefix, func(suffix, _ string) error { shardID, err := strconv.Atoi(suffix) if err != nil { return err @@ -195,10 +216,9 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { context.WithValue(ctx, logger.ContextKey, db.logger), common.ShardID(shardID), db.location, - IntervalRule{ - Unit: DAY, - Num: 1, - }) + db.segmentSize, + db.blockSize, + ) if errOpenShard != nil { return errOpenShard } @@ -220,9 +240,9 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { return db, nil } -type walkFn func(suffix, absolutePath string) error +type WalkFn func(suffix, absolutePath string) error -func walkDir(root, prefix string, walkFn walkFn) error { +func WalkDir(root, prefix string, walkFn WalkFn) error { files, err := ioutil.ReadDir(root) if err != nil { return errors.Wrapf(err, "failed to walk the database path: %s", root) diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index 75c1cb8..0dca4c2 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -62,7 +62,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string) { now := time.Now() segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat)) validateDirectory(tester, segPath) - validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockFormat))) + validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, now.Format(blockHourFormat))) } func openDatabase(t *require.Assertions, path string) (db Database) { diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 95f3e3e..9a6b1fd 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -23,6 +23,7 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type ( @@ -70,7 +71,7 @@ func projectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe // with the help of Entity. The result is a list of element set, where the order of inner list is kept // as what the users specify in the seekerBuilder. // This method is used by the underlying tableScan and localIndexScan plans. -func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange, +func executeForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange, builders ...seekerBuilder) ([]tsdb.Iterator, error) { var itersInShard []tsdb.Iterator for _, seriesFound := range series { diff --git a/pkg/query/logical/plan_indexscan_local.go b/pkg/query/logical/plan_indexscan_local.go index a18e62e..4233b27 100644 --- a/pkg/query/logical/plan_indexscan_local.go +++ b/pkg/query/logical/plan_indexscan_local.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/index" "github.com/apache/skywalking-banyandb/pkg/query/executor" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) var _ UnresolvedPlan = (*unresolvedIndexScan)(nil) @@ -108,7 +109,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, error) { return &localIndexScan{ orderBy: orderBySubPlan, - timeRange: tsdb.NewInclusiveTimeRange(uis.startTime, uis.endTime), + timeRange: timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime), schema: s, projectionFieldRefs: projFieldsRefs, metadata: uis.metadata, @@ -121,7 +122,7 @@ var _ Plan = (*localIndexScan)(nil) type localIndexScan struct { *orderBy - timeRange tsdb.TimeRange + timeRange timestamp.TimeRange schema Schema metadata *commonv1.Metadata conditionMap map[*databasev1.IndexRule][]Expr diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go new file mode 100644 index 0000000..aae487b --- /dev/null +++ b/pkg/timestamp/range.go @@ -0,0 +1,80 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package timestamp + +import ( + "time" +) + +type TimeRange struct { + Start time.Time + End time.Time + IncludeStart bool + IncludeEnd bool +} + +func (t TimeRange) Contains(unixNano uint64) bool { + tp := time.Unix(0, int64(unixNano)) + if t.Start.Equal(tp) { + return t.IncludeStart + } + if t.End.Equal(tp) { + return t.IncludeEnd + } + return !tp.Before(t.Start) && !tp.After(t.End) +} + +func (t TimeRange) Overlapping(other TimeRange) bool { + if t.Start.Equal(other.End) { + return t.IncludeStart && other.IncludeEnd + } + if other.Start.Equal(t.End) { + return t.IncludeEnd && other.IncludeStart + } + return !t.Start.After(other.End) && !other.Start.After(t.End) +} + +func (t TimeRange) Duration() time.Duration { + return t.End.Sub(t.Start) +} + +func NewInclusiveTimeRange(start, end time.Time) TimeRange { + return TimeRange{ + Start: start, + End: end, + IncludeStart: true, + IncludeEnd: true, + } +} + +func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) TimeRange { + return NewTimeRangeDuration(start, duration, true, true) +} + +func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) TimeRange { + return TimeRange{ + Start: start, + End: end, + IncludeStart: includeStart, + IncludeEnd: includeEnd, + } +} + +func NewTimeRangeDuration(start time.Time, duration time.Duration, includeStart, includeEnd bool) TimeRange { + return NewTimeRange(start, start.Add(duration), includeStart, includeEnd) +}
