This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch tsdb-closer in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 97a18702f81b0aa3f6ac8a43da409dc39fa1a816 Author: Gao Hongtao <[email protected]> AuthorDate: Thu Feb 17 01:25:35 2022 +0000 Add a queue to track and close blocks Signed-off-by: Gao Hongtao <[email protected]> --- banyand/tsdb/block.go | 22 +++++- banyand/tsdb/bucket/bucket.go | 2 + banyand/tsdb/bucket/queue.go | 145 +++++++++++++++++++++++++++++++++++ banyand/tsdb/bucket/queue_test.go | 56 ++++++++++++++ banyand/tsdb/bucket/strategy.go | 1 + banyand/tsdb/bucket/strategy_test.go | 7 ++ banyand/tsdb/segment.go | 73 ++++++++++++++++-- banyand/tsdb/shard.go | 44 +++++++++-- banyand/tsdb/shard_test.go | 43 +++++++++++ banyand/tsdb/tsdb.go | 3 +- go.mod | 3 +- go.sum | 2 + pkg/timestamp/range.go | 18 +++++ 13 files changed, 402 insertions(+), 17 deletions(-) diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 884a0d2..98c0b73 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -24,6 +24,7 @@ import ( "time" "github.com/dgraph-io/ristretto/z" + "go.uber.org/atomic" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" @@ -37,10 +38,11 @@ import ( ) type block struct { - path string - l *logger.Logger - suffix string - ref *z.Closer + path string + l *logger.Logger + suffix string + ref *z.Closer + closing *atomic.Bool store kv.TimeSeriesStore primaryIndex index.Store @@ -76,6 +78,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { l: logger.Fetch(ctx, "block"), TimeRange: timeRange, Reporter: bucket.NewTimeBasedReporter(timeRange), + closing: atomic.NewBool(false), } encodingMethodObject := ctx.Value(encodingMethodKey) if encodingMethodObject == nil { @@ -113,10 +116,17 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { return nil, err } b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex) + go func() { + <-b.ref.HasBeenClosed() + b.closing.Store(true) + }() return b, err } func (b *block) delegate() blockDelegate { + if b.closing.Load() { + return nil + } b.incRef() return &bDelegate{ delegate: b, @@ -139,6 +149,10 @@ func (b *block) close() { } } +func (b block) String() string { + return b.Reporter.String() +} + type blockDelegate interface { io.Closer contains(ts time.Time) bool diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go index 406093c..dc1b77f 100644 --- a/banyand/tsdb/bucket/bucket.go +++ b/banyand/tsdb/bucket/bucket.go @@ -26,6 +26,7 @@ import ( type Controller interface { Current() Reporter Next() (Reporter, error) + OnMove(prev, next Reporter) } type Status struct { @@ -38,6 +39,7 @@ type Channel chan Status type Reporter interface { Report() Channel Stop() + String() string } type timeBasedReporter struct { diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go new file mode 100644 index 0000000..e26109d --- /dev/null +++ b/banyand/tsdb/bucket/queue.go @@ -0,0 +1,145 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package bucket + +import ( + "errors" + "sync" + + "github.com/hashicorp/golang-lru/simplelru" +) + +type EvictFn func(id interface{}) + +type Queue interface { + Push(id interface{}) + Len() int +} + +const ( + DefaultRecentRatio = 0.25 + DefaultGhostEntries = 0.50 +) + +var ErrInvalidSize = errors.New("invalid size") + +type lruQueue struct { + size int + recentSize int + evictSize int + evictFn EvictFn + + recent simplelru.LRUCache + frequent simplelru.LRUCache + recentEvict simplelru.LRUCache + lock sync.RWMutex +} + +func NewQueue(size int, evictFn EvictFn) (Queue, error) { + if size <= 0 { + return nil, ErrInvalidSize + } + + recentSize := int(float64(size) * DefaultRecentRatio) + evictSize := int(float64(size) * DefaultGhostEntries) + + recent, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + frequent, err := simplelru.NewLRU(size, nil) + if err != nil { + return nil, err + } + recentEvict, err := simplelru.NewLRU(evictSize, nil) + if err != nil { + return nil, err + } + c := &lruQueue{ + size: size, + recentSize: recentSize, + recent: recent, + frequent: frequent, + recentEvict: recentEvict, + evictSize: evictSize, + evictFn: evictFn, + } + return c, nil +} + +func (q *lruQueue) Push(id interface{}) { + q.lock.Lock() + defer q.lock.Unlock() + + if q.frequent.Contains(id) { + q.frequent.Add(id, nil) + return + } + + if q.recent.Contains(id) { + q.recent.Remove(id) + q.frequent.Add(id, nil) + return + } + + if q.recentEvict.Contains(id) { + q.ensureSpace(true) + q.recentEvict.Remove(id) + q.frequent.Add(id, nil) + return + } + + q.ensureSpace(false) + q.recent.Add(id, nil) +} + +func (q *lruQueue) Len() int { + q.lock.RLock() + defer q.lock.RUnlock() + return q.recent.Len() + q.frequent.Len() +} + +func (q *lruQueue) ensureSpace(recentEvict bool) { + recentLen := q.recent.Len() + freqLen := q.frequent.Len() + if recentLen+freqLen < q.size { + return + } + if recentLen > 0 && (recentLen > q.recentSize || (recentLen == q.recentSize && !recentEvict)) { + k, _, _ := q.recent.RemoveOldest() + q.addLst(q.recentEvict, q.evictSize, k) + return + } + q.removeOldest(q.frequent) +} + +func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) { + if lst.Len() < size { + lst.Add(id, nil) + return + } + q.removeOldest(lst) + lst.Add(id, nil) +} + +func (q *lruQueue) removeOldest(lst simplelru.LRUCache) { + oldestID, _, ok := lst.RemoveOldest() + if ok && q.evictFn != nil { + q.evictFn(oldestID) + } +} diff --git a/banyand/tsdb/bucket/queue_test.go b/banyand/tsdb/bucket/queue_test.go new file mode 100644 index 0000000..fbda289 --- /dev/null +++ b/banyand/tsdb/bucket/queue_test.go @@ -0,0 +1,56 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// +package bucket_test + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" +) + +type queueEntryID struct { + first uint16 + second uint16 +} + +func entryID(id uint16) queueEntryID { + return queueEntryID{ + first: id, + second: id + 1, + } +} + +var _ = Describe("Queue", func() { + It("pushes data", func() { + evictLst := make([]queueEntryID, 0) + l, err := bucket.NewQueue(128, func(id interface{}) { + evictLst = append(evictLst, id.(queueEntryID)) + }) + Expect(err).ShouldNot(HaveOccurred()) + + for i := 0; i < 256; i++ { + l.Push(entryID(uint16(i))) + } + Expect(l.Len()).To(Equal(128)) + Expect(len(evictLst)).To(Equal(64)) + for i := 0; i < 64; i++ { + Expect(evictLst[i]).To(Equal(entryID(uint16(i)))) + } + }) +}) diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go index 339aa4d..581ebd8 100644 --- a/banyand/tsdb/bucket/strategy.go +++ b/banyand/tsdb/bucket/strategy.go @@ -111,6 +111,7 @@ func (s *Strategy) Run() { } } if ratio >= 1.0 { + s.ctrl.OnMove(s.current, s.next) s.current = s.next s.next = nil goto bucket diff --git a/banyand/tsdb/bucket/strategy_test.go b/banyand/tsdb/bucket/strategy_test.go index d225d23..80d9f8c 100644 --- a/banyand/tsdb/bucket/strategy_test.go +++ b/banyand/tsdb/bucket/strategy_test.go @@ -109,6 +109,9 @@ func (c *controller) Current() bucket.Reporter { return c.reporter } +func (c *controller) OnMove(prev bucket.Reporter, next bucket.Reporter) { +} + func (c *controller) newReporter() { c.reporter = &reporter{step: c.step, capacity: c.capacity} } @@ -142,3 +145,7 @@ func (r *reporter) Report() bucket.Channel { func (r *reporter) Stop() { } + +func (r *reporter) String() string { + return "default" +} diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index 60897d2..2063318 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -37,15 +37,16 @@ type segment struct { path string suffix string - globalIndex kv.Store - l *logger.Logger + blockCloseCh chan uint16 + globalIndex kv.Store + l *logger.Logger timestamp.TimeRange bucket.Reporter blockController *blockController blockManageStrategy *bucket.Strategy } -func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule) (s *segment, err error) { +func openSegment(ctx context.Context, startTime time.Time, path, suffix string, segmentSize, blockSize IntervalRule, blockQueue bucket.Queue) (s *segment, err error) { suffixInteger, err := strconv.Atoi(suffix) if err != nil { return nil, err @@ -53,14 +54,16 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, // TODO: fix id overflow id := uint16(segmentSize.Unit)<<12 | ((uint16(suffixInteger) << 4) >> 4) timeRange := timestamp.NewTimeRange(startTime, segmentSize.NextTime(startTime), true, false) + l := logger.Fetch(ctx, "segment") s = &segment{ id: id, path: path, suffix: suffix, - l: logger.Fetch(ctx, "segment"), - blockController: newBlockController(id, path, timeRange, blockSize), + l: l, + blockController: newBlockController(id, path, timeRange, blockSize, l, blockQueue), TimeRange: timeRange, Reporter: bucket.NewTimeBasedReporter(timeRange), + blockCloseCh: make(chan uint16, 24), } err = s.blockController.open(context.WithValue(ctx, logger.ContextKey, s.l)) if err != nil { @@ -79,6 +82,7 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, return nil, err } s.blockManageStrategy.Run() + s.blockController.listenToBlockClosing(s.blockCloseCh) return s, nil } @@ -89,6 +93,14 @@ func (s *segment) close() { s.Stop() } +func (s *segment) notifyCloseBlock(id uint16) { + s.blockCloseCh <- id +} + +func (s segment) String() string { + return s.Reporter.String() +} + type blockController struct { sync.RWMutex segID uint16 @@ -96,14 +108,22 @@ type blockController struct { segTimeRange timestamp.TimeRange blockSize IntervalRule lst []*block + blockQueue bucket.Queue + closing chan struct{} + + l *logger.Logger } -func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange, blockSize IntervalRule) *blockController { +func newBlockController(segID uint16, location string, segTimeRange timestamp.TimeRange, + blockSize IntervalRule, l *logger.Logger, blockQueue bucket.Queue) *blockController { return &blockController{ segID: segID, location: location, blockSize: blockSize, segTimeRange: segTimeRange, + closing: make(chan struct{}), + blockQueue: blockQueue, + l: l, } } @@ -133,6 +153,46 @@ func (bc *blockController) Next() (bucket.Reporter, error) { return reporter, err } +func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) { + bc.blockQueue.Push(blockIDAndSegID{ + segID: bc.segID, + blockID: prev.(*block).blockID, + }) +} + +func (bc *blockController) listenToBlockClosing(closeCh <-chan uint16) { + go func() { + for { + select { + case id := <-closeCh: + b := bc.removeBlock(id) + if b != nil { + b.close() + continue + } + bc.l.Warn().Uint16("blockID", id).Uint16("segID", id). + Msg("block is absent on removing it from the segment") + case <-bc.closing: + return + } + } + }() +} + +func (bc *blockController) removeBlock(blockID uint16) *block { + bc.Lock() + defer bc.Unlock() + for i := range bc.lst { + b := bc.lst[i] + if b.blockID == blockID { + //remove the block from the internal lst + bc.lst = append(bc.lst[:i], bc.lst[i+1:]...) + return b + } + } + return nil +} + func (bc *blockController) Format(tm time.Time) string { switch bc.blockSize.Unit { case HOUR: @@ -260,6 +320,7 @@ func (bc *blockController) load(ctx context.Context, suffix, path string) (b *bl } func (bc *blockController) close() { + bc.closing <- struct{}{} for _, s := range bc.lst { s.close() } diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index c0e65c7..135dd58 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -31,6 +31,8 @@ import ( "github.com/apache/skywalking-banyandb/pkg/timestamp" ) +const defaultBlockQueueSize = 1 << 4 + var _ Shard = (*shard)(nil) type shard struct { @@ -55,16 +57,24 @@ func (s *shard) Index() IndexDatabase { return s.indexDatabase } -func OpenShard(ctx context.Context, id common.ShardID, root string, segmentSize, blockSize IntervalRule) (Shard, error) { +func OpenShard(ctx context.Context, id common.ShardID, + root string, segmentSize, blockSize IntervalRule, openedBlockSize int) (Shard, error) { path, err := mkdir(shardTemplate, root, int(id)) if err != nil { return nil, errors.Wrapf(err, "make the directory of the shard %d ", int(id)) } l := logger.Fetch(ctx, "shard"+strconv.Itoa(int(id))) l.Info().Int("shard_id", int(id)).Str("path", path).Msg("creating a shard") + if openedBlockSize < 1 { + openedBlockSize = defaultBlockQueueSize + } + sc, err := newSegmentController(path, segmentSize, blockSize, openedBlockSize, l) + if err != nil { + return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id)) + } s := &shard{ id: id, - segmentController: newSegmentController(path, segmentSize, blockSize), + segmentController: sc, l: l, } shardCtx := context.WithValue(ctx, logger.ContextKey, s.l) @@ -156,14 +166,34 @@ type segmentController struct { segmentSize IntervalRule blockSize IntervalRule lst []*segment + blockQueue bucket.Queue + + l *logger.Logger } -func newSegmentController(location string, segmentSize, blockSize IntervalRule) *segmentController { - return &segmentController{ +type blockIDAndSegID struct { + segID uint16 + blockID uint16 +} + +func newSegmentController(location string, segmentSize, blockSize IntervalRule, openedBlockSize int, l *logger.Logger) (*segmentController, error) { + sc := &segmentController{ location: location, segmentSize: segmentSize, blockSize: blockSize, + l: l, } + var err error + sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) { + bsID := id.(blockIDAndSegID) + seg := sc.get(bsID.segID) + if seg == nil { + l.Warn().Uint16("segID", bsID.segID).Msg("segment is absent") + return + } + seg.notifyCloseBlock(bsID.blockID) + }) + return sc, err } func (sc *segmentController) get(segID uint16) *segment { @@ -222,6 +252,10 @@ func (sc *segmentController) Next() (bucket.Reporter, error) { sc.segmentSize.NextTime(seg.Start))) } +func (sc *segmentController) OnMove(prev bucket.Reporter, next bucket.Reporter) { + sc.l.Info().Stringer("prev", prev).Stringer("next", next).Msg("move to the next segment") +} + func (sc *segmentController) Format(tm time.Time) string { switch sc.segmentSize.Unit { case HOUR: @@ -282,7 +316,7 @@ func (sc *segmentController) load(ctx context.Context, suffix, path string) (seg if err != nil { return nil, err } - seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize) + seg, err = openSegment(ctx, startTime, path, suffix, sc.segmentSize, sc.blockSize, sc.blockQueue) if err != nil { return nil, err } diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go index 96e78e3..4a3f236 100644 --- a/banyand/tsdb/shard_test.go +++ b/banyand/tsdb/shard_test.go @@ -19,6 +19,9 @@ package tsdb_test import ( "context" + "errors" + "os" + "path" "time" . "github.com/onsi/ginkgo/v2" @@ -55,6 +58,7 @@ var _ = Describe("Shard", func() { Unit: tsdb.MILLISECOND, Num: 1000, }, + 1<<4, ) Expect(err).NotTo(HaveOccurred()) segDirectories := make([]string, 3) @@ -82,6 +86,45 @@ var _ = Describe("Shard", func() { }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 3)) } }) + It("closes blocks", func() { + var err error + shard, err = tsdb.OpenShard(context.TODO(), common.ShardID(0), tmp, + tsdb.IntervalRule{ + Unit: tsdb.DAY, + Num: 1, + }, + tsdb.IntervalRule{ + Unit: tsdb.MILLISECOND, + Num: 1000, + }, + 2, + ) + Expect(err).NotTo(HaveOccurred()) + var segDirectory string + Eventually(func() int { + num := 0 + errInternal := tsdb.WalkDir(tmp+"/shard-0", "seg-", func(suffix, absolutePath string) error { + if num < 1 { + segDirectory = absolutePath + } + num++ + return nil + }) + Expect(errInternal).NotTo(HaveOccurred()) + return num + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 1)) + Eventually(func() int { + num := 0 + errInternal := tsdb.WalkDir(segDirectory, "block-", func(suffix, absolutePath string) error { + if _, err := os.Stat(path.Join(absolutePath, "store", "LOCK")); errors.Is(err, os.ErrNotExist) { + num++ + } + return nil + }) + Expect(errInternal).NotTo(HaveOccurred()) + return num + }).WithTimeout(10 * time.Second).Should(BeNumerically(">=", 2)) + }) }) }) diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 22bf791..b863a98 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -186,7 +186,7 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e for i := startID; i < int(db.shardNum); i++ { db.logger.Info().Int("shard_id", i).Msg("creating a shard") so, errNewShard := OpenShard(ctx, common.ShardID(i), - db.location, db.segmentSize, db.blockSize) + db.location, db.segmentSize, db.blockSize, defaultBlockQueueSize) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue @@ -216,6 +216,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { db.location, db.segmentSize, db.blockSize, + defaultBlockQueueSize, ) if errOpenShard != nil { return errOpenShard diff --git a/go.mod b/go.mod index 22513ff..a445dc6 100644 --- a/go.mod +++ b/go.mod @@ -51,6 +51,7 @@ require ( github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect + github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.0.0 // indirect github.com/jonboulle/clockwork v0.2.2 // indirect @@ -93,7 +94,7 @@ require ( go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect go.opentelemetry.io/otel/trace v0.20.0 // indirect go.opentelemetry.io/proto/otlp v0.7.0 // indirect - go.uber.org/atomic v1.9.0 // indirect + go.uber.org/atomic v1.9.0 go.uber.org/zap v1.17.0 // indirect golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0 // indirect golang.org/x/text v0.3.6 // indirect diff --git a/go.sum b/go.sum index d0e069f..ee831c2 100644 --- a/go.sum +++ b/go.sum @@ -258,6 +258,8 @@ github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/b github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= diff --git a/pkg/timestamp/range.go b/pkg/timestamp/range.go index aae487b..e0db185 100644 --- a/pkg/timestamp/range.go +++ b/pkg/timestamp/range.go @@ -53,6 +53,24 @@ func (t TimeRange) Duration() time.Duration { return t.End.Sub(t.Start) } +func (t TimeRange) String() string { + var buf []byte + if t.IncludeStart { + buf = []byte("[") + } else { + buf = []byte("(") + } + buf = append(buf, t.Start.String()...) + buf = append(buf, ", "...) + buf = append(buf, t.End.String()...) + if t.IncludeEnd { + buf = append(buf, "]"...) + } else { + buf = append(buf, ")"...) + } + return string(buf) +} + func NewInclusiveTimeRange(start, end time.Time) TimeRange { return TimeRange{ Start: start,
