This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 28b5fd5  Add load test and inject testing flags (#204)
28b5fd5 is described below

commit 28b5fd5f23dee6bc4ea1ed729f7da1956ad257d0
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Nov 4 16:40:00 2022 +0800

    Add load test and inject testing flags (#204)
    
    * Add a slow load test case to CI
    * Update max opened blocks strategy. A shard could open 64 blocks
      at most. A cleanup goroutine could collect 10 inactive oldest blocks
       every minute until there are 4 blocks left in the cache.
    * Inject logging level and the timeout of eventually through "ldflags"
    
    Signed-off-by: Gao Hongtao <hanahm...@gmail.com>
---
 .github/workflows/load.yml                         |  57 +++++++
 Makefile                                           |   7 +-
 banyand/liaison/grpc/grpc_suite_test.go            |   3 +-
 banyand/measure/measure_suite_test.go              |   3 +-
 banyand/measure/measure_topn.go                    |   4 +-
 banyand/measure/measure_write.go                   |   6 +-
 banyand/metadata/metadata_test.go                  |   3 +-
 banyand/metadata/schema/schema_suite_test.go       |   3 +-
 banyand/query/processor_topn.go                    |   4 +-
 banyand/stream/stream_suite_test.go                |   3 +-
 banyand/stream/stream_write.go                     |   7 +-
 banyand/tsdb/block.go                              |  81 ++++++----
 banyand/tsdb/bucket/bucket_suite_test.go           |   3 +-
 banyand/tsdb/bucket/queue.go                       | 163 +++++++++++++++++----
 banyand/tsdb/bucket/queue_test.go                  |  83 ++++++++++-
 banyand/tsdb/bucket/strategy.go                    |   4 +-
 banyand/tsdb/indexdb.go                            |  14 +-
 banyand/tsdb/retention.go                          |  24 +--
 banyand/tsdb/segment.go                            |  94 ++++++++----
 banyand/tsdb/series.go                             |  20 +--
 banyand/tsdb/seriesdb.go                           |  22 +--
 banyand/tsdb/seriesdb_test.go                      |   5 +-
 banyand/tsdb/shard.go                              |  53 ++++---
 banyand/tsdb/shard_test.go                         |  53 +++----
 banyand/tsdb/tsdb.go                               |  11 +-
 banyand/tsdb/tsdb_suite_test.go                    |   6 +-
 banyand/tsdb/tsdb_test.go                          |   3 +-
 bydbctl/internal/cmd/cmd_suite_test.go             |   3 +-
 docs/crud/measure/query.md                         |   2 +
 docs/crud/stream/query.md                          |   2 +
 pkg/index/inverted/inverted_test.go                |   3 +-
 pkg/index/lsm/lsm_test.go                          |   3 +-
 pkg/query/logical/common.go                        |  22 +--
 .../measure/measure_plan_indexscan_local.go        |  10 +-
 .../logical/stream/stream_plan_indexscan_global.go |   5 +-
 .../logical/stream/stream_plan_indexscan_local.go  |  10 +-
 .../test/flags/flags.go                            |  32 ++--
 test/integration/cold_query/query_suite_test.go    |   3 +-
 .../load_suite_test.go}                            |  50 +++----
 test/integration/other/other_suite_test.go         |   3 +-
 test/integration/query/query_suite_test.go         |   3 +-
 41 files changed, 630 insertions(+), 260 deletions(-)

diff --git a/.github/workflows/load.yml b/.github/workflows/load.yml
new file mode 100644
index 0000000..d3682d4
--- /dev/null
+++ b/.github/workflows/load.yml
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+name: Continuous Integration
+
+on:
+  schedule:
+    - cron: '0 20 * * *'
+
+jobs:
+  test:
+    name: Load several days data
+    runs-on: ubuntu-20.04
+    strategy:
+      matrix:
+        tz: ["UTC", "Asia/Shanghai", "America/Los_Angeles"]
+    steps:
+      - name: Set timezone
+        run: sudo timedatectl set-timezone ${{ matrix.tz }}
+      - uses: actions/setup-node@v3
+        with:
+          node-version: 16.15
+      - name: Install Go
+        uses: actions/setup-go@v2
+        with:
+          go-version: 1.19
+      - name: Check out code into the Go module directory
+        uses: actions/checkout@v2
+      - uses: actions/cache@v3
+        id: cache-go
+        with:
+          path: |
+            ~/.cache/go-build
+            ~/go/pkg/mod
+          key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }}
+          restore-keys: |
+            ${{ runner.os }}-go-
+      - name: Update dependencies 
+        if: steps.cache-go.outputs.cache-hit != 'true'
+        run: GOPROXY=https://proxy.golang.org go mod download
+      - name: Generate mocks
+        run: make generate
+      - name: Test
+        run: TEST_EXTRA_OPTS="--label-filter slow" make -C test test
diff --git a/Makefile b/Makefile
index 37eee1c..099a0b8 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,12 @@ test-coverage: default ## Run the unit tests in all projects 
with coverage analy
 include scripts/build/ginkgo.mk
 
 test-ci: $(GINKGO) ## Run the unit tests in CI
-       $(GINKGO) --race --cover --covermode atomic --coverprofile=coverage.out 
./... 
+       $(GINKGO) --race \
+         -ldflags \
+         "-X 
github.com/apache/skywalking-banyandb/pkg/test/flags.eventuallyTimeout=30s -X 
github.com/apache/skywalking-banyandb/pkg/test/flags.LogLevel=warn" \
+         --cover --covermode atomic --coverprofile=coverage.out \
+         --label-filter !slow \
+         ./... 
 
 ##@ Code quality targets
 
diff --git a/banyand/liaison/grpc/grpc_suite_test.go 
b/banyand/liaison/grpc/grpc_suite_test.go
index ef85011..352574a 100644
--- a/banyand/liaison/grpc/grpc_suite_test.go
+++ b/banyand/liaison/grpc/grpc_suite_test.go
@@ -24,6 +24,7 @@ import (
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestGrpc(t *testing.T) {
@@ -34,6 +35,6 @@ func TestGrpc(t *testing.T) {
 var _ = BeforeSuite(func() {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).Should(Succeed())
 })
diff --git a/banyand/measure/measure_suite_test.go 
b/banyand/measure/measure_suite_test.go
index 3f295d8..9f6cc38 100644
--- a/banyand/measure/measure_suite_test.go
+++ b/banyand/measure/measure_suite_test.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        testmeasure "github.com/apache/skywalking-banyandb/pkg/test/measure"
 )
 
@@ -45,7 +46,7 @@ func TestMeasure(t *testing.T) {
 var _ = ginkgo.BeforeSuite(func() {
        gomega.Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(gomega.Succeed())
 })
 
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index 29295c3..6a969f1 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -166,7 +166,9 @@ func (t *topNStreamingProcessor) writeData(eventTime 
time.Time, timeBucket strin
        if err != nil {
                return err
        }
-       span, err := series.Create(eventTime)
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       span, err := series.Create(ctx, eventTime)
        if err != nil {
                if span != nil {
                        _ = span.Close()
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index c8a9aef..7cd8224 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -19,6 +19,8 @@ package measure
 
 import (
        "bytes"
+       "context"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -75,7 +77,9 @@ func (s *measure) write(shardID common.ShardID, seriesHashKey 
[]byte, value *mea
        if err != nil {
                return err
        }
-       wp, err := series.Create(t)
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       wp, err := series.Create(ctx, t)
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/metadata/metadata_test.go 
b/banyand/metadata/metadata_test.go
index fc3c7e5..682c762 100644
--- a/banyand/metadata/metadata_test.go
+++ b/banyand/metadata/metadata_test.go
@@ -27,6 +27,7 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        test "github.com/apache/skywalking-banyandb/pkg/test/stream"
 )
 
@@ -37,7 +38,7 @@ func Test_service_RulesBySubject(t *testing.T) {
        is := assert.New(t)
        is.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        ctx := context.TODO()
        s, _ := NewService(ctx)
diff --git a/banyand/metadata/schema/schema_suite_test.go 
b/banyand/metadata/schema/schema_suite_test.go
index aea8ea1..51afd65 100644
--- a/banyand/metadata/schema/schema_suite_test.go
+++ b/banyand/metadata/schema/schema_suite_test.go
@@ -24,6 +24,7 @@ import (
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestSchema(t *testing.T) {
@@ -35,6 +36,6 @@ func TestSchema(t *testing.T) {
 var _ = ginkgo.BeforeSuite(func() {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(Succeed())
 })
diff --git a/banyand/query/processor_topn.go b/banyand/query/processor_topn.go
index 91f1199..021c7eb 100644
--- a/banyand/query/processor_topn.go
+++ b/banyand/query/processor_topn.go
@@ -196,7 +196,9 @@ func familyIdentity(name string, flag []byte) []byte {
 }
 
 func (t *topNQueryProcessor) scanSeries(series tsdb.Series, request 
*measurev1.TopNRequest) ([]tsdb.Iterator, error) {
-       seriesSpan, err := series.Span(timestamp.NewInclusiveTimeRange(
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       seriesSpan, err := series.Span(ctx, timestamp.NewInclusiveTimeRange(
                request.GetTimeRange().GetBegin().AsTime(),
                request.GetTimeRange().GetEnd().AsTime()),
        )
diff --git a/banyand/stream/stream_suite_test.go 
b/banyand/stream/stream_suite_test.go
index 4f827eb..d3177ad 100644
--- a/banyand/stream/stream_suite_test.go
+++ b/banyand/stream/stream_suite_test.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        teststream "github.com/apache/skywalking-banyandb/pkg/test/stream"
 )
 
@@ -44,7 +45,7 @@ func TestStream(t *testing.T) {
 var _ = BeforeSuite(func() {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(Succeed())
 })
 
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 5dac816..af03580 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -18,6 +18,9 @@
 package stream
 
 import (
+       "context"
+       "time"
+
        "github.com/pkg/errors"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promauto"
@@ -87,7 +90,9 @@ func (s *stream) write(shardID common.ShardID, seriesHashKey 
[]byte, value *stre
                return err
        }
        t := timestamp.MToN(tp)
-       wp, err := series.Create(t)
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       wp, err := series.Create(ctx, t)
        if err != nil {
                if wp != nil {
                        _ = wp.Close()
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3bc87ea..8d050f5 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -30,6 +30,7 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       "go.uber.org/multierr"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
@@ -49,8 +50,11 @@ const (
        componentSecondLSMIdx      = "lsm"
 
        defaultMainMemorySize = 8 << 20
+       defaultEnqueueTimeout = 500 * time.Millisecond
 )
 
+var ErrBlockClosingInterrupted = errors.New("interrupt to close the block")
+
 type block struct {
        path       string
        l          *logger.Logger
@@ -106,13 +110,14 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
                deleted:   &atomic.Bool{},
                queue:     opts.queue,
        }
+       b.closed.Store(true)
        b.options(ctx)
        position := ctx.Value(common.PositionKey)
        if position != nil {
                b.position = position.(common.Position)
        }
 
-       return b, b.open()
+       return b, nil
 }
 
 func (b *block) options(ctx context.Context) {
@@ -187,24 +192,28 @@ func (b *block) open() (err error) {
        return nil
 }
 
-func (b *block) delegate() (BlockDelegate, error) {
+func (b *block) delegate(ctx context.Context) (BlockDelegate, error) {
        if b.deleted.Load() {
-               return nil, errors.WithMessagef(ErrBlockAbsent, "block %d is 
deleted", b.blockID)
+               return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is 
deleted", b)
+       }
+       blockID := BlockID{
+               BlockID: b.blockID,
+               SegID:   b.segID,
        }
        if b.incRef() {
+               b.queue.Touch(blockID)
                return &bDelegate{
                        delegate: b,
                }, nil
        }
        b.lock.Lock()
        defer b.lock.Unlock()
-       b.queue.Push(BlockID{
-               BlockID: b.blockID,
-               SegID:   b.segID,
-       })
-       // TODO: remove the block which fails to open from the queue
-       err := b.open()
-       if err != nil {
+       if err := b.queue.Push(ctx, blockID, func() error {
+               if !b.Closed() {
+                       return nil
+               }
+               return b.open()
+       }); err != nil {
                b.l.Error().Err(err).Stringer("block", b).Msg("fail to open 
block")
                return nil, err
        }
@@ -240,14 +249,23 @@ loop:
        goto loop
 }
 
-func (b *block) waitDone() {
-loop:
-       if b.ref.Load() < 1 {
-               b.ref.Store(0)
-               return
-       }
-       runtime.Gosched()
-       goto loop
+func (b *block) waitDone(stopped *atomic.Bool) <-chan struct{} {
+       ch := make(chan struct{})
+       go func() {
+       loop:
+               if b.ref.Load() < 1 {
+                       b.ref.Store(0)
+                       close(ch)
+                       return
+               }
+               if stopped.Load() {
+                       close(ch)
+                       return
+               }
+               runtime.Gosched()
+               goto loop
+       }()
+       return ch
 }
 
 func (b *block) flush() {
@@ -261,28 +279,37 @@ func (b *block) flush() {
        }
 }
 
-func (b *block) close() {
+func (b *block) close(ctx context.Context) (err error) {
        b.lock.Lock()
        defer b.lock.Unlock()
        if b.closed.Load() {
-               return
+               return nil
        }
        b.closed.Store(true)
-       b.waitDone()
+       stopWaiting := &atomic.Bool{}
+       ch := b.waitDone(stopWaiting)
+       select {
+       case <-ctx.Done():
+               b.closed.Store(false)
+               stopWaiting.Store(true)
+               return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
+       case <-ch:
+       }
        for _, closer := range b.closableLst {
-               _ = closer.Close()
+               err = multierr.Append(err, closer.Close())
        }
        close(b.stopCh)
+       return err
 }
 
-func (b *block) stopThenClose() {
+func (b *block) stopThenClose(ctx context.Context) error {
        if b.Reporter != nil {
                b.Stop()
        }
-       b.close()
+       return b.close(ctx)
 }
 
-func (b *block) delete() error {
+func (b *block) delete(ctx context.Context) error {
        if b.deleted.Load() {
                return nil
        }
@@ -290,7 +317,7 @@ func (b *block) delete() error {
        if b.Reporter != nil {
                b.Stop()
        }
-       b.close()
+       b.close(ctx)
        return os.RemoveAll(b.path)
 }
 
@@ -299,7 +326,7 @@ func (b *block) Closed() bool {
 }
 
 func (b *block) String() string {
-       return fmt.Sprintf("BlockID-%d-%d", b.segID, b.blockID)
+       return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.segID), 
parseSuffix(b.blockID))
 }
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go 
b/banyand/tsdb/bucket/bucket_suite_test.go
index 59ca1fe..289cd7d 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/banyand/tsdb/bucket/bucket_suite_test.go
@@ -23,6 +23,7 @@ import (
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestBucket(t *testing.T) {
@@ -30,7 +31,7 @@ func TestBucket(t *testing.T) {
        BeforeSuite(func() {
                Expect(logger.Init(logger.Logging{
                        Env:   "dev",
-                       Level: "warn",
+                       Level: flags.LogLevel,
                })).Should(Succeed())
        })
        RunSpecs(t, "Bucket Suite")
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 2b3dced..0bcf49a 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -17,29 +17,45 @@
 package bucket
 
 import (
+       "context"
        "errors"
+       "fmt"
+       "io"
        "sync"
+       "time"
 
        "github.com/hashicorp/golang-lru/simplelru"
+       "github.com/robfig/cron/v3"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-type EvictFn func(id interface{})
+type (
+       EvictFn       func(ctx context.Context, id interface{}) error
+       OnAddRecentFn func() error
+)
 
 type Queue interface {
-       Push(id interface{})
-       Remove(id interface{})
+       io.Closer
+       Touch(id fmt.Stringer) bool
+       Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
+       Remove(id fmt.Stringer)
        Len() int
+       Volume() int
        All() []interface{}
 }
 
 const (
-       DefaultRecentRatio  = 0.25
-       DefaultGhostEntries = 0.50
+       DefaultRecentRatio = 0.25
+
+       defaultEvictBatchSize = 10
 )
 
 var ErrInvalidSize = errors.New("invalid size")
 
 type lruQueue struct {
+       l          *logger.Logger
        size       int
        recentSize int
        evictSize  int
@@ -49,15 +65,17 @@ type lruQueue struct {
        frequent    simplelru.LRUCache
        recentEvict simplelru.LRUCache
        lock        sync.RWMutex
+
+       stopCh chan struct{}
 }
 
-func NewQueue(size int, evictFn EvictFn) (Queue, error) {
+func NewQueue(logger *logger.Logger, size int, maxSize int, clock 
timestamp.Clock, evictFn EvictFn) (Queue, error) {
        if size <= 0 {
                return nil, ErrInvalidSize
        }
 
        recentSize := int(float64(size) * DefaultRecentRatio)
-       evictSize := int(float64(size) * DefaultGhostEntries)
+       evictSize := maxSize - size
 
        recent, err := simplelru.NewLRU(size, nil)
        if err != nil {
@@ -79,37 +97,108 @@ func NewQueue(size int, evictFn EvictFn) (Queue, error) {
                recentEvict: recentEvict,
                evictSize:   evictSize,
                evictFn:     evictFn,
+               l:           logger,
+               stopCh:      make(chan struct{}),
        }
+       parser := cron.NewParser(cron.Second)
+       // every 60 seconds to clean up recentEvict
+       scheduler, err := parser.Parse("59")
+       if err != nil {
+               return nil, err
+       }
+       go func() {
+               now := clock.Now()
+               for {
+                       next := scheduler.Next(now)
+                       timer := clock.Timer(next.Sub(now))
+                       select {
+                       case now = <-timer.C:
+                               c.l.Info().Time("now", now).Msg("wakes")
+                               var evictLen int
+                               c.lock.RLock()
+                               evictLen = c.recentEvict.Len()
+                               c.lock.RUnlock()
+                               if evictLen < 1 {
+                                       continue
+                               }
+                               for i := 0; i < defaultEvictBatchSize; i++ {
+                                       c.lock.Lock()
+                                       if evictLen < 1 {
+                                               continue
+                                       }
+                                       ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                                       if err := c.removeOldest(ctx, 
c.recentEvict); err != nil {
+                                               
c.l.Error().Err(err).Msg("failed to remove oldest blocks")
+                                       }
+                                       cancel()
+                                       c.lock.Unlock()
+                               }
+                       case <-c.stopCh:
+                               c.l.Info().Msg("stop")
+                               timer.Stop()
+                               return
+                       }
+               }
+       }()
        return c, nil
 }
 
-func (q *lruQueue) Push(id interface{}) {
+func (q *lruQueue) Touch(id fmt.Stringer) bool {
        q.lock.Lock()
        defer q.lock.Unlock()
 
        if q.frequent.Contains(id) {
+               q.l.Debug().Stringer("id", id).Msg("get from frequent")
+               return true
+       }
+
+       if q.recent.Contains(id) {
+               q.l.Debug().Stringer("id", id).Msg("promote from recent to 
frequent")
+               q.recent.Remove(id)
                q.frequent.Add(id, nil)
-               return
+               return true
+       }
+       return false
+}
+
+func (q *lruQueue) Push(ctx context.Context, id fmt.Stringer, fn 
OnAddRecentFn) error {
+       q.lock.Lock()
+       defer q.lock.Unlock()
+
+       if q.frequent.Contains(id) {
+               q.l.Debug().Stringer("id", id).Msg("push to frequent")
+               q.frequent.Add(id, nil)
+               return nil
        }
 
        if q.recent.Contains(id) {
+               q.l.Debug().Stringer("id", id).Msg("promote from recent to 
frequent")
                q.recent.Remove(id)
                q.frequent.Add(id, nil)
-               return
+               return nil
        }
 
        if q.recentEvict.Contains(id) {
-               q.ensureSpace(true)
+               q.l.Debug().Stringer("id", id).Msg("restore from recentEvict")
+               if err := q.ensureSpace(ctx, true); err != nil {
+                       return err
+               }
                q.recentEvict.Remove(id)
                q.frequent.Add(id, nil)
-               return
+               return nil
        }
 
-       q.ensureSpace(false)
+       if err := q.ensureSpace(ctx, false); err != nil {
+               return err
+       }
        q.recent.Add(id, nil)
+       if fn == nil {
+               return nil
+       }
+       return fn()
 }
 
-func (q *lruQueue) Remove(id interface{}) {
+func (q *lruQueue) Remove(id fmt.Stringer) {
        q.lock.Lock()
        defer q.lock.Unlock()
 
@@ -134,6 +223,10 @@ func (q *lruQueue) Len() int {
        return q.recent.Len() + q.frequent.Len()
 }
 
+func (q *lruQueue) Volume() int {
+       return q.size + q.recentSize + q.evictSize
+}
+
 func (q *lruQueue) All() []interface{} {
        q.lock.RLock()
        defer q.lock.RUnlock()
@@ -144,32 +237,50 @@ func (q *lruQueue) All() []interface{} {
        return all
 }
 
-func (q *lruQueue) ensureSpace(recentEvict bool) {
+func (q *lruQueue) ensureSpace(ctx context.Context, recentEvict bool) error {
        recentLen := q.recent.Len()
        freqLen := q.frequent.Len()
        if recentLen+freqLen < q.size {
-               return
+               return nil
        }
        if recentLen > 0 && (recentLen > q.recentSize || (recentLen == 
q.recentSize && !recentEvict)) {
-               k, _, _ := q.recent.RemoveOldest()
-               q.addLst(q.recentEvict, q.evictSize, k)
-               return
+               k, _, ok := q.recent.GetOldest()
+               if !ok {
+                       return errors.New("failed to get oldest from recent 
queue")
+               }
+               if err := q.addLst(ctx, q.recentEvict, q.evictSize, k); err != 
nil {
+                       return err
+               }
+               q.recent.Remove(k)
+               return nil
        }
-       q.removeOldest(q.frequent)
+       return q.removeOldest(ctx, q.frequent)
 }
 
-func (q *lruQueue) addLst(lst simplelru.LRUCache, size int, id interface{}) {
+func (q *lruQueue) addLst(ctx context.Context, lst simplelru.LRUCache, size 
int, id interface{}) error {
        if lst.Len() < size {
                lst.Add(id, nil)
-               return
+               return nil
+       }
+       if err := q.removeOldest(ctx, lst); err != nil {
+               return err
        }
-       q.removeOldest(lst)
        lst.Add(id, nil)
+       return nil
 }
 
-func (q *lruQueue) removeOldest(lst simplelru.LRUCache) {
-       oldestID, _, ok := lst.RemoveOldest()
+func (q *lruQueue) removeOldest(ctx context.Context, lst simplelru.LRUCache) 
error {
+       oldestID, _, ok := lst.GetOldest()
        if ok && q.evictFn != nil {
-               q.evictFn(oldestID)
+               if err := q.evictFn(ctx, oldestID); err != nil {
+                       return err
+               }
+               _ = lst.Remove(oldestID)
        }
+       return nil
+}
+
+func (q *lruQueue) Close() error {
+       close(q.stopCh)
+       return nil
 }
diff --git a/banyand/tsdb/bucket/queue_test.go 
b/banyand/tsdb/bucket/queue_test.go
index 0eb6fe0..f256358 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -17,11 +17,17 @@
 package bucket_test
 
 import (
+       "context"
+       "strconv"
+       "time"
+
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/onsi/gomega/gleak"
 
        "github.com/apache/skywalking-banyandb/banyand/tsdb/bucket"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type queueEntryID struct {
@@ -29,6 +35,10 @@ type queueEntryID struct {
        second uint16
 }
 
+func (q queueEntryID) String() string {
+       return strconv.Itoa(int(q.first))
+}
+
 func entryID(id uint16) queueEntryID {
        return queueEntryID{
                first:  id,
@@ -37,26 +47,87 @@ func entryID(id uint16) queueEntryID {
 }
 
 var _ = Describe("Queue", func() {
+       var evictLst []queueEntryID
+       var l bucket.Queue
+       var clock timestamp.MockClock
        BeforeEach(func() {
                goods := gleak.Goroutines()
                DeferCleanup(func() {
                        
Eventually(gleak.Goroutines).ShouldNot(gleak.HaveLeaked(goods))
                })
-       })
-       It("pushes data", func() {
-               evictLst := make([]queueEntryID, 0)
-               l, err := bucket.NewQueue(128, func(id interface{}) {
+               evictLst = make([]queueEntryID, 0)
+               clock = timestamp.NewMockClock()
+               clock.Set(time.Date(1970, 0o1, 0o1, 0, 0, 0, 0, time.Local))
+               var err error
+               l, err = bucket.NewQueue(logger.GetLogger("test"), 128, 192, 
clock, func(_ context.Context, id interface{}) error {
                        evictLst = append(evictLst, id.(queueEntryID))
+                       return nil
                })
                Expect(err).ShouldNot(HaveOccurred())
-
+               DeferCleanup(func() {
+                       evictLst = evictLst[:0]
+                       Expect(l.Close()).To(Succeed())
+               })
+       })
+       It("pushes to recent", func() {
+               enRecentSize := 0
                for i := 0; i < 256; i++ {
-                       l.Push(entryID(uint16(i)))
+                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
+                               enRecentSize++
+                               return nil
+                       })).To(Succeed())
                }
+               Expect(enRecentSize).To(Equal(256))
                Expect(l.Len()).To(Equal(128))
                Expect(len(evictLst)).To(Equal(64))
                for i := 0; i < 64; i++ {
                        Expect(evictLst[i]).To(Equal(entryID(uint16(i))))
                }
        })
+
+       It("promotes to frequent", func() {
+               enRecentSize := 0
+               for i := 0; i < 128; i++ {
+                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
+                               enRecentSize++
+                               return nil
+                       })).To(Succeed())
+               }
+               Expect(enRecentSize).To(Equal(128))
+               Expect(l.Len()).To(Equal(128))
+               Expect(len(evictLst)).To(Equal(0))
+               for i := 0; i < 64; i++ {
+                       Expect(l.Touch(entryID(uint16(i)))).To(BeTrue())
+               }
+               enRecentSize = 0
+               for i := 128; i < 256; i++ {
+                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
+                               enRecentSize++
+                               return nil
+                       })).To(Succeed())
+               }
+               Expect(enRecentSize).To(Equal(128))
+               Expect(l.Len()).To(Equal(128))
+               Expect(len(evictLst)).To(Equal(64))
+               for i := 0; i < 64; i++ {
+                       Expect(evictLst[i]).To(Equal(entryID(uint16(i + 64))))
+               }
+       })
+
+       It("cleans up evict queue", func() {
+               enRecentSize := 0
+
+               for i := 0; i < 192; i++ {
+                       Expect(l.Push(context.Background(), entryID(uint16(i)), 
func() error {
+                               enRecentSize++
+                               return nil
+                       })).To(Succeed())
+               }
+               Expect(enRecentSize).To(Equal(192))
+               Expect(l.Len()).To(Equal(128))
+               Expect(len(evictLst)).To(Equal(0))
+               clock.Add(time.Minute)
+               GinkgoWriter.Printf("evicted size:%d \n", len(evictLst))
+               Expect(len(evictLst)).To(BeNumerically(">", 1))
+       })
 })
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 4a8d14e..6bf51f2 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -117,7 +117,7 @@ func (s *Strategy) observe(c Channel) bool {
                select {
                case status, more := <-c:
                        if !more {
-                               return true
+                               return moreBucket
                        }
                        ratio := Ratio(status.Volume) / Ratio(status.Capacity)
                        atomic.StoreUint64(&s.currentRatio, 
math.Float64bits(float64(ratio)))
@@ -136,7 +136,7 @@ func (s *Strategy) observe(c Channel) bool {
                                if next != nil {
                                        s.current.Store(next)
                                }
-                               return true
+                               return moreBucket
                        }
                case <-s.stopCh:
                        return false
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index e7ce863..c4281d0 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -28,7 +28,6 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/index"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 type IndexDatabase interface {
@@ -97,7 +96,6 @@ type indexWriterBuilder struct {
        scope        Entry
        segCtrl      *segmentController
        ts           time.Time
-       seg          *segment
        globalItemID *GlobalItemID
 }
 
@@ -108,11 +106,6 @@ func (i *indexWriterBuilder) Scope(scope Entry) 
IndexWriterBuilder {
 
 func (i *indexWriterBuilder) Time(ts time.Time) IndexWriterBuilder {
        i.ts = ts
-       segs := i.segCtrl.span(timestamp.NewTimeRangeDuration(ts, 0, true, 
false))
-       if len(segs) != 1 {
-               return i
-       }
-       i.seg = segs[0]
        return i
 }
 
@@ -122,15 +115,16 @@ func (i *indexWriterBuilder) GlobalItemID(itemID 
GlobalItemID) IndexWriterBuilde
 }
 
 func (i *indexWriterBuilder) Build() (IndexWriter, error) {
-       if i.seg == nil {
-               return nil, errors.WithStack(ErrNoTime)
+       seg, err := i.segCtrl.create(i.segCtrl.Format(i.ts), false)
+       if err != nil {
+               return nil, err
        }
        if i.globalItemID == nil {
                return nil, errors.WithStack(ErrNoVal)
        }
        return &indexWriter{
                scope:  i.scope,
-               seg:    i.seg,
+               seg:    seg,
                ts:     i.ts,
                itemID: i.globalItemID,
        }, nil
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index ab61a8d..755bf41 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -18,6 +18,7 @@
 package tsdb
 
 import (
+       "context"
        "sync"
        "time"
 
@@ -76,19 +77,18 @@ func (rc *retentionController) run() {
        for {
                next := rc.scheduler.Next(now)
                timer := rc.segment.clock.Timer(next.Sub(now))
-               for {
-                       select {
-                       case now = <-timer.C:
-                               rc.l.Info().Time("now", now).Msg("wake")
-                               if err := 
rc.segment.remove(now.Add(-rc.duration)); err != nil {
-                                       rc.l.Error().Err(err)
-                               }
-                       case <-rc.stopCh:
-                               timer.Stop()
-                               rc.l.Info().Msg("stop")
-                               return
+               select {
+               case now = <-timer.C:
+                       rc.l.Info().Time("now", now).Msg("wake")
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Minute)
+                       if err := rc.segment.remove(ctx, 
now.Add(-rc.duration)); err != nil {
+                               rc.l.Error().Err(err)
                        }
-                       break
+                       cancel()
+               case <-rc.stopCh:
+                       timer.Stop()
+                       rc.l.Info().Msg("stop")
+                       return
                }
        }
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 3393f67..6d08e2f 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -111,10 +111,14 @@ func openSegment(ctx context.Context, startTime 
time.Time, path, suffix string,
        return s, nil
 }
 
-func (s *segment) close() {
-       s.blockController.close()
+func (s *segment) close(ctx context.Context) error {
+       if err := s.blockController.close(ctx); err != nil {
+               return err
+       }
        if s.globalIndex != nil {
-               s.globalIndex.Close()
+               if err := s.globalIndex.Close(); err != nil {
+                       return err
+               }
        }
        if s.blockManageStrategy != nil {
                s.blockManageStrategy.Close()
@@ -122,19 +126,22 @@ func (s *segment) close() {
        if s.Reporter != nil {
                s.Stop()
        }
+       return nil
 }
 
-func (s *segment) closeBlock(id uint16) {
-       s.blockController.closeBlock(id)
+func (s *segment) closeBlock(ctx context.Context, id uint16) error {
+       return s.blockController.closeBlock(ctx, id)
 }
 
-func (s *segment) delete() error {
-       s.close()
+func (s *segment) delete(ctx context.Context) error {
+       if err := s.close(ctx); err != nil {
+               return err
+       }
        return os.RemoveAll(s.path)
 }
 
 func (s segment) String() string {
-       return fmt.Sprintf("SegID-%d", s.id)
+       return fmt.Sprintf("SegID-%d", parseSuffix(s.id))
 }
 
 func (s *segment) Stats() observability.Statistics {
@@ -177,7 +184,7 @@ func newBlockController(segCtx context.Context, segID 
uint16, location string, s
 func (bc *blockController) Current() (bucket.Reporter, error) {
        now := bc.clock.Now()
        ns := uint64(now.UnixNano())
-       if b := func() bucket.Reporter {
+       if b := func() *block {
                bc.RLock()
                defer bc.RUnlock()
                for _, s := range bc.lst {
@@ -187,9 +194,14 @@ func (bc *blockController) Current() (bucket.Reporter, 
error) {
                }
                return nil
        }(); b != nil {
+               if b.Closed() {
+                       if err := b.open(); err != nil {
+                               return nil, err
+                       }
+               }
                return b, nil
        }
-       return bc.create(now)
+       return bc.newHeadBlock(now)
 }
 
 func (bc *blockController) Next() (bucket.Reporter, error) {
@@ -198,17 +210,34 @@ func (bc *blockController) Next() (bucket.Reporter, 
error) {
                return nil, err
        }
        b := c.(*block)
-       return bc.create(bc.blockSize.NextTime(b.Start))
+
+       return bc.newHeadBlock(bc.blockSize.NextTime(b.Start))
+}
+
+func (bc *blockController) newHeadBlock(now time.Time) (*block, error) {
+       b, err := bc.create(now)
+       if err != nil {
+               return nil, err
+       }
+       return b, nil
 }
 
 func (bc *blockController) OnMove(prev bucket.Reporter, next bucket.Reporter) {
        event := bc.l.Info()
        if prev != nil {
                event.Stringer("prev", prev)
-               bc.blockQueue.Push(BlockID{
+               b := prev.(*block)
+               ctx, cancel := context.WithTimeout(context.Background(), 
defaultEnqueueTimeout)
+               defer cancel()
+               if err := bc.blockQueue.Push(ctx, BlockID{
                        SegID:   bc.segID,
-                       BlockID: prev.(*block).blockID,
-               })
+                       BlockID: b.blockID,
+               }, nil); err != nil {
+                       bc.l.Debug().Err(err).Msg("failed to push a expired 
head block to the queue")
+                       ctxClosing, cancelClosing := 
context.WithTimeout(context.Background(), defaultEnqueueTimeout)
+                       defer cancelClosing()
+                       b.close(ctxClosing)
+               }
        }
        if next != nil {
                event.Stringer("next", next)
@@ -236,7 +265,7 @@ func (bc *blockController) Parse(value string) (time.Time, 
error) {
        panic("invalid interval unit")
 }
 
-func (bc *blockController) span(timeRange timestamp.TimeRange) 
([]BlockDelegate, error) {
+func (bc *blockController) span(ctx context.Context, timeRange 
timestamp.TimeRange) ([]BlockDelegate, error) {
        bb := bc.search(func(b *block) bool {
                return b.Overlapping(timeRange)
        })
@@ -245,7 +274,7 @@ func (bc *blockController) span(timeRange 
timestamp.TimeRange) ([]BlockDelegate,
        }
        dd := make([]BlockDelegate, len(bb))
        for i, b := range bb {
-               d, err := b.delegate()
+               d, err := b.delegate(ctx)
                if err != nil {
                        return nil, err
                }
@@ -254,10 +283,10 @@ func (bc *blockController) span(timeRange 
timestamp.TimeRange) ([]BlockDelegate,
        return dd, nil
 }
 
-func (bc *blockController) get(blockID uint16) (BlockDelegate, error) {
+func (bc *blockController) get(ctx context.Context, blockID uint16) 
(BlockDelegate, error) {
        b := bc.getBlock(blockID)
        if b != nil {
-               return b.delegate()
+               return b.delegate(ctx)
        }
        return nil, nil
 }
@@ -297,14 +326,14 @@ func (bc *blockController) search(matcher func(*block) 
bool) (bb []*block) {
        return bb
 }
 
-func (bc *blockController) closeBlock(blockID uint16) {
+func (bc *blockController) closeBlock(ctx context.Context, blockID uint16) 
error {
        bc.RLock()
-       defer bc.RUnlock()
        b := bc.getBlock(blockID)
+       bc.RUnlock()
        if b == nil {
-               return
+               return nil
        }
-       b.close()
+       return b.close(ctx)
 }
 
 func (bc *blockController) startTime(suffix string) (time.Time, error) {
@@ -327,7 +356,7 @@ func (bc *blockController) startTime(suffix string) 
(time.Time, error) {
 func (bc *blockController) open() error {
        return WalkDir(
                bc.location,
-               segPathPrefix,
+               blockPathPrefix,
                func(suffix, absolutePath string) error {
                        bc.Lock()
                        defer bc.Unlock()
@@ -355,7 +384,15 @@ func (bc *blockController) create(startTime time.Time) 
(*block, error) {
        if err != nil {
                return nil, err
        }
-       return bc.load(suffix, segPath)
+       b, err := bc.load(suffix, segPath)
+       if err != nil {
+               return nil, err
+       }
+       err = b.open()
+       if err != nil {
+               return nil, err
+       }
+       return b, nil
 }
 
 func (bc *blockController) load(suffix, path string) (b *block, err error) {
@@ -393,17 +430,18 @@ func (bc *blockController) sortLst() {
        })
 }
 
-func (bc *blockController) close() {
+func (bc *blockController) close(ctx context.Context) (err error) {
        for _, s := range bc.lst {
-               s.stopThenClose()
+               err = multierr.Append(err, s.stopThenClose(ctx))
        }
+       return err
 }
 
-func (bc *blockController) remove(deadline time.Time) (err error) {
+func (bc *blockController) remove(ctx context.Context, deadline time.Time) 
(err error) {
        for _, b := range bc.blocks() {
                if b.End.Before(deadline) {
                        bc.Lock()
-                       if errDel := b.delete(); errDel != nil {
+                       if errDel := b.delete(ctx); errDel != nil {
                                err = multierr.Append(err, errDel)
                        } else {
                                b.queue.Remove(BlockID{
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index c0956de..09d62aa 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -75,9 +75,9 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
 
 type Series interface {
        ID() common.SeriesID
-       Span(timeRange timestamp.TimeRange) (SeriesSpan, error)
-       Create(t time.Time) (SeriesSpan, error)
-       Get(id GlobalItemID) (Item, io.Closer, error)
+       Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, 
error)
+       Create(ctx context.Context, t time.Time) (SeriesSpan, error)
+       Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, error)
 }
 
 type SeriesSpan interface {
@@ -95,8 +95,8 @@ type series struct {
        l       *logger.Logger
 }
 
-func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
-       b, err := s.blockDB.block(id)
+func (s *series) Get(ctx context.Context, id GlobalItemID) (Item, io.Closer, 
error) {
+       b, err := s.blockDB.block(ctx, id)
        if err != nil {
                return nil, nil, err
        }
@@ -114,8 +114,8 @@ func (s *series) ID() common.SeriesID {
        return s.id
 }
 
-func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
-       blocks, err := s.blockDB.span(timeRange)
+func (s *series) Span(ctx context.Context, timeRange timestamp.TimeRange) 
(SeriesSpan, error) {
+       blocks, err := s.blockDB.span(ctx, timeRange)
        if err != nil {
                return nil, err
        }
@@ -128,9 +128,9 @@ func (s *series) Span(timeRange timestamp.TimeRange) 
(SeriesSpan, error) {
        return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), timeRange, blocks, s.id, s.shardID), nil
 }
 
-func (s *series) Create(t time.Time) (SeriesSpan, error) {
+func (s *series) Create(ctx context.Context, t time.Time) (SeriesSpan, error) {
        tr := timestamp.NewInclusiveTimeRange(t, t)
-       blocks, err := s.blockDB.span(tr)
+       blocks, err := s.blockDB.span(ctx, tr)
        if err != nil {
                return nil, err
        }
@@ -140,7 +140,7 @@ func (s *series) Create(t time.Time) (SeriesSpan, error) {
                        Msg("load a series span")
                return newSeriesSpan(context.WithValue(context.Background(), 
logger.ContextKey, s.l), tr, blocks, s.id, s.shardID), nil
        }
-       b, err := s.blockDB.create(t)
+       b, err := s.blockDB.create(ctx, t)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index 7825338..b878b38 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -148,9 +148,9 @@ type SeriesDatabase interface {
 
 type blockDatabase interface {
        shardID() common.ShardID
-       span(timeRange timestamp.TimeRange) ([]BlockDelegate, error)
-       create(ts time.Time) (BlockDelegate, error)
-       block(id GlobalItemID) (BlockDelegate, error)
+       span(ctx context.Context, timeRange timestamp.TimeRange) 
([]BlockDelegate, error)
+       create(ctx context.Context, ts time.Time) (BlockDelegate, error)
+       block(ctx context.Context, id GlobalItemID) (BlockDelegate, error)
 }
 
 var (
@@ -189,12 +189,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, 
error) {
        return newSeries(s.context(), id, s), nil
 }
 
-func (s *seriesDB) block(id GlobalItemID) (BlockDelegate, error) {
+func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, 
error) {
        seg := s.segCtrl.get(id.segID)
        if seg == nil {
                return nil, nil
        }
-       return seg.blockController.get(id.blockID)
+       return seg.blockController.get(ctx, id.blockID)
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -251,11 +251,11 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
        return result, err
 }
 
-func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]BlockDelegate, 
error) {
+func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) 
([]BlockDelegate, error) {
        // TODO: return correct blocks
        result := make([]BlockDelegate, 0)
        for _, s := range s.segCtrl.span(timeRange) {
-               dd, err := s.blockController.span(timeRange)
+               dd, err := s.blockController.span(ctx, timeRange)
                if err != nil {
                        return nil, err
                }
@@ -267,14 +267,14 @@ func (s *seriesDB) span(timeRange timestamp.TimeRange) 
([]BlockDelegate, error)
        return result, nil
 }
 
-func (s *seriesDB) create(ts time.Time) (BlockDelegate, error) {
+func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, 
error) {
        s.Lock()
        defer s.Unlock()
        timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
        ss := s.segCtrl.span(timeRange)
        if len(ss) > 0 {
                s := ss[0]
-               dd, err := s.blockController.span(timeRange)
+               dd, err := s.blockController.span(ctx, timeRange)
                if err != nil {
                        return nil, err
                }
@@ -285,7 +285,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, 
error) {
                if err != nil {
                        return nil, err
                }
-               return block.delegate()
+               return block.delegate(ctx)
        }
        seg, err := s.segCtrl.create(s.segCtrl.Format(timeRange.Start), false)
        if err != nil {
@@ -295,7 +295,7 @@ func (s *seriesDB) create(ts time.Time) (BlockDelegate, 
error) {
        if err != nil {
                return nil, err
        }
-       return block.delegate()
+       return block.delegate(ctx)
 }
 
 func (s *seriesDB) context() context.Context {
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index f5b1875..3c4623c 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestEntity(t *testing.T) {
@@ -355,7 +356,7 @@ func Test_SeriesDatabase_Get(t *testing.T) {
        tester := assert.New(t)
        tester.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
@@ -376,7 +377,7 @@ func Test_SeriesDatabase_List(t *testing.T) {
        tester := assert.New(t)
        tester.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        dir, deferFunc := test.Space(require.New(t))
        defer deferFunc()
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index 277ee03..b77a232 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -34,8 +34,9 @@ import (
 )
 
 const (
-       defaultBlockQueueSize = 1 << 4
-       defaultKVMemorySize   = 1 << 20
+       defaultBlockQueueSize    = 4
+       defaultMaxBlockQueueSize = 64
+       defaultKVMemorySize      = 1 << 20
 )
 
 var _ Shard = (*shard)(nil)
@@ -54,7 +55,7 @@ type shard struct {
 }
 
 func OpenShard(ctx context.Context, id common.ShardID,
-       root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize 
int,
+       root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, 
maxOpenedBlockSize int,
 ) (Shard, error) {
        path, err := mkdir(shardTemplate, root, int(id))
        if err != nil {
@@ -70,7 +71,7 @@ func OpenShard(ctx context.Context, id common.ShardID,
                p.Shard = strconv.Itoa(int(id))
                return p
        })
-       sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, 
openedBlockSize, l)
+       sc, err := newSegmentController(shardCtx, path, segmentSize, blockSize, 
openedBlockSize, maxOpenedBlockSize, l)
        if err != nil {
                return nil, errors.Wrapf(err, "create the segment controller of 
the shard %d", int(id))
        }
@@ -164,8 +165,9 @@ func (s *shard) State() (shardState ShardState) {
 func (s *shard) Close() error {
        s.retentionController.stop()
        s.segmentManageStrategy.Close()
-       s.segmentController.close()
-       err := s.seriesDatabase.Close()
+       ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+       defer cancel()
+       err := multierr.Combine(s.segmentController.close(ctx), 
s.seriesDatabase.Close())
        close(s.stopCh)
        return err
 }
@@ -236,7 +238,7 @@ type segmentController struct {
 }
 
 func newSegmentController(shardCtx context.Context, location string, 
segmentSize, blockSize IntervalRule,
-       openedBlockSize int, l *logger.Logger,
+       openedBlockSize, maxOpenedBlockSize int, l *logger.Logger,
 ) (*segmentController, error) {
        clock, _ := timestamp.GetClock(shardCtx)
        sc := &segmentController{
@@ -248,16 +250,21 @@ func newSegmentController(shardCtx context.Context, 
location string, segmentSize
                clock:       clock,
        }
        var err error
-       sc.blockQueue, err = bucket.NewQueue(openedBlockSize, func(id 
interface{}) {
-               bsID := id.(BlockID)
-               seg := sc.get(bsID.SegID)
-               if seg == nil {
-                       l.Warn().Uint16("segID", bsID.SegID).Msg("segment is 
absent")
-                       return
-               }
-               l.Info().Uint16("blockID", bsID.BlockID).Uint16("segID", 
bsID.SegID).Msg("closing the block")
-               seg.closeBlock(bsID.BlockID)
-       })
+       sc.blockQueue, err = bucket.NewQueue(
+               l.Named("block-queue"),
+               openedBlockSize,
+               maxOpenedBlockSize,
+               clock,
+               func(ctx context.Context, id interface{}) error {
+                       bsID := id.(BlockID)
+                       seg := sc.get(bsID.SegID)
+                       if seg == nil {
+                               l.Warn().Int("segID", 
parseSuffix(bsID.SegID)).Msg("segment is absent")
+                               return nil
+                       }
+                       l.Info().Uint16("blockID", bsID.BlockID).Int("segID", 
parseSuffix(bsID.SegID)).Msg("closing the block")
+                       return seg.closeBlock(ctx, bsID.BlockID)
+               })
        return sc, err
 }
 
@@ -409,14 +416,14 @@ func (sc *segmentController) load(suffix, path string, 
createBlockIfEmpty bool)
        return seg, nil
 }
 
-func (sc *segmentController) remove(deadline time.Time) (err error) {
+func (sc *segmentController) remove(ctx context.Context, deadline time.Time) 
(err error) {
        sc.l.Info().Time("deadline", deadline).Msg("start to remove before 
deadline")
        for _, s := range sc.segments() {
                if s.End.Before(deadline) || 
s.Contains(uint64(deadline.UnixNano())) {
-                       err = multierr.Append(err, 
s.blockController.remove(deadline))
+                       err = multierr.Append(err, 
s.blockController.remove(ctx, deadline))
                        if s.End.Before(deadline) {
                                sc.Lock()
-                               if errDel := s.delete(); errDel != nil {
+                               if errDel := s.delete(ctx); errDel != nil {
                                        err = multierr.Append(err, errDel)
                                } else {
                                        sc.removeSeg(s.id)
@@ -437,8 +444,10 @@ func (sc *segmentController) removeSeg(segID uint16) {
        }
 }
 
-func (sc *segmentController) close() {
+func (sc *segmentController) close(ctx context.Context) (err error) {
        for _, s := range sc.lst {
-               s.close()
+               err = multierr.Append(err, s.close(ctx))
        }
+       err = multierr.Append(err, sc.blockQueue.Close())
+       return err
 }
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 1e1197e..6b66ea4 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -27,6 +27,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -70,13 +71,14 @@ var _ = Describe("Shard", func() {
                                        Num:  7,
                                },
                                2,
+                               3,
                        )
                        Expect(err).NotTo(HaveOccurred())
                        By("01/01 00:00 1st block is opened")
                        t1 := clock.Now()
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -93,7 +95,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/01 10:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -111,7 +113,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
                        By("01/01 13:00 moves to the 2nd block")
                        clock.Add(3 * time.Hour)
                        Eventually(func() []tsdb.BlockID {
@@ -119,7 +121,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/01 13:00 has 
been triggered")
                                }
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -133,7 +135,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/01 22:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -163,7 +165,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/02 01:00 has 
been triggered")
                                }
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -181,7 +183,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/02 10:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -218,7 +220,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/02 13:00 has 
been triggered")
                                }
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -240,7 +242,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/02 22:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -284,11 +286,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/03 01:00 has 
been triggered")
                                }
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
-                               {
-                                       SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-                                       BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
-                               },
+                       }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -300,7 +298,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -315,6 +313,7 @@ var _ = Describe("Shard", func() {
                                                BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
                                        },
                                        TimeRange: 
timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+                                       Closed:    true,
                                },
                                {
                                        ID: tsdb.BlockID{
@@ -342,7 +341,7 @@ var _ = Describe("Shard", func() {
                        series, err := 
shard.Series().GetByID(common.SeriesID(11))
                        Expect(err).NotTo(HaveOccurred())
                        t1Range := timestamp.NewInclusiveTimeRangeDuration(t1, 
1*time.Hour)
-                       span, err := series.Span(t1Range)
+                       span, err := series.Span(context.Background(), t1Range)
                        Expect(err).NotTo(HaveOccurred())
                        defer span.Close()
                        writer, err := 
span.WriterBuilder().Family([]byte("test"), 
[]byte("test")).Time(t1Range.End).Build()
@@ -351,14 +350,14 @@ var _ = Describe("Shard", func() {
                        Expect(err).NotTo(HaveOccurred())
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
                                },
                                {
-                                       SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
-                                       BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
+                                       SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
+                                       BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
                                },
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -367,7 +366,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -381,6 +380,7 @@ var _ = Describe("Shard", func() {
                                                BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
                                        },
                                        TimeRange: 
timestamp.NewTimeRangeDuration(t2, 12*time.Hour, true, false),
+                                       Closed:    true,
                                },
                                {
                                        ID: tsdb.BlockID{
@@ -388,7 +388,6 @@ var _ = Describe("Shard", func() {
                                                BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
                                        },
                                        TimeRange: 
timestamp.NewTimeRangeDuration(t3, 12*time.Hour, true, false),
-                                       Closed:    true,
                                },
                                {
                                        ID: tsdb.BlockID{
@@ -422,6 +421,7 @@ var _ = Describe("Shard", func() {
                                        Num:  1,
                                },
                                10,
+                               15,
                        )
                        Expect(err).NotTo(HaveOccurred())
                        By("open 4 blocks")
@@ -451,7 +451,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/02 13:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -497,7 +497,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/03 01:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
@@ -558,13 +558,14 @@ var _ = Describe("Shard", func() {
                                        Num:  7,
                                },
                                2,
+                               3,
                        )
                        Expect(err).NotTo(HaveOccurred())
                        By("01/01 00:01 1st block is opened")
                        t1 := clock.Now()
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -581,7 +582,7 @@ var _ = Describe("Shard", func() {
                                        GinkgoWriter.Println("01/01 10:00 has 
been triggered")
                                }
                                return shard.State().Blocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -600,7 +601,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
+                       }, 
flags.EventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
                })
        })
 })
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 5710837..42bb4f5 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -107,10 +107,18 @@ type BlockID struct {
        BlockID uint16
 }
 
+func (b BlockID) String() string {
+       return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), 
parseSuffix(b.BlockID))
+}
+
 func GenerateInternalID(unit IntervalUnit, suffix int) uint16 {
        return uint16(unit)<<12 | ((uint16(suffix) << 4) >> 4)
 }
 
+func parseSuffix(id uint16) int {
+       return int((id << 12) >> 12)
+}
+
 type BlockState struct {
        ID        BlockID
        TimeRange timestamp.TimeRange
@@ -211,7 +219,7 @@ func createDatabase(ctx context.Context, db *database, 
startID int) (Database, e
        for i := startID; i < int(db.shardNum); i++ {
                db.logger.Info().Int("shard_id", i).Msg("creating a shard")
                so, errNewShard := OpenShard(ctx, common.ShardID(i),
-                       db.location, db.segmentSize, db.blockSize, db.ttl, 
defaultBlockQueueSize)
+                       db.location, db.segmentSize, db.blockSize, db.ttl, 
defaultBlockQueueSize, defaultMaxBlockQueueSize)
                if errNewShard != nil {
                        err = multierr.Append(err, errNewShard)
                        continue
@@ -243,6 +251,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
                        db.blockSize,
                        db.ttl,
                        defaultBlockQueueSize,
+                       defaultMaxBlockQueueSize,
                )
                if errOpenShard != nil {
                        return errOpenShard
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index cd11765..52e1157 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -18,16 +18,14 @@ package tsdb_test
 
 import (
        "testing"
-       "time"
 
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
-var defaultEventuallyTimeout = 30 * time.Second
-
 func TestTsdb(t *testing.T) {
        RegisterFailHandler(Fail)
        RunSpecs(t, "Tsdb Suite")
@@ -36,6 +34,6 @@ func TestTsdb(t *testing.T) {
 var _ = BeforeSuite(func() {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).Should(Succeed())
 })
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index 684269f..5a482a8 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -100,7 +101,7 @@ func verifyDatabaseStructure(tester *assert.Assertions, 
tempDir string, now time
 func openDatabase(ctx context.Context, t *require.Assertions, path string) (db 
Database) {
        t.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        db, err := OpenDatabase(
                context.WithValue(ctx, logger.ContextKey, 
logger.GetLogger("test")),
diff --git a/bydbctl/internal/cmd/cmd_suite_test.go 
b/bydbctl/internal/cmd/cmd_suite_test.go
index 6dc56ae..dc563e3 100644
--- a/bydbctl/internal/cmd/cmd_suite_test.go
+++ b/bydbctl/internal/cmd/cmd_suite_test.go
@@ -24,6 +24,7 @@ import (
        . "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestCmd(t *testing.T) {
@@ -34,6 +35,6 @@ func TestCmd(t *testing.T) {
 var _ = BeforeSuite(func() {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(Succeed())
 })
diff --git a/docs/crud/measure/query.md b/docs/crud/measure/query.md
index 5dd8c1d..40639a7 100644
--- a/docs/crud/measure/query.md
+++ b/docs/crud/measure/query.md
@@ -43,6 +43,7 @@ fieldProjection:
 timeRange:
   begin: 2022-10-15T22:32:48Z
   end: 2022-10-15T23:32:48Z
+EOF
 ```
 
 The below command could query data in the last 30 minutes using relative time 
duration :
@@ -58,6 +59,7 @@ tagProjection:
     tags: ["id", "entity_id"]
 fieldProjection:
   names: ["total", "value"]
+EOF
 ```
 
 ## API Reference
diff --git a/docs/crud/stream/query.md b/docs/crud/stream/query.md
index cc7dea3..620f572 100644
--- a/docs/crud/stream/query.md
+++ b/docs/crud/stream/query.md
@@ -43,6 +43,7 @@ projection:
 timeRange:
   begin: 2022-10-15T22:32:48+08:00
   end: 2022-10-15T23:32:48+08:00
+EOF
 ```
 
 The below command could query data in the last 30 minutes using relative time 
duration :
@@ -58,6 +59,7 @@ projection:
     tags: ["trace_id"]
   - name: "data"
     tags: ["data_binary"]
+EOF
 ```
 
 ## API Reference
diff --git a/pkg/index/inverted/inverted_test.go 
b/pkg/index/inverted/inverted_test.go
index 4d889cd..fda5215 100644
--- a/pkg/index/inverted/inverted_test.go
+++ b/pkg/index/inverted/inverted_test.go
@@ -32,6 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/testcases"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 var serviceName = index.FieldKey{
@@ -173,7 +174,7 @@ func TestStore_Iterator(t *testing.T) {
 func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
        t.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        tempDir, deferFunc = test.Space(t)
        return tempDir, deferFunc
diff --git a/pkg/index/lsm/lsm_test.go b/pkg/index/lsm/lsm_test.go
index c0de013..4af5eb5 100644
--- a/pkg/index/lsm/lsm_test.go
+++ b/pkg/index/lsm/lsm_test.go
@@ -26,6 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/testcases"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestStore_MatchTerm(t *testing.T) {
@@ -63,7 +64,7 @@ func TestStore_Iterator(t *testing.T) {
 func setUp(t *require.Assertions) (tempDir string, deferFunc func()) {
        t.NoError(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        }))
        tempDir, deferFunc = test.Space(t)
        return tempDir, deferFunc
diff --git a/pkg/query/logical/common.go b/pkg/query/logical/common.go
index 3537ce7..716be97 100644
--- a/pkg/query/logical/common.go
+++ b/pkg/query/logical/common.go
@@ -19,6 +19,9 @@ package logical
 
 import (
        "bytes"
+       "context"
+       "io"
+       "time"
 
        "github.com/pkg/errors"
 
@@ -101,16 +104,15 @@ func ProjectItem(ec executor.ExecutionContext, item 
tsdb.Item, projectionFieldRe
 // This method is used by the underlying tableScan and localIndexScan plans.
 func ExecuteForShard(series tsdb.SeriesList, timeRange timestamp.TimeRange,
        builders ...SeekerBuilder,
-) ([]tsdb.Iterator, error) {
+) ([]tsdb.Iterator, []io.Closer, error) {
        var itersInShard []tsdb.Iterator
+       var closers []io.Closer
        for _, seriesFound := range series {
                itersInSeries, err := func() ([]tsdb.Iterator, error) {
-                       sp, errInner := seriesFound.Span(timeRange)
-                       defer func(sp tsdb.SeriesSpan) {
-                               if sp != nil {
-                                       _ = sp.Close()
-                               }
-                       }(sp)
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       defer cancel()
+                       sp, errInner := seriesFound.Span(ctx, timeRange)
+                       closers = append(closers, sp)
                        if errInner != nil {
                                return nil, errInner
                        }
@@ -129,16 +131,16 @@ func ExecuteForShard(series tsdb.SeriesList, timeRange 
timestamp.TimeRange,
                        return iters, nil
                }()
                if err != nil {
-                       return nil, err
+                       return nil, nil, err
                }
                if len(itersInSeries) > 0 {
                        itersInShard = append(itersInShard, itersInSeries...)
                }
        }
-       return itersInShard, nil
+       return itersInShard, closers, nil
 }
 
-var DefaultLimit uint32 = 100
+var DefaultLimit uint32 = 20
 
 type Tag struct {
        familyName, name string
diff --git a/pkg/query/logical/measure/measure_plan_indexscan_local.go 
b/pkg/query/logical/measure/measure_plan_indexscan_local.go
index c9147d4..69df5b3 100644
--- a/pkg/query/logical/measure/measure_plan_indexscan_local.go
+++ b/pkg/query/logical/measure/measure_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package measure
 
 import (
        "fmt"
+       "io"
        "time"
 
        "go.uber.org/multierr"
@@ -132,7 +133,14 @@ func (i *localIndexScan) Execute(ec 
executor.MeasureExecutionContext) (executor.
                        b.Filter(i.filter)
                })
        }
-       iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, 
builders...)
+       iters, closers, innerErr := logical.ExecuteForShard(seriesList, 
i.timeRange, builders...)
+       if len(closers) > 0 {
+               defer func(closers []io.Closer) {
+                       for _, c := range closers {
+                               _ = c.Close()
+                       }
+               }(closers)
+       }
        if innerErr != nil {
                return nil, innerErr
        }
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_global.go 
b/pkg/query/logical/stream/stream_plan_indexscan_global.go
index 75549e1..549a83d 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_global.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_global.go
@@ -18,6 +18,7 @@
 package stream
 
 import (
+       "context"
        "fmt"
        "io"
        "time"
@@ -96,7 +97,9 @@ func (t *globalIndexScan) executeForShard(ec 
executor.StreamExecutionContext, sh
                        return elementsInShard, errors.WithStack(err)
                }
                err = func() error {
-                       item, closer, errInner := series.Get(itemID)
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 5*time.Second)
+                       defer cancel()
+                       item, closer, errInner := series.Get(ctx, itemID)
                        defer func(closer io.Closer) {
                                if closer != nil {
                                        _ = closer.Close()
diff --git a/pkg/query/logical/stream/stream_plan_indexscan_local.go 
b/pkg/query/logical/stream/stream_plan_indexscan_local.go
index 6cc4403..b33e4ff 100644
--- a/pkg/query/logical/stream/stream_plan_indexscan_local.go
+++ b/pkg/query/logical/stream/stream_plan_indexscan_local.go
@@ -19,6 +19,7 @@ package stream
 
 import (
        "fmt"
+       "io"
        "time"
 
        "google.golang.org/protobuf/types/known/timestamppb"
@@ -77,7 +78,14 @@ func (i *localIndexScan) Execute(ec 
executor.StreamExecutionContext) ([]*streamv
                        b.Filter(i.filter)
                })
        }
-       iters, innerErr := logical.ExecuteForShard(seriesList, i.timeRange, 
builders...)
+       iters, closers, innerErr := logical.ExecuteForShard(seriesList, 
i.timeRange, builders...)
+       if len(closers) > 0 {
+               defer func(closers []io.Closer) {
+                       for _, c := range closers {
+                               _ = c.Close()
+                       }
+               }(closers)
+       }
        if innerErr != nil {
                return nil, innerErr
        }
diff --git a/banyand/tsdb/bucket/bucket_suite_test.go b/pkg/test/flags/flags.go
similarity index 70%
copy from banyand/tsdb/bucket/bucket_suite_test.go
copy to pkg/test/flags/flags.go
index 59ca1fe..b2f83f5 100644
--- a/banyand/tsdb/bucket/bucket_suite_test.go
+++ b/pkg/test/flags/flags.go
@@ -14,24 +14,26 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-package bucket_test
+package flags
 
 import (
-       "testing"
-
-       . "github.com/onsi/ginkgo/v2"
-       . "github.com/onsi/gomega"
+       "time"
+)
 
-       "github.com/apache/skywalking-banyandb/pkg/logger"
+var (
+       eventuallyTimeout string
+       EventuallyTimeout time.Duration
+       LogLevel          = "debug"
 )
 
-func TestBucket(t *testing.T) {
-       RegisterFailHandler(Fail)
-       BeforeSuite(func() {
-               Expect(logger.Init(logger.Logging{
-                       Env:   "dev",
-                       Level: "warn",
-               })).Should(Succeed())
-       })
-       RunSpecs(t, "Bucket Suite")
+func init() {
+       if eventuallyTimeout == "" {
+               EventuallyTimeout = time.Second * 3
+               return
+       }
+       d, err := time.ParseDuration(eventuallyTimeout)
+       if err != nil {
+               panic(err)
+       }
+       EventuallyTimeout = d
 }
diff --git a/test/integration/cold_query/query_suite_test.go 
b/test/integration/cold_query/query_suite_test.go
index 9b3a190..849a059 100644
--- a/test/integration/cold_query/query_suite_test.go
+++ b/test/integration/cold_query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
 var _ = SynchronizedBeforeSuite(func() []byte {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(Succeed())
        var addr string
        addr, _, deferFunc = setup.SetUp()
diff --git a/test/integration/query/query_suite_test.go 
b/test/integration/load/load_suite_test.go
similarity index 61%
copy from test/integration/query/query_suite_test.go
copy to test/integration/load/load_suite_test.go
index 0bb67be..b7430f2 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/load/load_suite_test.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package integration_query_test
+package integration_load_test
 
 import (
        "testing"
@@ -29,17 +29,13 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
-       "github.com/apache/skywalking-banyandb/pkg/timestamp"
-       cases_measure "github.com/apache/skywalking-banyandb/test/cases/measure"
-       cases_measure_data 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        cases_stream "github.com/apache/skywalking-banyandb/test/cases/stream"
        cases_stream_data 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
-       cases_topn "github.com/apache/skywalking-banyandb/test/cases/topn"
 )
 
-func TestIntegrationQuery(t *testing.T) {
+func TestIntegrationLoad(t *testing.T) {
        RegisterFailHandler(Fail)
-       RunSpecs(t, "Integration Query Suite")
+       RunSpecs(t, "Integration Load Suite", Label("integration", "slow"))
 }
 
 var (
@@ -60,18 +56,26 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                grpclib.WithTransportCredentials(insecure.NewCredentials()),
        )
        Expect(err).NotTo(HaveOccurred())
-       ns := timestamp.NowMilli().UnixNano()
-       now = time.Unix(0, ns-ns%int64(time.Minute))
-       interval := 500 * time.Millisecond
-       // stream
-       cases_stream_data.Write(conn, "data.json", now, interval)
-       // measure
-       interval = time.Minute
-       cases_measure_data.Write(conn, "service_traffic", "sw_metric", 
"service_traffic_data.json", now, interval)
-       cases_measure_data.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
-       cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
-       cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data1.json", now.Add(10*time.Second), interval)
-       cases_measure_data.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data2.json", now.Add(10*time.Minute), interval)
+       days := 7
+       hours := 24
+       minutes := 60
+       interval := 10 * time.Second
+       c := time.Now()
+       for i := 0; i < days; i++ {
+               date := c.Add(-time.Hour * time.Duration((days-i)*24))
+               for h := 0; h < hours; h++ {
+                       hour := date.Add(time.Hour * time.Duration(h))
+                       start := time.Now()
+                       for j := 0; j < minutes; j++ {
+                               n := hour.Add(time.Minute * time.Duration(j))
+                               ns := n.UnixNano()
+                               now = time.Unix(0, ns-ns%int64(time.Minute))
+                               // stream
+                               cases_stream_data.Write(conn, "data.json", now, 
interval)
+                       }
+                       GinkgoWriter.Printf("written stream in %s took %s \n", 
hour, time.Since(start))
+               }
+       }
        Expect(conn.Close()).To(Succeed())
        return []byte(addr)
 }, func(address []byte) {
@@ -85,14 +89,6 @@ var _ = SynchronizedBeforeSuite(func() []byte {
                Connection: connection,
                BaseTime:   now,
        }
-       cases_measure.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
-       cases_topn.SharedContext = helpers.SharedContext{
-               Connection: connection,
-               BaseTime:   now,
-       }
        Expect(err).NotTo(HaveOccurred())
 })
 
diff --git a/test/integration/other/other_suite_test.go 
b/test/integration/other/other_suite_test.go
index 4eaee84..eaf2474 100644
--- a/test/integration/other/other_suite_test.go
+++ b/test/integration/other/other_suite_test.go
@@ -24,6 +24,7 @@ import (
        gm "github.com/onsi/gomega"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
 )
 
 func TestIntegrationOther(t *testing.T) {
@@ -34,6 +35,6 @@ func TestIntegrationOther(t *testing.T) {
 var _ = g.BeforeSuite(func() {
        gm.Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(gm.Succeed())
 })
diff --git a/test/integration/query/query_suite_test.go 
b/test/integration/query/query_suite_test.go
index 0bb67be..69be763 100644
--- a/test/integration/query/query_suite_test.go
+++ b/test/integration/query/query_suite_test.go
@@ -27,6 +27,7 @@ import (
        "google.golang.org/grpc/credentials/insecure"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
@@ -51,7 +52,7 @@ var (
 var _ = SynchronizedBeforeSuite(func() []byte {
        Expect(logger.Init(logger.Logging{
                Env:   "dev",
-               Level: "warn",
+               Level: flags.LogLevel,
        })).To(Succeed())
        var addr string
        addr, _, deferFunc = setup.SetUp()

Reply via email to