This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e04847f Add a Closer to manage the closing phase (#210)
e04847f is described below
commit e04847f66cb57a6e276cdaf1edf4b221bb86984a
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 10 21:37:36 2022 +0800
Add a Closer to manage the closing phase (#210)
---
.github/workflows/ci.yml | 10 +----
.github/workflows/dependency-review.yml | 34 ++++++++++++++
.gitignore | 6 +++
banyand/tsdb/block.go | 51 ++++-----------------
banyand/tsdb/bucket/bucket.go | 32 ++++++++-----
banyand/tsdb/bucket/queue.go | 10 +++--
banyand/tsdb/bucket/queue_test.go | 15 +++++--
banyand/tsdb/bucket/strategy.go | 36 +++++++++++----
banyand/tsdb/bucket/strategy_test.go | 4 +-
banyand/tsdb/metric.go | 3 +-
banyand/tsdb/retention.go | 27 ++++-------
banyand/tsdb/segment.go | 40 +++++++++--------
banyand/tsdb/shard.go | 25 +++++++----
pkg/index/index.go | 2 -
pkg/index/inverted/inverted.go | 5 ---
pkg/index/lsm/lsm.go | 4 --
pkg/run/closer.go | 79 +++++++++++++++++++++++++++++++++
17 files changed, 245 insertions(+), 138 deletions(-)
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index 1b75422..70d0a8a 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -161,17 +161,9 @@ jobs:
run: make test-ci
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v3
- dependency-review:
- runs-on: ubuntu-latest
- steps:
- - name: 'Checkout Repository'
- uses: actions/checkout@v3
- - name: 'Dependency Review'
- uses: actions/dependency-review-action@v2
result:
name: Continuous Integration
runs-on: ubuntu-20.04
- needs: [check, build, test, dependency-review]
+ needs: [check, build, test]
steps:
- run: echo 'success'
-
diff --git a/.github/workflows/dependency-review.yml
b/.github/workflows/dependency-review.yml
new file mode 100644
index 0000000..09f5be9
--- /dev/null
+++ b/.github/workflows/dependency-review.yml
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Dependency Review Action
+#
+# This Action will scan dependency manifest files that change as part of a
Pull Request, surfacing known-vulnerable versions of the packages declared or
updated in the PR. Once installed, if the workflow run is marked as required,
PRs introducing known-vulnerable packages will be blocked from merging.
+#
+name: 'Dependency Review'
+on: [pull_request]
+
+permissions:
+ contents: read
+
+jobs:
+ dependency-review:
+ runs-on: ubuntu-latest
+ steps:
+ - name: 'Checkout Repository'
+ uses: actions/checkout@v3
+ - name: 'Dependency Review'
+ uses: actions/dependency-review-action@v2
diff --git a/.gitignore b/.gitignore
index 32e3104..14ba169 100644
--- a/.gitignore
+++ b/.gitignore
@@ -30,6 +30,9 @@ target
# Test binary, build with `go test -c`
*.test
+# Ginkgo test report
+*.report
+
# Output of the go coverage tool, specifically when used with LiteIDE
*.out
@@ -47,3 +50,6 @@ target
# mock files
*mock.go
*mock_test.go
+
+# snky cache
+.dccache
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 8d050f5..9a813c0 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -72,13 +72,12 @@ type block struct {
invertedIndex index.Store
lsmIndex index.Store
closableLst []io.Closer
+ clock timestamp.Clock
timestamp.TimeRange
bucket.Reporter
segID uint16
blockID uint16
encodingMethod EncodingMethod
- flushCh chan struct{}
- stopCh chan struct{}
}
type blockOpts struct {
@@ -104,7 +103,7 @@ func newBlock(ctx context.Context, opts blockOpts) (b
*block, err error) {
l: logger.Fetch(ctx, "block"),
TimeRange: opts.timeRange,
Reporter: bucket.NewTimeBasedReporter(opts.timeRange, clock),
- flushCh: make(chan struct{}, 1),
+ clock: clock,
ref: &atomic.Int32{},
closed: &atomic.Bool{},
deleted: &atomic.Bool{},
@@ -154,9 +153,6 @@ func (b *block) open() (err error) {
kv.TSSWithEncoding(b.encodingMethod.EncoderPool,
b.encodingMethod.DecoderPool),
kv.TSSWithLogger(b.l.Named(componentMain)),
kv.TSSWithMemTableSize(b.memSize),
- kv.TSSWithFlushCallback(func() {
- b.flushCh <- struct{}{}
- }),
); err != nil {
return err
}
@@ -174,20 +170,9 @@ func (b *block) open() (err error) {
}); err != nil {
return err
}
+ b.Reporter = bucket.NewTimeBasedReporter(b.TimeRange, b.clock)
b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
b.ref.Store(0)
- stopCh := make(chan struct{})
- b.stopCh = stopCh
- go func() {
- for {
- select {
- case <-b.flushCh:
- b.flush()
- case <-stopCh:
- return
- }
- }
- }()
b.closed.Store(false)
return nil
}
@@ -268,55 +253,35 @@ func (b *block) waitDone(stopped *atomic.Bool) <-chan
struct{} {
return ch
}
-func (b *block) flush() {
- for i := 0; i < 10; i++ {
- err := b.invertedIndex.Flush()
- if err == nil {
- break
- }
- time.Sleep(time.Second)
- b.l.Warn().Err(err).Int("retried", i).Msg("failed to flush
inverted index")
- }
-}
-
func (b *block) close(ctx context.Context) (err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed.Load() {
return nil
}
- b.closed.Store(true)
stopWaiting := &atomic.Bool{}
ch := b.waitDone(stopWaiting)
select {
case <-ctx.Done():
- b.closed.Store(false)
stopWaiting.Store(true)
return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
case <-ch:
}
+ b.closed.Store(true)
+ if b.Reporter != nil {
+ b.Stop()
+ }
for _, closer := range b.closableLst {
err = multierr.Append(err, closer.Close())
}
- close(b.stopCh)
return err
}
-func (b *block) stopThenClose(ctx context.Context) error {
- if b.Reporter != nil {
- b.Stop()
- }
- return b.close(ctx)
-}
-
func (b *block) delete(ctx context.Context) error {
if b.deleted.Load() {
return nil
}
b.deleted.Store(true)
- if b.Reporter != nil {
- b.Stop()
- }
b.close(ctx)
return os.RemoveAll(b.path)
}
@@ -326,7 +291,7 @@ func (b *block) Closed() bool {
}
func (b *block) String() string {
- return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID),
parseSuffix(b.blockID))
+ return fmt.Sprintf("BlockID-%d-%s", parseSuffix(b.segID), b.suffix)
}
func (b *block) stats() (names []string, stats []observability.Statistics) {
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 5838c26..d0fe08a 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -18,11 +18,15 @@
package bucket
import (
+ "errors"
"time"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var ErrReporterClosed = errors.New("reporter is closed")
+
type Controller interface {
Current() (Reporter, error)
Next() (Reporter, error)
@@ -37,7 +41,7 @@ type Status struct {
type Channel chan Status
type Reporter interface {
- Report() Channel
+ Report() (Channel, error)
Stop()
String() string
}
@@ -46,28 +50,34 @@ var _ Reporter = (*timeBasedReporter)(nil)
type timeBasedReporter struct {
timestamp.TimeRange
- reporterStopCh chan struct{}
- clock timestamp.Clock
+ clock timestamp.Clock
+ closer *run.Closer
}
func NewTimeBasedReporter(timeRange timestamp.TimeRange, clock
timestamp.Clock) Reporter {
if timeRange.End.Before(clock.Now()) {
return nil
}
- return &timeBasedReporter{
- TimeRange: timeRange,
- reporterStopCh: make(chan struct{}),
- clock: clock,
+ t := &timeBasedReporter{
+ TimeRange: timeRange,
+ clock: clock,
+ closer: run.NewCloser(0),
}
+ return t
}
-func (tr *timeBasedReporter) Report() Channel {
+func (tr *timeBasedReporter) Report() (Channel, error) {
+ if tr.closer.Closed() {
+ return nil, ErrReporterClosed
+ }
ch := make(Channel, 1)
interval := tr.Duration() >> 4
if interval < 100*time.Millisecond {
interval = 100 * time.Millisecond
}
go func() {
+ tr.closer.AddRunning()
+ defer tr.closer.Done()
defer close(ch)
ticker := tr.clock.Ticker(interval)
defer ticker.Stop()
@@ -82,14 +92,14 @@ func (tr *timeBasedReporter) Report() Channel {
if status.Volume >= status.Capacity {
return
}
- case <-tr.reporterStopCh:
+ case <-tr.closer.CloseNotify():
return
}
}
}()
- return ch
+ return ch, nil
}
func (tr *timeBasedReporter) Stop() {
- close(tr.reporterStopCh)
+ tr.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 0bcf49a..04f782b 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -28,6 +28,7 @@ import (
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -66,7 +67,7 @@ type lruQueue struct {
recentEvict simplelru.LRUCache
lock sync.RWMutex
- stopCh chan struct{}
+ closer *run.Closer
}
func NewQueue(logger *logger.Logger, size int, maxSize int, clock
timestamp.Clock, evictFn EvictFn) (Queue, error) {
@@ -98,7 +99,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int,
clock timestamp.Cloc
evictSize: evictSize,
evictFn: evictFn,
l: logger,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
parser := cron.NewParser(cron.Second)
// every 60 seconds to clean up recentEvict
@@ -107,6 +108,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int,
clock timestamp.Cloc
return nil, err
}
go func() {
+ defer c.closer.Done()
now := clock.Now()
for {
next := scheduler.Next(now)
@@ -133,7 +135,7 @@ func NewQueue(logger *logger.Logger, size int, maxSize int,
clock timestamp.Cloc
cancel()
c.lock.Unlock()
}
- case <-c.stopCh:
+ case <-c.closer.CloseNotify():
c.l.Info().Msg("stop")
timer.Stop()
return
@@ -281,6 +283,6 @@ func (q *lruQueue) removeOldest(ctx context.Context, lst
simplelru.LRUCache) err
}
func (q *lruQueue) Close() error {
- close(q.stopCh)
+ q.closer.CloseThenWait()
return nil
}
diff --git a/banyand/tsdb/bucket/queue_test.go
b/banyand/tsdb/bucket/queue_test.go
index f256358..8e16306 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -19,6 +19,7 @@ package bucket_test
import (
"context"
"strconv"
+ "sync"
"time"
. "github.com/onsi/ginkgo/v2"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -47,6 +49,7 @@ func entryID(id uint16) queueEntryID {
}
var _ = Describe("Queue", func() {
+ var lock sync.Mutex
var evictLst []queueEntryID
var l bucket.Queue
var clock timestamp.MockClock
@@ -60,6 +63,8 @@ var _ = Describe("Queue", func() {
clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
var err error
l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192,
clock, func(_ context.Context, id interface{}) error {
+ lock.Lock()
+ defer lock.Unlock()
evictLst = append(evictLst, id.(queueEntryID))
return nil
})
@@ -126,8 +131,12 @@ var _ = Describe("Queue", func() {
Expect(enRecentSize).To(Equal(192))
Expect(l.Len()).To(Equal(128))
Expect(len(evictLst)).To(Equal(0))
- clock.Add(time.Minute)
- GinkgoWriter.Printf("evicted size:%d \n", len(evictLst))
- Expect(len(evictLst)).To(BeNumerically(">", 1))
+ Eventually(func() int {
+ clock.Add(time.Minute)
+ clock.TriggerTimer()
+ lock.Lock()
+ defer lock.Unlock()
+ return len(evictLst)
+
}).WithTimeout(flags.EventuallyTimeout).Should(BeNumerically(">", 1))
})
})
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 6bf51f2..bc09583 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -26,6 +26,7 @@ import (
"go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
var (
@@ -42,7 +43,8 @@ type Strategy struct {
current atomic.Value
currentRatio uint64
logger *logger.Logger
- stopCh chan struct{}
+
+ closer *run.Closer
}
type StrategyOptions func(*Strategy)
@@ -71,7 +73,7 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions)
(*Strategy, error)
strategy := &Strategy{
ctrl: ctrl,
ratio: 0.8,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
for _, opt := range options {
opt(strategy)
@@ -82,18 +84,36 @@ func NewStrategy(ctrl Controller, options
...StrategyOptions) (*Strategy, error)
if strategy.logger == nil {
strategy.logger = logger.GetLogger("bucket-strategy")
}
- c, err := ctrl.Current()
- if err != nil {
+ if err := strategy.resetCurrent(); err != nil {
return nil, err
}
- strategy.current.Store(c)
return strategy, nil
}
+func (s *Strategy) resetCurrent() error {
+ c, err := s.ctrl.Current()
+ if err != nil {
+ return err
+ }
+ s.current.Store(c)
+ return nil
+}
+
func (s *Strategy) Run() {
go func(s *Strategy) {
+ defer s.closer.Done()
for {
- c := s.current.Load().(Reporter).Report()
+ c, err := s.current.Load().(Reporter).Report()
+ if errors.Is(err, ErrReporterClosed) {
+ return
+ }
+ if err != nil {
+ s.logger.Error().Err(err).Msg("failed to get
reporter")
+ if err := s.resetCurrent(); err != nil {
+ panic(err)
+ }
+ continue
+ }
if !s.observe(c) {
return
}
@@ -138,12 +158,12 @@ func (s *Strategy) observe(c Channel) bool {
}
return moreBucket
}
- case <-s.stopCh:
+ case <-s.closer.CloseNotify():
return false
}
}
}
func (s *Strategy) Close() {
- close(s.stopCh)
+ s.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/bucket/strategy_test.go
b/banyand/tsdb/bucket/strategy_test.go
index 00f6ef5..64adbda 100644
--- a/banyand/tsdb/bucket/strategy_test.go
+++ b/banyand/tsdb/bucket/strategy_test.go
@@ -133,7 +133,7 @@ type reporter struct {
step int
}
-func (r *reporter) Report() bucket.Channel {
+func (r *reporter) Report() (bucket.Channel, error) {
ch := make(bucket.Channel, r.capacity)
go func() {
var volume int
@@ -146,7 +146,7 @@ func (r *reporter) Report() bucket.Channel {
}
close(ch)
}()
- return ch
+ return ch, nil
}
func (r *reporter) Stop() {
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index a97cddf..696e019 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -52,13 +52,14 @@ func init() {
func (s *shard) runStat() {
go func() {
+ defer s.closer.Done()
ticker := time.NewTicker(statInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
s.stat()
- case <-s.stopCh:
+ case <-s.closer.CloseNotify():
return
}
}
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index 755bf41..78f7747 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -19,22 +19,21 @@ package tsdb
import (
"context"
- "sync"
"time"
"github.com/robfig/cron/v3"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
type retentionController struct {
segment *segmentController
scheduler cron.Schedule
- stopped bool
- stopMux sync.Mutex
- stopCh chan struct{}
duration time.Duration
- l *logger.Logger
+
+ closer *run.Closer
+ l *logger.Logger
}
func newRetentionController(segment *segmentController, ttl IntervalRule)
(*retentionController, error) {
@@ -56,22 +55,18 @@ func newRetentionController(segment *segmentController, ttl
IntervalRule) (*rete
return &retentionController{
segment: segment,
scheduler: scheduler,
- stopCh: make(chan struct{}),
l: segment.l.Named("retention-controller"),
duration: ttl.EstimatedDuration(),
+ closer: run.NewCloser(1),
}, nil
}
func (rc *retentionController) start() {
- rc.stopMux.Lock()
- if rc.stopped {
- return
- }
- rc.stopMux.Unlock()
go rc.run()
}
func (rc *retentionController) run() {
+ defer rc.closer.Done()
rc.l.Info().Msg("start")
now := rc.segment.clock.Now()
for {
@@ -85,7 +80,7 @@ func (rc *retentionController) run() {
rc.l.Error().Err(err)
}
cancel()
- case <-rc.stopCh:
+ case <-rc.closer.CloseNotify():
timer.Stop()
rc.l.Info().Msg("stop")
return
@@ -94,11 +89,5 @@ func (rc *retentionController) run() {
}
func (rc *retentionController) stop() {
- rc.stopMux.Lock()
- defer rc.stopMux.Unlock()
- if rc.stopped {
- return
- }
- rc.stopped = true
- close(rc.stopCh)
+ rc.closer.CloseThenWait()
}
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 6d08e2f..abfcd28 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -20,7 +20,6 @@ package tsdb
import (
"context"
"errors"
- "fmt"
"os"
"sort"
"strconv"
@@ -50,6 +49,7 @@ type segment struct {
bucket.Reporter
blockController *blockController
blockManageStrategy *bucket.Strategy
+ closeOnce sync.Once
}
func openSegment(ctx context.Context, startTime time.Time, path, suffix string,
@@ -111,21 +111,23 @@ func openSegment(ctx context.Context, startTime
time.Time, path, suffix string,
return s, nil
}
-func (s *segment) close(ctx context.Context) error {
- if err := s.blockController.close(ctx); err != nil {
- return err
- }
- if s.globalIndex != nil {
- if err := s.globalIndex.Close(); err != nil {
- return err
+func (s *segment) close(ctx context.Context) (err error) {
+ s.closeOnce.Do(func() {
+ if err = s.blockController.close(ctx); err != nil {
+ return
}
- }
- if s.blockManageStrategy != nil {
- s.blockManageStrategy.Close()
- }
- if s.Reporter != nil {
- s.Stop()
- }
+ if s.globalIndex != nil {
+ if err = s.globalIndex.Close(); err != nil {
+ return
+ }
+ }
+ if s.blockManageStrategy != nil {
+ s.blockManageStrategy.Close()
+ }
+ if s.Reporter != nil {
+ s.Stop()
+ }
+ })
return nil
}
@@ -140,8 +142,8 @@ func (s *segment) delete(ctx context.Context) error {
return os.RemoveAll(s.path)
}
-func (s segment) String() string {
- return fmt.Sprintf("SegID-%d", parseSuffix(s.id))
+func (s *segment) String() string {
+ return "SegID-" + s.suffix
}
func (s *segment) Stats() observability.Statistics {
@@ -431,8 +433,10 @@ func (bc *blockController) sortLst() {
}
func (bc *blockController) close(ctx context.Context) (err error) {
+ bc.Lock()
+ defer bc.Unlock()
for _, s := range bc.lst {
- err = multierr.Append(err, s.stopThenClose(ctx))
+ err = multierr.Append(err, s.close(ctx))
}
return err
}
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index b77a232..93c9824 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -30,6 +30,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
"github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -51,7 +52,9 @@ type shard struct {
segmentController *segmentController
segmentManageStrategy *bucket.Strategy
retentionController *retentionController
- stopCh chan struct{}
+
+ closeOnce sync.Once
+ closer *run.Closer
}
func OpenShard(ctx context.Context, id common.ShardID,
@@ -79,7 +82,7 @@ func OpenShard(ctx context.Context, id common.ShardID,
id: id,
segmentController: sc,
l: l,
- stopCh: make(chan struct{}),
+ closer: run.NewCloser(1),
}
err = s.segmentController.open()
if err != nil {
@@ -162,13 +165,15 @@ func (s *shard) State() (shardState ShardState) {
return shardState
}
-func (s *shard) Close() error {
- s.retentionController.stop()
- s.segmentManageStrategy.Close()
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- err := multierr.Combine(s.segmentController.close(ctx),
s.seriesDatabase.Close())
- close(s.stopCh)
+func (s *shard) Close() (err error) {
+ s.closeOnce.Do(func() {
+ s.closer.CloseThenWait()
+ s.retentionController.stop()
+ s.segmentManageStrategy.Close()
+ ctx, cancel := context.WithTimeout(context.Background(),
5*time.Second)
+ defer cancel()
+ err = multierr.Combine(s.segmentController.close(ctx),
s.seriesDatabase.Close())
+ })
return err
}
@@ -445,6 +450,8 @@ func (sc *segmentController) removeSeg(segID uint16) {
}
func (sc *segmentController) close(ctx context.Context) (err error) {
+ sc.Lock()
+ defer sc.Unlock()
for _, s := range sc.lst {
err = multierr.Append(err, s.close(ctx))
}
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 6f490cd..6d06790 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -166,8 +166,6 @@ type Store interface {
io.Closer
Writer
Searcher
- // Flush flushed memory data to disk
- Flush() error
}
type GetSearcher func(location databasev1.IndexRule_Type) (Searcher, error)
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 5ce8878..4f680b8 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -204,11 +204,6 @@ func (s *store) Range(fieldKey index.FieldKey, opts
index.RangeOpts) (list posti
return
}
-// Flush flushed memory data to disk
-func (s *store) Flush() error {
- return nil
-}
-
type blugeMatchIterator struct {
delegated search.DocumentMatchIterator
fieldKey string
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 8c366ad..0a26c26 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -35,10 +35,6 @@ type store struct {
l *logger.Logger
}
-func (*store) Flush() error {
- panic("do not call flush here. LSM index is using its own controller to
flush memory data")
-}
-
func (s *store) Stats() observability.Statistics {
return s.lsm.Stats()
}
diff --git a/pkg/run/closer.go b/pkg/run/closer.go
new file mode 100644
index 0000000..cc70e53
--- /dev/null
+++ b/pkg/run/closer.go
@@ -0,0 +1,79 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package run
+
+import (
+ "context"
+ "sync"
+ "sync/atomic"
+)
+
+// Closer can close a goroutine then wait for it to stop.
+type Closer struct {
+ waiting sync.WaitGroup
+ closed *atomic.Bool
+
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// NewCloser instances a new Closer.
+func NewCloser(initial int) *Closer {
+ c := &Closer{}
+ c.ctx, c.cancel = context.WithCancel(context.Background())
+ c.closed = &atomic.Bool{}
+ c.waiting.Add(initial)
+ return c
+}
+
+// AddRunning adds a running task.
+func (c *Closer) AddRunning() {
+ c.waiting.Add(1)
+}
+
+// Close sends a signal to the CloseNotify.
+func (c *Closer) Close() {
+ c.closed.Store(true)
+ c.cancel()
+}
+
+// CloseNotify receives a signal from Close.
+func (c *Closer) CloseNotify() <-chan struct{} {
+ return c.ctx.Done()
+}
+
+// Done notifies that one task is done.
+func (c *Closer) Done() {
+ c.waiting.Done()
+}
+
+// Wait waits until all tasks are done.
+func (c *Closer) Wait() {
+ c.waiting.Wait()
+}
+
+// CloseThenWait calls Close(), then Wait().
+func (c *Closer) CloseThenWait() {
+ c.Close()
+ c.Wait()
+}
+
+// Closed returns whether the Closer is closed
+func (c *Closer) Closed() bool {
+ return c.closed.Load()
+}