This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ff2ba01 Add the block manager (#73)
ff2ba01 is described below
commit ff2ba01ee2653d6e7de23ce20be2568401c52923
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jan 27 17:18:59 2022 +0800
Add the block manager (#73)
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/measure/query_test.go | 3 +-
banyand/measure/write.go | 3 +-
banyand/stream/stream_query_test.go | 23 +--
banyand/stream/stream_write.go | 3 +-
banyand/tsdb/block.go | 45 +++--
banyand/tsdb/bucket/bucket.go | 52 ++++++
banyand/tsdb/bucket/strategy_test.go | 3 +
banyand/tsdb/indexdb.go | 3 +-
banyand/tsdb/segment.go | 280 ++++++++++++++++++++++--------
banyand/tsdb/series.go | 67 +------
banyand/tsdb/series_test.go | 12 +-
banyand/tsdb/seriesdb.go | 13 +-
banyand/tsdb/shard.go | 121 +++++++------
banyand/tsdb/shard_test.go | 41 +++--
banyand/tsdb/tsdb.go | 70 +++++---
banyand/tsdb/tsdb_test.go | 2 +-
pkg/query/logical/common.go | 3 +-
pkg/query/logical/plan_indexscan_local.go | 5 +-
pkg/timestamp/range.go | 80 +++++++++
19 files changed, 551 insertions(+), 278 deletions(-)
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 60b8bf2..c26b7f8 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -26,6 +26,7 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
func Test_ParseTag_And_ParseField(t *testing.T) {
@@ -37,7 +38,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) {
r.NoError(err)
series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
r.NoError(err)
- seriesSpan, err :=
series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
+ seriesSpan, err :=
series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
defer func(seriesSpan tsdb.SeriesSpan) {
_ = seriesSpan.Close()
}(seriesSpan)
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b0f9854..d26cd74 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -78,7 +79,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey
[]byte, value *mea
return err
}
t := value.GetTimestamp().AsTime()
- wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
+ wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/stream/stream_query_test.go
b/banyand/stream/stream_query_test.go
index 2533ef4..f63346d 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -42,6 +42,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type shardStruct struct {
@@ -106,7 +107,7 @@ func Test_Stream_Series(t *testing.T) {
name: "all",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -136,7 +137,7 @@ func Test_Stream_Series(t *testing.T) {
name: "time range",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond),
1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond),
1*time.Hour),
},
want: shardsForTest{
{
@@ -163,7 +164,7 @@ func Test_Stream_Series(t *testing.T) {
name: "find series by service_id and instance_id",
args: queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -182,7 +183,7 @@ func Test_Stream_Series(t *testing.T) {
name: "find a series",
args: queryOpts{
entity: tsdb.Entity{tsdb.Entry("webapp_id"),
tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
},
want: shardsForTest{
{
@@ -196,7 +197,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
builder.Filter(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -242,7 +243,7 @@ func Test_Stream_Series(t *testing.T) {
name: "order by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
builder.OrderByIndex(&databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -283,7 +284,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -330,7 +331,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter and sort by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -378,7 +379,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by several conditions",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -441,7 +442,7 @@ func Test_Stream_Series(t *testing.T) {
name: "filter by several conditions, sort by duration",
args: queryOpts{
entity: tsdb.Entity{tsdb.AnyEntry,
tsdb.AnyEntry, tsdb.AnyEntry},
- timeRange:
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+ timeRange:
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
buildFn: func(builder tsdb.SeekerBuilder) {
rule := &databasev1.IndexRule{
Metadata: &commonv1.Metadata{
@@ -628,7 +629,7 @@ func Test_Stream_Global_Index(t *testing.T) {
type queryOpts struct {
entity tsdb.Entity
- timeRange tsdb.TimeRange
+ timeRange timestamp.TimeRange
buildFn func(builder tsdb.SeekerBuilder)
}
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 7fef502..21e9561 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -71,7 +72,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey
[]byte, value *stre
return err
}
t := value.GetTimestamp().AsTime()
- wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
+ wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
if err != nil {
if wp != nil {
_ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index c5a2b98..a33fccd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,7 @@ package tsdb
import (
"context"
"io"
+ "strconv"
"time"
"github.com/dgraph-io/ristretto/z"
@@ -27,43 +28,55 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/kv"
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/index/lsm"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type block struct {
- path string
- l *logger.Logger
- ref *z.Closer
+ path string
+ l *logger.Logger
+ suffix string
+ ref *z.Closer
store kv.TimeSeriesStore
primaryIndex index.Store
invertedIndex index.Store
lsmIndex index.Store
closableLst []io.Closer
- endTime time.Time
- startTime time.Time
- segID uint16
- blockID uint16
+ timestamp.TimeRange
+ bucket.Reporter
+ segID uint16
+ blockID uint16
}
type blockOpts struct {
- segID uint16
- blockID uint16
- path string
+ segID uint16
+ blockSize IntervalRule
+ startTime time.Time
+ suffix string
+ path string
}
func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
+ suffixInteger, err := strconv.Atoi(opts.suffix)
+ if err != nil {
+ return nil, err
+ }
+ id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4)
>> 4)
+ timeRange := timestamp.NewTimeRange(opts.startTime,
opts.blockSize.NextTime(opts.startTime), true, false)
b = &block{
segID: opts.segID,
- blockID: opts.blockID,
+ blockID: id,
path: opts.path,
ref: z.NewCloser(1),
- startTime: time.Now(),
l: logger.Fetch(ctx, "block"),
+ TimeRange: timeRange,
+ Reporter: bucket.NewTimeBasedReporter(timeRange),
}
encodingMethodObject := ctx.Value(encodingMethodKey)
if encodingMethodObject == nil {
@@ -169,7 +182,7 @@ func (d *bDelegate) primaryIndexReader() index.Searcher {
}
func (d *bDelegate) startTime() time.Time {
- return d.delegate.startTime
+ return d.delegate.Start
}
func (d *bDelegate) identity() (segID uint16, blockID uint16) {
@@ -199,11 +212,7 @@ func (d *bDelegate) writeInvertedIndex(field index.Field,
id common.ItemID) erro
}
func (d *bDelegate) contains(ts time.Time) bool {
- greaterAndEqualStart := d.delegate.startTime.Equal(ts) ||
d.delegate.startTime.Before(ts)
- if d.delegate.endTime.IsZero() {
- return greaterAndEqualStart
- }
- return greaterAndEqualStart && d.delegate.endTime.After(ts)
+ return d.delegate.Contains(uint64(ts.UnixNano()))
}
func (d *bDelegate) Close() error {
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 82a76ec..406093c 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -17,6 +17,12 @@
package bucket
+import (
+ "time"
+
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
type Controller interface {
Current() Reporter
Next() (Reporter, error)
@@ -31,4 +37,50 @@ type Channel chan Status
type Reporter interface {
Report() Channel
+ Stop()
+}
+
+type timeBasedReporter struct {
+ timestamp.TimeRange
+ reporterStopCh chan struct{}
+}
+
+func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter {
+ return &timeBasedReporter{
+ TimeRange: timeRange,
+ reporterStopCh: make(chan struct{}),
+ }
+}
+
+func (tr *timeBasedReporter) Report() Channel {
+ ch := make(Channel)
+ interval := tr.Duration() >> 4
+ if interval < 100*time.Millisecond {
+ interval = 100 * time.Millisecond
+ }
+ go func() {
+ defer close(ch)
+ ticker := time.NewTicker(interval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ status := Status{
+ Capacity: int(tr.End.UnixNano() -
tr.Start.UnixNano()),
+ Volume: int(time.Now().UnixNano() -
tr.Start.UnixNano()),
+ }
+ ch <- status
+ if status.Volume >= status.Capacity {
+ return
+ }
+ case <-tr.reporterStopCh:
+ return
+ }
+ }
+ }()
+ return ch
+}
+
+func (tr *timeBasedReporter) Stop() {
+ close(tr.reporterStopCh)
}
diff --git a/banyand/tsdb/bucket/strategy_test.go
b/banyand/tsdb/bucket/strategy_test.go
index 3a302ce..d225d23 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -139,3 +139,6 @@ func (r *reporter) Report() bucket.Channel {
}()
return ch
}
+
+func (r *reporter) Stop() {
+}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 741ba85..0c77c51 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -26,6 +26,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type IndexDatabase interface {
@@ -99,7 +100,7 @@ type indexWriterBuilder struct {
func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
i.ts = ts
- segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false))
+ segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true,
false))
if len(segs) != 1 {
return i
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index e82767a..580536c 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -19,6 +19,7 @@ package tsdb
import (
"context"
+ "errors"
"strconv"
"sync"
"time"
@@ -26,68 +27,46 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var ErrEndOfSegment = errors.New("reached the end of the segment")
+
type segment struct {
id uint16
path string
suffix string
- lst []*block
globalIndex kv.Store
- sync.Mutex
- l *logger.Logger
- reporterStopCh chan struct{}
- TimeRange
-}
-
-func (s *segment) Report() bucket.Channel {
- ch := make(bucket.Channel)
- interval := s.Duration() >> 4
- if interval < 100*time.Millisecond {
- interval = 100 * time.Millisecond
- }
- go func() {
- defer close(ch)
- ticker := time.NewTicker(interval)
- defer ticker.Stop()
- for {
- select {
- case <-ticker.C:
- status := bucket.Status{
- Capacity: int(s.End.UnixNano() -
s.Start.UnixNano()),
- Volume: int(time.Now().UnixNano() -
s.Start.UnixNano()),
- }
- ch <- status
- if status.Volume >= status.Capacity {
- return
- }
- case <-s.reporterStopCh:
- return
- }
- }
- }()
- return ch
+ l *logger.Logger
+ timestamp.TimeRange
+ bucket.Reporter
+ blockController *blockController
+ blockManageStrategy *bucket.Strategy
}
-func openSegment(ctx context.Context, suffix, path string, intervalRule
IntervalRule) (s *segment, err error) {
- startTime, err := intervalRule.Unit.Parse(suffix)
- if err != nil {
- return nil, err
- }
+func openSegment(ctx context.Context, startTime time.Time, path, suffix
string, segmentSize, blockSize IntervalRule) (s *segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
}
- id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >>
4)
+ // TODO: fix id overflow
+ id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+ timeRange := timestamp.NewTimeRange(startTime,
segmentSize.NextTime(startTime), true, false)
s = &segment{
- id: id,
- path: path,
- suffix: suffix,
- l: logger.Fetch(ctx, "segment"),
- reporterStopCh: make(chan struct{}),
- TimeRange: NewTimeRange(startTime,
intervalRule.NextTime(startTime), true, false),
+ id: id,
+ path: path,
+ suffix: suffix,
+ l: logger.Fetch(ctx, "segment"),
+ blockController: newBlockController(id, path, timeRange,
blockSize),
+ TimeRange: timeRange,
+ Reporter: bucket.NewTimeBasedReporter(timeRange),
}
+ err = s.blockController.open(context.WithValue(ctx, logger.ContextKey,
s.l))
+ if err != nil {
+ return nil, err
+ }
+
indexPath, err := mkdir(globalIndexTemplate, path)
if err != nil {
return nil, err
@@ -95,46 +74,193 @@ func openSegment(ctx context.Context, suffix, path string,
intervalRule Interval
if s.globalIndex, err = kv.OpenStore(0, indexPath,
kv.StoreWithLogger(s.l)); err != nil {
return nil, err
}
- loadBlock := func(path string) error {
- var b *block
- if b, err = newBlock(context.WithValue(ctx, logger.ContextKey,
s.l), blockOpts{
- segID: s.id,
- path: path,
- }); err != nil {
- return err
+ s.blockManageStrategy, err = bucket.NewStrategy(s.blockController,
bucket.WithLogger(s.l))
+ if err != nil {
+ return nil, err
+ }
+ s.blockManageStrategy.Run()
+ return s, nil
+}
+
+func (s *segment) close() {
+ s.blockManageStrategy.Close()
+ s.blockController.close()
+ s.globalIndex.Close()
+ s.Stop()
+}
+
+type blockController struct {
+ sync.RWMutex
+ segID uint16
+ location string
+ segTimeRange timestamp.TimeRange
+ blockSize IntervalRule
+ lst []*block
+}
+
+func newBlockController(segID uint16, location string, segTimeRange
timestamp.TimeRange, blockSize IntervalRule) *blockController {
+ return &blockController{
+ segID: segID,
+ location: location,
+ blockSize: blockSize,
+ segTimeRange: segTimeRange,
+ }
+}
+
+func (bc *blockController) Current() bucket.Reporter {
+ bc.RLock()
+ defer bc.RUnlock()
+ now := time.Now()
+ for _, s := range bc.lst {
+ if s.suffix == bc.Format(now) {
+ return s
+ }
+ }
+ // return the latest segment before now
+ if len(bc.lst) > 0 {
+ return bc.lst[len(bc.lst)-1]
+ }
+ return nil
+}
+
+func (bc *blockController) Next() (bucket.Reporter, error) {
+ b := bc.Current().(*block)
+ reporter, err := bc.create(context.TODO(),
+ bc.blockSize.NextTime(b.Start))
+ if errors.Is(err, ErrEndOfSegment) {
+ return nil, bucket.ErrNoMoreBucket
+ }
+ return reporter, err
+}
+
+func (bc *blockController) Format(tm time.Time) string {
+ switch bc.blockSize.Unit {
+ case HOUR:
+ return tm.Format(blockHourFormat)
+ case DAY:
+ return tm.Format(blockDayFormat)
+ case MILLISECOND:
+ return tm.Format(millisecondFormat)
+ }
+ panic("invalid interval unit")
+}
+
+func (bc *blockController) Parse(value string) (time.Time, error) {
+ switch bc.blockSize.Unit {
+ case HOUR:
+ return time.Parse(blockHourFormat, value)
+ case DAY:
+ return time.Parse(blockDayFormat, value)
+ case MILLISECOND:
+ return time.Parse(millisecondFormat, value)
+ }
+ panic("invalid interval unit")
+}
+
+func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) {
+ bc.RLock()
+ defer bc.RUnlock()
+ last := len(bc.lst) - 1
+ for i := range bc.lst {
+ b := bc.lst[last-i]
+ if b.Overlapping(timeRange) {
+ bb = append(bb, b)
}
- {
- s.Lock()
- defer s.Unlock()
- s.lst = append(s.lst, b)
+ }
+ return bb
+}
+
+func (bc *blockController) get(blockID uint16) *block {
+ bc.RLock()
+ defer bc.RUnlock()
+ last := len(bc.lst) - 1
+ for i := range bc.lst {
+ b := bc.lst[last-i]
+ if b.blockID == blockID {
+ return b
}
- return nil
}
- err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error
{
- return loadBlock(absolutePath)
- })
+ return nil
+}
+
+func (bc *blockController) startTime(suffix string) (time.Time, error) {
+ t, err := bc.Parse(suffix)
if err != nil {
- return nil, err
+ return time.Time{}, err
}
- if len(s.lst) < 1 {
- blockPath, err := mkdir(blockTemplate, path,
time.Now().Format(blockFormat))
- if err != nil {
- return nil, err
- }
- err = loadBlock(blockPath)
+ startTime := bc.segTimeRange.Start
+ switch bc.blockSize.Unit {
+ case HOUR:
+ return time.Date(startTime.Year(), startTime.Month(),
+ startTime.Day(), t.Hour(), 0, 0, 0, time.UTC), nil
+ case DAY:
+ return time.Date(startTime.Year(), startTime.Month(),
+ t.Day(), t.Hour(), 0, 0, 0, time.UTC), nil
+ case MILLISECOND:
+ return time.Parse(millisecondFormat, suffix)
+ }
+ panic("invalid interval unit")
+}
+
+func (bc *blockController) open(ctx context.Context) error {
+ err := WalkDir(
+ bc.location,
+ segPathPrefix,
+ func(suffix, absolutePath string) error {
+ _, err := bc.load(ctx, suffix, absolutePath)
+ return err
+ })
+ if err != nil {
+ return err
+ }
+ if bc.Current() == nil {
+ _, err = bc.create(ctx, time.Now())
if err != nil {
- return nil, err
+ return err
}
}
- return s, nil
+ return nil
}
-func (s *segment) close() {
- s.Lock()
- defer s.Unlock()
- for _, b := range s.lst {
- b.close()
+func (bc *blockController) create(ctx context.Context, startTime time.Time)
(*block, error) {
+ if startTime.Before(bc.segTimeRange.Start) {
+ startTime = bc.segTimeRange.Start
+ }
+ if !startTime.Before(bc.segTimeRange.End) {
+ return nil, ErrEndOfSegment
+ }
+ suffix := bc.Format(startTime)
+ segPath, err := mkdir(blockTemplate, bc.location, suffix)
+ if err != nil {
+ return nil, err
+ }
+ return bc.load(ctx, suffix, segPath)
+}
+
+func (bc *blockController) load(ctx context.Context, suffix, path string) (b
*block, err error) {
+ starTime, err := bc.startTime(suffix)
+ if err != nil {
+ return nil, err
+ }
+ if b, err = newBlock(ctx, blockOpts{
+ segID: bc.segID,
+ path: path,
+ startTime: starTime,
+ suffix: suffix,
+ blockSize: bc.blockSize,
+ }); err != nil {
+ return nil, err
+ }
+ {
+ bc.Lock()
+ defer bc.Unlock()
+ bc.lst = append(bc.lst, b)
+ }
+ return b, nil
+}
+
+func (bc *blockController) close() {
+ for _, s := range bc.lst {
+ s.close()
}
- s.globalIndex.Close()
- close(s.reporterStopCh)
}
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 5cc9805..bed9555 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -29,6 +29,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var (
@@ -72,67 +73,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
return nil
}
-type TimeRange struct {
- Start time.Time
- End time.Time
- IncludeStart bool
- IncludeEnd bool
-}
-
-func (t TimeRange) Contains(unixNano uint64) bool {
- tp := time.Unix(0, int64(unixNano))
- if t.Start.Equal(tp) {
- return t.IncludeStart
- }
- if t.End.Equal(tp) {
- return t.IncludeEnd
- }
- return !tp.Before(t.Start) && !tp.After(t.End)
-}
-
-func (t TimeRange) Overlapping(other TimeRange) bool {
- if t.Start.Equal(other.End) {
- return t.IncludeStart && other.IncludeEnd
- }
- if other.Start.Equal(t.End) {
- return t.IncludeEnd && other.IncludeStart
- }
- return !t.Start.After(other.End) && !other.Start.After(t.End)
-}
-
-func (t TimeRange) Duration() time.Duration {
- return t.End.Sub(t.Start)
-}
-
-func NewInclusiveTimeRange(start, end time.Time) TimeRange {
- return TimeRange{
- Start: start,
- End: end,
- IncludeStart: true,
- IncludeEnd: true,
- }
-}
-
-func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration)
TimeRange {
- return NewTimeRangeDuration(start, duration, true, true)
-}
-
-func NewTimeRange(start, end time.Time, includeStart, includeEnd bool)
TimeRange {
- return TimeRange{
- Start: start,
- End: end,
- IncludeStart: includeStart,
- IncludeEnd: includeEnd,
- }
-}
-
-func NewTimeRangeDuration(start time.Time, duration time.Duration,
includeStart, includeEnd bool) TimeRange {
- return NewTimeRange(start, start.Add(duration), includeStart,
includeEnd)
-}
-
type Series interface {
ID() common.SeriesID
- Span(timeRange TimeRange) (SeriesSpan, error)
+ Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
Get(id GlobalItemID) (Item, io.Closer, error)
}
@@ -167,7 +110,7 @@ func (s *series) ID() common.SeriesID {
return s.id
}
-func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
+func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
blocks := s.blockDB.span(timeRange)
if len(blocks) < 1 {
return nil, ErrEmptySeriesSpan
@@ -199,7 +142,7 @@ type seriesSpan struct {
blocks []blockDelegate
seriesID common.SeriesID
shardID common.ShardID
- timeRange TimeRange
+ timeRange timestamp.TimeRange
l *logger.Logger
}
@@ -218,7 +161,7 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
return newSeekerBuilder(s)
}
-func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks
[]blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks
[]blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
s := &seriesSpan{
blocks: blocks,
seriesID: id,
diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go
index 25a7df6..6d2b940 100644
--- a/banyand/tsdb/series_test.go
+++ b/banyand/tsdb/series_test.go
@@ -24,7 +24,7 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
- "github.com/apache/skywalking-banyandb/banyand/tsdb"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ = Describe("Series", func() {
@@ -34,7 +34,7 @@ var _ = Describe("Series", func() {
startTime, _ := time.Parse("20060202", start)
endTime, _ := time.Parse("20060202", end)
tsTime, _ := time.Parse("20060202", ts)
- Expect(tsdb.NewTimeRange(startTime, endTime,
includeStart,
includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
+ Expect(timestamp.NewTimeRange(startTime,
endTime, includeStart,
includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
}
DescribeTable("It's a exclusive range",
func(start, end, ts string, expected bool) {
@@ -90,8 +90,8 @@ var _ = Describe("Series", func() {
for _, r2l := range includes {
for _, r2u := range
includes {
By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l,
r2u), func() {
- r1 :=
tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u)
- r2 :=
tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u)
+ r1 :=
timestamp.NewTimeRange(startTime1, endTime1, r1l, r1u)
+ r2 :=
timestamp.NewTimeRange(startTime2, endTime2, r2l, r2u)
Expect(r1.Overlapping(r2)).To(Equal(expected))
Expect(r2.Overlapping(r1)).To(Equal(expected))
})
@@ -114,8 +114,8 @@ var _ = Describe("Series", func() {
endTime1, _ := time.Parse("20060102",
"20210107")
startTime2, _ := time.Parse("20060102",
"20210107")
endTime2, _ := time.Parse("20060102",
"20210109")
- r1 := tsdb.NewTimeRange(startTime1, endTime1,
false, include1)
- r2 := tsdb.NewTimeRange(startTime2, endTime2,
include2, false)
+ r1 := timestamp.NewTimeRange(startTime1,
endTime1, false, include1)
+ r2 := timestamp.NewTimeRange(startTime2,
endTime2, include2, false)
Expect(r1.Overlapping(r2)).To(Equal(expected))
}
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index de269e6..d082a08 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/kv"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var maxIntBytes = convert.Uint64ToBytes(math.MaxUint64)
@@ -95,7 +96,7 @@ type SeriesDatabase interface {
type blockDatabase interface {
shardID() common.ShardID
- span(timeRange TimeRange) []blockDelegate
+ span(timeRange timestamp.TimeRange) []blockDelegate
block(id GlobalItemID) blockDelegate
}
@@ -138,7 +139,11 @@ func (s *seriesDB) block(id GlobalItemID) blockDelegate {
if seg == nil {
return nil
}
- return seg.lst[id.blockID].delegate()
+ block := seg.blockController.get(id.blockID)
+ if block == nil {
+ return nil
+ }
+ return block.delegate()
}
func (s *seriesDB) shardID() common.ShardID {
@@ -195,11 +200,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
return result, err
}
-func (s *seriesDB) span(timeRange TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate {
//TODO: return correct blocks
result := make([]blockDelegate, 0)
for _, s := range s.segCtrl.span(timeRange) {
- for _, b := range s.lst {
+ for _, b := range s.blockController.span(timeRange) {
result = append(result, b.delegate())
}
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 7eaf50f..0f70e53 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -28,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ Shard = (*shard)(nil)
@@ -54,7 +55,7 @@ func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
-func OpenShard(ctx context.Context, id common.ShardID, root string,
intervalRule IntervalRule) (Shard, error) {
+func OpenShard(ctx context.Context, id common.ShardID, root string,
segmentSize, blockSize IntervalRule) (Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
return nil, errors.Wrapf(err, "make the directory of the shard
%d ", int(id))
@@ -63,7 +64,7 @@ func OpenShard(ctx context.Context, id common.ShardID, root
string, intervalRule
l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a
shard")
s := &shard{
id: id,
- segmentController: newSegmentController(path, intervalRule),
+ segmentController: newSegmentController(path, segmentSize,
blockSize),
l: l,
}
shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
@@ -102,20 +103,17 @@ func (s *shard) Close() error {
type IntervalUnit int
const (
- DAY IntervalUnit = iota
- MONTH
- YEAR
- MILLISECOND // only for testing
+ MILLISECOND IntervalUnit = iota // only for testing
+ HOUR
+ DAY
)
func (iu IntervalUnit) String() string {
switch iu {
+ case HOUR:
+ return "hour"
case DAY:
return "day"
- case MONTH:
- return "month"
- case YEAR:
- return "year"
case MILLISECOND:
return "millis"
@@ -123,34 +121,6 @@ func (iu IntervalUnit) String() string {
panic("invalid interval unit")
}
-func (iu IntervalUnit) Format(tm time.Time) string {
- switch iu {
- case DAY:
- return tm.Format(segDayFormat)
- case MONTH:
- return tm.Format(segMonthFormat)
- case YEAR:
- return tm.Format(segYearFormat)
- case MILLISECOND:
- return tm.Format(segMillisecondFormat)
- }
- panic("invalid interval unit")
-}
-
-func (iu IntervalUnit) Parse(value string) (time.Time, error) {
- switch iu {
- case DAY:
- return time.Parse(segDayFormat, value)
- case MONTH:
- return time.Parse(segMonthFormat, value)
- case YEAR:
- return time.Parse(segYearFormat, value)
- case MILLISECOND:
- return time.Parse(segMillisecondFormat, value)
- }
- panic("invalid interval unit")
-}
-
type IntervalRule struct {
Unit IntervalUnit
Num int
@@ -158,29 +128,41 @@ type IntervalRule struct {
func (ir IntervalRule) NextTime(current time.Time) time.Time {
switch ir.Unit {
+ case HOUR:
+ return current.Add(time.Hour * time.Duration(ir.Num))
case DAY:
return current.AddDate(0, 0, ir.Num)
- case MONTH:
- return current.AddDate(0, ir.Num, 0)
- case YEAR:
- return current.AddDate(ir.Num, 0, 0)
case MILLISECOND:
return current.Add(time.Millisecond * time.Duration(ir.Num))
}
panic("invalid interval unit")
}
+func (ir IntervalRule) EstimatedDuration() time.Duration {
+ switch ir.Unit {
+ case HOUR:
+ return time.Hour * time.Duration(ir.Num)
+ case DAY:
+ return 24 * time.Hour * time.Duration(ir.Num)
+ case MILLISECOND:
+ return time.Microsecond * time.Duration(ir.Num)
+ }
+ panic("invalid interval unit")
+}
+
type segmentController struct {
sync.RWMutex
- location string
- intervalRule IntervalRule
- lst []*segment
+ location string
+ segmentSize IntervalRule
+ blockSize IntervalRule
+ lst []*segment
}
-func newSegmentController(location string, intervalRule IntervalRule)
*segmentController {
+func newSegmentController(location string, segmentSize, blockSize
IntervalRule) *segmentController {
return &segmentController{
- location: location,
- intervalRule: intervalRule,
+ location: location,
+ segmentSize: segmentSize,
+ blockSize: blockSize,
}
}
@@ -197,7 +179,7 @@ func (sc *segmentController) get(segID uint16) *segment {
return nil
}
-func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss
[]*segment) {
sc.RLock()
defer sc.RUnlock()
last := len(sc.lst) - 1
@@ -223,7 +205,7 @@ func (sc *segmentController) Current() bucket.Reporter {
defer sc.RUnlock()
now := time.Now()
for _, s := range sc.lst {
- if s.suffix == sc.intervalRule.Unit.Format(now) {
+ if s.suffix == sc.Format(now) {
return s
}
}
@@ -235,12 +217,37 @@ func (sc *segmentController) Current() bucket.Reporter {
}
func (sc *segmentController) Next() (bucket.Reporter, error) {
- return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
- sc.intervalRule.NextTime(time.Now())))
+ seg := sc.Current().(*segment)
+ return sc.create(context.TODO(), sc.Format(
+ sc.segmentSize.NextTime(seg.Start)))
+}
+
+func (sc *segmentController) Format(tm time.Time) string {
+ switch sc.segmentSize.Unit {
+ case HOUR:
+ return tm.Format(segHourFormat)
+ case DAY:
+ return tm.Format(segDayFormat)
+ case MILLISECOND:
+ return tm.Format(millisecondFormat)
+ }
+ panic("invalid interval unit")
+}
+
+func (sc *segmentController) Parse(value string) (time.Time, error) {
+ switch sc.segmentSize.Unit {
+ case HOUR:
+ return time.Parse(segHourFormat, value)
+ case DAY:
+ return time.Parse(segDayFormat, value)
+ case MILLISECOND:
+ return time.Parse(millisecondFormat, value)
+ }
+ panic("invalid interval unit")
}
func (sc *segmentController) open(ctx context.Context) error {
- err := walkDir(
+ err := WalkDir(
sc.location,
segPathPrefix,
func(suffix, absolutePath string) error {
@@ -251,7 +258,7 @@ func (sc *segmentController) open(ctx context.Context)
error {
return err
}
if sc.Current() == nil {
- _, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+ _, err = sc.create(ctx, sc.Format(time.Now()))
if err != nil {
return err
}
@@ -268,7 +275,11 @@ func (sc *segmentController) create(ctx context.Context,
suffix string) (*segmen
}
func (sc *segmentController) load(ctx context.Context, suffix, path string)
(seg *segment, err error) {
- seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+ startTime, err := sc.Parse(suffix)
+ if err != nil {
+ return nil, err
+ }
+ seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize,
sc.blockSize)
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 5609f0a..96e78e3 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,8 +19,6 @@ package tsdb_test
import (
"context"
- "io/ioutil"
- "strings"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -48,22 +46,41 @@ var _ = Describe("Shard", func() {
})
It("generates several segments", func() {
var err error
- shard, err = tsdb.OpenShard(context.TODO(),
common.ShardID(0), tmp, tsdb.IntervalRule{
- Unit: tsdb.MILLISECOND,
- Num: 1000,
- })
+ shard, err = tsdb.OpenShard(context.TODO(),
common.ShardID(0), tmp,
+ tsdb.IntervalRule{
+ Unit: tsdb.MILLISECOND,
+ Num: 3000,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.MILLISECOND,
+ Num: 1000,
+ },
+ )
Expect(err).NotTo(HaveOccurred())
+ segDirectories := make([]string, 3)
Eventually(func() int {
- files, err := ioutil.ReadDir(tmp + "/shard-0")
- Expect(err).NotTo(HaveOccurred())
num := 0
- for _, fi := range files {
- if fi.IsDir() &&
strings.HasPrefix(fi.Name(), "seg-") {
- num++
+ err := tsdb.WalkDir(tmp+"/shard-0", "seg-",
func(suffix, absolutePath string) error {
+ if num < 3 {
+ segDirectories[num] =
absolutePath
}
- }
+ num++
+ return nil
+ })
+ Expect(err).NotTo(HaveOccurred())
return num
}).WithTimeout(10 *
time.Second).Should(BeNumerically(">=", 3))
+ for _, d := range segDirectories {
+ Eventually(func() int {
+ num := 0
+ err := tsdb.WalkDir(d, "block-",
func(suffix, absolutePath string) error {
+ num++
+ return nil
+ })
+ Expect(err).NotTo(HaveOccurred())
+ return num
+ }).WithTimeout(10 *
time.Second).Should(BeNumerically(">=", 3))
+ }
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index a3aff1c..7659b53 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,18 +49,18 @@ const (
blockTemplate = rootPrefix + blockPathPrefix + "-%s"
globalIndexTemplate = rootPrefix + "index"
- segDayFormat = "20060102"
- segMonthFormat = "200601"
- segYearFormat = "2006"
- segMillisecondFormat = "20060102150405000"
- blockFormat = "1504"
+ segHourFormat = "2006010215"
+ segDayFormat = "20060102"
+ millisecondFormat = "20060102150405000"
+ blockHourFormat = "15"
+ blockDayFormat = "0102"
dirPerm = 0700
)
var (
- ErrInvalidShardID = errors.New("invalid shard id")
- ErrEncodingMethodAbsent = errors.New("encoding method is absent")
+ ErrInvalidShardID = errors.New("invalid shard id")
+ ErrOpenDatabase = errors.New("fails to open the database")
indexRulesKey = contextIndexRulesKey{}
encodingMethodKey = contextEncodingMethodKey{}
@@ -89,6 +89,8 @@ type DatabaseOpts struct {
ShardNum uint32
IndexRules []*databasev1.IndexRule
EncodingMethod EncodingMethod
+ SegmentSize IntervalRule
+ BlockSize IntervalRule
}
type EncodingMethod struct {
@@ -97,9 +99,11 @@ type EncodingMethod struct {
}
type database struct {
- logger *logger.Logger
- location string
- shardNum uint32
+ logger *logger.Logger
+ location string
+ shardNum uint32
+ segmentSize IntervalRule
+ blockSize IntervalRule
sLst []Shard
sync.Mutex
@@ -129,15 +133,34 @@ func (d *database) Close() error {
func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
if opts.EncodingMethod.EncoderPool == nil ||
opts.EncodingMethod.DecoderPool == nil {
- return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to
open database")
+ return nil, errors.Wrap(ErrOpenDatabase, "encoding method is
absent")
}
if _, err := mkdir(opts.Location); err != nil {
return nil, err
}
+ segmentSize := opts.SegmentSize
+ if segmentSize.Unit == MILLISECOND {
+ segmentSize = IntervalRule{
+ Unit: DAY,
+ Num: 1,
+ }
+ }
+ blockSize := opts.BlockSize
+ if blockSize.Unit == MILLISECOND {
+ blockSize = IntervalRule{
+ Unit: HOUR,
+ Num: 2,
+ }
+ }
+ if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() {
+ return nil, errors.Wrapf(ErrOpenDatabase, "the block size is
bigger than the segment size")
+ }
db := &database{
- location: opts.Location,
- shardNum: opts.ShardNum,
- logger: logger.Fetch(ctx, "tsdb"),
+ location: opts.Location,
+ shardNum: opts.ShardNum,
+ logger: logger.Fetch(ctx, "tsdb"),
+ segmentSize: segmentSize,
+ blockSize: blockSize,
}
db.logger.Info().Str("path", opts.Location).Msg("initialized")
var entries []fs.FileInfo
@@ -164,10 +187,8 @@ func createDatabase(ctx context.Context, db *database,
startID int) (Database, e
var err error
for i := startID; i < int(db.shardNum); i++ {
db.logger.Info().Int("shard_id", i).Msg("creating a shard")
- so, errNewShard := OpenShard(ctx, common.ShardID(i),
db.location, IntervalRule{
- Unit: DAY,
- Num: 1,
- })
+ so, errNewShard := OpenShard(ctx, common.ShardID(i),
+ db.location, db.segmentSize, db.blockSize)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -182,7 +203,7 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
//TODO: open the manifest file
db.Lock()
defer db.Unlock()
- err := walkDir(db.location, shardPathPrefix, func(suffix, _ string)
error {
+ err := WalkDir(db.location, shardPathPrefix, func(suffix, _ string)
error {
shardID, err := strconv.Atoi(suffix)
if err != nil {
return err
@@ -195,10 +216,9 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
context.WithValue(ctx, logger.ContextKey, db.logger),
common.ShardID(shardID),
db.location,
- IntervalRule{
- Unit: DAY,
- Num: 1,
- })
+ db.segmentSize,
+ db.blockSize,
+ )
if errOpenShard != nil {
return errOpenShard
}
@@ -220,9 +240,9 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
return db, nil
}
-type walkFn func(suffix, absolutePath string) error
+type WalkFn func(suffix, absolutePath string) error
-func walkDir(root, prefix string, walkFn walkFn) error {
+func WalkDir(root, prefix string, walkFn WalkFn) error {
files, err := ioutil.ReadDir(root)
if err != nil {
return errors.Wrapf(err, "failed to walk the database path:
%s", root)
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 75c1cb8..0dca4c2 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -62,7 +62,7 @@ func verifyDatabaseStructure(tester *assert.Assertions,
tempDir string) {
now := time.Now()
segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
validateDirectory(tester, segPath)
- validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath,
now.Format(blockFormat)))
+ validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath,
now.Format(blockHourFormat)))
}
func openDatabase(t *require.Assertions, path string) (db Database) {
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 95f3e3e..9a6b1fd 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -23,6 +23,7 @@ import (
modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
type (
@@ -70,7 +71,7 @@ func projectItem(ec executor.ExecutionContext, item
tsdb.Item, projectionFieldRe
// with the help of Entity. The result is a list of element set, where the
order of inner list is kept
// as what the users specify in the seekerBuilder.
// This method is used by the underlying tableScan and localIndexScan plans.
-func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
+func executeForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
builders ...seekerBuilder) ([]tsdb.Iterator, error) {
var itersInShard []tsdb.Iterator
for _, seriesFound := range series {
diff --git a/pkg/query/logical/plan_indexscan_local.go
b/pkg/query/logical/plan_indexscan_local.go
index a18e62e..4233b27 100644
--- a/pkg/query/logical/plan_indexscan_local.go
+++ b/pkg/query/logical/plan_indexscan_local.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb"
"github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/query/executor"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
)
var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
@@ -108,7 +109,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan,
error) {
return &localIndexScan{
orderBy: orderBySubPlan,
- timeRange: tsdb.NewInclusiveTimeRange(uis.startTime,
uis.endTime),
+ timeRange:
timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime),
schema: s,
projectionFieldRefs: projFieldsRefs,
metadata: uis.metadata,
@@ -121,7 +122,7 @@ var _ Plan = (*localIndexScan)(nil)
type localIndexScan struct {
*orderBy
- timeRange tsdb.TimeRange
+ timeRange timestamp.TimeRange
schema Schema
metadata *commonv1.Metadata
conditionMap map[*databasev1.IndexRule][]Expr
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
new file mode 100644
index 0000000..aae487b
--- /dev/null
+++ b/pkg/timestamp/range.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+ "time"
+)
+
+type TimeRange struct {
+ Start time.Time
+ End time.Time
+ IncludeStart bool
+ IncludeEnd bool
+}
+
+func (t TimeRange) Contains(unixNano uint64) bool {
+ tp := time.Unix(0, int64(unixNano))
+ if t.Start.Equal(tp) {
+ return t.IncludeStart
+ }
+ if t.End.Equal(tp) {
+ return t.IncludeEnd
+ }
+ return !tp.Before(t.Start) && !tp.After(t.End)
+}
+
+func (t TimeRange) Overlapping(other TimeRange) bool {
+ if t.Start.Equal(other.End) {
+ return t.IncludeStart && other.IncludeEnd
+ }
+ if other.Start.Equal(t.End) {
+ return t.IncludeEnd && other.IncludeStart
+ }
+ return !t.Start.After(other.End) && !other.Start.After(t.End)
+}
+
+func (t TimeRange) Duration() time.Duration {
+ return t.End.Sub(t.Start)
+}
+
+func NewInclusiveTimeRange(start, end time.Time) TimeRange {
+ return TimeRange{
+ Start: start,
+ End: end,
+ IncludeStart: true,
+ IncludeEnd: true,
+ }
+}
+
+func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration)
TimeRange {
+ return NewTimeRangeDuration(start, duration, true, true)
+}
+
+func NewTimeRange(start, end time.Time, includeStart, includeEnd bool)
TimeRange {
+ return TimeRange{
+ Start: start,
+ End: end,
+ IncludeStart: includeStart,
+ IncludeEnd: includeEnd,
+ }
+}
+
+func NewTimeRangeDuration(start time.Time, duration time.Duration,
includeStart, includeEnd bool) TimeRange {
+ return NewTimeRange(start, start.Add(duration), includeStart,
includeEnd)
+}