This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch tsdb-closer
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 97a18702f81b0aa3f6ac8a43da409dc39fa1a816
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Feb 17 01:25:35 2022 +0000

    Add a queue to track and close blocks
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/tsdb/block.go                |  22 +++++-
 banyand/tsdb/bucket/bucket.go        |   2 +
 banyand/tsdb/bucket/queue.go         | 145 +++++++++++++++++++++++++++++++++++
 banyand/tsdb/bucket/queue_test.go    |  56 ++++++++++++++
 banyand/tsdb/bucket/strategy.go      |   1 +
 banyand/tsdb/bucket/strategy_test.go |   7 ++
 banyand/tsdb/segment.go              |  73 ++++++++++++++++--
 banyand/tsdb/shard.go                |  44 +++++++++--
 banyand/tsdb/shard_test.go           |  43 +++++++++++
 banyand/tsdb/tsdb.go                 |   3 +-
 go.mod                               |   3 +-
 go.sum                               |   2 +
 pkg/timestamp/range.go               |  18 +++++
 13 files changed, 402 insertions(+), 17 deletions(-)

diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 884a0d2..98c0b73 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/dgraph-io/ristretto/z"
+       "go.uber.org/atomic"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
@@ -37,10 +38,11 @@ import (
 )
 
 type block struct {
-       path   string
-       l      *logger.Logger
-       suffix string
-       ref    *z.Closer
+       path    string
+       l       *logger.Logger
+       suffix  string
+       ref     *z.Closer
+       closing *atomic.Bool
 
        store         kv.TimeSeriesStore
        primaryIndex  index.Store
@@ -76,6 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, 
err error) {
                l:         logger.Fetch(ctx, "block"),
                TimeRange: timeRange,
                Reporter:  bucket.NewTimeBasedReporter(timeRange),
+               closing:   atomic.NewBool(false),
        }
        encodingMethodObject := ctx.Value(encodingMethodKey)
        if encodingMethodObject == nil {
@@ -113,10 +116,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
                return nil, err
        }
        b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+       go func() {
+               <-b.ref.HasBeenClosed()
+               b.closing.Store(true)
+       }()
        return b, err
 }
 
 func (b *block) delegate() blockDelegate {
+       if b.closing.Load() {
+               return nil
+       }
        b.incRef()
        return &bDelegate{
                delegate: b,
@@ -139,6 +149,10 @@ func (b *block) close() {
        }
 }
 
+func (b block) String() string {
+       return b.Reporter.String()
+}
+
 type blockDelegate interface {
        io.Closer
        contains(ts time.Time) bool
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 406093c..dc1b77f 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -26,6 +26,7 @@ import (
 type Controller interface {
        Current() Reporter
        Next() (Reporter, error)
+       OnMove(prev, next Reporter)
 }
 
 type Status struct {
@@ -38,6 +39,7 @@ type Channel chan Status
 type Reporter interface {
        Report() Channel
        Stop()
+       String() string
 }
 
 type timeBasedReporter struct {
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
new file mode 100644
index 0000000..e26109d
--- /dev/null
+++ b/banyand/tsdb/bucket/queue.go
@@ -0,0 +1,145 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket
+
+import (
+       "errors"
+       "sync"
+
+       "github.com/hashicorp/golang-lru/simplelru"
+)
+
+type EvictFn func(id interface{})
+
+type Queue interface {
+       Push(id interface{})
+       Len() int
+}
+
+const (
+       DefaultRecentRatio  = 0.25
+       DefaultGhostEntries = 0.50
+)
+
+var ErrInvalidSize = errors.New("invalid size")
+
+type lruQueue struct {
+       size       int
+       recentSize int
+       evictSize  int
+       evictFn    EvictFn
+
+       recent      simplelru.LRUCache
+       frequent    simplelru.LRUCache
+       recentEvict simplelru.LRUCache
+       lock        sync.RWMutex
+}
+
+func NewQueue(size int, evictFn EvictFn) (Queue, error) {
+       if size <= 0 {
+               return nil, ErrInvalidSize
+       }
+
+       recentSize := int(float64(size) * DefaultRecentRatio)
+       evictSize := int(float64(size) * DefaultGhostEntries)
+
+       recent, err := simplelru.NewLRU(size, nil)
+       if err != nil {
+               return nil, err
+       }
+       frequent, err := simplelru.NewLRU(size, nil)
+       if err != nil {
+               return nil, err
+       }
+       recentEvict, err := simplelru.NewLRU(evictSize, nil)
+       if err != nil {
+               return nil, err
+       }
+       c := &lruQueue{
+               size:        size,
+               recentSize:  recentSize,
+               recent:      recent,
+               frequent:    frequent,
+               recentEvict: recentEvict,
+               evictSize:   evictSize,
+               evictFn:     evictFn,
+       }
+       return c, nil
+}
+
+func (q *lruQueue) Push(id interface{}) {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+
+       if q.frequent.Contains(id) {
+               q.frequent.Add(id, nil)
+               return
+       }
+
+       if q.recent.Contains(id) {
+               q.recent.Remove(id)
+               q.frequent.Add(id, nil)
+               return
+       }
+
+       if q.recentEvict.Contains(id) {
+               q.ensureSpace(true)
+               q.recentEvict.Remove(id)
+               q.frequent.Add(id, nil)
+               return
+       }
+
+       q.ensureSpace(false)
+       q.recent.Add(id, nil)
+}
+
+func (q *lruQueue) Len() int {
+       q.lock.RLock()
+       defer q.lock.RUnlock()
+       return q.recent.Len() + q.frequent.Len()
+}
+
+func (q *lruQueue) ensureSpace(recentEvict bool) {
+       recentLen := q.recent.Len()
+       freqLen := q.frequent.Len()
+       if recentLen+freqLen < q.size {
+               return
+       }
+       if recentLen > 0 && (recentLen > q.recentSize || (recentLen == 
q.recentSize && !recentEvict)) {
+               k, _, _ := q.recent.RemoveOldest()
+               q.addLst(q.recentEvict, q.evictSize, k)
+               return
+       }
+       q.removeOldest(q.frequent)
+}
+
+func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) {
+       if lst.Len() < size {
+               lst.Add(id, nil)
+               return
+       }
+       q.removeOldest(lst)
+       lst.Add(id, nil)
+}
+
+func (q *lruQueue) removeOldest(lst simplelru.LRUCache) {
+       oldestID, _, ok := lst.RemoveOldest()
+       if ok && q.evictFn != nil {
+               q.evictFn(oldestID)
+       }
+}
diff --git a/banyand/tsdb/bucket/queue_test.go 
b/banyand/tsdb/bucket/queue_test.go
new file mode 100644
index 0000000..fbda289
--- /dev/null
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -0,0 +1,56 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+package bucket_test
+
+import (
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+
+       "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+)
+
+type queueEntryID struct {
+       first  uint16
+       second uint16
+}
+
+func entryID(id uint16) queueEntryID {
+       return queueEntryID{
+               first:  id,
+               second: id + 1,
+       }
+}
+
+var _ = Describe("Queue", func() {
+       It("pushes data", func() {
+               evictLst := make([]queueEntryID, 0)
+               l, err := bucket.NewQueue(128, func(id interface{}) {
+                       evictLst = append(evictLst, id.(queueEntryID))
+               })
+               Expect(err).ShouldNot(HaveOccurred())
+
+               for i := 0; i < 256; i++ {
+                       l.Push(entryID(uint16(i)))
+               }
+               Expect(l.Len()).To(Equal(128))
+               Expect(len(evictLst)).To(Equal(64))
+               for i := 0; i < 64; i++ {
+                       Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
+               }
+       })
+})
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 339aa4d..581ebd8 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -111,6 +111,7 @@ func (s *Strategy) Run() {
                                        }
                                }
                                if ratio >= 1.0 {
+                                       s.ctrl.OnMove(s.current, s.next)
                                        s.current = s.next
                                        s.next = nil
                                        goto bucket
diff --git a/banyand/tsdb/bucket/strategy_test.go 
b/banyand/tsdb/bucket/strategy_test.go
index d225d23..80d9f8c 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -109,6 +109,9 @@ func (c *controller) Current() bucket.Reporter {
        return c.reporter
 }
 
+func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) {
+}
+
 func (c *controller) newReporter() {
        c.reporter = &reporter{step: c.step, capacity: c.capacity}
 }
@@ -142,3 +145,7 @@ func (r *reporter) Report() bucket.Channel {
 
 func (r *reporter) Stop() {
 }
+
+func (r *reporter) String() string {
+       return "default"
+}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 60897d2..2063318 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -37,15 +37,16 @@ type segment struct {
        path   string
        suffix string
 
-       globalIndex kv.Store
-       l           *logger.Logger
+       blockCloseCh chan uint16
+       globalIndex  kv.Store
+       l            *logger.Logger
        timestamp.TimeRange
        bucket.Reporter
        blockController     *blockController
        blockManageStrategy *bucket.Strategy
 }
 
-func openSegment(ctx context.Context, startTime time.Time, path, suffix 
string, segmentSize, blockSize IntervalRule) (s *segment, err error) {
+func openSegment(ctx context.Context, startTime time.Time, path, suffix 
string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s 
*segment, err error) {
        suffixInteger, err := strconv.Atoi(suffix)
        if err != nil {
                return nil, err
@@ -53,14 +54,16 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
        // TODO: fix id overflow
        id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4)
        timeRange := timestamp.NewTimeRange(startTime, 
segmentSize.NextTime(startTime), true, false)
+       l := logger.Fetch(ctx, "segment")
        s = &segment{
                id:              id,
                path:            path,
                suffix:          suffix,
-               l:               logger.Fetch(ctx, "segment"),
-               blockController: newBlockController(id, path, timeRange, 
blockSize),
+               l:               l,
+               blockController: newBlockController(id, path, timeRange, 
blockSize, l, blockQueue),
                TimeRange:       timeRange,
                Reporter:        bucket.NewTimeBasedReporter(timeRange),
+               blockCloseCh:    make(chan uint16, 24),
        }
        err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, 
s.l))
        if err != nil {
@@ -79,6 +82,7 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
                return nil, err
        }
        s.blockManageStrategy.Run()
+       s.blockController.listenToBlockClosing(s.blockCloseCh)
        return s, nil
 }
 
@@ -89,6 +93,14 @@ func (s *segment) close() {
        s.Stop()
 }
 
+func (s *segment) notifyCloseBlock(id uint16) {
+       s.blockCloseCh <- id
+}
+
+func (s segment) String() string {
+       return s.Reporter.String()
+}
+
 type blockController struct {
        sync.RWMutex
        segID        uint16
@@ -96,14 +108,22 @@ type blockController struct {
        segTimeRange timestamp.TimeRange
        blockSize    IntervalRule
        lst          []*block
+       blockQueue   bucket.Queue
+       closing      chan struct{}
+
+       l *logger.Logger
 }
 
-func newBlockController(segID uint16, location string, segTimeRange 
timestamp.TimeRange, blockSize IntervalRule) *blockController {
+func newBlockController(segID uint16, location string, segTimeRange 
timestamp.TimeRange,
+       blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) 
*blockController {
        return &blockController{
                segID:        segID,
                location:     location,
                blockSize:    blockSize,
                segTimeRange: segTimeRange,
+               closing:      make(chan struct{}),
+               blockQueue:   blockQueue,
+               l:            l,
        }
 }
 
@@ -133,6 +153,46 @@ func (bc *blockController) Next() (bucket.Reporter, error) 
{
        return reporter, err
 }
 
+func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
+       bc.blockQueue.Push(blockIDAndSegID{
+               segID:   bc.segID,
+               blockID: prev.(*block).blockID,
+       })
+}
+
+func (bc *blockController) listenToBlockClosing(closeCh <-chan uint16) {
+       go func() {
+               for {
+                       select {
+                       case id := <-closeCh:
+                               b := bc.removeBlock(id)
+                               if b != nil {
+                                       b.close()
+                                       continue
+                               }
+                               bc.l.Warn().Uint16("blockID", 
id).Uint16("segID", id).
+                                       Msg("block is absent on removing it 
from the segment")
+                       case <-bc.closing:
+                               return
+                       }
+               }
+       }()
+}
+
+func (bc *blockController) removeBlock(blockID uint16) *block {
+       bc.Lock()
+       defer bc.Unlock()
+       for i := range bc.lst {
+               b := bc.lst[i]
+               if b.blockID == blockID {
+                       //remove the block from the internal lst
+                       bc.lst = append(bc.lst[:i], bc.lst[i+1:]...)
+                       return b
+               }
+       }
+       return nil
+}
+
 func (bc *blockController) Format(tm time.Time) string {
        switch bc.blockSize.Unit {
        case HOUR:
@@ -260,6 +320,7 @@ func (bc *blockController) load(ctx context.Context, 
suffix, path string) (b *bl
 }
 
 func (bc *blockController) close() {
+       bc.closing <- struct{}{}
        for _, s := range bc.lst {
                s.close()
        }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index c0e65c7..135dd58 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -31,6 +31,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+const defaultBlockQueueSize = 1 << 4
+
 var _ Shard = (*shard)(nil)
 
 type shard struct {
@@ -55,16 +57,24 @@ func (s *shard) Index() IndexDatabase {
        return s.indexDatabase
 }
 
-func OpenShard(ctx context.Context, id common.ShardID, root string, 
segmentSize, blockSize IntervalRule) (Shard, error) {
+func OpenShard(ctx context.Context, id common.ShardID,
+       root string, segmentSize, blockSize IntervalRule, openedBlockSize int) 
(Shard, error) {
        path, err := mkdir(shardTemplate, root, int(id))
        if err != nil {
                return nil, errors.Wrapf(err, "make the directory of the shard 
%d ", int(id))
        }
        l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id)))
        l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a 
shard")
+       if openedBlockSize < 1 {
+               openedBlockSize = defaultBlockQueueSize
+       }
+       sc, err := newSegmentController(path, segmentSize, blockSize, 
openedBlockSize, l)
+       if err != nil {
+               return nil, errors.Wrapf(err, "create the segment controller of 
the shard %d", int(id))
+       }
        s := &shard{
                id:                id,
-               segmentController: newSegmentController(path, segmentSize, 
blockSize),
+               segmentController: sc,
                l:                 l,
        }
        shardCtx := context.WithValue(ctx, logger.ContextKey, s.l)
@@ -156,14 +166,34 @@ type segmentController struct {
        segmentSize IntervalRule
        blockSize   IntervalRule
        lst         []*segment
+       blockQueue  bucket.Queue
+
+       l *logger.Logger
 }
 
-func newSegmentController(location string, segmentSize, blockSize 
IntervalRule) *segmentController {
-       return &segmentController{
+type blockIDAndSegID struct {
+       segID   uint16
+       blockID uint16
+}
+
+func newSegmentController(location string, segmentSize, blockSize 
IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, 
error) {
+       sc := &segmentController{
                location:    location,
                segmentSize: segmentSize,
                blockSize:   blockSize,
+               l:           l,
        }
+       var err error
+       sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id 
interface{}) {
+               bsID := id.(blockIDAndSegID)
+               seg := sc.get(bsID.segID)
+               if seg == nil {
+                       l.Warn().Uint16("segID", bsID.segID).Msg("segment is 
absent")
+                       return
+               }
+               seg.notifyCloseBlock(bsID.blockID)
+       })
+       return sc, err
 }
 
 func (sc *segmentController) get(segID uint16) *segment {
@@ -222,6 +252,10 @@ func (sc *segmentController) Next() (bucket.Reporter, 
error) {
                sc.segmentSize.NextTime(seg.Start)))
 }
 
+func (sc *segmentController) OnMove(prev bucket.Reporter, next 
bucket.Reporter) {
+       sc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to 
the next segment")
+}
+
 func (sc *segmentController) Format(tm time.Time) string {
        switch sc.segmentSize.Unit {
        case HOUR:
@@ -282,7 +316,7 @@ func (sc *segmentController) load(ctx context.Context, 
suffix, path string) (seg
        if err != nil {
                return nil, err
        }
-       seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, 
sc.blockSize)
+       seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, 
sc.blockSize, sc.blockQueue)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 96e78e3..4a3f236 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -19,6 +19,9 @@ package tsdb_test
 
 import (
        "context"
+       "errors"
+       "os"
+       "path"
        "time"
 
        . "github.com/onsi/ginkgo/v2"
@@ -55,6 +58,7 @@ var _ = Describe("Shard", func() {
                                        Unit: tsdb.MILLISECOND,
                                        Num:  1000,
                                },
+                               1<<4,
                        )
                        Expect(err).NotTo(HaveOccurred())
                        segDirectories := make([]string, 3)
@@ -82,6 +86,45 @@ var _ = Describe("Shard", func() {
                                }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 3))
                        }
                })
+               It("closes blocks", func() {
+                       var err error
+                       shard, err = tsdb.OpenShard(context.TODO(), 
common.ShardID(0), tmp,
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.DAY,
+                                       Num:  1,
+                               },
+                               tsdb.IntervalRule{
+                                       Unit: tsdb.MILLISECOND,
+                                       Num:  1000,
+                               },
+                               2,
+                       )
+                       Expect(err).NotTo(HaveOccurred())
+                       var segDirectory string
+                       Eventually(func() int {
+                               num := 0
+                               errInternal := tsdb.WalkDir(tmp+"/shard-0", 
"seg-", func(suffix, absolutePath string) error {
+                                       if num < 1 {
+                                               segDirectory = absolutePath
+                                       }
+                                       num++
+                                       return nil
+                               })
+                               Expect(errInternal).NotTo(HaveOccurred())
+                               return num
+                       }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 1))
+                       Eventually(func() int {
+                               num := 0
+                               errInternal := tsdb.WalkDir(segDirectory, 
"block-", func(suffix, absolutePath string) error {
+                                       if _, err := 
os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err, 
os.ErrNotExist) {
+                                               num++
+                                       }
+                                       return nil
+                               })
+                               Expect(errInternal).NotTo(HaveOccurred())
+                               return num
+                       }).WithTimeout(10 * 
time.Second).Should(BeNumerically(">=", 2))
+               })
 
        })
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 22bf791..b863a98 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -186,7 +186,7 @@ func createDatabase(ctx context.Context, db *database, 
startID int) (Database, e
        for i := startID; i < int(db.shardNum); i++ {
                db.logger.Info().Int("shard_id", i).Msg("creating a shard")
                so, errNewShard := OpenShard(ctx, common.ShardID(i),
-                       db.location, db.segmentSize, db.blockSize)
+                       db.location, db.segmentSize, db.blockSize, 
defaultBlockQueueSize)
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -216,6 +216,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
                        db.location,
                        db.segmentSize,
                        db.blockSize,
+                       defaultBlockQueueSize,
                )
                if errOpenShard != nil {
                        return errOpenShard
diff --git a/go.mod b/go.mod
index 22513ff..a445dc6 100644
--- a/go.mod
+++ b/go.mod
@@ -51,6 +51,7 @@ require (
        github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
        github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
+       github.com/hashicorp/golang-lru v0.5.4
        github.com/hashicorp/hcl v1.0.0 // indirect
        github.com/inconshreveable/mousetrap v1.0.0 // indirect
        github.com/jonboulle/clockwork v0.2.2 // indirect
@@ -93,7 +94,7 @@ require (
        go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
        go.opentelemetry.io/otel/trace v0.20.0 // indirect
        go.opentelemetry.io/proto/otlp v0.7.0 // indirect
-       go.uber.org/atomic v1.9.0 // indirect
+       go.uber.org/atomic v1.9.0
        go.uber.org/zap v1.17.0 // indirect
        golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect
        golang.org/x/text v0.3.6 // indirect
diff --git a/go.sum b/go.sum
index d0e069f..ee831c2 100644
--- a/go.sum
+++ b/go.sum
@@ -258,6 +258,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod 
h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b
 github.com/hashicorp/go.net v0.0.1/go.mod 
h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
 github.com/hashicorp/golang-lru v0.5.0/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
 github.com/hashicorp/golang-lru v0.5.1/go.mod 
h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
+github.com/hashicorp/golang-lru v0.5.4 
h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
+github.com/hashicorp/golang-lru v0.5.4/go.mod 
h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
 github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
 github.com/hashicorp/hcl v1.0.0/go.mod 
h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
 github.com/hashicorp/logutils v1.0.0/go.mod 
h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go
index aae487b..e0db185 100644
--- a/pkg/timestamp/range.go
+++ b/pkg/timestamp/range.go
@@ -53,6 +53,24 @@ func (t TimeRange) Duration() time.Duration {
        return t.End.Sub(t.Start)
 }
 
+func (t TimeRange) String() string {
+       var buf []byte
+       if t.IncludeStart {
+               buf = []byte("[")
+       } else {
+               buf = []byte("(")
+       }
+       buf = append(buf, t.Start.String()...)
+       buf = append(buf, ", "...)
+       buf = append(buf, t.End.String()...)
+       if t.IncludeEnd {
+               buf = append(buf, "]"...)
+       } else {
+               buf = append(buf, ")"...)
+       }
+       return string(buf)
+}
+
 func NewInclusiveTimeRange(start, end time.Time) TimeRange {
        return TimeRange{
                Start:        start,

Reply via email to