This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-block
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e4e5781231795386503ec8ffbd1d523ea5643905
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jan 27 04:08:00 2022 +0000

    Add the block manager
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/measure/query_test.go             |   3 +-
 banyand/measure/write.go                  |   3 +-
 banyand/stream/stream_query_test.go       |  23 +--
 banyand/stream/stream_write.go            |   3 +-
 banyand/tsdb/block.go                     |  45 +++--
 banyand/tsdb/bucket/bucket.go             |  52 ++++++
 banyand/tsdb/bucket/strategy_test.go      |   3 +
 banyand/tsdb/indexdb.go                   |   3 +-
 banyand/tsdb/segment.go                   | 280 ++++++++++++++++++++++--------
 banyand/tsdb/series.go                    |  67 +------
 banyand/tsdb/series_test.go               |  12 +-
 banyand/tsdb/seriesdb.go                  |  13 +-
 banyand/tsdb/shard.go                     | 121 +++++++------
 banyand/tsdb/shard_test.go                |  41 +++--
 banyand/tsdb/tsdb.go                      |  70 +++++---
 banyand/tsdb/tsdb_test.go                 |   2 +-
 pkg/query/logical/common.go               |   3 +-
 pkg/query/logical/plan_indexscan_local.go |   5 +-
 pkg/timestamp/range.go                    |  80 +++++++++
 19 files changed, 551 insertions(+), 278 deletions(-)

diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 60b8bf2..c26b7f8 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -26,6 +26,7 @@ import (
 
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 func Test_ParseTag_And_ParseField(t *testing.T) {
@@ -37,7 +38,7 @@ func Test_ParseTag_And_ParseField(t *testing.T) {
        r.NoError(err)
        series, err := shard.Series().Get(tsdb.Entity{tsdb.Entry("1")})
        r.NoError(err)
-       seriesSpan, err := 
series.Span(tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
+       seriesSpan, err := 
series.Span(timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour))
        defer func(seriesSpan tsdb.SeriesSpan) {
                _ = seriesSpan.Close()
        }(seriesSpan)
diff --git a/banyand/measure/write.go b/banyand/measure/write.go
index b0f9854..d26cd74 100644
--- a/banyand/measure/write.go
+++ b/banyand/measure/write.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -78,7 +79,7 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey 
[]byte, value *mea
                return err
        }
        t := value.GetTimestamp().AsTime()
-       wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
+       wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/stream/stream_query_test.go 
b/banyand/stream/stream_query_test.go
index 2533ef4..f63346d 100644
--- a/banyand/stream/stream_query_test.go
+++ b/banyand/stream/stream_query_test.go
@@ -42,6 +42,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type shardStruct struct {
@@ -106,7 +107,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "all",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -136,7 +137,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "time range",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 
1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime.Add(1500*time.Millisecond), 
1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -163,7 +164,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "find series by service_id and instance_id",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -182,7 +183,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "find a series",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.Entry("webapp_id"), 
tsdb.Entry("10.0.0.1_id"), convert.Int64ToBytes(0)},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                        },
                        want: shardsForTest{
                                {
@@ -196,7 +197,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        builder.Filter(&databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -242,7 +243,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "order by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        
builder.OrderByIndex(&databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -283,7 +284,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -330,7 +331,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter and sort by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -378,7 +379,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by several conditions",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -441,7 +442,7 @@ func Test_Stream_Series(t *testing.T) {
                        name: "filter by several conditions, sort by duration",
                        args: queryOpts{
                                entity:    tsdb.Entity{tsdb.AnyEntry, 
tsdb.AnyEntry, tsdb.AnyEntry},
-                               timeRange: 
tsdb.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
+                               timeRange: 
timestamp.NewInclusiveTimeRangeDuration(baseTime, 1*time.Hour),
                                buildFn: func(builder tsdb.SeekerBuilder) {
                                        rule := &databasev1.IndexRule{
                                                Metadata: &commonv1.Metadata{
@@ -628,7 +629,7 @@ func Test_Stream_Global_Index(t *testing.T) {
 
 type queryOpts struct {
        entity    tsdb.Entity
-       timeRange tsdb.TimeRange
+       timeRange timestamp.TimeRange
        buildFn   func(builder tsdb.SeekerBuilder)
 }
 
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 7fef502..21e9561 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -71,7 +72,7 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey 
[]byte, value *stre
                return err
        }
        t := value.GetTimestamp().AsTime()
-       wp, err := series.Span(tsdb.NewInclusiveTimeRangeDuration(t, 0))
+       wp, err := series.Span(timestamp.NewInclusiveTimeRangeDuration(t, 0))
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index c5a2b98..a33fccd 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -20,6 +20,7 @@ package tsdb
 import (
        "context"
        "io"
+       "strconv"
        "time"
 
        "github.com/dgraph-io/ristretto/z"
@@ -27,43 +28,55 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/kv"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/lsm"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type block struct {
-       path string
-       l    *logger.Logger
-       ref  *z.Closer
+       path   string
+       l      *logger.Logger
+       suffix string
+       ref    *z.Closer
 
        store         kv.TimeSeriesStore
        primaryIndex  index.Store
        invertedIndex index.Store
        lsmIndex      index.Store
        closableLst   []io.Closer
-       endTime       time.Time
-       startTime     time.Time
-       segID         uint16
-       blockID       uint16
+       timestamp.TimeRange
+       bucket.Reporter
+       segID   uint16
+       blockID uint16
 }
 
 type blockOpts struct {
-       segID   uint16
-       blockID uint16
-       path    string
+       segID     uint16
+       blockSize IntervalRule
+       startTime time.Time
+       suffix    string
+       path      string
 }
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
+       suffixInteger, err := strconv.Atoi(opts.suffix)
+       if err != nil {
+               return nil, err
+       }
+       id := uint16(opts.blockSize.Unit)<<12 | ((uint16(suffixInteger) << 4) 
>> 4)
+       timeRange := timestamp.NewTimeRange(opts.startTime, 
opts.blockSize.NextTime(opts.startTime), true, false)
        b = &block{
                segID:     opts.segID,
-               blockID:   opts.blockID,
+               blockID:   id,
                path:      opts.path,
                ref:       z.NewCloser(1),
-               startTime: time.Now(),
                l:         logger.Fetch(ctx, "block"),
+               TimeRange: timeRange,
+               Reporter:  bucket.NewTimeBasedReporter(timeRange),
        }
        encodingMethodObject := ctx.Value(encodingMethodKey)
        if encodingMethodObject == nil {
@@ -169,7 +182,7 @@ func (d *bDelegate) primaryIndexReader() index.Searcher {
 }
 
 func (d *bDelegate) startTime() time.Time {
-       return d.delegate.startTime
+       return d.delegate.Start
 }
 
 func (d *bDelegate) identity() (segID uint16, blockID uint16) {
@@ -199,11 +212,7 @@ func (d *bDelegate) writeInvertedIndex(field index.Field, 
id common.ItemID) erro
 }
 
 func (d *bDelegate) contains(ts time.Time) bool {
-       greaterAndEqualStart := d.delegate.startTime.Equal(ts) || 
d.delegate.startTime.Before(ts)
-       if d.delegate.endTime.IsZero() {
-               return greaterAndEqualStart
-       }
-       return greaterAndEqualStart && d.delegate.endTime.After(ts)
+       return d.delegate.Contains(uint64(ts.UnixNano()))
 }
 
 func (d *bDelegate) Close() error {
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 82a76ec..406093c 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -17,6 +17,12 @@
 
 package bucket
 
+import (
+       "time"
+
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
 type Controller interface {
        Current() Reporter
        Next() (Reporter, error)
@@ -31,4 +37,50 @@ type Channel chan Status
 
 type Reporter interface {
        Report() Channel
+       Stop()
+}
+
+type timeBasedReporter struct {
+       timestamp.TimeRange
+       reporterStopCh chan struct{}
+}
+
+func NewTimeBasedReporter(timeRange timestamp.TimeRange) Reporter {
+       return &timeBasedReporter{
+               TimeRange:      timeRange,
+               reporterStopCh: make(chan struct{}),
+       }
+}
+
+func (tr *timeBasedReporter) Report() Channel {
+       ch := make(Channel)
+       interval := tr.Duration() >> 4
+       if interval < 100*time.Millisecond {
+               interval = 100 * time.Millisecond
+       }
+       go func() {
+               defer close(ch)
+               ticker := time.NewTicker(interval)
+               defer ticker.Stop()
+               for {
+                       select {
+                       case <-ticker.C:
+                               status := Status{
+                                       Capacity: int(tr.End.UnixNano() - 
tr.Start.UnixNano()),
+                                       Volume:   int(time.Now().UnixNano() - 
tr.Start.UnixNano()),
+                               }
+                               ch <- status
+                               if status.Volume >= status.Capacity {
+                                       return
+                               }
+                       case <-tr.reporterStopCh:
+                               return
+                       }
+               }
+       }()
+       return ch
+}
+
+func (tr *timeBasedReporter) Stop() {
+       close(tr.reporterStopCh)
 }
diff --git a/banyand/tsdb/bucket/strategy_test.go 
b/banyand/tsdb/bucket/strategy_test.go
index 3a302ce..d225d23 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -139,3 +139,6 @@ func (r *reporter) Report() bucket.Channel {
        }()
        return ch
 }
+
+func (r *reporter) Stop() {
+}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 741ba85..0c77c51 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type IndexDatabase interface {
@@ -99,7 +100,7 @@ type indexWriterBuilder struct {
 
 func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
        i.ts = ts
-       segs := i.segCtrl.span(NewTimeRangeDuration(ts, 0, true, false))
+       segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, 
false))
        if len(segs) != 1 {
                return i
        }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index e82767a..580536c 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -19,6 +19,7 @@ package tsdb
 
 import (
        "context"
+       "errors"
        "strconv"
        "sync"
        "time"
@@ -26,68 +27,46 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+var ErrEndOfSegment = errors.New("reached the end of the segment")
+
 type segment struct {
        id     uint16
        path   string
        suffix string
 
-       lst         []*block
        globalIndex kv.Store
-       sync.Mutex
-       l              *logger.Logger
-       reporterStopCh chan struct{}
-       TimeRange
-}
-
-func (s *segment) Report() bucket.Channel {
-       ch := make(bucket.Channel)
-       interval := s.Duration() >> 4
-       if interval < 100*time.Millisecond {
-               interval = 100 * time.Millisecond
-       }
-       go func() {
-               defer close(ch)
-               ticker := time.NewTicker(interval)
-               defer ticker.Stop()
-               for {
-                       select {
-                       case <-ticker.C:
-                               status := bucket.Status{
-                                       Capacity: int(s.End.UnixNano() - 
s.Start.UnixNano()),
-                                       Volume:   int(time.Now().UnixNano() - 
s.Start.UnixNano()),
-                               }
-                               ch <- status
-                               if status.Volume >= status.Capacity {
-                                       return
-                               }
-                       case <-s.reporterStopCh:
-                               return
-                       }
-               }
-       }()
-       return ch
+       l           *logger.Logger
+       timestamp.TimeRange
+       bucket.Reporter
+       blockController     *blockController
+       blockManageStrategy *bucket.Strategy
 }
 
-func openSegment(ctx context.Context, suffix, path string, intervalRule 
IntervalRule) (s *segment, err error) {
-       startTime, err := intervalRule.Unit.Parse(suffix)
-       if err != nil {
-               return nil, err
-       }
+func openSegment(ctx context.Context, startTime time.Time, path, suffix 
string, segmentSize, blockSize IntervalRule) (s *segment, err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
                return nil, err
        }
-       id := uint16(intervalRule.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 
4)
+       // TODO: fix id overflow
+       id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
+       timeRange := timestamp.NewTimeRange(startTime, 
segmentSize.NextTime(startTime), true, false)
        s = &segment{
-               id:             id,
-               path:           path,
-               suffix:         suffix,
-               l:              logger.Fetch(ctx, "segment"),
-               reporterStopCh: make(chan struct{}),
-               TimeRange:      NewTimeRange(startTime, 
intervalRule.NextTime(startTime), true, false),
+               id:              id,
+               path:            path,
+               suffix:          suffix,
+               l:               logger.Fetch(ctx, "segment"),
+               blockController: newBlockController(id, path, timeRange, 
blockSize),
+               TimeRange:       timeRange,
+               Reporter:        bucket.NewTimeBasedReporter(timeRange),
        }
+       err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, 
s.l))
+       if err != nil {
+               return nil, err
+       }
+
        indexPath, err := mkdir(globalIndexTemplate, path)
        if err != nil {
                return nil, err
@@ -95,46 +74,193 @@ func openSegment(ctx context.Context, suffix, path string, 
intervalRule Interval
        if s.globalIndex, err = kv.OpenStore(0, indexPath, 
kv.StoreWithLogger(s.l)); err != nil {
                return nil, err
        }
-       loadBlock := func(path string) error {
-               var b *block
-               if b, err = newBlock(context.WithValue(ctx, logger.ContextKey, 
s.l), blockOpts{
-                       segID: s.id,
-                       path:  path,
-               }); err != nil {
-                       return err
+       s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, 
bucket.WithLogger(s.l))
+       if err != nil {
+               return nil, err
+       }
+       s.blockManageStrategy.Run()
+       return s, nil
+}
+
+func (s *segment) close() {
+       s.blockManageStrategy.Close()
+       s.blockController.close()
+       s.globalIndex.Close()
+       s.Stop()
+}
+
+type blockController struct {
+       sync.RWMutex
+       segID        uint16
+       location     string
+       segTimeRange timestamp.TimeRange
+       blockSize    IntervalRule
+       lst          []*block
+}
+
+func newBlockController(segID uint16, location string, segTimeRange 
timestamp.TimeRange, blockSize IntervalRule) *blockController {
+       return &blockController{
+               segID:        segID,
+               location:     location,
+               blockSize:    blockSize,
+               segTimeRange: segTimeRange,
+       }
+}
+
+func (bc *blockController) Current() bucket.Reporter {
+       bc.RLock()
+       defer bc.RUnlock()
+       now := time.Now()
+       for _, s := range bc.lst {
+               if s.suffix == bc.Format(now) {
+                       return s
+               }
+       }
+       // return the latest segment before now
+       if len(bc.lst) > 0 {
+               return bc.lst[len(bc.lst)-1]
+       }
+       return nil
+}
+
+func (bc *blockController) Next() (bucket.Reporter, error) {
+       b := bc.Current().(*block)
+       reporter, err := bc.create(context.TODO(),
+               bc.blockSize.NextTime(b.Start))
+       if errors.Is(err, ErrEndOfSegment) {
+               return nil, bucket.ErrNoMoreBucket
+       }
+       return reporter, err
+}
+
+func (bc *blockController) Format(tm time.Time) string {
+       switch bc.blockSize.Unit {
+       case HOUR:
+               return tm.Format(blockHourFormat)
+       case DAY:
+               return tm.Format(blockDayFormat)
+       case MILLISECOND:
+               return tm.Format(millisecondFormat)
+       }
+       panic("invalid interval unit")
+}
+
+func (bc *blockController) Parse(value string) (time.Time, error) {
+       switch bc.blockSize.Unit {
+       case HOUR:
+               return time.Parse(blockHourFormat, value)
+       case DAY:
+               return time.Parse(blockDayFormat, value)
+       case MILLISECOND:
+               return time.Parse(millisecondFormat, value)
+       }
+       panic("invalid interval unit")
+}
+
+func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) {
+       bc.RLock()
+       defer bc.RUnlock()
+       last := len(bc.lst) - 1
+       for i := range bc.lst {
+               b := bc.lst[last-i]
+               if b.Overlapping(timeRange) {
+                       bb = append(bb, b)
                }
-               {
-                       s.Lock()
-                       defer s.Unlock()
-                       s.lst = append(s.lst, b)
+       }
+       return bb
+}
+
+func (bc *blockController) get(blockID uint16) *block {
+       bc.RLock()
+       defer bc.RUnlock()
+       last := len(bc.lst) - 1
+       for i := range bc.lst {
+               b := bc.lst[last-i]
+               if b.blockID == blockID {
+                       return b
                }
-               return nil
        }
-       err = walkDir(path, blockPathPrefix, func(_, absolutePath string) error 
{
-               return loadBlock(absolutePath)
-       })
+       return nil
+}
+
+func (bc *blockController) startTime(suffix string) (time.Time, error) {
+       t, err := bc.Parse(suffix)
        if err != nil {
-               return nil, err
+               return time.Time{}, err
        }
-       if len(s.lst) < 1 {
-               blockPath, err := mkdir(blockTemplate, path, 
time.Now().Format(blockFormat))
-               if err != nil {
-                       return nil, err
-               }
-               err = loadBlock(blockPath)
+       startTime := bc.segTimeRange.Start
+       switch bc.blockSize.Unit {
+       case HOUR:
+               return time.Date(startTime.Year(), startTime.Month(),
+                       startTime.Day(), t.Hour(), 0, 0, 0, time.UTC), nil
+       case DAY:
+               return time.Date(startTime.Year(), startTime.Month(),
+                       t.Day(), t.Hour(), 0, 0, 0, time.UTC), nil
+       case MILLISECOND:
+               return time.Parse(millisecondFormat, suffix)
+       }
+       panic("invalid interval unit")
+}
+
+func (bc *blockController) open(ctx context.Context) error {
+       err := WalkDir(
+               bc.location,
+               segPathPrefix,
+               func(suffix, absolutePath string) error {
+                       _, err := bc.load(ctx, suffix, absolutePath)
+                       return err
+               })
+       if err != nil {
+               return err
+       }
+       if bc.Current() == nil {
+               _, err = bc.create(ctx, time.Now())
                if err != nil {
-                       return nil, err
+                       return err
                }
        }
-       return s, nil
+       return nil
 }
 
-func (s *segment) close() {
-       s.Lock()
-       defer s.Unlock()
-       for _, b := range s.lst {
-               b.close()
+func (bc *blockController) create(ctx context.Context, startTime time.Time) 
(*block, error) {
+       if startTime.Before(bc.segTimeRange.Start) {
+               startTime = bc.segTimeRange.Start
+       }
+       if !startTime.Before(bc.segTimeRange.End) {
+               return nil, ErrEndOfSegment
+       }
+       suffix := bc.Format(startTime)
+       segPath, err := mkdir(blockTemplate, bc.location, suffix)
+       if err != nil {
+               return nil, err
+       }
+       return bc.load(ctx, suffix, segPath)
+}
+
+func (bc *blockController) load(ctx context.Context, suffix, path string) (b 
*block, err error) {
+       starTime, err := bc.startTime(suffix)
+       if err != nil {
+               return nil, err
+       }
+       if b, err = newBlock(ctx, blockOpts{
+               segID:     bc.segID,
+               path:      path,
+               startTime: starTime,
+               suffix:    suffix,
+               blockSize: bc.blockSize,
+       }); err != nil {
+               return nil, err
+       }
+       {
+               bc.Lock()
+               defer bc.Unlock()
+               bc.lst = append(bc.lst, b)
+       }
+       return b, nil
+}
+
+func (bc *blockController) close() {
+       for _, s := range bc.lst {
+               s.close()
        }
-       s.globalIndex.Close()
-       close(s.reporterStopCh)
 }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 5cc9805..bed9555 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var (
@@ -72,67 +73,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
        return nil
 }
 
-type TimeRange struct {
-       Start        time.Time
-       End          time.Time
-       IncludeStart bool
-       IncludeEnd   bool
-}
-
-func (t TimeRange) Contains(unixNano uint64) bool {
-       tp := time.Unix(0, int64(unixNano))
-       if t.Start.Equal(tp) {
-               return t.IncludeStart
-       }
-       if t.End.Equal(tp) {
-               return t.IncludeEnd
-       }
-       return !tp.Before(t.Start) && !tp.After(t.End)
-}
-
-func (t TimeRange) Overlapping(other TimeRange) bool {
-       if t.Start.Equal(other.End) {
-               return t.IncludeStart && other.IncludeEnd
-       }
-       if other.Start.Equal(t.End) {
-               return t.IncludeEnd && other.IncludeStart
-       }
-       return !t.Start.After(other.End) && !other.Start.After(t.End)
-}
-
-func (t TimeRange) Duration() time.Duration {
-       return t.End.Sub(t.Start)
-}
-
-func NewInclusiveTimeRange(start, end time.Time) TimeRange {
-       return TimeRange{
-               Start:        start,
-               End:          end,
-               IncludeStart: true,
-               IncludeEnd:   true,
-       }
-}
-
-func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) 
TimeRange {
-       return NewTimeRangeDuration(start, duration, true, true)
-}
-
-func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) 
TimeRange {
-       return TimeRange{
-               Start:        start,
-               End:          end,
-               IncludeStart: includeStart,
-               IncludeEnd:   includeEnd,
-       }
-}
-
-func NewTimeRangeDuration(start time.Time, duration time.Duration, 
includeStart, includeEnd bool) TimeRange {
-       return NewTimeRange(start, start.Add(duration), includeStart, 
includeEnd)
-}
-
 type Series interface {
        ID() common.SeriesID
-       Span(timeRange TimeRange) (SeriesSpan, error)
+       Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
        Get(id GlobalItemID) (Item, io.Closer, error)
 }
 
@@ -167,7 +110,7 @@ func (s *series) ID() common.SeriesID {
        return s.id
 }
 
-func (s *series) Span(timeRange TimeRange) (SeriesSpan, error) {
+func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
        blocks := s.blockDB.span(timeRange)
        if len(blocks) < 1 {
                return nil, ErrEmptySeriesSpan
@@ -199,7 +142,7 @@ type seriesSpan struct {
        blocks    []blockDelegate
        seriesID  common.SeriesID
        shardID   common.ShardID
-       timeRange TimeRange
+       timeRange timestamp.TimeRange
        l         *logger.Logger
 }
 
@@ -218,7 +161,7 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
        return newSeekerBuilder(s)
 }
 
-func newSeriesSpan(ctx context.Context, timeRange TimeRange, blocks 
[]blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
+func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks 
[]blockDelegate, id common.SeriesID, shardID common.ShardID) *seriesSpan {
        s := &seriesSpan{
                blocks:    blocks,
                seriesID:  id,
diff --git a/banyand/tsdb/series_test.go b/banyand/tsdb/series_test.go
index 25a7df6..6d2b940 100644
--- a/banyand/tsdb/series_test.go
+++ b/banyand/tsdb/series_test.go
@@ -24,7 +24,7 @@ import (
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
 
-       "github.com/apache/skywalking-banyandb/banyand/tsdb"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ = Describe("Series", func() {
@@ -34,7 +34,7 @@ var _ = Describe("Series", func() {
                                startTime, _ := time.Parse("20060202", start)
                                endTime, _ := time.Parse("20060202", end)
                                tsTime, _ := time.Parse("20060202", ts)
-                               Expect(tsdb.NewTimeRange(startTime, endTime, 
includeStart, 
includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
+                               Expect(timestamp.NewTimeRange(startTime, 
endTime, includeStart, 
includeEnd).Contains(uint64(tsTime.UnixNano()))).To(Equal(expected))
                        }
                        DescribeTable("It's a exclusive range",
                                func(start, end, ts string, expected bool) {
@@ -90,8 +90,8 @@ var _ = Describe("Series", func() {
                                                for _, r2l := range includes {
                                                        for _, r2u := range 
includes {
                                                                
By(fmt.Sprintf("r1 lower:%v upper:%v. r1 lower:%v upper:%v", r1l, r1u, r2l, 
r2u), func() {
-                                                                       r1 := 
tsdb.NewTimeRange(startTime1, endTime1, r1l, r1u)
-                                                                       r2 := 
tsdb.NewTimeRange(startTime2, endTime2, r2l, r2u)
+                                                                       r1 := 
timestamp.NewTimeRange(startTime1, endTime1, r1l, r1u)
+                                                                       r2 := 
timestamp.NewTimeRange(startTime2, endTime2, r2l, r2u)
                                                                        
Expect(r1.Overlapping(r2)).To(Equal(expected))
                                                                        
Expect(r2.Overlapping(r1)).To(Equal(expected))
                                                                })
@@ -114,8 +114,8 @@ var _ = Describe("Series", func() {
                                endTime1, _ := time.Parse("20060102", 
"20210107")
                                startTime2, _ := time.Parse("20060102", 
"20210107")
                                endTime2, _ := time.Parse("20060102", 
"20210109")
-                               r1 := tsdb.NewTimeRange(startTime1, endTime1, 
false, include1)
-                               r2 := tsdb.NewTimeRange(startTime2, endTime2, 
include2, false)
+                               r1 := timestamp.NewTimeRange(startTime1, 
endTime1, false, include1)
+                               r2 := timestamp.NewTimeRange(startTime2, 
endTime2, include2, false)
                                Expect(r1.Overlapping(r2)).To(Equal(expected))
                        }
 
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index de269e6..d082a08 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var maxIntBytes = convert.Uint64ToBytes(math.MaxUint64)
@@ -95,7 +96,7 @@ type SeriesDatabase interface {
 
 type blockDatabase interface {
        shardID() common.ShardID
-       span(timeRange TimeRange) []blockDelegate
+       span(timeRange timestamp.TimeRange) []blockDelegate
        block(id GlobalItemID) blockDelegate
 }
 
@@ -138,7 +139,11 @@ func (s *seriesDB) block(id GlobalItemID) blockDelegate {
        if seg == nil {
                return nil
        }
-       return seg.lst[id.blockID].delegate()
+       block := seg.blockController.get(id.blockID)
+       if block == nil {
+               return nil
+       }
+       return block.delegate()
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -195,11 +200,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
        return result, err
 }
 
-func (s *seriesDB) span(timeRange TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate {
        //TODO: return correct blocks
        result := make([]blockDelegate, 0)
        for _, s := range s.segCtrl.span(timeRange) {
-               for _, b := range s.lst {
+               for _, b := range s.blockController.span(timeRange) {
                        result = append(result, b.delegate())
                }
        }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 7eaf50f..0f70e53 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -28,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ Shard = (*shard)(nil)
@@ -54,7 +55,7 @@ func (s *shard) Index() IndexDatabase {
        return s.indexDatabase
 }
 
-func OpenShard(ctx context.Context, id common.ShardID, root string, 
intervalRule IntervalRule) (Shard, error) {
+func OpenShard(ctx context.Context, id common.ShardID, root string, 
segmentSize, blockSize IntervalRule) (Shard, error) {
        path, err := mkdir(shardTemplate, root, int(id))
        if err != nil {
                return nil, errors.Wrapf(err, "make the directory of the shard 
%d ", int(id))
@@ -63,7 +64,7 @@ func OpenShard(ctx context.Context, id common.ShardID, root 
string, intervalRule
        l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a 
shard")
        s := &shard{
                id:                id,
-               segmentController: newSegmentController(path, intervalRule),
+               segmentController: newSegmentController(path, segmentSize, 
blockSize),
                l:                 l,
        }
        shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
@@ -102,20 +103,17 @@ func (s *shard) Close() error {
 type IntervalUnit int
 
 const (
-       DAY IntervalUnit = iota
-       MONTH
-       YEAR
-       MILLISECOND // only for testing
+       MILLISECOND IntervalUnit = iota // only for testing
+       HOUR
+       DAY
 )
 
 func (iu IntervalUnit) String() string {
        switch iu {
+       case HOUR:
+               return "hour"
        case DAY:
                return "day"
-       case MONTH:
-               return "month"
-       case YEAR:
-               return "year"
        case MILLISECOND:
                return "millis"
 
@@ -123,34 +121,6 @@ func (iu IntervalUnit) String() string {
        panic("invalid interval unit")
 }
 
-func (iu IntervalUnit) Format(tm time.Time) string {
-       switch iu {
-       case DAY:
-               return tm.Format(segDayFormat)
-       case MONTH:
-               return tm.Format(segMonthFormat)
-       case YEAR:
-               return tm.Format(segYearFormat)
-       case MILLISECOND:
-               return tm.Format(segMillisecondFormat)
-       }
-       panic("invalid interval unit")
-}
-
-func (iu IntervalUnit) Parse(value string) (time.Time, error) {
-       switch iu {
-       case DAY:
-               return time.Parse(segDayFormat, value)
-       case MONTH:
-               return time.Parse(segMonthFormat, value)
-       case YEAR:
-               return time.Parse(segYearFormat, value)
-       case MILLISECOND:
-               return time.Parse(segMillisecondFormat, value)
-       }
-       panic("invalid interval unit")
-}
-
 type IntervalRule struct {
        Unit IntervalUnit
        Num  int
@@ -158,29 +128,41 @@ type IntervalRule struct {
 
 func (ir IntervalRule) NextTime(current time.Time) time.Time {
        switch ir.Unit {
+       case HOUR:
+               return current.Add(time.Hour * time.Duration(ir.Num))
        case DAY:
                return current.AddDate(0, 0, ir.Num)
-       case MONTH:
-               return current.AddDate(0, ir.Num, 0)
-       case YEAR:
-               return current.AddDate(ir.Num, 0, 0)
        case MILLISECOND:
                return current.Add(time.Millisecond * time.Duration(ir.Num))
        }
        panic("invalid interval unit")
 }
 
+func (ir IntervalRule) EstimatedDuration() time.Duration {
+       switch ir.Unit {
+       case HOUR:
+               return time.Hour * time.Duration(ir.Num)
+       case DAY:
+               return 24 * time.Hour * time.Duration(ir.Num)
+       case MILLISECOND:
+               return time.Microsecond * time.Duration(ir.Num)
+       }
+       panic("invalid interval unit")
+}
+
 type segmentController struct {
        sync.RWMutex
-       location     string
-       intervalRule IntervalRule
-       lst          []*segment
+       location    string
+       segmentSize IntervalRule
+       blockSize   IntervalRule
+       lst         []*segment
 }
 
-func newSegmentController(location string, intervalRule IntervalRule) 
*segmentController {
+func newSegmentController(location string, segmentSize, blockSize 
IntervalRule) *segmentController {
        return &segmentController{
-               location:     location,
-               intervalRule: intervalRule,
+               location:    location,
+               segmentSize: segmentSize,
+               blockSize:   blockSize,
        }
 }
 
@@ -197,7 +179,7 @@ func (sc *segmentController) get(segID uint16) *segment {
        return nil
 }
 
-func (sc *segmentController) span(timeRange TimeRange) (ss []*segment) {
+func (sc *segmentController) span(timeRange timestamp.TimeRange) (ss 
[]*segment) {
        sc.RLock()
        defer sc.RUnlock()
        last := len(sc.lst) - 1
@@ -223,7 +205,7 @@ func (sc *segmentController) Current() bucket.Reporter {
        defer sc.RUnlock()
        now := time.Now()
        for _, s := range sc.lst {
-               if s.suffix == sc.intervalRule.Unit.Format(now) {
+               if s.suffix == sc.Format(now) {
                        return s
                }
        }
@@ -235,12 +217,37 @@ func (sc *segmentController) Current() bucket.Reporter {
 }
 
 func (sc *segmentController) Next() (bucket.Reporter, error) {
-       return sc.create(context.TODO(), sc.intervalRule.Unit.Format(
-               sc.intervalRule.NextTime(time.Now())))
+       seg := sc.Current().(*segment)
+       return sc.create(context.TODO(), sc.Format(
+               sc.segmentSize.NextTime(seg.Start)))
+}
+
+func (sc *segmentController) Format(tm time.Time) string {
+       switch sc.segmentSize.Unit {
+       case HOUR:
+               return tm.Format(segHourFormat)
+       case DAY:
+               return tm.Format(segDayFormat)
+       case MILLISECOND:
+               return tm.Format(millisecondFormat)
+       }
+       panic("invalid interval unit")
+}
+
+func (sc *segmentController) Parse(value string) (time.Time, error) {
+       switch sc.segmentSize.Unit {
+       case HOUR:
+               return time.Parse(segHourFormat, value)
+       case DAY:
+               return time.Parse(segDayFormat, value)
+       case MILLISECOND:
+               return time.Parse(millisecondFormat, value)
+       }
+       panic("invalid interval unit")
 }
 
 func (sc *segmentController) open(ctx context.Context) error {
-       err := walkDir(
+       err := WalkDir(
                sc.location,
                segPathPrefix,
                func(suffix, absolutePath string) error {
@@ -251,7 +258,7 @@ func (sc *segmentController) open(ctx context.Context) 
error {
                return err
        }
        if sc.Current() == nil {
-               _, err = sc.create(ctx, sc.intervalRule.Unit.Format(time.Now()))
+               _, err = sc.create(ctx, sc.Format(time.Now()))
                if err != nil {
                        return err
                }
@@ -268,7 +275,11 @@ func (sc *segmentController) create(ctx context.Context, 
suffix string) (*segmen
 }
 
 func (sc *segmentController) load(ctx context.Context, suffix, path string) 
(seg *segment, err error) {
-       seg, err = openSegment(ctx, suffix, path, sc.intervalRule)
+       startTime, err := sc.Parse(suffix)
+       if err != nil {
+               return nil, err
+       }
+       seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, 
sc.blockSize)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 5609f0a..96e78e3 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,8 +19,6 @@ package tsdb_test
 
 import (
        "context"
-       "io/ioutil"
-       "strings"
        "time"
 
        . "github.com/onsi/ginkgo/v2"
@@ -48,22 +46,41 @@ var _ = Describe("Shard", func() {
                })
                It("generates several segments", func() {
                        var err error
-                       shard, err = tsdb.OpenShard(context.TODO(), 
common.ShardID(0), tmp, tsdb.IntervalRule{
-                               Unit: tsdb.MILLISECOND,
-                               Num:  1000,
-                       })
+                       shard, err = tsdb.OpenShard(context.TODO(), 
common.ShardID(0), tmp,
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.MILLISECOND,
+                                       Num:  3000,
+                               },
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.MILLISECOND,
+                                       Num:  1000,
+                               },
+                       )
                        Expect(err).NotTo(HaveOccurred())
+                       segDirectories := make([]string, 3)
                        Eventually(func() int {
-                               files, err := ioutil.ReadDir(tmp + "/shard-0")
-                               Expect(err).NotTo(HaveOccurred())
                                num := 0
-                               for _, fi := range files {
-                                       if fi.IsDir() && 
strings.HasPrefix(fi.Name(), "seg-") {
-                                               num++
+                               err := tsdb.WalkDir(tmp+"/shard-0", "seg-", 
func(suffix, absolutePath string) error {
+                                       if num < 3 {
+                                               segDirectories[num] = 
absolutePath
                                        }
-                               }
+                                       num++
+                                       return nil
+                               })
+                               Expect(err).NotTo(HaveOccurred())
                                return num
                        }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 3))
+                       for _, d := range segDirectories {
+                               Eventually(func() int {
+                                       num := 0
+                                       err := tsdb.WalkDir(d, "block-", 
func(suffix, absolutePath string) error {
+                                               num++
+                                               return nil
+                                       })
+                                       Expect(err).NotTo(HaveOccurred())
+                                       return num
+                               }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 3))
+                       }
                })
 
        })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index a3aff1c..7659b53 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -49,18 +49,18 @@ const (
        blockTemplate       = rootPrefix + blockPathPrefix + "-%s"
        globalIndexTemplate = rootPrefix + "index"
 
-       segDayFormat         = "20060102"
-       segMonthFormat       = "200601"
-       segYearFormat        = "2006"
-       segMillisecondFormat = "20060102150405000"
-       blockFormat          = "1504"
+       segHourFormat     = "2006010215"
+       segDayFormat      = "20060102"
+       millisecondFormat = "20060102150405000"
+       blockHourFormat   = "15"
+       blockDayFormat    = "0102"
 
        dirPerm = 0700
 )
 
 var (
-       ErrInvalidShardID       = errors.New("invalid shard id")
-       ErrEncodingMethodAbsent = errors.New("encoding method is absent")
+       ErrInvalidShardID = errors.New("invalid shard id")
+       ErrOpenDatabase   = errors.New("fails to open the database")
 
        indexRulesKey     = contextIndexRulesKey{}
        encodingMethodKey = contextEncodingMethodKey{}
@@ -89,6 +89,8 @@ type DatabaseOpts struct {
        ShardNum       uint32
        IndexRules     []*databasev1.IndexRule
        EncodingMethod EncodingMethod
+       SegmentSize    IntervalRule
+       BlockSize      IntervalRule
 }
 
 type EncodingMethod struct {
@@ -97,9 +99,11 @@ type EncodingMethod struct {
 }
 
 type database struct {
-       logger   *logger.Logger
-       location string
-       shardNum uint32
+       logger      *logger.Logger
+       location    string
+       shardNum    uint32
+       segmentSize IntervalRule
+       blockSize   IntervalRule
 
        sLst []Shard
        sync.Mutex
@@ -129,15 +133,34 @@ func (d *database) Close() error {
 
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
        if opts.EncodingMethod.EncoderPool == nil || 
opts.EncodingMethod.DecoderPool == nil {
-               return nil, errors.Wrap(ErrEncodingMethodAbsent, "failed to 
open database")
+               return nil, errors.Wrap(ErrOpenDatabase, "encoding method is 
absent")
        }
        if _, err := mkdir(opts.Location); err != nil {
                return nil, err
        }
+       segmentSize := opts.SegmentSize
+       if segmentSize.Unit == MILLISECOND {
+               segmentSize = IntervalRule{
+                       Unit: DAY,
+                       Num:  1,
+               }
+       }
+       blockSize := opts.BlockSize
+       if blockSize.Unit == MILLISECOND {
+               blockSize = IntervalRule{
+                       Unit: HOUR,
+                       Num:  2,
+               }
+       }
+       if blockSize.EstimatedDuration() > segmentSize.EstimatedDuration() {
+               return nil, errors.Wrapf(ErrOpenDatabase, "the block size is 
bigger than the segment size")
+       }
        db := &database{
-               location: opts.Location,
-               shardNum: opts.ShardNum,
-               logger:   logger.Fetch(ctx, "tsdb"),
+               location:    opts.Location,
+               shardNum:    opts.ShardNum,
+               logger:      logger.Fetch(ctx, "tsdb"),
+               segmentSize: segmentSize,
+               blockSize:   blockSize,
        }
        db.logger.Info().Str("path", opts.Location).Msg("initialized")
        var entries []fs.FileInfo
@@ -164,10 +187,8 @@ func createDatabase(ctx context.Context, db *database, 
startID int) (Database, e
        var err error
        for i := startID; i < int(db.shardNum); i++ {
                db.logger.Info().Int("shard_id", i).Msg("creating a shard")
-               so, errNewShard := OpenShard(ctx, common.ShardID(i), 
db.location, IntervalRule{
-                       Unit: DAY,
-                       Num:  1,
-               })
+               so, errNewShard := OpenShard(ctx, common.ShardID(i),
+                       db.location, db.segmentSize, db.blockSize)
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -182,7 +203,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        //TODO: open the manifest file
        db.Lock()
        defer db.Unlock()
-       err := walkDir(db.location, shardPathPrefix, func(suffix, _ string) 
error {
+       err := WalkDir(db.location, shardPathPrefix, func(suffix, _ string) 
error {
                shardID, err := strconv.Atoi(suffix)
                if err != nil {
                        return err
@@ -195,10 +216,9 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
                        context.WithValue(ctx, logger.ContextKey, db.logger),
                        common.ShardID(shardID),
                        db.location,
-                       IntervalRule{
-                               Unit: DAY,
-                               Num:  1,
-                       })
+                       db.segmentSize,
+                       db.blockSize,
+               )
                if errOpenShard != nil {
                        return errOpenShard
                }
@@ -220,9 +240,9 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        return db, nil
 }
 
-type walkFn func(suffix, absolutePath string) error
+type WalkFn func(suffix, absolutePath string) error
 
-func walkDir(root, prefix string, walkFn walkFn) error {
+func WalkDir(root, prefix string, walkFn WalkFn) error {
        files, err := ioutil.ReadDir(root)
        if err != nil {
                return errors.Wrapf(err, "failed to walk the database path: 
%s", root)
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 75c1cb8..0dca4c2 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -62,7 +62,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, 
tempDir string) {
        now := time.Now()
        segPath := fmt.Sprintf(segTemplate, shardPath, now.Format(segDayFormat))
        validateDirectory(tester, segPath)
-       validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockFormat)))
+       validateDirectory(tester, fmt.Sprintf(blockTemplate, segPath, 
now.Format(blockHourFormat)))
 }
 
 func openDatabase(t *require.Assertions, path string) (db Database) {
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 95f3e3e..9a6b1fd 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -23,6 +23,7 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type (
@@ -70,7 +71,7 @@ func projectItem(ec executor.ExecutionContext, item 
tsdb.Item, projectionFieldRe
 // with the help of Entity. The result is a list of element set, where the 
order of inner list is kept
 // as what the users specify in the seekerBuilder.
 // This method is used by the underlying tableScan and localIndexScan plans.
-func executeForShard(series tsdb.SeriesList, timeRange tsdb.TimeRange,
+func executeForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
        builders ...seekerBuilder) ([]tsdb.Iterator, error) {
        var itersInShard []tsdb.Iterator
        for _, seriesFound := range series {
diff --git a/pkg/query/logical/plan_indexscan_local.go 
b/pkg/query/logical/plan_indexscan_local.go
index a18e62e..4233b27 100644
--- a/pkg/query/logical/plan_indexscan_local.go
+++ b/pkg/query/logical/plan_indexscan_local.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 var _ UnresolvedPlan = (*unresolvedIndexScan)(nil)
@@ -108,7 +109,7 @@ func (uis *unresolvedIndexScan) Analyze(s Schema) (Plan, 
error) {
 
        return &localIndexScan{
                orderBy:             orderBySubPlan,
-               timeRange:           tsdb.NewInclusiveTimeRange(uis.startTime, 
uis.endTime),
+               timeRange:           
timestamp.NewInclusiveTimeRange(uis.startTime, uis.endTime),
                schema:              s,
                projectionFieldRefs: projFieldsRefs,
                metadata:            uis.metadata,
@@ -121,7 +122,7 @@ var _ Plan = (*localIndexScan)(nil)
 
 type localIndexScan struct {
        *orderBy
-       timeRange           tsdb.TimeRange
+       timeRange           timestamp.TimeRange
        schema              Schema
        metadata            *commonv1.Metadata
        conditionMap        map[*databasev1.IndexRule][]Expr
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
new file mode 100644
index 0000000..aae487b
--- /dev/null
+++ b/pkg/timestamp/range.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package timestamp
+
+import (
+       "time"
+)
+
+type TimeRange struct {
+       Start        time.Time
+       End          time.Time
+       IncludeStart bool
+       IncludeEnd   bool
+}
+
+func (t TimeRange) Contains(unixNano uint64) bool {
+       tp := time.Unix(0, int64(unixNano))
+       if t.Start.Equal(tp) {
+               return t.IncludeStart
+       }
+       if t.End.Equal(tp) {
+               return t.IncludeEnd
+       }
+       return !tp.Before(t.Start) && !tp.After(t.End)
+}
+
+func (t TimeRange) Overlapping(other TimeRange) bool {
+       if t.Start.Equal(other.End) {
+               return t.IncludeStart && other.IncludeEnd
+       }
+       if other.Start.Equal(t.End) {
+               return t.IncludeEnd && other.IncludeStart
+       }
+       return !t.Start.After(other.End) && !other.Start.After(t.End)
+}
+
+func (t TimeRange) Duration() time.Duration {
+       return t.End.Sub(t.Start)
+}
+
+func NewInclusiveTimeRange(start, end time.Time) TimeRange {
+       return TimeRange{
+               Start:        start,
+               End:          end,
+               IncludeStart: true,
+               IncludeEnd:   true,
+       }
+}
+
+func NewInclusiveTimeRangeDuration(start time.Time, duration time.Duration) 
TimeRange {
+       return NewTimeRangeDuration(start, duration, true, true)
+}
+
+func NewTimeRange(start, end time.Time, includeStart, includeEnd bool) 
TimeRange {
+       return TimeRange{
+               Start:        start,
+               End:          end,
+               IncludeStart: includeStart,
+               IncludeEnd:   includeEnd,
+       }
+}
+
+func NewTimeRangeDuration(start time.Time, duration time.Duration, 
includeStart, includeEnd bool) TimeRange {
+       return NewTimeRange(start, start.Add(duration), includeStart, 
includeEnd)
+}

Reply via email to