This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new 28b5fd5 Add load test and inject testing flags (#204) 28b5fd5 is described below commit 28b5fd5f23dee6bc4ea1ed729f7da1956ad257d0 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Nov 4 16:40:00 2022 +0800 Add load test and inject testing flags (#204) * Add a slow load test case to CI * Update max opened blocks strategy. A shard could open 64 blocks at most. A cleanup goroutine could collect 10 inactive oldest blocks every minute until there are 4 blocks left in the cache. * Inject logging level and the timeout of eventually through "ldflags" Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- .github/workflows/load.yml | 57 +++++++ Makefile | 7 +- banyand/liaison/grpc/grpc_suite_test.go | 3 +- banyand/measure/measure_suite_test.go | 3 +- banyand/measure/measure_topn.go | 4 +- banyand/measure/measure_write.go | 6 +- banyand/metadata/metadata_test.go | 3 +- banyand/metadata/schema/schema_suite_test.go | 3 +- banyand/query/processor_topn.go | 4 +- banyand/stream/stream_suite_test.go | 3 +- banyand/stream/stream_write.go | 7 +- banyand/tsdb/block.go | 81 ++++++---- banyand/tsdb/bucket/bucket_suite_test.go | 3 +- banyand/tsdb/bucket/queue.go | 163 +++++++++++++++++---- banyand/tsdb/bucket/queue_test.go | 83 ++++++++++- banyand/tsdb/bucket/strategy.go | 4 +- banyand/tsdb/indexdb.go | 14 +- banyand/tsdb/retention.go | 24 +-- banyand/tsdb/segment.go | 94 ++++++++---- banyand/tsdb/series.go | 20 +-- banyand/tsdb/seriesdb.go | 22 +-- banyand/tsdb/seriesdb_test.go | 5 +- banyand/tsdb/shard.go | 53 ++++--- banyand/tsdb/shard_test.go | 53 +++---- banyand/tsdb/tsdb.go | 11 +- banyand/tsdb/tsdb_suite_test.go | 6 +- banyand/tsdb/tsdb_test.go | 3 +- bydbctl/internal/cmd/cmd_suite_test.go | 3 +- docs/crud/measure/query.md | 2 + docs/crud/stream/query.md | 2 + pkg/index/inverted/inverted_test.go | 3 +- pkg/index/lsm/lsm_test.go | 3 +- pkg/query/logical/common.go | 22 +-- .../measure/measure_plan_indexscan_local.go | 10 +- .../logical/stream/stream_plan_indexscan_global.go | 5 +- .../logical/stream/stream_plan_indexscan_local.go | 10 +- .../test/flags/flags.go | 32 ++-- test/integration/cold_query/query_suite_test.go | 3 +- .../load_suite_test.go} | 50 +++---- test/integration/other/other_suite_test.go | 3 +- test/integration/query/query_suite_test.go | 3 +- 41 files changed, 630 insertions(+), 260 deletions(-) diff --git a/.github/workflows/load.yml b/.github/workflows/load.yml new file mode 100644 index 0000000..d3682d4 --- /dev/null +++ b/.github/workflows/load.yml @@ -0,0 +1,57 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Continuous Integration + +on: + schedule: + - cron: '0 20 * * *' + +jobs: + test: + name: Load several days data + runs-on: ubuntu-20.04 + strategy: + matrix: + tz: ["UTC", "Asia/Shanghai", "America/Los_Angeles"] + steps: + - name: Set timezone + run: sudo timedatectl set-timezone ${{ matrix.tz }} + - uses: actions/setup-node@v3 + with: + node-version: 16.15 + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: 1.19 + - name: Check out code into the Go module directory + uses: actions/checkout@v2 + - uses: actions/cache@v3 + id: cache-go + with: + path: | + ~/.cache/go-build + ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + - name: Update dependencies + if: steps.cache-go.outputs.cache-hit != 'true' + run: GOPROXY=https://proxy.golang.org go mod download + - name: Generate mocks + run: make generate + - name: Test + run: TEST_EXTRA_OPTS="--label-filter slow" make -C test test diff --git a/Makefile b/Makefile index 37eee1c..099a0b8 100644 --- a/Makefile +++ b/Makefile @@ -65,7 +65,12 @@ test-coverage: default ## Run the unit tests in all projects with coverage analy include scripts/build/ginkgo.mk test-ci: $(GINKGO) ## Run the unit tests in CI - $(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out ./... + $(GINKGO) --race \ + -ldflags \ + "-X github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=warn" \ + --cover --covermode atomic --coverprofile=coverage.out \ + --label-filter !slow \ + ./... ##@ Code quality targets diff --git a/banyand/liaison/grpc/grpc_suite_test.go b/banyand/liaison/grpc/grpc_suite_test.go index ef85011..352574a 100644 --- a/banyand/liaison/grpc/grpc_suite_test.go +++ b/banyand/liaison/grpc/grpc_suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestGrpc(t *testing.T) { @@ -34,6 +35,6 @@ func TestGrpc(t *testing.T) { var _ = BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).Should(Succeed()) }) diff --git a/banyand/measure/measure_suite_test.go b/banyand/measure/measure_suite_test.go index 3f295d8..9f6cc38 100644 --- a/banyand/measure/measure_suite_test.go +++ b/banyand/measure/measure_suite_test.go @@ -33,6 +33,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure" ) @@ -45,7 +46,7 @@ func TestMeasure(t *testing.T) { var _ = ginkgo.BeforeSuite(func() { gomega.Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(gomega.Succeed()) }) diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go index 29295c3..6a969f1 100644 --- a/banyand/measure/measure_topn.go +++ b/banyand/measure/measure_topn.go @@ -166,7 +166,9 @@ func (t *topNStreamingProcessor) writeData(eventTime time.Time, timeBucket strin if err != nil { return err } - span, err := series.Create(eventTime) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + span, err := series.Create(ctx, eventTime) if err != nil { if span != nil { _ = span.Close() diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go index c8a9aef..7cd8224 100644 --- a/banyand/measure/measure_write.go +++ b/banyand/measure/measure_write.go @@ -19,6 +19,8 @@ package measure import ( "bytes" + "context" + "time" "github.com/pkg/errors" @@ -75,7 +77,9 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey []byte, value *mea if err != nil { return err } - wp, err := series.Create(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + wp, err := series.Create(ctx, t) if err != nil { if wp != nil { _ = wp.Close() diff --git a/banyand/metadata/metadata_test.go b/banyand/metadata/metadata_test.go index fc3c7e5..682c762 100644 --- a/banyand/metadata/metadata_test.go +++ b/banyand/metadata/metadata_test.go @@ -27,6 +27,7 @@ import ( commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" test "github.com/apache/skywalking-banyandb/pkg/test/stream" ) @@ -37,7 +38,7 @@ func Test_service_RulesBySubject(t *testing.T) { is := assert.New(t) is.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) ctx := context.TODO() s, _ := NewService(ctx) diff --git a/banyand/metadata/schema/schema_suite_test.go b/banyand/metadata/schema/schema_suite_test.go index aea8ea1..51afd65 100644 --- a/banyand/metadata/schema/schema_suite_test.go +++ b/banyand/metadata/schema/schema_suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestSchema(t *testing.T) { @@ -35,6 +36,6 @@ func TestSchema(t *testing.T) { var _ = ginkgo.BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(Succeed()) }) diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go index 91f1199..021c7eb 100644 --- a/banyand/query/processor_topn.go +++ b/banyand/query/processor_topn.go @@ -196,7 +196,9 @@ func familyIdentity(name string, flag []byte) []byte { } func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request *measurev1.TopNRequest) ([]tsdb.Iterator, error) { - seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRange( + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange( request.GetTimeRange().GetBegin().AsTime(), request.GetTimeRange().GetEnd().AsTime()), ) diff --git a/banyand/stream/stream_suite_test.go b/banyand/stream/stream_suite_test.go index 4f827eb..d3177ad 100644 --- a/banyand/stream/stream_suite_test.go +++ b/banyand/stream/stream_suite_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/banyand/queue" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" teststream "github.com/apache/skywalking-banyandb/pkg/test/stream" ) @@ -44,7 +45,7 @@ func TestStream(t *testing.T) { var _ = BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(Succeed()) }) diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go index 5dac816..af03580 100644 --- a/banyand/stream/stream_write.go +++ b/banyand/stream/stream_write.go @@ -18,6 +18,9 @@ package stream import ( + "context" + "time" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -87,7 +90,9 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey []byte, value *stre return err } t := timestamp.MToN(tp) - wp, err := series.Create(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + wp, err := series.Create(ctx, t) if err != nil { if wp != nil { _ = wp.Close() diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go index 3bc87ea..8d050f5 100644 --- a/banyand/tsdb/block.go +++ b/banyand/tsdb/block.go @@ -30,6 +30,7 @@ import ( "time" "github.com/pkg/errors" + "go.uber.org/multierr" "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/kv" @@ -49,8 +50,11 @@ const ( componentSecondLSMIdx = "lsm" defaultMainMemorySize = 8 << 20 + defaultEnqueueTimeout = 500 * time.Millisecond ) +var ErrBlockClosingInterrupted = errors.New("interrupt to close the block") + type block struct { path string l *logger.Logger @@ -106,13 +110,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) { deleted: &atomic.Bool{}, queue: opts.queue, } + b.closed.Store(true) b.options(ctx) position := ctx.Value(common.PositionKey) if position != nil { b.position = position.(common.Position) } - return b, b.open() + return b, nil } func (b *block) options(ctx context.Context) { @@ -187,24 +192,28 @@ func (b *block) open() (err error) { return nil } -func (b *block) delegate() (BlockDelegate, error) { +func (b *block) delegate(ctx context.Context) (BlockDelegate, error) { if b.deleted.Load() { - return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is deleted", b.blockID) + return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is deleted", b) + } + blockID := BlockID{ + BlockID: b.blockID, + SegID: b.segID, } if b.incRef() { + b.queue.Touch(blockID) return &bDelegate{ delegate: b, }, nil } b.lock.Lock() defer b.lock.Unlock() - b.queue.Push(BlockID{ - BlockID: b.blockID, - SegID: b.segID, - }) - // TODO: remove the block which fails to open from the queue - err := b.open() - if err != nil { + if err := b.queue.Push(ctx, blockID, func() error { + if !b.Closed() { + return nil + } + return b.open() + }); err != nil { b.l.Error().Err(err).Stringer("block", b).Msg("fail to open block") return nil, err } @@ -240,14 +249,23 @@ loop: goto loop } -func (b *block) waitDone() { -loop: - if b.ref.Load() < 1 { - b.ref.Store(0) - return - } - runtime.Gosched() - goto loop +func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} { + ch := make(chan struct{}) + go func() { + loop: + if b.ref.Load() < 1 { + b.ref.Store(0) + close(ch) + return + } + if stopped.Load() { + close(ch) + return + } + runtime.Gosched() + goto loop + }() + return ch } func (b *block) flush() { @@ -261,28 +279,37 @@ func (b *block) flush() { } } -func (b *block) close() { +func (b *block) close(ctx context.Context) (err error) { b.lock.Lock() defer b.lock.Unlock() if b.closed.Load() { - return + return nil } b.closed.Store(true) - b.waitDone() + stopWaiting := &atomic.Bool{} + ch := b.waitDone(stopWaiting) + select { + case <-ctx.Done(): + b.closed.Store(false) + stopWaiting.Store(true) + return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b) + case <-ch: + } for _, closer := range b.closableLst { - _ = closer.Close() + err = multierr.Append(err, closer.Close()) } close(b.stopCh) + return err } -func (b *block) stopThenClose() { +func (b *block) stopThenClose(ctx context.Context) error { if b.Reporter != nil { b.Stop() } - b.close() + return b.close(ctx) } -func (b *block) delete() error { +func (b *block) delete(ctx context.Context) error { if b.deleted.Load() { return nil } @@ -290,7 +317,7 @@ func (b *block) delete() error { if b.Reporter != nil { b.Stop() } - b.close() + b.close(ctx) return os.RemoveAll(b.path) } @@ -299,7 +326,7 @@ func (b *block) Closed() bool { } func (b *block) String() string { - return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID) + return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), parseSuffix(b.blockID)) } func (b *block) stats() (names []string, stats []observability.Statistics) { diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/banyand/tsdb/bucket/bucket_suite_test.go index 59ca1fe..289cd7d 100644 --- a/banyand/tsdb/bucket/bucket_suite_test.go +++ b/banyand/tsdb/bucket/bucket_suite_test.go @@ -23,6 +23,7 @@ import ( . "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestBucket(t *testing.T) { @@ -30,7 +31,7 @@ func TestBucket(t *testing.T) { BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).Should(Succeed()) }) RunSpecs(t, "Bucket Suite") diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go index 2b3dced..0bcf49a 100644 --- a/banyand/tsdb/bucket/queue.go +++ b/banyand/tsdb/bucket/queue.go @@ -17,29 +17,45 @@ package bucket import ( + "context" "errors" + "fmt" + "io" "sync" + "time" "github.com/hashicorp/golang-lru/simplelru" + "github.com/robfig/cron/v3" + + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) -type EvictFn func(id interface{}) +type ( + EvictFn func(ctx context.Context, id interface{}) error + OnAddRecentFn func() error +) type Queue interface { - Push(id interface{}) - Remove(id interface{}) + io.Closer + Touch(id fmt.Stringer) bool + Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error + Remove(id fmt.Stringer) Len() int + Volume() int All() []interface{} } const ( - DefaultRecentRatio = 0.25 - DefaultGhostEntries = 0.50 + DefaultRecentRatio = 0.25 + + defaultEvictBatchSize = 10 ) var ErrInvalidSize = errors.New("invalid size") type lruQueue struct { + l *logger.Logger size int recentSize int evictSize int @@ -49,15 +65,17 @@ type lruQueue struct { frequent simplelru.LRUCache recentEvict simplelru.LRUCache lock sync.RWMutex + + stopCh chan struct{} } -func NewQueue(size int, evictFn EvictFn) (Queue, error) { +func NewQueue(logger *logger.Logger, size int, maxSize int, clock timestamp.Clock, evictFn EvictFn) (Queue, error) { if size <= 0 { return nil, ErrInvalidSize } recentSize := int(float64(size) * DefaultRecentRatio) - evictSize := int(float64(size) * DefaultGhostEntries) + evictSize := maxSize - size recent, err := simplelru.NewLRU(size, nil) if err != nil { @@ -79,37 +97,108 @@ func NewQueue(size int, evictFn EvictFn) (Queue, error) { recentEvict: recentEvict, evictSize: evictSize, evictFn: evictFn, + l: logger, + stopCh: make(chan struct{}), } + parser := cron.NewParser(cron.Second) + // every 60 seconds to clean up recentEvict + scheduler, err := parser.Parse("59") + if err != nil { + return nil, err + } + go func() { + now := clock.Now() + for { + next := scheduler.Next(now) + timer := clock.Timer(next.Sub(now)) + select { + case now = <-timer.C: + c.l.Info().Time("now", now).Msg("wakes") + var evictLen int + c.lock.RLock() + evictLen = c.recentEvict.Len() + c.lock.RUnlock() + if evictLen < 1 { + continue + } + for i := 0; i < defaultEvictBatchSize; i++ { + c.lock.Lock() + if evictLen < 1 { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + if err := c.removeOldest(ctx, c.recentEvict); err != nil { + c.l.Error().Err(err).Msg("failed to remove oldest blocks") + } + cancel() + c.lock.Unlock() + } + case <-c.stopCh: + c.l.Info().Msg("stop") + timer.Stop() + return + } + } + }() return c, nil } -func (q *lruQueue) Push(id interface{}) { +func (q *lruQueue) Touch(id fmt.Stringer) bool { q.lock.Lock() defer q.lock.Unlock() if q.frequent.Contains(id) { + q.l.Debug().Stringer("id", id).Msg("get from frequent") + return true + } + + if q.recent.Contains(id) { + q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent") + q.recent.Remove(id) q.frequent.Add(id, nil) - return + return true + } + return false +} + +func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error { + q.lock.Lock() + defer q.lock.Unlock() + + if q.frequent.Contains(id) { + q.l.Debug().Stringer("id", id).Msg("push to frequent") + q.frequent.Add(id, nil) + return nil } if q.recent.Contains(id) { + q.l.Debug().Stringer("id", id).Msg("promote from recent to frequent") q.recent.Remove(id) q.frequent.Add(id, nil) - return + return nil } if q.recentEvict.Contains(id) { - q.ensureSpace(true) + q.l.Debug().Stringer("id", id).Msg("restore from recentEvict") + if err := q.ensureSpace(ctx, true); err != nil { + return err + } q.recentEvict.Remove(id) q.frequent.Add(id, nil) - return + return nil } - q.ensureSpace(false) + if err := q.ensureSpace(ctx, false); err != nil { + return err + } q.recent.Add(id, nil) + if fn == nil { + return nil + } + return fn() } -func (q *lruQueue) Remove(id interface{}) { +func (q *lruQueue) Remove(id fmt.Stringer) { q.lock.Lock() defer q.lock.Unlock() @@ -134,6 +223,10 @@ func (q *lruQueue) Len() int { return q.recent.Len() + q.frequent.Len() } +func (q *lruQueue) Volume() int { + return q.size + q.recentSize + q.evictSize +} + func (q *lruQueue) All() []interface{} { q.lock.RLock() defer q.lock.RUnlock() @@ -144,32 +237,50 @@ func (q *lruQueue) All() []interface{} { return all } -func (q *lruQueue) ensureSpace(recentEvict bool) { +func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error { recentLen := q.recent.Len() freqLen := q.frequent.Len() if recentLen+freqLen < q.size { - return + return nil } if recentLen > 0 && (recentLen > q.recentSize || (recentLen == q.recentSize && !recentEvict)) { - k, _, _ := q.recent.RemoveOldest() - q.addLst(q.recentEvict, q.evictSize, k) - return + k, _, ok := q.recent.GetOldest() + if !ok { + return errors.New("failed to get oldest from recent queue") + } + if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != nil { + return err + } + q.recent.Remove(k) + return nil } - q.removeOldest(q.frequent) + return q.removeOldest(ctx, q.frequent) } -func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) { +func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size int, id interface{}) error { if lst.Len() < size { lst.Add(id, nil) - return + return nil + } + if err := q.removeOldest(ctx, lst); err != nil { + return err } - q.removeOldest(lst) lst.Add(id, nil) + return nil } -func (q *lruQueue) removeOldest(lst simplelru.LRUCache) { - oldestID, _, ok := lst.RemoveOldest() +func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) error { + oldestID, _, ok := lst.GetOldest() if ok && q.evictFn != nil { - q.evictFn(oldestID) + if err := q.evictFn(ctx, oldestID); err != nil { + return err + } + _ = lst.Remove(oldestID) } + return nil +} + +func (q *lruQueue) Close() error { + close(q.stopCh) + return nil } diff --git a/banyand/tsdb/bucket/queue_test.go b/banyand/tsdb/bucket/queue_test.go index 0eb6fe0..f256358 100644 --- a/banyand/tsdb/bucket/queue_test.go +++ b/banyand/tsdb/bucket/queue_test.go @@ -17,11 +17,17 @@ package bucket_test import ( + "context" + "strconv" + "time" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/gleak" "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type queueEntryID struct { @@ -29,6 +35,10 @@ type queueEntryID struct { second uint16 } +func (q queueEntryID) String() string { + return strconv.Itoa(int(q.first)) +} + func entryID(id uint16) queueEntryID { return queueEntryID{ first: id, @@ -37,26 +47,87 @@ func entryID(id uint16) queueEntryID { } var _ = Describe("Queue", func() { + var evictLst []queueEntryID + var l bucket.Queue + var clock timestamp.MockClock BeforeEach(func() { goods := gleak.Goroutines() DeferCleanup(func() { Eventually(gleak.Goroutines).ShouldNot(gleak.HaveLeaked(goods)) }) - }) - It("pushes data", func() { - evictLst := make([]queueEntryID, 0) - l, err := bucket.NewQueue(128, func(id interface{}) { + evictLst = make([]queueEntryID, 0) + clock = timestamp.NewMockClock() + clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local)) + var err error + l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, clock, func(_ context.Context, id interface{}) error { evictLst = append(evictLst, id.(queueEntryID)) + return nil }) Expect(err).ShouldNot(HaveOccurred()) - + DeferCleanup(func() { + evictLst = evictLst[:0] + Expect(l.Close()).To(Succeed()) + }) + }) + It("pushes to recent", func() { + enRecentSize := 0 for i := 0; i < 256; i++ { - l.Push(entryID(uint16(i))) + Expect(l.Push(context.Background(), entryID(uint16(i)), func() error { + enRecentSize++ + return nil + })).To(Succeed()) } + Expect(enRecentSize).To(Equal(256)) Expect(l.Len()).To(Equal(128)) Expect(len(evictLst)).To(Equal(64)) for i := 0; i < 64; i++ { Expect(evictLst[i]).To(Equal(entryID(uint16(i)))) } }) + + It("promotes to frequent", func() { + enRecentSize := 0 + for i := 0; i < 128; i++ { + Expect(l.Push(context.Background(), entryID(uint16(i)), func() error { + enRecentSize++ + return nil + })).To(Succeed()) + } + Expect(enRecentSize).To(Equal(128)) + Expect(l.Len()).To(Equal(128)) + Expect(len(evictLst)).To(Equal(0)) + for i := 0; i < 64; i++ { + Expect(l.Touch(entryID(uint16(i)))).To(BeTrue()) + } + enRecentSize = 0 + for i := 128; i < 256; i++ { + Expect(l.Push(context.Background(), entryID(uint16(i)), func() error { + enRecentSize++ + return nil + })).To(Succeed()) + } + Expect(enRecentSize).To(Equal(128)) + Expect(l.Len()).To(Equal(128)) + Expect(len(evictLst)).To(Equal(64)) + for i := 0; i < 64; i++ { + Expect(evictLst[i]).To(Equal(entryID(uint16(i + 64)))) + } + }) + + It("cleans up evict queue", func() { + enRecentSize := 0 + + for i := 0; i < 192; i++ { + Expect(l.Push(context.Background(), entryID(uint16(i)), func() error { + enRecentSize++ + return nil + })).To(Succeed()) + } + Expect(enRecentSize).To(Equal(192)) + Expect(l.Len()).To(Equal(128)) + Expect(len(evictLst)).To(Equal(0)) + clock.Add(time.Minute) + GinkgoWriter.Printf("evicted size:%d \n", len(evictLst)) + Expect(len(evictLst)).To(BeNumerically(">", 1)) + }) }) diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go index 4a8d14e..6bf51f2 100644 --- a/banyand/tsdb/bucket/strategy.go +++ b/banyand/tsdb/bucket/strategy.go @@ -117,7 +117,7 @@ func (s *Strategy) observe(c Channel) bool { select { case status, more := <-c: if !more { - return true + return moreBucket } ratio := Ratio(status.Volume) / Ratio(status.Capacity) atomic.StoreUint64(&s.currentRatio, math.Float64bits(float64(ratio))) @@ -136,7 +136,7 @@ func (s *Strategy) observe(c Channel) bool { if next != nil { s.current.Store(next) } - return true + return moreBucket } case <-s.stopCh: return false diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go index e7ce863..c4281d0 100644 --- a/banyand/tsdb/indexdb.go +++ b/banyand/tsdb/indexdb.go @@ -28,7 +28,6 @@ import ( "github.com/apache/skywalking-banyandb/banyand/kv" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/index" - "github.com/apache/skywalking-banyandb/pkg/timestamp" ) type IndexDatabase interface { @@ -97,7 +96,6 @@ type indexWriterBuilder struct { scope Entry segCtrl *segmentController ts time.Time - seg *segment globalItemID *GlobalItemID } @@ -108,11 +106,6 @@ func (i *indexWriterBuilder) Scope(scope Entry) IndexWriterBuilder { func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder { i.ts = ts - segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, false)) - if len(segs) != 1 { - return i - } - i.seg = segs[0] return i } @@ -122,15 +115,16 @@ func (i *indexWriterBuilder) GlobalItemID(itemID GlobalItemID) IndexWriterBuilde } func (i *indexWriterBuilder) Build() (IndexWriter, error) { - if i.seg == nil { - return nil, errors.WithStack(ErrNoTime) + seg, err := i.segCtrl.create(i.segCtrl.Format(i.ts), false) + if err != nil { + return nil, err } if i.globalItemID == nil { return nil, errors.WithStack(ErrNoVal) } return &indexWriter{ scope: i.scope, - seg: i.seg, + seg: seg, ts: i.ts, itemID: i.globalItemID, }, nil diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go index ab61a8d..755bf41 100644 --- a/banyand/tsdb/retention.go +++ b/banyand/tsdb/retention.go @@ -18,6 +18,7 @@ package tsdb import ( + "context" "sync" "time" @@ -76,19 +77,18 @@ func (rc *retentionController) run() { for { next := rc.scheduler.Next(now) timer := rc.segment.clock.Timer(next.Sub(now)) - for { - select { - case now = <-timer.C: - rc.l.Info().Time("now", now).Msg("wake") - if err := rc.segment.remove(now.Add(-rc.duration)); err != nil { - rc.l.Error().Err(err) - } - case <-rc.stopCh: - timer.Stop() - rc.l.Info().Msg("stop") - return + select { + case now = <-timer.C: + rc.l.Info().Time("now", now).Msg("wake") + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute) + if err := rc.segment.remove(ctx, now.Add(-rc.duration)); err != nil { + rc.l.Error().Err(err) } - break + cancel() + case <-rc.stopCh: + timer.Stop() + rc.l.Info().Msg("stop") + return } } } diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go index 3393f67..6d08e2f 100644 --- a/banyand/tsdb/segment.go +++ b/banyand/tsdb/segment.go @@ -111,10 +111,14 @@ func openSegment(ctx context.Context, startTime time.Time, path, suffix string, return s, nil } -func (s *segment) close() { - s.blockController.close() +func (s *segment) close(ctx context.Context) error { + if err := s.blockController.close(ctx); err != nil { + return err + } if s.globalIndex != nil { - s.globalIndex.Close() + if err := s.globalIndex.Close(); err != nil { + return err + } } if s.blockManageStrategy != nil { s.blockManageStrategy.Close() @@ -122,19 +126,22 @@ func (s *segment) close() { if s.Reporter != nil { s.Stop() } + return nil } -func (s *segment) closeBlock(id uint16) { - s.blockController.closeBlock(id) +func (s *segment) closeBlock(ctx context.Context, id uint16) error { + return s.blockController.closeBlock(ctx, id) } -func (s *segment) delete() error { - s.close() +func (s *segment) delete(ctx context.Context) error { + if err := s.close(ctx); err != nil { + return err + } return os.RemoveAll(s.path) } func (s segment) String() string { - return fmt.Sprintf("SegID-%d", s.id) + return fmt.Sprintf("SegID-%d", parseSuffix(s.id)) } func (s *segment) Stats() observability.Statistics { @@ -177,7 +184,7 @@ func newBlockController(segCtx context.Context, segID uint16, location string, s func (bc *blockController) Current() (bucket.Reporter, error) { now := bc.clock.Now() ns := uint64(now.UnixNano()) - if b := func() bucket.Reporter { + if b := func() *block { bc.RLock() defer bc.RUnlock() for _, s := range bc.lst { @@ -187,9 +194,14 @@ func (bc *blockController) Current() (bucket.Reporter, error) { } return nil }(); b != nil { + if b.Closed() { + if err := b.open(); err != nil { + return nil, err + } + } return b, nil } - return bc.create(now) + return bc.newHeadBlock(now) } func (bc *blockController) Next() (bucket.Reporter, error) { @@ -198,17 +210,34 @@ func (bc *blockController) Next() (bucket.Reporter, error) { return nil, err } b := c.(*block) - return bc.create(bc.blockSize.NextTime(b.Start)) + + return bc.newHeadBlock(bc.blockSize.NextTime(b.Start)) +} + +func (bc *blockController) newHeadBlock(now time.Time) (*block, error) { + b, err := bc.create(now) + if err != nil { + return nil, err + } + return b, nil } func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) { event := bc.l.Info() if prev != nil { event.Stringer("prev", prev) - bc.blockQueue.Push(BlockID{ + b := prev.(*block) + ctx, cancel := context.WithTimeout(context.Background(), defaultEnqueueTimeout) + defer cancel() + if err := bc.blockQueue.Push(ctx, BlockID{ SegID: bc.segID, - BlockID: prev.(*block).blockID, - }) + BlockID: b.blockID, + }, nil); err != nil { + bc.l.Debug().Err(err).Msg("failed to push a expired head block to the queue") + ctxClosing, cancelClosing := context.WithTimeout(context.Background(), defaultEnqueueTimeout) + defer cancelClosing() + b.close(ctxClosing) + } } if next != nil { event.Stringer("next", next) @@ -236,7 +265,7 @@ func (bc *blockController) Parse(value string) (time.Time, error) { panic("invalid interval unit") } -func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) { +func (bc *blockController) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) { bb := bc.search(func(b *block) bool { return b.Overlapping(timeRange) }) @@ -245,7 +274,7 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate, } dd := make([]BlockDelegate, len(bb)) for i, b := range bb { - d, err := b.delegate() + d, err := b.delegate(ctx) if err != nil { return nil, err } @@ -254,10 +283,10 @@ func (bc *blockController) span(timeRange timestamp.TimeRange) ([]BlockDelegate, return dd, nil } -func (bc *blockController) get(blockID uint16) (BlockDelegate, error) { +func (bc *blockController) get(ctx context.Context, blockID uint16) (BlockDelegate, error) { b := bc.getBlock(blockID) if b != nil { - return b.delegate() + return b.delegate(ctx) } return nil, nil } @@ -297,14 +326,14 @@ func (bc *blockController) search(matcher func(*block) bool) (bb []*block) { return bb } -func (bc *blockController) closeBlock(blockID uint16) { +func (bc *blockController) closeBlock(ctx context.Context, blockID uint16) error { bc.RLock() - defer bc.RUnlock() b := bc.getBlock(blockID) + bc.RUnlock() if b == nil { - return + return nil } - b.close() + return b.close(ctx) } func (bc *blockController) startTime(suffix string) (time.Time, error) { @@ -327,7 +356,7 @@ func (bc *blockController) startTime(suffix string) (time.Time, error) { func (bc *blockController) open() error { return WalkDir( bc.location, - segPathPrefix, + blockPathPrefix, func(suffix, absolutePath string) error { bc.Lock() defer bc.Unlock() @@ -355,7 +384,15 @@ func (bc *blockController) create(startTime time.Time) (*block, error) { if err != nil { return nil, err } - return bc.load(suffix, segPath) + b, err := bc.load(suffix, segPath) + if err != nil { + return nil, err + } + err = b.open() + if err != nil { + return nil, err + } + return b, nil } func (bc *blockController) load(suffix, path string) (b *block, err error) { @@ -393,17 +430,18 @@ func (bc *blockController) sortLst() { }) } -func (bc *blockController) close() { +func (bc *blockController) close(ctx context.Context) (err error) { for _, s := range bc.lst { - s.stopThenClose() + err = multierr.Append(err, s.stopThenClose(ctx)) } + return err } -func (bc *blockController) remove(deadline time.Time) (err error) { +func (bc *blockController) remove(ctx context.Context, deadline time.Time) (err error) { for _, b := range bc.blocks() { if b.End.Before(deadline) { bc.Lock() - if errDel := b.delete(); errDel != nil { + if errDel := b.delete(ctx); errDel != nil { err = multierr.Append(err, errDel) } else { b.queue.Remove(BlockID{ diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go index c0956de..09d62aa 100644 --- a/banyand/tsdb/series.go +++ b/banyand/tsdb/series.go @@ -75,9 +75,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error { type Series interface { ID() common.SeriesID - Span(timeRange timestamp.TimeRange) (SeriesSpan, error) - Create(t time.Time) (SeriesSpan, error) - Get(id GlobalItemID) (Item, io.Closer, error) + Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) + Create(ctx context.Context, t time.Time) (SeriesSpan, error) + Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error) } type SeriesSpan interface { @@ -95,8 +95,8 @@ type series struct { l *logger.Logger } -func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) { - b, err := s.blockDB.block(id) +func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error) { + b, err := s.blockDB.block(ctx, id) if err != nil { return nil, nil, err } @@ -114,8 +114,8 @@ func (s *series) ID() common.SeriesID { return s.id } -func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) { - blocks, err := s.blockDB.span(timeRange) +func (s *series) Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, error) { + blocks, err := s.blockDB.span(ctx, timeRange) if err != nil { return nil, err } @@ -128,9 +128,9 @@ func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) { return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil } -func (s *series) Create(t time.Time) (SeriesSpan, error) { +func (s *series) Create(ctx context.Context, t time.Time) (SeriesSpan, error) { tr := timestamp.NewInclusiveTimeRange(t, t) - blocks, err := s.blockDB.span(tr) + blocks, err := s.blockDB.span(ctx, tr) if err != nil { return nil, err } @@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) { Msg("load a series span") return newSeriesSpan(context.WithValue(context.Background(), logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil } - b, err := s.blockDB.create(t) + b, err := s.blockDB.create(ctx, t) if err != nil { return nil, err } diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go index 7825338..b878b38 100644 --- a/banyand/tsdb/seriesdb.go +++ b/banyand/tsdb/seriesdb.go @@ -148,9 +148,9 @@ type SeriesDatabase interface { type blockDatabase interface { shardID() common.ShardID - span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) - create(ts time.Time) (BlockDelegate, error) - block(id GlobalItemID) (BlockDelegate, error) + span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) + create(ctx context.Context, ts time.Time) (BlockDelegate, error) + block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) } var ( @@ -189,12 +189,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, error) { return newSeries(s.context(), id, s), nil } -func (s *seriesDB) block(id GlobalItemID) (BlockDelegate, error) { +func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, error) { seg := s.segCtrl.get(id.segID) if seg == nil { return nil, nil } - return seg.blockController.get(id.blockID) + return seg.blockController.get(ctx, id.blockID) } func (s *seriesDB) shardID() common.ShardID { @@ -251,11 +251,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) { return result, err } -func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) { +func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) ([]BlockDelegate, error) { // TODO: return correct blocks result := make([]BlockDelegate, 0) for _, s := range s.segCtrl.span(timeRange) { - dd, err := s.blockController.span(timeRange) + dd, err := s.blockController.span(ctx, timeRange) if err != nil { return nil, err } @@ -267,14 +267,14 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, error) return result, nil } -func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) { +func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, error) { s.Lock() defer s.Unlock() timeRange := timestamp.NewInclusiveTimeRange(ts, ts) ss := s.segCtrl.span(timeRange) if len(ss) > 0 { s := ss[0] - dd, err := s.blockController.span(timeRange) + dd, err := s.blockController.span(ctx, timeRange) if err != nil { return nil, err } @@ -285,7 +285,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) { if err != nil { return nil, err } - return block.delegate() + return block.delegate(ctx) } seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false) if err != nil { @@ -295,7 +295,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) { if err != nil { return nil, err } - return block.delegate() + return block.delegate(ctx) } func (s *seriesDB) context() context.Context { diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go index f5b1875..3c4623c 100644 --- a/banyand/tsdb/seriesdb_test.go +++ b/banyand/tsdb/seriesdb_test.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestEntity(t *testing.T) { @@ -355,7 +356,7 @@ func Test_SeriesDatabase_Get(t *testing.T) { tester := assert.New(t) tester.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -376,7 +377,7 @@ func Test_SeriesDatabase_List(t *testing.T) { tester := assert.New(t) tester.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) dir, deferFunc := test.Space(require.New(t)) defer deferFunc() diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go index 277ee03..b77a232 100644 --- a/banyand/tsdb/shard.go +++ b/banyand/tsdb/shard.go @@ -34,8 +34,9 @@ import ( ) const ( - defaultBlockQueueSize = 1 << 4 - defaultKVMemorySize = 1 << 20 + defaultBlockQueueSize = 4 + defaultMaxBlockQueueSize = 64 + defaultKVMemorySize = 1 << 20 ) var _ Shard = (*shard)(nil) @@ -54,7 +55,7 @@ type shard struct { } func OpenShard(ctx context.Context, id common.ShardID, - root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize int, + root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, maxOpenedBlockSize int, ) (Shard, error) { path, err := mkdir(shardTemplate, root, int(id)) if err != nil { @@ -70,7 +71,7 @@ func OpenShard(ctx context.Context, id common.ShardID, p.Shard = strconv.Itoa(int(id)) return p }) - sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, l) + sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, openedBlockSize, maxOpenedBlockSize, l) if err != nil { return nil, errors.Wrapf(err, "create the segment controller of the shard %d", int(id)) } @@ -164,8 +165,9 @@ func (s *shard) State() (shardState ShardState) { func (s *shard) Close() error { s.retentionController.stop() s.segmentManageStrategy.Close() - s.segmentController.close() - err := s.seriesDatabase.Close() + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + err := multierr.Combine(s.segmentController.close(ctx), s.seriesDatabase.Close()) close(s.stopCh) return err } @@ -236,7 +238,7 @@ type segmentController struct { } func newSegmentController(shardCtx context.Context, location string, segmentSize, blockSize IntervalRule, - openedBlockSize int, l *logger.Logger, + openedBlockSize, maxOpenedBlockSize int, l *logger.Logger, ) (*segmentController, error) { clock, _ := timestamp.GetClock(shardCtx) sc := &segmentController{ @@ -248,16 +250,21 @@ func newSegmentController(shardCtx context.Context, location string, segmentSize clock: clock, } var err error - sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id interface{}) { - bsID := id.(BlockID) - seg := sc.get(bsID.SegID) - if seg == nil { - l.Warn().Uint16("segID", bsID.SegID).Msg("segment is absent") - return - } - l.Info().Uint16("blockID", bsID.BlockID).Uint16("segID", bsID.SegID).Msg("closing the block") - seg.closeBlock(bsID.BlockID) - }) + sc.blockQueue, err = bucket.NewQueue( + l.Named("block-queue"), + openedBlockSize, + maxOpenedBlockSize, + clock, + func(ctx context.Context, id interface{}) error { + bsID := id.(BlockID) + seg := sc.get(bsID.SegID) + if seg == nil { + l.Warn().Int("segID", parseSuffix(bsID.SegID)).Msg("segment is absent") + return nil + } + l.Info().Uint16("blockID", bsID.BlockID).Int("segID", parseSuffix(bsID.SegID)).Msg("closing the block") + return seg.closeBlock(ctx, bsID.BlockID) + }) return sc, err } @@ -409,14 +416,14 @@ func (sc *segmentController) load(suffix, path string, createBlockIfEmpty bool) return seg, nil } -func (sc *segmentController) remove(deadline time.Time) (err error) { +func (sc *segmentController) remove(ctx context.Context, deadline time.Time) (err error) { sc.l.Info().Time("deadline", deadline).Msg("start to remove before deadline") for _, s := range sc.segments() { if s.End.Before(deadline) || s.Contains(uint64(deadline.UnixNano())) { - err = multierr.Append(err, s.blockController.remove(deadline)) + err = multierr.Append(err, s.blockController.remove(ctx, deadline)) if s.End.Before(deadline) { sc.Lock() - if errDel := s.delete(); errDel != nil { + if errDel := s.delete(ctx); errDel != nil { err = multierr.Append(err, errDel) } else { sc.removeSeg(s.id) @@ -437,8 +444,10 @@ func (sc *segmentController) removeSeg(segID uint16) { } } -func (sc *segmentController) close() { +func (sc *segmentController) close(ctx context.Context) (err error) { for _, s := range sc.lst { - s.close() + err = multierr.Append(err, s.close(ctx)) } + err = multierr.Append(err, sc.blockQueue.Close()) + return err } diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go index 1e1197e..6b66ea4 100644 --- a/banyand/tsdb/shard_test.go +++ b/banyand/tsdb/shard_test.go @@ -27,6 +27,7 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/tsdb" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -70,13 +71,14 @@ var _ = Describe("Shard", func() { Num: 7, }, 2, + 3, ) Expect(err).NotTo(HaveOccurred()) By("01/01 00:00 1st block is opened") t1 := clock.Now() Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -93,7 +95,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/01 10:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -111,7 +113,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{})) + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{})) By("01/01 13:00 moves to the 2nd block") clock.Add(3 * time.Hour) Eventually(func() []tsdb.BlockID { @@ -119,7 +121,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/01 13:00 has been triggered") } return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -133,7 +135,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/01 22:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -163,7 +165,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/02 01:00 has been triggered") } return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -181,7 +183,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/02 10:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -218,7 +220,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/02 13:00 has been triggered") } return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -240,7 +242,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/02 22:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -284,11 +286,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/03 01:00 has been triggered") } return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ - { - SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), - BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), - }, + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), @@ -300,7 +298,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -315,6 +313,7 @@ var _ = Describe("Shard", func() { BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), }, TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false), + Closed: true, }, { ID: tsdb.BlockID{ @@ -342,7 +341,7 @@ var _ = Describe("Shard", func() { series, err := shard.Series().GetByID(common.SeriesID(11)) Expect(err).NotTo(HaveOccurred()) t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 1*time.Hour) - span, err := series.Span(t1Range) + span, err := series.Span(context.Background(), t1Range) Expect(err).NotTo(HaveOccurred()) defer span.Close() writer, err := span.WriterBuilder().Family([]byte("test"), []byte("test")).Time(t1Range.End).Build() @@ -351,14 +350,14 @@ var _ = Describe("Shard", func() { Expect(err).NotTo(HaveOccurred()) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{ { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), }, { - SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), - BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), + SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), + BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), }, { SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), @@ -367,7 +366,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -381,6 +380,7 @@ var _ = Describe("Shard", func() { BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 12), }, TimeRange: timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false), + Closed: true, }, { ID: tsdb.BlockID{ @@ -388,7 +388,6 @@ var _ = Describe("Shard", func() { BlockID: tsdb.GenerateInternalID(tsdb.HOUR, 0o0), }, TimeRange: timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false), - Closed: true, }, { ID: tsdb.BlockID{ @@ -422,6 +421,7 @@ var _ = Describe("Shard", func() { Num: 1, }, 10, + 15, ) Expect(err).NotTo(HaveOccurred()) By("open 4 blocks") @@ -451,7 +451,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/02 13:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -497,7 +497,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/03 01:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700102), @@ -558,13 +558,14 @@ var _ = Describe("Shard", func() { Num: 7, }, 2, + 3, ) Expect(err).NotTo(HaveOccurred()) By("01/01 00:01 1st block is opened") t1 := clock.Now() Eventually(func() []tsdb.BlockState { return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -581,7 +582,7 @@ var _ = Describe("Shard", func() { GinkgoWriter.Println("01/01 10:00 has been triggered") } return shard.State().Blocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{ + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{ { ID: tsdb.BlockID{ SegID: tsdb.GenerateInternalID(tsdb.DAY, 19700101), @@ -600,7 +601,7 @@ var _ = Describe("Shard", func() { })) Eventually(func() []tsdb.BlockID { return shard.State().OpenBlocks - }, defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{})) + }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{})) }) }) }) diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go index 5710837..42bb4f5 100644 --- a/banyand/tsdb/tsdb.go +++ b/banyand/tsdb/tsdb.go @@ -107,10 +107,18 @@ type BlockID struct { BlockID uint16 } +func (b BlockID) String() string { + return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), parseSuffix(b.BlockID)) +} + func GenerateInternalID(unit IntervalUnit, suffix int) uint16 { return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4) } +func parseSuffix(id uint16) int { + return int((id << 12) >> 12) +} + type BlockState struct { ID BlockID TimeRange timestamp.TimeRange @@ -211,7 +219,7 @@ func createDatabase(ctx context.Context, db *database, startID int) (Database, e for i := startID; i < int(db.shardNum); i++ { db.logger.Info().Int("shard_id", i).Msg("creating a shard") so, errNewShard := OpenShard(ctx, common.ShardID(i), - db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize) + db.location, db.segmentSize, db.blockSize, db.ttl, defaultBlockQueueSize, defaultMaxBlockQueueSize) if errNewShard != nil { err = multierr.Append(err, errNewShard) continue @@ -243,6 +251,7 @@ func loadDatabase(ctx context.Context, db *database) (Database, error) { db.blockSize, db.ttl, defaultBlockQueueSize, + defaultMaxBlockQueueSize, ) if errOpenShard != nil { return errOpenShard diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go index cd11765..52e1157 100644 --- a/banyand/tsdb/tsdb_suite_test.go +++ b/banyand/tsdb/tsdb_suite_test.go @@ -18,16 +18,14 @@ package tsdb_test import ( "testing" - "time" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) -var defaultEventuallyTimeout = 30 * time.Second - func TestTsdb(t *testing.T) { RegisterFailHandler(Fail) RunSpecs(t, "Tsdb Suite") @@ -36,6 +34,6 @@ func TestTsdb(t *testing.T) { var _ = BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).Should(Succeed()) }) diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go index 684269f..5a482a8 100644 --- a/banyand/tsdb/tsdb_test.go +++ b/banyand/tsdb/tsdb_test.go @@ -30,6 +30,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -100,7 +101,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, tempDir string, now time func openDatabase(ctx context.Context, t *require.Assertions, path string) (db Database) { t.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) db, err := OpenDatabase( context.WithValue(ctx, logger.ContextKey, logger.GetLogger("test")), diff --git a/bydbctl/internal/cmd/cmd_suite_test.go b/bydbctl/internal/cmd/cmd_suite_test.go index 6dc56ae..dc563e3 100644 --- a/bydbctl/internal/cmd/cmd_suite_test.go +++ b/bydbctl/internal/cmd/cmd_suite_test.go @@ -24,6 +24,7 @@ import ( . "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestCmd(t *testing.T) { @@ -34,6 +35,6 @@ func TestCmd(t *testing.T) { var _ = BeforeSuite(func() { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(Succeed()) }) diff --git a/docs/crud/measure/query.md b/docs/crud/measure/query.md index 5dd8c1d..40639a7 100644 --- a/docs/crud/measure/query.md +++ b/docs/crud/measure/query.md @@ -43,6 +43,7 @@ fieldProjection: timeRange: begin: 2022-10-15T22:32:48Z end: 2022-10-15T23:32:48Z +EOF ``` The below command could query data in the last 30 minutes using relative time duration : @@ -58,6 +59,7 @@ tagProjection: tags: ["id", "entity_id"] fieldProjection: names: ["total", "value"] +EOF ``` ## API Reference diff --git a/docs/crud/stream/query.md b/docs/crud/stream/query.md index cc7dea3..620f572 100644 --- a/docs/crud/stream/query.md +++ b/docs/crud/stream/query.md @@ -43,6 +43,7 @@ projection: timeRange: begin: 2022-10-15T22:32:48+08:00 end: 2022-10-15T23:32:48+08:00 +EOF ``` The below command could query data in the last 30 minutes using relative time duration : @@ -58,6 +59,7 @@ projection: tags: ["trace_id"] - name: "data" tags: ["data_binary"] +EOF ``` ## API Reference diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go index 4d889cd..fda5215 100644 --- a/pkg/index/inverted/inverted_test.go +++ b/pkg/index/inverted/inverted_test.go @@ -32,6 +32,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/testcases" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) var serviceName = index.FieldKey{ @@ -173,7 +174,7 @@ func TestStore_Iterator(t *testing.T) { func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { t.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) tempDir, deferFunc = test.Space(t) return tempDir, deferFunc diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go index c0de013..4af5eb5 100644 --- a/pkg/index/lsm/lsm_test.go +++ b/pkg/index/lsm/lsm_test.go @@ -26,6 +26,7 @@ import ( "github.com/apache/skywalking-banyandb/pkg/index/testcases" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestStore_MatchTerm(t *testing.T) { @@ -63,7 +64,7 @@ func TestStore_Iterator(t *testing.T) { func setUp(t *require.Assertions) (tempDir string, deferFunc func()) { t.NoError(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })) tempDir, deferFunc = test.Space(t) return tempDir, deferFunc diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go index 3537ce7..716be97 100644 --- a/pkg/query/logical/common.go +++ b/pkg/query/logical/common.go @@ -19,6 +19,9 @@ package logical import ( "bytes" + "context" + "io" + "time" "github.com/pkg/errors" @@ -101,16 +104,15 @@ func ProjectItem(ec executor.ExecutionContext, item tsdb.Item, projectionFieldRe // This method is used by the underlying tableScan and localIndexScan plans. func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange, builders ...SeekerBuilder, -) ([]tsdb.Iterator, error) { +) ([]tsdb.Iterator, []io.Closer, error) { var itersInShard []tsdb.Iterator + var closers []io.Closer for _, seriesFound := range series { itersInSeries, err := func() ([]tsdb.Iterator, error) { - sp, errInner := seriesFound.Span(timeRange) - defer func(sp tsdb.SeriesSpan) { - if sp != nil { - _ = sp.Close() - } - }(sp) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + sp, errInner := seriesFound.Span(ctx, timeRange) + closers = append(closers, sp) if errInner != nil { return nil, errInner } @@ -129,16 +131,16 @@ func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange, return iters, nil }() if err != nil { - return nil, err + return nil, nil, err } if len(itersInSeries) > 0 { itersInShard = append(itersInShard, itersInSeries...) } } - return itersInShard, nil + return itersInShard, closers, nil } -var DefaultLimit uint32 = 100 +var DefaultLimit uint32 = 20 type Tag struct { familyName, name string diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go b/pkg/query/logical/measure/measure_plan_indexscan_local.go index c9147d4..69df5b3 100644 --- a/pkg/query/logical/measure/measure_plan_indexscan_local.go +++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go @@ -19,6 +19,7 @@ package measure import ( "fmt" + "io" "time" "go.uber.org/multierr" @@ -132,7 +133,14 @@ func (i *localIndexScan) Execute(ec executor.MeasureExecutionContext) (executor. b.Filter(i.filter) }) } - iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...) + iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...) + if len(closers) > 0 { + defer func(closers []io.Closer) { + for _, c := range closers { + _ = c.Close() + } + }(closers) + } if innerErr != nil { return nil, innerErr } diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go b/pkg/query/logical/stream/stream_plan_indexscan_global.go index 75549e1..549a83d 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_global.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go @@ -18,6 +18,7 @@ package stream import ( + "context" "fmt" "io" "time" @@ -96,7 +97,9 @@ func (t *globalIndexScan) executeForShard(ec executor.StreamExecutionContext, sh return elementsInShard, errors.WithStack(err) } err = func() error { - item, closer, errInner := series.Get(itemID) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + item, closer, errInner := series.Get(ctx, itemID) defer func(closer io.Closer) { if closer != nil { _ = closer.Close() diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go b/pkg/query/logical/stream/stream_plan_indexscan_local.go index 6cc4403..b33e4ff 100644 --- a/pkg/query/logical/stream/stream_plan_indexscan_local.go +++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go @@ -19,6 +19,7 @@ package stream import ( "fmt" + "io" "time" "google.golang.org/protobuf/types/known/timestamppb" @@ -77,7 +78,14 @@ func (i *localIndexScan) Execute(ec executor.StreamExecutionContext) ([]*streamv b.Filter(i.filter) }) } - iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...) + iters, closers, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, builders...) + if len(closers) > 0 { + defer func(closers []io.Closer) { + for _, c := range closers { + _ = c.Close() + } + }(closers) + } if innerErr != nil { return nil, innerErr } diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/pkg/test/flags/flags.go similarity index 70% copy from banyand/tsdb/bucket/bucket_suite_test.go copy to pkg/test/flags/flags.go index 59ca1fe..b2f83f5 100644 --- a/banyand/tsdb/bucket/bucket_suite_test.go +++ b/pkg/test/flags/flags.go @@ -14,24 +14,26 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -package bucket_test +package flags import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" + "time" +) - "github.com/apache/skywalking-banyandb/pkg/logger" +var ( + eventuallyTimeout string + EventuallyTimeout time.Duration + LogLevel = "debug" ) -func TestBucket(t *testing.T) { - RegisterFailHandler(Fail) - BeforeSuite(func() { - Expect(logger.Init(logger.Logging{ - Env: "dev", - Level: "warn", - })).Should(Succeed()) - }) - RunSpecs(t, "Bucket Suite") +func init() { + if eventuallyTimeout == "" { + EventuallyTimeout = time.Second * 3 + return + } + d, err := time.ParseDuration(eventuallyTimeout) + if err != nil { + panic(err) + } + EventuallyTimeout = d } diff --git a/test/integration/cold_query/query_suite_test.go b/test/integration/cold_query/query_suite_test.go index 9b3a190..849a059 100644 --- a/test/integration/cold_query/query_suite_test.go +++ b/test/integration/cold_query/query_suite_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -51,7 +52,7 @@ var ( var _ = SynchronizedBeforeSuite(func() []byte { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(Succeed()) var addr string addr, _, deferFunc = setup.SetUp() diff --git a/test/integration/query/query_suite_test.go b/test/integration/load/load_suite_test.go similarity index 61% copy from test/integration/query/query_suite_test.go copy to test/integration/load/load_suite_test.go index 0bb67be..b7430f2 100644 --- a/test/integration/query/query_suite_test.go +++ b/test/integration/load/load_suite_test.go @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -package integration_query_test +package integration_load_test import ( "testing" @@ -29,17 +29,13 @@ import ( "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" - "github.com/apache/skywalking-banyandb/pkg/timestamp" - cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure" - cases_measure_data "github.com/apache/skywalking-banyandb/test/cases/measure/data" cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream" cases_stream_data "github.com/apache/skywalking-banyandb/test/cases/stream/data" - cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn" ) -func TestIntegrationQuery(t *testing.T) { +func TestIntegrationLoad(t *testing.T) { RegisterFailHandler(Fail) - RunSpecs(t, "Integration Query Suite") + RunSpecs(t, "Integration Load Suite", Label("integration", "slow")) } var ( @@ -60,18 +56,26 @@ var _ = SynchronizedBeforeSuite(func() []byte { grpclib.WithTransportCredentials(insecure.NewCredentials()), ) Expect(err).NotTo(HaveOccurred()) - ns := timestamp.NowMilli().UnixNano() - now = time.Unix(0, ns-ns%int64(time.Minute)) - interval := 500 * time.Millisecond - // stream - cases_stream_data.Write(conn, "data.json", now, interval) - // measure - interval = time.Minute - cases_measure_data.Write(conn, "service_traffic", "sw_metric", "service_traffic_data.json", now, interval) - cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", "service_instance_traffic_data.json", now, interval) - cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data.json", now, interval) - cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data1.json", now.Add(10*time.Second), interval) - cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", "service_cpm_minute_data2.json", now.Add(10*time.Minute), interval) + days := 7 + hours := 24 + minutes := 60 + interval := 10 * time.Second + c := time.Now() + for i := 0; i < days; i++ { + date := c.Add(-time.Hour * time.Duration((days-i)*24)) + for h := 0; h < hours; h++ { + hour := date.Add(time.Hour * time.Duration(h)) + start := time.Now() + for j := 0; j < minutes; j++ { + n := hour.Add(time.Minute * time.Duration(j)) + ns := n.UnixNano() + now = time.Unix(0, ns-ns%int64(time.Minute)) + // stream + cases_stream_data.Write(conn, "data.json", now, interval) + } + GinkgoWriter.Printf("written stream in %s took %s \n", hour, time.Since(start)) + } + } Expect(conn.Close()).To(Succeed()) return []byte(addr) }, func(address []byte) { @@ -85,14 +89,6 @@ var _ = SynchronizedBeforeSuite(func() []byte { Connection: connection, BaseTime: now, } - cases_measure.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } - cases_topn.SharedContext = helpers.SharedContext{ - Connection: connection, - BaseTime: now, - } Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/integration/other/other_suite_test.go b/test/integration/other/other_suite_test.go index 4eaee84..eaf2474 100644 --- a/test/integration/other/other_suite_test.go +++ b/test/integration/other/other_suite_test.go @@ -24,6 +24,7 @@ import ( gm "github.com/onsi/gomega" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" ) func TestIntegrationOther(t *testing.T) { @@ -34,6 +35,6 @@ func TestIntegrationOther(t *testing.T) { var _ = g.BeforeSuite(func() { gm.Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(gm.Succeed()) }) diff --git a/test/integration/query/query_suite_test.go b/test/integration/query/query_suite_test.go index 0bb67be..69be763 100644 --- a/test/integration/query/query_suite_test.go +++ b/test/integration/query/query_suite_test.go @@ -27,6 +27,7 @@ import ( "google.golang.org/grpc/credentials/insecure" "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test/flags" "github.com/apache/skywalking-banyandb/pkg/test/helpers" "github.com/apache/skywalking-banyandb/pkg/test/setup" "github.com/apache/skywalking-banyandb/pkg/timestamp" @@ -51,7 +52,7 @@ var ( var _ = SynchronizedBeforeSuite(func() []byte { Expect(logger.Init(logger.Logging{ Env: "dev", - Level: "warn", + Level: flags.LogLevel, })).To(Succeed()) var addr string addr, _, deferFunc = setup.SetUp()