This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new a5d0f6d Add a queue to track and close blocks (#79)
a5d0f6d is described below
commit a5d0f6db8f45ce843f0bd9b6f0bde6dc8da67557
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Feb 17 10:24:37 2022 +0800
Add a queue to track and close blocks (#79)
---
banyand/tsdb/block.go | 22 +++++-
banyand/tsdb/bucket/bucket.go | 2 +
banyand/tsdb/bucket/queue.go | 145 +++++++++++++++++++++++++++++++++++
banyand/tsdb/bucket/queue_test.go | 56 ++++++++++++++
banyand/tsdb/bucket/strategy.go | 1 +
banyand/tsdb/bucket/strategy_test.go | 7 ++
banyand/tsdb/segment.go | 73 ++++++++++++++++--
banyand/tsdb/shard.go | 44 +++++++++--
banyand/tsdb/shard_test.go | 43 +++++++++++
banyand/tsdb/tsdb.go | 3 +-
go.mod | 3 +-
go.sum | 2 +
pkg/timestamp/range.go | 18 +++++
13 files changed, 402 insertions(+), 17 deletions(-)
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 884a0d2..98c0b73 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -24,6 +24,7 @@ import (
"time"
"github.com/dgraph-io/ristretto/z"
+ "go.uber.org/atomic"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/kv"
@@ -37,10 +38,11 @@ import (
)
type block struct {
- path string
- l *logger.Logger
- suffix string
- ref *z.Closer
+ path string
+ l *logger.Logger
+ suffix string
+ ref *z.Closer
+ closing *atomic.Bool
store kv.TimeSeriesStore
primaryIndex index.Store
@@ -76,6 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block,
err error) {
l: logger.Fetch(ctx, "block"),
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange),
+ closing: atomic.NewBool(false),
}
encodingMethodObject := ctx.Value(encodingMethodKey)
if encodingMethodObject == nil {
@@ -113,10 +116,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b
*block, err error) {
return nil, err
}
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+ go func() {
+ <-b.ref.HasBeenClosed()
+ b.closing.Store(true)
+ }()
return b, err
}
func (b *block) delegate() blockDelegate {
+ if b.closing.Load() {
+ return nil
+ }
b.incRef()
return &bDelegate{
delegate: b,
@@ -139,6 +149,10 @@ func (b *block) close() {
}
}
+func (b block) String() string {
+ return b.Reporter.String()
+}
+
type blockDelegate interface {
io.Closer
contains(ts time.Time) bool
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 406093c..dc1b77f 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -26,6 +26,7 @@ import (
type Controller interface {
Current() Reporter
Next() (Reporter, error)
+ OnMove(prev, next Reporter)
}
type Status struct {
@@ -38,6 +39,7 @@ type Channel chan Status
type Reporter interface {
Report() Channel
Stop()
+ String() string
}
type timeBasedReporter struct {
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
new file mode 100644
index 0000000..e26109d
--- /dev/null
+++ b/banyand/tsdb/bucket/queue.go
@@ -0,0 +1,145 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket
+
+import (
+ "errors"
+ "sync"
+
+ "github.com/hashicorp/golang-lru/simplelru"
+)
+
+type EvictFn func(id interface{})
+
+type Queue interface {
+ Push(id interface{})
+ Len() int
+}
+
+const (
+ DefaultRecentRatio = 0.25
+ DefaultGhostEntries = 0.50
+)
+
+var ErrInvalidSize = errors.New("invalid size")
+
+type lruQueue struct {
+ size int
+ recentSize int
+ evictSize int
+ evictFn EvictFn
+
+ recent simplelru.LRUCache
+ frequent simplelru.LRUCache
+ recentEvict simplelru.LRUCache
+ lock sync.RWMutex
+}
+
+func NewQueue(size int, evictFn EvictFn) (Queue, error) {
+ if size <= 0 {
+ return nil, ErrInvalidSize
+ }
+
+ recentSize := int(float64(size) * DefaultRecentRatio)
+ evictSize := int(float64(size) * DefaultGhostEntries)
+
+ recent, err := simplelru.NewLRU(size, nil)
+ if err != nil {
+ return nil, err
+ }
+ frequent, err := simplelru.NewLRU(size, nil)
+ if err != nil {
+ return nil, err
+ }
+ recentEvict, err := simplelru.NewLRU(evictSize, nil)
+ if err != nil {
+ return nil, err
+ }
+ c := &lruQueue{
+ size: size,
+ recentSize: recentSize,
+ recent: recent,
+ frequent: frequent,
+ recentEvict: recentEvict,
+ evictSize: evictSize,
+ evictFn: evictFn,
+ }
+ return c, nil
+}
+
+func (q *lruQueue) Push(id interface{}) {
+ q.lock.Lock()
+ defer q.lock.Unlock()
+
+ if q.frequent.Contains(id) {
+ q.frequent.Add(id, nil)
+ return
+ }
+
+ if q.recent.Contains(id) {
+ q.recent.Remove(id)
+ q.frequent.Add(id, nil)
+ return
+ }
+
+ if q.recentEvict.Contains(id) {
+ q.ensureSpace(true)
+ q.recentEvict.Remove(id)
+ q.frequent.Add(id, nil)
+ return
+ }
+
+ q.ensureSpace(false)
+ q.recent.Add(id, nil)
+}
+
+func (q *lruQueue) Len() int {
+ q.lock.RLock()
+ defer q.lock.RUnlock()
+ return q.recent.Len() + q.frequent.Len()
+}
+
+func (q *lruQueue) ensureSpace(recentEvict bool) {
+ recentLen := q.recent.Len()
+ freqLen := q.frequent.Len()
+ if recentLen+freqLen < q.size {
+ return
+ }
+ if recentLen > 0 && (recentLen > q.recentSize || (recentLen ==
q.recentSize && !recentEvict)) {
+ k, _, _ := q.recent.RemoveOldest()
+ q.addLst(q.recentEvict, q.evictSize, k)
+ return
+ }
+ q.removeOldest(q.frequent)
+}
+
+func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) {
+ if lst.Len() < size {
+ lst.Add(id, nil)
+ return
+ }
+ q.removeOldest(lst)
+ lst.Add(id, nil)
+}
+
+func (q *lruQueue) removeOldest(lst simplelru.LRUCache) {
+ oldestID, _, ok := lst.RemoveOldest()
+ if ok && q.evictFn != nil {
+ q.evictFn(oldestID)
+ }
+}
diff --git a/banyand/tsdb/bucket/queue_test.go
b/banyand/tsdb/bucket/queue_test.go
new file mode 100644
index 0000000..fbda289
--- /dev/null
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -0,0 +1,56 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket_test
+
+import (
+ . "github.com/onsi/ginkgo/v2"
+ . "github.com/onsi/gomega"
+
+ "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+)
+
+type queueEntryID struct {
+ first uint16
+ second uint16
+}
+
+func entryID(id uint16) queueEntryID {
+ return queueEntryID{
+ first: id,
+ second: id + 1,
+ }
+}
+
+var _ = Describe("Queue", func() {
+ It("pushes data", func() {
+ evictLst := make([]queueEntryID, 0)
+ l, err := bucket.NewQueue(128, func(id interface{}) {
+ evictLst = append(evictLst, id.(queueEntryID))
+ })
+ Expect(err).ShouldNot(HaveOccurred())
+
+ for i := 0; i < 256; i++ {
+ l.Push(entryID(uint16(i)))
+ }
+ Expect(l.Len()).To(Equal(128))
+ Expect(len(evictLst)).To(Equal(64))
+ for i := 0; i < 64; i++ {
+ Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
+ }
+ })
+})
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 339aa4d..581ebd8 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -111,6 +111,7 @@ func (s *Strategy) Run() {
}
}
if ratio >= 1.0 {
+ s.ctrl.OnMove(s.current, s.next)
s.current = s.next
s.next = nil
goto bucket
diff --git a/banyand/tsdb/bucket/strategy_test.go
b/banyand/tsdb/bucket/strategy_test.go
index d225d23..80d9f8c 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -109,6 +109,9 @@ func (c *controller) Current() bucket.Reporter {
return c.reporter
}
+func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
+}
+
func (c *controller) newReporter() {
c.reporter = &reporter{step: c.step, capacity: c.capacity}
}
@@ -142,3 +145,7 @@ func (r *reporter) Report() bucket.Channel {
func (r *reporter) Stop() {
}
+
+func (r *reporter) String() string {
+ return "default"
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 60897d2..2063318 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -37,15 +37,16 @@ type segment struct {
path string
suffix string
- globalIndex kv.Store
- l *logger.Logger
+ blockCloseCh chan uint16
+ globalIndex kv.Store
+ l *logger.Logger
timestamp.TimeRange
bucket.Reporter
blockController *blockController
blockManageStrategy *bucket.Strategy
}
-func openSegment(ctx context.Context, startTime time.Time, path, suffix
string, segmentSize, blockSize IntervalRule) (s *segment, err error) {
+func openSegment(ctx context.Context, startTime time.Time, path, suffix
string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s
*segment, err error) {
suffixInteger, err := strconv.Atoi(suffix)
if err != nil {
return nil, err
@@ -53,14 +54,16 @@ func openSegment(ctx context.Context, startTime time.Time,
path, suffix string,
// TODO: fix id overflow
id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
timeRange := timestamp.NewTimeRange(startTime,
segmentSize.NextTime(startTime), true, false)
+ l := logger.Fetch(ctx, "segment")
s = &segment{
id: id,
path: path,
suffix: suffix,
- l: logger.Fetch(ctx, "segment"),
- blockController: newBlockController(id, path, timeRange,
blockSize),
+ l: l,
+ blockController: newBlockController(id, path, timeRange,
blockSize, l, blockQueue),
TimeRange: timeRange,
Reporter: bucket.NewTimeBasedReporter(timeRange),
+ blockCloseCh: make(chan uint16, 24),
}
err = s.blockController.open(context.WithValue(ctx, logger.ContextKey,
s.l))
if err != nil {
@@ -79,6 +82,7 @@ func openSegment(ctx context.Context, startTime time.Time,
path, suffix string,
return nil, err
}
s.blockManageStrategy.Run()
+ s.blockController.listenToBlockClosing(s.blockCloseCh)
return s, nil
}
@@ -89,6 +93,14 @@ func (s *segment) close() {
s.Stop()
}
+func (s *segment) notifyCloseBlock(id uint16) {
+ s.blockCloseCh <- id
+}
+
+func (s segment) String() string {
+ return s.Reporter.String()
+}
+
type blockController struct {
sync.RWMutex
segID uint16
@@ -96,14 +108,22 @@ type blockController struct {
segTimeRange timestamp.TimeRange
blockSize IntervalRule
lst []*block
+ blockQueue bucket.Queue
+ closing chan struct{}
+
+ l *logger.Logger
}
-func newBlockController(segID uint16, location string, segTimeRange
timestamp.TimeRange, blockSize IntervalRule) *blockController {
+func newBlockController(segID uint16, location string, segTimeRange
timestamp.TimeRange,
+ blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue)
*blockController {
return &blockController{
segID: segID,
location: location,
blockSize: blockSize,
segTimeRange: segTimeRange,
+ closing: make(chan struct{}),
+ blockQueue: blockQueue,
+ l: l,
}
}
@@ -133,6 +153,46 @@ func (bc *blockController) Next() (bucket.Reporter, error)
{
return reporter, err
}
+func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
+ bc.blockQueue.Push(blockIDAndSegID{
+ segID: bc.segID,
+ blockID: prev.(*block).blockID,
+ })
+}
+
+func (bc *blockController) listenToBlockClosing(closeCh <-chan uint16) {
+ go func() {
+ for {
+ select {
+ case id := <-closeCh:
+ b := bc.removeBlock(id)
+ if b != nil {
+ b.close()
+ continue
+ }
+ bc.l.Warn().Uint16("blockID",
id).Uint16("segID", id).
+ Msg("block is absent on removing it
from the segment")
+ case <-bc.closing:
+ return
+ }
+ }
+ }()
+}
+
+func (bc *blockController) removeBlock(blockID uint16) *block {
+ bc.Lock()
+ defer bc.Unlock()
+ for i := range bc.lst {
+ b := bc.lst[i]
+ if b.blockID == blockID {
+ //remove the block from the internal lst
+ bc.lst = append(bc.lst[:i], bc.lst[i+1:]...)
+ return b
+ }
+ }
+ return nil
+}
+
func (bc *blockController) Format(tm time.Time) string {
switch bc.blockSize.Unit {
case HOUR:
@@ -260,6 +320,7 @@ func (bc *blockController) load(ctx context.Context,
suffix, path string) (b *bl
}
func (bc *blockController) close() {
+ bc.closing <- struct{}{}
for _, s := range bc.lst {
s.close()
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index c0e65c7..135dd58 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,6 +31,8 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+const defaultBlockQueueSize = 1 << 4
+
var _ Shard = (*shard)(nil)
type shard struct {
@@ -55,16 +57,24 @@ func (s *shard) Index() IndexDatabase {
return s.indexDatabase
}
-func OpenShard(ctx context.Context, id common.ShardID, root string,
segmentSize, blockSize IntervalRule) (Shard, error) {
+func OpenShard(ctx context.Context, id common.ShardID,
+ root string, segmentSize, blockSize IntervalRule, openedBlockSize int)
(Shard, error) {
path, err := mkdir(shardTemplate, root, int(id))
if err != nil {
return nil, errors.Wrapf(err, "make the directory of the shard
%d ", int(id))
}
l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a
shard")
+ if openedBlockSize < 1 {
+ openedBlockSize = defaultBlockQueueSize
+ }
+ sc, err := newSegmentController(path, segmentSize, blockSize,
openedBlockSize, l)
+ if err != nil {
+ return nil, errors.Wrapf(err, "create the segment controller of
the shard %d", int(id))
+ }
s := &shard{
id: id,
- segmentController: newSegmentController(path, segmentSize,
blockSize),
+ segmentController: sc,
l: l,
}
shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
@@ -156,14 +166,34 @@ type segmentController struct {
segmentSize IntervalRule
blockSize IntervalRule
lst []*segment
+ blockQueue bucket.Queue
+
+ l *logger.Logger
}
-func newSegmentController(location string, segmentSize, blockSize
IntervalRule) *segmentController {
- return &segmentController{
+type blockIDAndSegID struct {
+ segID uint16
+ blockID uint16
+}
+
+func newSegmentController(location string, segmentSize, blockSize
IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController,
error) {
+ sc := &segmentController{
location: location,
segmentSize: segmentSize,
blockSize: blockSize,
+ l: l,
}
+ var err error
+ sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id
interface{}) {
+ bsID := id.(blockIDAndSegID)
+ seg := sc.get(bsID.segID)
+ if seg == nil {
+ l.Warn().Uint16("segID", bsID.segID).Msg("segment is
absent")
+ return
+ }
+ seg.notifyCloseBlock(bsID.blockID)
+ })
+ return sc, err
}
func (sc *segmentController) get(segID uint16) *segment {
@@ -222,6 +252,10 @@ func (sc *segmentController) Next() (bucket.Reporter,
error) {
sc.segmentSize.NextTime(seg.Start)))
}
+func (sc *segmentController) OnMove(prev bucket.Reporter, next
bucket.Reporter) {
+ sc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to
the next segment")
+}
+
func (sc *segmentController) Format(tm time.Time) string {
switch sc.segmentSize.Unit {
case HOUR:
@@ -282,7 +316,7 @@ func (sc *segmentController) load(ctx context.Context,
suffix, path string) (seg
if err != nil {
return nil, err
}
- seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize,
sc.blockSize)
+ seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize,
sc.blockSize, sc.blockQueue)
if err != nil {
return nil, err
}
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 96e78e3..a7b45ed 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,6 +19,9 @@ package tsdb_test
import (
"context"
+ "errors"
+ "os"
+ "path"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -55,6 +58,7 @@ var _ = Describe("Shard", func() {
Unit: tsdb.MILLISECOND,
Num: 1000,
},
+ 1<<4,
)
Expect(err).NotTo(HaveOccurred())
segDirectories := make([]string, 3)
@@ -82,6 +86,45 @@ var _ = Describe("Shard", func() {
}).WithTimeout(10 *
time.Second).Should(BeNumerically(">=", 3))
}
})
+ It("closes blocks", func() {
+ var err error
+ shard, err = tsdb.OpenShard(context.TODO(),
common.ShardID(0), tmp,
+ tsdb.IntervalRule{
+ Unit: tsdb.DAY,
+ Num: 1,
+ },
+ tsdb.IntervalRule{
+ Unit: tsdb.MILLISECOND,
+ Num: 1000,
+ },
+ 2,
+ )
+ Expect(err).NotTo(HaveOccurred())
+ var segDirectory string
+ Eventually(func() int {
+ num := 0
+ errInternal := tsdb.WalkDir(tmp+"/shard-0",
"seg-", func(suffix, absolutePath string) error {
+ if num < 1 {
+ segDirectory = absolutePath
+ }
+ num++
+ return nil
+ })
+ Expect(errInternal).NotTo(HaveOccurred())
+ return num
+ }).WithTimeout(10 *
time.Second).Should(BeNumerically(">=", 1))
+ Eventually(func() int {
+ num := 0
+ errInternal := tsdb.WalkDir(segDirectory,
"block-", func(suffix, absolutePath string) error {
+ if _, err :=
os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err,
os.ErrNotExist) {
+ num++
+ }
+ return nil
+ })
+ Expect(errInternal).NotTo(HaveOccurred())
+ return num
+ }).WithTimeout(30 *
time.Second).Should(BeNumerically(">=", 1))
+ })
})
})
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 22bf791..b863a98 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -186,7 +186,7 @@ func createDatabase(ctx context.Context, db *database,
startID int) (Database, e
for i := startID; i < int(db.shardNum); i++ {
db.logger.Info().Int("shard_id", i).Msg("creating a shard")
so, errNewShard := OpenShard(ctx, common.ShardID(i),
- db.location, db.segmentSize, db.blockSize)
+ db.location, db.segmentSize, db.blockSize,
defaultBlockQueueSize)
if errNewShard != nil {
err = multierr.Append(err, errNewShard)
continue
@@ -216,6 +216,7 @@ func loadDatabase(ctx context.Context, db *database)
(Database, error) {
db.location,
db.segmentSize,
db.blockSize,
+ defaultBlockQueueSize,
)
if errOpenShard != nil {
return errOpenShard
diff --git a/go.mod b/go.mod
index 22513ff..a445dc6 100644
--- a/go.mod
+++ b/go.mod
@@ -51,6 +51,7 @@ require (
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
+ github.com/hashicorp/golang-lru v0.5.4
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jonboulle/clockwork v0.2.2 // indirect
@@ -93,7 +94,7 @@ require (
go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
go.opentelemetry.io/otel/trace v0.20.0 // indirect
go.opentelemetry.io/proto/otlp v0.7.0 // indirect
- go.uber.org/atomic v1.9.0 // indirect
+ go.uber.org/atomic v1.9.0
go.uber.org/zap v1.17.0 // indirect
golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
golang.org/x/text v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index d0e069f..ee831c2 100644
--- a/go.sum
+++ b/go.sum
@@ -258,6 +258,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
github.com/hashicorp/go.net v0.0.1/go.mod
h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
github.com/hashicorp/golang-lru v0.5.0/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.1/go.mod
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4
h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
+github.com/hashicorp/golang-lru v0.5.4/go.mod
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod
h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index aae487b..e0db185 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -53,6 +53,24 @@ func (t TimeRange) Duration() time.Duration {
return t.End.Sub(t.Start)
}
+func (t TimeRange) String() string {
+ var buf []byte
+ if t.IncludeStart {
+ buf = []byte("[")
+ } else {
+ buf = []byte("(")
+ }
+ buf = append(buf, t.Start.String()...)
+ buf = append(buf, ", "...)
+ buf = append(buf, t.End.String()...)
+ if t.IncludeEnd {
+ buf = append(buf, "]"...)
+ } else {
+ buf = append(buf, ")"...)
+ }
+ return string(buf)
+}
+
func NewInclusiveTimeRange(start, end time.Time) TimeRange {
return TimeRange{
Start: start,