This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch optimize-memory
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 42b52155cf05bc784bbe12e0c0390376b2e3cf6d
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Aug 8 15:23:15 2022 +0000

    Parameterize memory size
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 api/proto/banyandb/measure/v1/topn.pb.go  |   2 +-
 api/proto/banyandb/measure/v1/write.pb.go |   2 +-
 api/proto/banyandb/model/v1/common.pb.go  |   2 -
 banyand/kv/kv.go                          |  29 +++++
 banyand/measure/metadata.go               |  26 +++--
 banyand/measure/service.go                |   9 +-
 banyand/stream/metadata.go                |  26 +++--
 banyand/stream/service.go                 |  13 ++-
 banyand/stream/stream.go                  |   9 +-
 banyand/tsdb/block.go                     | 183 +++++++++++++++++++-----------
 banyand/tsdb/bucket/strategy.go           |  70 +++++-------
 banyand/tsdb/index/writer.go              |  38 ++++---
 banyand/tsdb/metric.go                    |  12 +-
 banyand/tsdb/segment.go                   |  78 +++++++------
 banyand/tsdb/series.go                    |  10 +-
 banyand/tsdb/seriesdb.go                  |  43 ++++---
 banyand/tsdb/shard.go                     |   4 +-
 banyand/tsdb/shard_test.go                |  26 ++---
 banyand/tsdb/tsdb.go                      |  20 ++--
 banyand/tsdb/tsdb_suite_test.go           |   2 +-
 dist/LICENSE                              |   4 +-
 go.mod                                    |  30 +++--
 go.sum                                    |   9 +-
 pkg/index/index.go                        |  22 ++--
 pkg/index/lsm/lsm.go                      |  11 +-
 pkg/run/channel.go                        |  50 --------
 pkg/test/helpers/fail_interceptor.go      |   5 +-
 27 files changed, 401 insertions(+), 334 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/topn.pb.go 
b/api/proto/banyandb/measure/v1/topn.pb.go
index 2f3cc61..92eb78e 100644
--- a/api/proto/banyandb/measure/v1/topn.pb.go
+++ b/api/proto/banyandb/measure/v1/topn.pb.go
@@ -41,7 +41,7 @@ const (
        _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
 )
 
-// TopNList contains a series of topN items
+//TopNList contains a series of topN items
 type TopNList struct {
        state         protoimpl.MessageState
        sizeCache     protoimpl.SizeCache
diff --git a/api/proto/banyandb/measure/v1/write.pb.go 
b/api/proto/banyandb/measure/v1/write.pb.go
index ac30c4c..e693cbb 100644
--- a/api/proto/banyandb/measure/v1/write.pb.go
+++ b/api/proto/banyandb/measure/v1/write.pb.go
@@ -41,7 +41,7 @@ const (
        _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
 )
 
-// DataPointValue is the data point for writing. It only contains values.
+//DataPointValue is the data point for writing. It only contains values.
 type DataPointValue struct {
        state         protoimpl.MessageState
        sizeCache     protoimpl.SizeCache
diff --git a/api/proto/banyandb/model/v1/common.pb.go 
b/api/proto/banyandb/model/v1/common.pb.go
index cd5e94a..b88a476 100644
--- a/api/proto/banyandb/model/v1/common.pb.go
+++ b/api/proto/banyandb/model/v1/common.pb.go
@@ -337,7 +337,6 @@ type TagValue struct {
        unknownFields protoimpl.UnknownFields
 
        // Types that are assignable to Value:
-       //
        //      *TagValue_Null
        //      *TagValue_Str
        //      *TagValue_StrArray
@@ -535,7 +534,6 @@ type FieldValue struct {
        unknownFields protoimpl.UnknownFields
 
        // Types that are assignable to Value:
-       //
        //      *FieldValue_Null
        //      *FieldValue_Str
        //      *FieldValue_Int
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 9ae630f..f40d05d 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -126,6 +126,17 @@ func TSSWithFlushCallback(callback func()) 
TimeSeriesOptions {
        }
 }
 
+func TSSWithMemTableSize(size int64) TimeSeriesOptions {
+       return func(store TimeSeriesStore) {
+               if size < 1 {
+                       return
+               }
+               if btss, ok := store.(*badgerTSS); ok {
+                       btss.dbOpts.MemTableSize = size
+               }
+       }
+}
+
 type Iterator interface {
        Next()
        Rewind()
@@ -160,6 +171,9 @@ func OpenTimeSeriesStore(shardID int, path string, options 
...TimeSeriesOptions)
        }
        // Put all values into LSM
        btss.dbOpts = btss.dbOpts.WithVLogPercentile(1.0)
+       if btss.dbOpts.MemTableSize < 8<<20 {
+               btss.dbOpts = btss.dbOpts.WithValueThreshold(1 << 10)
+       }
        var err error
        btss.db, err = badger.Open(btss.dbOpts)
        if err != nil {
@@ -187,6 +201,18 @@ func StoreWithNamedLogger(name string, l *logger.Logger) 
StoreOptions {
        }
 }
 
+// StoreWithMemTableSize sets MemTable size
+func StoreWithMemTableSize(size int64) StoreOptions {
+       return func(store Store) {
+               if size < 1 {
+                       return
+               }
+               if bdb, ok := store.(*badgerDB); ok {
+                       bdb.dbOpts = bdb.dbOpts.WithMemTableSize(size)
+               }
+       }
+}
+
 // OpenStore creates a new Store
 func OpenStore(shardID int, path string, options ...StoreOptions) (Store, 
error) {
        bdb := new(badgerDB)
@@ -196,6 +222,9 @@ func OpenStore(shardID int, path string, options 
...StoreOptions) (Store, error)
                opt(bdb)
        }
        bdb.dbOpts = bdb.dbOpts.WithNumVersionsToKeep(math.MaxUint32)
+       if bdb.dbOpts.MemTableSize > 0 && bdb.dbOpts.MemTableSize < 8<<20 {
+               bdb.dbOpts = bdb.dbOpts.WithValueThreshold(1 << 10)
+       }
 
        var err error
        bdb.db, err = badger.Open(bdb.dbOpts)
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 2fad3d5..ef235a3 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -39,7 +39,9 @@ type schemaRepo struct {
        metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo,
+       dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
        return schemaRepo{
                l:        l,
                metadata: metadata,
@@ -47,7 +49,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRe
                        metadata,
                        repo,
                        l,
-                       newSupplier(path, metadata, l),
+                       newSupplier(path, metadata, dbOpts, l),
                        event.MeasureTopicShardEvent,
                        event.MeasureTopicEntityEvent,
                ),
@@ -168,13 +170,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
        path     string
+       dbOpts   tsdb.DatabaseOpts
        metadata metadata.Repo
        l        *logger.Logger
 }
 
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) 
*supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts 
tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+       dbOpts.EncodingMethod = tsdb.EncodingMethod{
+               EncoderPool: newEncoderPool(plainChunkSize, intChunkSize, l),
+               DecoderPool: newDecoderPool(plainChunkSize, intChunkSize, l),
+       }
        return &supplier{
                path:     path,
+               dbOpts:   dbOpts,
                metadata: metadata,
                l:        l,
        }
@@ -195,17 +203,13 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md 
*commonv1.Metadata) (re
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+       opts := s.dbOpts
+       opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+       opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
        return tsdb.OpenDatabase(
                context.WithValue(context.Background(), common.PositionKey, 
common.Position{
                        Module:   "measure",
                        Database: groupSchema.Metadata.Name,
                }),
-               tsdb.DatabaseOpts{
-                       Location: path.Join(s.path, groupSchema.Metadata.Name),
-                       ShardNum: groupSchema.ResourceOpts.ShardNum,
-                       EncodingMethod: tsdb.EncodingMethod{
-                               EncoderPool: newEncoderPool(plainChunkSize, 
intChunkSize, s.l),
-                               DecoderPool: newDecoderPool(plainChunkSize, 
intChunkSize, s.l),
-                       },
-               })
+               opts)
 }
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 18239a2..26219d2 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
@@ -50,11 +51,13 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
+       root   string
+       dbOpts tsdb.DatabaseOpts
+
        schemaRepo    schemaRepo
        writeListener *writeCallback
        l             *logger.Logger
        metadata      metadata.Repo
-       root          string
        pipeline      queue.Queue
        repo          discovery.ServiceRepo
        // stop channel for the service
@@ -76,6 +79,8 @@ func (s *service) LoadGroup(name string) 
(resourceSchema.Group, bool) {
 func (s *service) FlagSet() *run.FlagSet {
        flagS := run.NewFlagSet("storage")
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
database")
+       flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size", 
16<<20, "block memory size")
+       flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 
1<<20, "series metadata memory size")
        return flagS
 }
 
@@ -98,7 +103,7 @@ func (s *service) PreRun() error {
        if err != nil {
                return err
        }
-       s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, 
s.repo, s.l)
+       s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, 
s.repo, s.dbOpts, s.l)
        for _, g := range groups {
                if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
                        continue
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 0895b16..172ed7a 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -41,7 +41,9 @@ type schemaRepo struct {
        metadata metadata.Repo
 }
 
-func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo, l *logger.Logger) schemaRepo {
+func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRepo,
+       dbOpts tsdb.DatabaseOpts, l *logger.Logger,
+) schemaRepo {
        return schemaRepo{
                l:        l,
                metadata: metadata,
@@ -49,7 +51,7 @@ func newSchemaRepo(path string, metadata metadata.Repo, repo 
discovery.ServiceRe
                        metadata,
                        repo,
                        l,
-                       newSupplier(path, metadata, l),
+                       newSupplier(path, metadata, dbOpts, l),
                        event.StreamTopicShardEvent,
                        event.StreamTopicEntityEvent,
                ),
@@ -170,13 +172,19 @@ var _ resourceSchema.ResourceSupplier = (*supplier)(nil)
 
 type supplier struct {
        path     string
+       dbOpts   tsdb.DatabaseOpts
        metadata metadata.Repo
        l        *logger.Logger
 }
 
-func newSupplier(path string, metadata metadata.Repo, l *logger.Logger) 
*supplier {
+func newSupplier(path string, metadata metadata.Repo, dbOpts 
tsdb.DatabaseOpts, l *logger.Logger) *supplier {
+       dbOpts.EncodingMethod = tsdb.EncodingMethod{
+               EncoderPool: encoding.NewPlainEncoderPool(chunkSize),
+               DecoderPool: encoding.NewPlainDecoderPool(chunkSize),
+       }
        return &supplier{
                path:     path,
+               dbOpts:   dbOpts,
                metadata: metadata,
                l:        l,
        }
@@ -197,17 +205,13 @@ func (s *supplier) ResourceSchema(repo metadata.Repo, md 
*commonv1.Metadata) (re
 }
 
 func (s *supplier) OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error) {
+       opts := s.dbOpts
+       opts.ShardNum = groupSchema.ResourceOpts.ShardNum
+       opts.Location = path.Join(s.path, groupSchema.Metadata.Name)
        return tsdb.OpenDatabase(
                context.WithValue(context.Background(), common.PositionKey, 
common.Position{
                        Module:   "stream",
                        Database: groupSchema.Metadata.Name,
                }),
-               tsdb.DatabaseOpts{
-                       Location: path.Join(s.path, groupSchema.Metadata.Name),
-                       ShardNum: groupSchema.ResourceOpts.ShardNum,
-                       EncodingMethod: tsdb.EncodingMethod{
-                               EncoderPool: 
encoding.NewPlainEncoderPool(chunkSize),
-                               DecoderPool: 
encoding.NewPlainDecoderPool(chunkSize),
-                       },
-               })
+               opts)
 }
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 1109ad4..2a7f555 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -30,6 +30,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
@@ -49,11 +50,13 @@ type Service interface {
 var _ Service = (*service)(nil)
 
 type service struct {
+       root   string
+       dbOpts tsdb.DatabaseOpts
+
        schemaRepo    schemaRepo
        writeListener *writeCallback
        l             *logger.Logger
        metadata      metadata.Repo
-       root          string
        pipeline      queue.Queue
        repo          discovery.ServiceRepo
        // stop channel for the service
@@ -71,6 +74,9 @@ func (s *service) Stream(metadata *commonv1.Metadata) 
(Stream, error) {
 func (s *service) FlagSet() *run.FlagSet {
        flagS := run.NewFlagSet("storage")
        flagS.StringVar(&s.root, "stream-root-path", "/tmp", "the root path of 
database")
+       flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20, 
"block memory size")
+       flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size", 
1<<20, "series metadata memory size")
+       flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize, 
"stream-global-index-mem-size", 2<<20, "global index memory size")
        return flagS
 }
 
@@ -93,7 +99,7 @@ func (s *service) PreRun() error {
        if err != nil {
                return err
        }
-       s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, 
s.repo, s.l)
+       s.schemaRepo = newSchemaRepo(path.Join(s.root, s.Name()), s.metadata, 
s.repo, s.dbOpts, s.l)
        for _, g := range groups {
                if g.Catalog != commonv1.Catalog_CATALOG_STREAM {
                        continue
@@ -149,5 +155,8 @@ func NewService(_ context.Context, metadata metadata.Repo, 
repo discovery.Servic
                metadata: metadata,
                repo:     repo,
                pipeline: pipeline,
+               dbOpts: tsdb.DatabaseOpts{
+                       EnableGlobalIndex: true,
+               },
        }, nil
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 83854ac..1af3871 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -93,10 +93,11 @@ func openStream(shardNum uint32, db tsdb.Supplier, spec 
streamSpec, l *logger.Lo
 
        sm.db = db
        sm.indexWriter = index.NewWriter(ctx, index.WriterOptions{
-               DB:         db,
-               ShardNum:   shardNum,
-               Families:   spec.schema.TagFamilies,
-               IndexRules: spec.indexRules,
+               DB:                db,
+               ShardNum:          shardNum,
+               Families:          spec.schema.TagFamilies,
+               IndexRules:        spec.indexRules,
+               EnableGlobalIndex: true,
        })
        return sm, nil
 }
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 3419a34..21290f6 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -21,13 +21,12 @@ import (
        "context"
        "io"
        "path"
+       "runtime"
        "strconv"
        "sync"
+       "sync/atomic"
        "time"
 
-       "github.com/dgraph-io/ristretto/z"
-       "go.uber.org/atomic"
-
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -37,7 +36,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index/inverted"
        "github.com/apache/skywalking-banyandb/pkg/index/lsm"
        "github.com/apache/skywalking-banyandb/pkg/logger"
-       "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
@@ -48,13 +46,16 @@ const (
 )
 
 type block struct {
-       path     string
-       l        *logger.Logger
-       suffix   string
-       ref      *z.Closer
-       lock     sync.RWMutex
-       closed   *atomic.Bool
-       position common.Position
+       path       string
+       l          *logger.Logger
+       queue      bucket.Queue
+       suffix     string
+       ref        *atomic.Int32
+       closed     *atomic.Bool
+       lock       sync.RWMutex
+       position   common.Position
+       memSize    int64
+       lsmMemSize int64
 
        store         kv.TimeSeriesStore
        invertedIndex index.Store
@@ -65,8 +66,7 @@ type block struct {
        segID          uint16
        blockID        uint16
        encodingMethod EncodingMethod
-       flushCh        *run.Chan[struct{}]
-       flushChQueue   chan *run.Chan[struct{}]
+       flushCh        chan struct{}
 }
 
 type blockOpts struct {
@@ -75,6 +75,7 @@ type blockOpts struct {
        startTime time.Time
        suffix    string
        path      string
+       queue     bucket.Queue
 }
 
 func newBlock(ctx context.Context, opts blockOpts) (b *block, err error) {
@@ -84,60 +85,66 @@ func newBlock(ctx context.Context, opts blockOpts) (b 
*block, err error) {
        }
        id := GenerateInternalID(opts.blockSize.Unit, suffixInteger)
        timeRange := timestamp.NewTimeRange(opts.startTime, 
opts.blockSize.NextTime(opts.startTime), true, false)
-       encodingMethodObject := ctx.Value(encodingMethodKey)
-       if encodingMethodObject == nil {
-               encodingMethodObject = EncodingMethod{
-                       EncoderPool: encoding.NewPlainEncoderPool(0),
-                       DecoderPool: encoding.NewPlainDecoderPool(0),
-               }
-       }
        clock, _ := timestamp.GetClock(ctx)
        b = &block{
-               segID:          opts.segID,
-               blockID:        id,
-               path:           opts.path,
-               l:              logger.Fetch(ctx, "block"),
-               TimeRange:      timeRange,
-               Reporter:       bucket.NewTimeBasedReporter(timeRange, clock),
-               closed:         atomic.NewBool(true),
-               encodingMethod: encodingMethodObject.(EncodingMethod),
-               flushChQueue:   make(chan *run.Chan[struct{}]),
+               segID:     opts.segID,
+               blockID:   id,
+               path:      opts.path,
+               l:         logger.Fetch(ctx, "block"),
+               TimeRange: timeRange,
+               Reporter:  bucket.NewTimeBasedReporter(timeRange, clock),
+               flushCh:   make(chan struct{}),
+               ref:       &atomic.Int32{},
+               closed:    &atomic.Bool{},
+               queue:     opts.queue,
        }
+       b.options(ctx)
        position := ctx.Value(common.PositionKey)
        if position != nil {
                b.position = position.(common.Position)
        }
        go func() {
-               for {
-                       ch := <-b.flushChQueue
-                       for {
-                               _, more := ch.Read()
-                               if !more {
-                                       break
-                               }
-                               b.flush()
-                       }
+               for range b.flushCh {
+                       b.flush()
                }
        }()
-       return b, err
+       return b, b.open()
 }
 
-func (b *block) open() (err error) {
-       b.lock.Lock()
-       defer b.lock.Unlock()
-       if !b.closed.Load() {
-               return nil
+func (b *block) options(ctx context.Context) {
+       var options DatabaseOpts
+       o := ctx.Value(optionsKey)
+       if o != nil {
+               options = o.(DatabaseOpts)
+       }
+       if options.EncodingMethod.EncoderPool == nil {
+               options.EncodingMethod.EncoderPool = 
encoding.NewPlainEncoderPool(0)
+       }
+       if options.EncodingMethod.EncoderPool == nil {
+               options.EncodingMethod.DecoderPool = 
encoding.NewPlainDecoderPool(0)
+       }
+       b.encodingMethod = options.EncodingMethod
+       if options.BlockMemSize < 1 {
+               b.memSize = 8 << 20 // 8MB
+       } else {
+               b.memSize = options.BlockMemSize
        }
-       b.ref = z.NewCloser(1)
-       b.flushCh = run.NewChan(make(chan struct{}))
-       b.flushChQueue <- b.flushCh
+       b.lsmMemSize = b.memSize / 8
+       defaultLSMMemSize := int64(1 << 20)
+       if b.lsmMemSize < defaultLSMMemSize {
+               b.lsmMemSize = defaultLSMMemSize
+       }
+}
+
+func (b *block) open() (err error) {
        if b.store, err = kv.OpenTimeSeriesStore(
                0,
                path.Join(b.path, componentMain),
                kv.TSSWithEncoding(b.encodingMethod.EncoderPool, 
b.encodingMethod.DecoderPool),
                kv.TSSWithLogger(b.l.Named(componentMain)),
+               kv.TSSWithMemTableSize(b.memSize),
                kv.TSSWithFlushCallback(func() {
-                       b.flushCh.Write(struct{}{})
+                       b.flushCh <- struct{}{}
                }),
        ); err != nil {
                return err
@@ -150,33 +157,76 @@ func (b *block) open() (err error) {
                return err
        }
        if b.lsmIndex, err = lsm.NewStore(lsm.StoreOpts{
-               Path:   path.Join(b.path, componentSecondLSMIdx),
-               Logger: b.l.Named(componentSecondLSMIdx),
+               Path:         path.Join(b.path, componentSecondLSMIdx),
+               Logger:       b.l.Named(componentSecondLSMIdx),
+               MemTableSize: b.lsmMemSize,
        }); err != nil {
                return err
        }
        b.closableLst = append(b.closableLst, b.invertedIndex, b.lsmIndex)
+       b.ref.Store(0)
        b.closed.Store(false)
-
        return nil
 }
 
-func (b *block) delegate() blockDelegate {
-       if b.isClosed() {
-               return nil
+func (b *block) delegate() (blockDelegate, error) {
+       if b.incRef() {
+               return &bDelegate{
+                       delegate: b,
+               }, nil
+       }
+       b.lock.Lock()
+       defer b.lock.Unlock()
+       b.queue.Push(BlockID{
+               BlockID: b.blockID,
+               SegID:   b.segID,
+       })
+       // TODO: remove the block which fails to open from the queue
+       err := b.open()
+       if err != nil {
+               b.l.Error().Err(err).Stringer("block", b).Msg("fail to open 
block")
+               return nil, err
        }
        b.incRef()
        return &bDelegate{
                delegate: b,
+       }, nil
+}
+
+func (b *block) incRef() bool {
+loop:
+       if b.Closed() {
+               return false
+       }
+       r := b.ref.Load()
+       if b.ref.CompareAndSwap(r, r+1) {
+               return true
        }
+       runtime.Gosched()
+       goto loop
 }
 
-func (b *block) dscRef() {
-       b.ref.Done()
+func (b *block) Done() {
+loop:
+       r := b.ref.Load()
+       if r < 1 {
+               return
+       }
+       if b.ref.CompareAndSwap(r, r-1) {
+               return
+       }
+       runtime.Gosched()
+       goto loop
 }
 
-func (b *block) incRef() {
-       b.ref.AddRunning(1)
+func (b *block) waitDone() {
+loop:
+       if b.ref.Load() < 1 {
+               b.ref.Store(0)
+               return
+       }
+       runtime.Gosched()
+       goto loop
 }
 
 func (b *block) flush() {
@@ -193,19 +243,14 @@ func (b *block) flush() {
 func (b *block) close() {
        b.lock.Lock()
        defer b.lock.Unlock()
-       if b.isClosed() {
-               return
-       }
-       b.dscRef()
-       b.ref.SignalAndWait()
+       b.closed.Store(true)
+       b.waitDone()
        for _, closer := range b.closableLst {
                _ = closer.Close()
        }
-       b.closed.Store(true)
-       b.flushCh.Close()
 }
 
-func (b *block) isClosed() bool {
+func (b *block) Closed() bool {
        return b.closed.Load()
 }
 
@@ -215,6 +260,10 @@ func (b *block) String() string {
 
 func (b *block) stats() (names []string, stats []observability.Statistics) {
        names = append(names, componentMain, componentSecondInvertedIdx, 
componentSecondLSMIdx)
+       if b.Closed() {
+               stats = make([]observability.Statistics, 3)
+               return
+       }
        stats = append(stats, b.store.Stats(), b.invertedIndex.Stats(), 
b.lsmIndex.Stats())
        return names, stats
 }
@@ -290,6 +339,6 @@ func (d *bDelegate) String() string {
 }
 
 func (d *bDelegate) Close() error {
-       d.delegate.dscRef()
+       d.delegate.Done()
        return nil
 }
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index 3cd6531..35c1a5e 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -18,8 +18,6 @@
 package bucket
 
 import (
-       "sync"
-
        "github.com/pkg/errors"
        "go.uber.org/multierr"
 
@@ -39,9 +37,7 @@ type Strategy struct {
        ctrl       Controller
        current    Reporter
        next       Reporter
-       mux        sync.Mutex
        logger     *logger.Logger
-       stopCh     chan struct{}
 }
 
 type StrategyOptions func(*Strategy)
@@ -68,9 +64,8 @@ func NewStrategy(ctrl Controller, options ...StrategyOptions) 
(*Strategy, error)
                return nil, errors.Wrap(ErrInvalidParameter, "controller is 
absent")
        }
        strategy := &Strategy{
-               ctrl:   ctrl,
-               ratio:  0.8,
-               stopCh: make(chan struct{}),
+               ctrl:  ctrl,
+               ratio: 0.8,
        }
        for _, opt := range options {
                opt(strategy)
@@ -93,44 +88,41 @@ func (s *Strategy) Run() {
        }
        reset()
        go func(s *Strategy) {
-               var err error
-       bucket:
-               c := s.current.Report()
                for {
-                       select {
-                       case status, closed := <-c:
-                               if !closed {
-                                       reset()
-                                       goto bucket
-                               }
-                               ratio := Ratio(status.Volume) / 
Ratio(status.Capacity)
-                               if ratio >= s.ratio && s.next == nil {
-                                       s.next, err = s.ctrl.Next()
-                                       if errors.Is(err, ErrNoMoreBucket) {
-                                               return
-                                       }
-                                       if err != nil {
-                                               s.logger.Err(err).Msg("failed 
to create the next bucket")
-                                       }
-                               }
-                               if ratio >= 1.0 {
-                                       s.mux.Lock()
-                                       s.ctrl.OnMove(s.current, s.next)
-                                       s.current = s.next
-                                       s.next = nil
-                                       s.mux.Unlock()
-                                       goto bucket
-                               }
-                       case <-s.stopCh:
+                       if s.current == nil {
                                return
                        }
+                       c := s.current.Report()
+                       s.observe(c)
                }
        }(s)
 }
 
+func (s *Strategy) observe(c Channel) {
+       var err error
+       moreBucket := true
+       for status := range c {
+               ratio := Ratio(status.Volume) / Ratio(status.Capacity)
+               if ratio >= s.ratio && s.next == nil && moreBucket {
+                       s.next, err = s.ctrl.Next()
+                       if errors.Is(err, ErrNoMoreBucket) {
+                               moreBucket = false
+                       } else if err != nil {
+                               s.logger.Err(err).Msg("failed to create the 
next bucket")
+                       }
+               }
+               if ratio >= 1.0 {
+                       s.move()
+                       return
+               }
+       }
+}
+
+func (s *Strategy) move() {
+       s.ctrl.OnMove(s.current, s.next)
+       s.current = s.next
+       s.next = nil
+}
+
 func (s *Strategy) Close() {
-       close(s.stopCh)
-       s.mux.Lock()
-       defer s.mux.Unlock()
-       s.ctrl.OnMove(s.current, nil)
 }
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 3f94c25..8b0dcc9 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -33,7 +33,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
-       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 type CallbackFn func()
@@ -52,18 +51,20 @@ type Value struct {
 }
 
 type WriterOptions struct {
-       ShardNum   uint32
-       Families   []*databasev1.TagFamilySpec
-       IndexRules []*databasev1.IndexRule
-       DB         tsdb.Supplier
+       ShardNum          uint32
+       Families          []*databasev1.TagFamilySpec
+       IndexRules        []*databasev1.IndexRule
+       DB                tsdb.Supplier
+       EnableGlobalIndex bool
 }
 
 type Writer struct {
-       l              *logger.Logger
-       db             tsdb.Supplier
-       shardNum       uint32
-       ch             *run.Chan[Message]
-       indexRuleIndex []*partition.IndexRuleLocator
+       l                 *logger.Logger
+       db                tsdb.Supplier
+       shardNum          uint32
+       enableGlobalIndex bool
+       ch                chan Message
+       indexRuleIndex    []*partition.IndexRuleLocator
 }
 
 func NewWriter(ctx context.Context, options WriterOptions) *Writer {
@@ -76,29 +77,26 @@ func NewWriter(ctx context.Context, options WriterOptions) 
*Writer {
        }
        w.shardNum = options.ShardNum
        w.db = options.DB
+       w.enableGlobalIndex = options.EnableGlobalIndex
        w.indexRuleIndex = partition.ParseIndexRuleLocators(options.Families, 
options.IndexRules)
-       w.ch = run.NewChan[Message](make(chan Message))
+       w.ch = make(chan Message)
        w.bootIndexGenerator()
        return w
 }
 
 func (s *Writer) Write(value Message) {
        go func(m Message) {
-               s.ch.Write(m)
+               s.ch <- m
        }(value)
 }
 
 func (s *Writer) Close() error {
-       return s.ch.Close()
+       return nil
 }
 
 func (s *Writer) bootIndexGenerator() {
        go func() {
-               for {
-                       m, more := s.ch.Read()
-                       if !more {
-                               return
-                       }
+               for m := range s.ch {
                        var err error
                        for _, ruleIndex := range s.indexRuleIndex {
                                rule := ruleIndex.Rule
@@ -106,6 +104,10 @@ func (s *Writer) bootIndexGenerator() {
                                case databasev1.IndexRule_LOCATION_SERIES:
                                        err = multierr.Append(err, 
writeLocalIndex(m.LocalWriter, ruleIndex, m.Value))
                                case databasev1.IndexRule_LOCATION_GLOBAL:
+                                       if !s.enableGlobalIndex {
+                                               
s.l.Warn().Stringer("index-rule", ruleIndex.Rule).Msg("global index is 
disabled")
+                                               continue
+                                       }
                                        err = multierr.Append(err, 
s.writeGlobalIndex(m.Scope, ruleIndex, m.LocalWriter.ItemID(), m.Value))
                                }
                        }
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index a45f57d..5cdea5b 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -66,11 +66,11 @@ func (s *shard) runStat() {
 }
 
 func (s *shard) stat() {
-       defer func() {
-               if r := recover(); r != nil {
-                       s.l.Warn().Interface("r", r).Msg("recovered")
-               }
-       }()
+       // defer func() {
+       //      if r := recover(); r != nil {
+       //              s.l.Warn().Interface("r", r).Msg("recovered")
+       //      }
+       // }()
        seriesStat := s.seriesDatabase.Stats()
        
s.curry(mtBytes).WithLabelValues("series").Set(float64(seriesStat.MemBytes))
        
s.curry(maxMtBytes).WithLabelValues("series").Set(float64(seriesStat.MaxMemBytes))
@@ -81,7 +81,7 @@ func (s *shard) stat() {
                segStats.MaxMemBytes += segStat.MaxMemBytes
                segStats.MemBytes += segStat.MemBytes
                for _, b := range seg.blockController.blocks() {
-                       if b.closed.Load() {
+                       if b.Closed() {
                                continue
                        }
                        names, bss := b.stats()
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index a758947..800f128 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -77,9 +77,25 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
        if err != nil {
                return nil, err
        }
-       if s.globalIndex, err = kv.OpenStore(0, indexPath, 
kv.StoreWithLogger(s.l)); err != nil {
-               return nil, err
+       o := ctx.Value(optionsKey)
+       if o != nil {
+               options := o.(DatabaseOpts)
+               if options.EnableGlobalIndex {
+                       memSize := options.GlobalIndexMemSize
+                       if memSize == 0 {
+                               memSize = 1 << 20
+                       }
+                       if s.globalIndex, err = kv.OpenStore(
+                               0,
+                               indexPath,
+                               kv.StoreWithLogger(s.l),
+                               kv.StoreWithMemTableSize(memSize),
+                       ); err != nil {
+                               return nil, err
+                       }
+               }
        }
+
        s.blockManageStrategy, err = bucket.NewStrategy(s.blockController, 
bucket.WithLogger(s.l))
        if err != nil {
                return nil, err
@@ -90,7 +106,9 @@ func openSegment(ctx context.Context, startTime time.Time, 
path, suffix string,
 
 func (s *segment) close() {
        s.blockController.close()
-       s.globalIndex.Close()
+       if s.globalIndex != nil {
+               s.globalIndex.Close()
+       }
        s.Stop()
 }
 
@@ -103,6 +121,9 @@ func (s segment) String() string {
 }
 
 func (s *segment) Stats() observability.Statistics {
+       if s.globalIndex == nil {
+               return observability.Statistics{}
+       }
        return s.globalIndex.Stats()
 }
 
@@ -159,7 +180,6 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
        if errors.Is(err, ErrEndOfSegment) {
                return nil, bucket.ErrNoMoreBucket
        }
-       err = reporter.open()
        if err != nil {
                return nil, err
        }
@@ -201,18 +221,30 @@ func (bc *blockController) Parse(value string) 
(time.Time, error) {
        panic("invalid interval unit")
 }
 
-func (bc *blockController) span(timeRange timestamp.TimeRange) (bb []*block) {
-       return bc.ensureBlockOpen(bc.search(func(b *block) bool {
+func (bc *blockController) span(timeRange timestamp.TimeRange) 
([]blockDelegate, error) {
+       bb := bc.search(func(b *block) bool {
                return b.Overlapping(timeRange)
-       }))
+       })
+       if bb == nil {
+               return nil, nil
+       }
+       dd := make([]blockDelegate, len(bb))
+       for i, b := range bb {
+               d, err := b.delegate()
+               if err != nil {
+                       return nil, err
+               }
+               dd[i] = d
+       }
+       return dd, nil
 }
 
-func (bc *blockController) get(blockID uint16) *block {
+func (bc *blockController) get(blockID uint16) (blockDelegate, error) {
        b := bc.getBlock(blockID)
        if b != nil {
-               return bc.ensureBlockOpen([]*block{b})[0]
+               return b.delegate()
        }
-       return nil
+       return nil, nil
 }
 
 func (bc *blockController) getBlock(blockID uint16) *block {
@@ -250,26 +282,6 @@ func (bc *blockController) search(matcher func(*block) 
bool) (bb []*block) {
        return bb
 }
 
-func (bc *blockController) ensureBlockOpen(blocks []*block) (openedBlocks 
[]*block) {
-       if blocks == nil {
-               return nil
-       }
-       for _, b := range blocks {
-               if b.isClosed() {
-                       if err := b.open(); err != nil {
-                               bc.l.Error().Err(err).Stringer("block", 
b).Msg("fail to open block")
-                               continue
-                       }
-               }
-               openedBlocks = append(openedBlocks, b)
-               bc.blockQueue.Push(BlockID{
-                       BlockID: b.blockID,
-                       SegID:   b.segID,
-               })
-       }
-       return openedBlocks
-}
-
 func (bc *blockController) closeBlock(blockID uint16) {
        bc.RLock()
        defer bc.RUnlock()
@@ -309,13 +321,10 @@ func (bc *blockController) open() error {
                return err
        }
        if bc.Current() == nil {
-               b, err := bc.create(bc.clock.Now())
+               _, err := bc.create(bc.clock.Now())
                if err != nil {
                        return err
                }
-               if err = b.open(); err != nil {
-                       return err
-               }
        }
        return nil
 }
@@ -351,6 +360,7 @@ func (bc *blockController) load(suffix, path string) (b 
*block, err error) {
                        startTime: starTime,
                        suffix:    suffix,
                        blockSize: bc.blockSize,
+                       queue:     bc.blockQueue,
                }); err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index bed9555..1696616 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -95,7 +95,10 @@ type series struct {
 }
 
 func (s *series) Get(id GlobalItemID) (Item, io.Closer, error) {
-       b := s.blockDB.block(id)
+       b, err := s.blockDB.block(id)
+       if err != nil {
+               return nil, nil, err
+       }
        if b == nil {
                return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", 
id)
        }
@@ -111,7 +114,10 @@ func (s *series) ID() common.SeriesID {
 }
 
 func (s *series) Span(timeRange timestamp.TimeRange) (SeriesSpan, error) {
-       blocks := s.blockDB.span(timeRange)
+       blocks, err := s.blockDB.span(timeRange)
+       if err != nil {
+               return nil, err
+       }
        if len(blocks) < 1 {
                return nil, ErrEmptySeriesSpan
        }
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index e2a6ef8..a7eb0f9 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -132,8 +132,8 @@ type SeriesDatabase interface {
 
 type blockDatabase interface {
        shardID() common.ShardID
-       span(timeRange timestamp.TimeRange) []blockDelegate
-       block(id GlobalItemID) blockDelegate
+       span(timeRange timestamp.TimeRange) ([]blockDelegate, error)
+       block(id GlobalItemID) (blockDelegate, error)
 }
 
 var (
@@ -172,16 +172,12 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, 
error) {
        return newSeries(s.context(), id, s), nil
 }
 
-func (s *seriesDB) block(id GlobalItemID) blockDelegate {
+func (s *seriesDB) block(id GlobalItemID) (blockDelegate, error) {
        seg := s.segCtrl.get(id.segID)
        if seg == nil {
-               return nil
-       }
-       block := seg.blockController.get(id.blockID)
-       if block == nil {
-               return nil
+               return nil, nil
        }
-       return block.delegate()
+       return seg.blockController.get(id.blockID)
 }
 
 func (s *seriesDB) shardID() common.ShardID {
@@ -238,18 +234,20 @@ func (s *seriesDB) List(path Path) (SeriesList, error) {
        return result, err
 }
 
-func (s *seriesDB) span(timeRange timestamp.TimeRange) []blockDelegate {
+func (s *seriesDB) span(timeRange timestamp.TimeRange) ([]blockDelegate, 
error) {
        // TODO: return correct blocks
        result := make([]blockDelegate, 0)
        for _, s := range s.segCtrl.span(timeRange) {
-               for _, b := range s.blockController.span(timeRange) {
-                       bd := b.delegate()
-                       if bd != nil {
-                               result = append(result, bd)
-                       }
+               dd, err := s.blockController.span(timeRange)
+               if err != nil {
+                       return nil, err
                }
+               if dd == nil {
+                       continue
+               }
+               result = append(result, dd...)
        }
-       return result
+       return result, nil
 }
 
 func (s *seriesDB) context() context.Context {
@@ -270,8 +268,19 @@ func newSeriesDataBase(ctx context.Context, shardID 
common.ShardID, path string,
                segCtrl: segCtrl,
                l:       logger.Fetch(ctx, "series_database"),
        }
+       memSize := int64(1 << 20)
+       o := ctx.Value(optionsKey)
+       if o != nil {
+               options := o.(DatabaseOpts)
+               if options.SeriesMemSize > 1 {
+                       memSize = options.SeriesMemSize
+               }
+       }
        var err error
-       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md", 
kv.StoreWithNamedLogger("metadata", sdb.l))
+       sdb.seriesMetadata, err = kv.OpenStore(0, path+"/md",
+               kv.StoreWithNamedLogger("metadata", sdb.l),
+               kv.StoreWithMemTableSize(memSize),
+       )
        if err != nil {
                return nil, err
        }
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index a318a80..6310952 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -126,7 +126,7 @@ func (s *shard) State() (shardState ShardState) {
                                        BlockID: b.blockID,
                                },
                                TimeRange: b.TimeRange,
-                               Closed:    b.isClosed(),
+                               Closed:    b.Closed(),
                        })
                }
        }
@@ -135,6 +135,7 @@ func (s *shard) State() (shardState ShardState) {
        for i, v := range s.segmentController.blockQueue.All() {
                shardState.OpenBlocks[i] = v.(BlockID)
        }
+       s.l.Info().Interface("", shardState).Msg("state")
        return shardState
 }
 
@@ -295,7 +296,6 @@ func (sc *segmentController) OnMove(prev bucket.Reporter, 
next bucket.Reporter)
        event := sc.l.Info()
        if prev != nil {
                event.Stringer("prev", prev)
-               prev.(*segment).blockManageStrategy.Close()
        }
        if next != nil {
                event.Stringer("next", next)
diff --git a/banyand/tsdb/shard_test.go b/banyand/tsdb/shard_test.go
index 7ef3839..16da8ac 100644
--- a/banyand/tsdb/shard_test.go
+++ b/banyand/tsdb/shard_test.go
@@ -67,7 +67,7 @@ var _ = Describe("Shard", func() {
                        t1 := clock.Now()
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -81,7 +81,7 @@ var _ = Describe("Shard", func() {
                        t2 := clock.Now().Add(2 * time.Hour)
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -99,12 +99,12 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{}))
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{}))
                        By("01/01 13:00 moves to the 2nd block")
                        clock.Add(3 * time.Hour)
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -115,7 +115,7 @@ var _ = Describe("Shard", func() {
                        t3 := clock.Now().Add(2 * time.Hour)
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -142,7 +142,7 @@ var _ = Describe("Shard", func() {
                        clock.Add(3 * time.Hour)
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }).Should(Equal([]tsdb.BlockID{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -157,7 +157,7 @@ var _ = Describe("Shard", func() {
                        t4 := clock.Now().Add(2 * time.Hour)
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -191,7 +191,7 @@ var _ = Describe("Shard", func() {
                        clock.Add(3 * time.Hour)
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -210,7 +210,7 @@ var _ = Describe("Shard", func() {
                        t5 := clock.Now().Add(2 * time.Hour)
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -251,7 +251,7 @@ var _ = Describe("Shard", func() {
                        clock.Add(3 * time.Hour)
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 0o0),
@@ -267,7 +267,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
@@ -318,7 +318,7 @@ var _ = Describe("Shard", func() {
                        Expect(err).NotTo(HaveOccurred())
                        Eventually(func() []tsdb.BlockID {
                                return shard.State().OpenBlocks
-                       }, defaultEventallyTimeout).Should(Equal([]tsdb.BlockID{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockID{
                                {
                                        SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700102),
                                        BlockID: 
tsdb.GenerateInternalID(tsdb.HOUR, 12),
@@ -334,7 +334,7 @@ var _ = Describe("Shard", func() {
                        }))
                        Eventually(func() []tsdb.BlockState {
                                return shard.State().Blocks
-                       }, 
defaultEventallyTimeout).Should(Equal([]tsdb.BlockState{
+                       }, 
defaultEventuallyTimeout).Should(Equal([]tsdb.BlockState{
                                {
                                        ID: tsdb.BlockID{
                                                SegID:   
tsdb.GenerateInternalID(tsdb.DAY, 19700101),
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 20d8e6e..feeb6b4 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -59,10 +59,10 @@ var (
        ErrInvalidShardID = errors.New("invalid shard id")
        ErrOpenDatabase   = errors.New("fails to open the database")
 
-       encodingMethodKey = contextEncodingMethodKey{}
+       optionsKey = contextOptionsKey{}
 )
 
-type contextEncodingMethodKey struct{}
+type contextOptionsKey struct{}
 
 type Supplier interface {
        SupplyTSDB() Database
@@ -84,11 +84,15 @@ type Shard interface {
 var _ Database = (*database)(nil)
 
 type DatabaseOpts struct {
-       Location       string
-       ShardNum       uint32
-       EncodingMethod EncodingMethod
-       SegmentSize    IntervalRule
-       BlockSize      IntervalRule
+       Location           string
+       ShardNum           uint32
+       EncodingMethod     EncodingMethod
+       SegmentSize        IntervalRule
+       BlockSize          IntervalRule
+       BlockMemSize       int64
+       SeriesMemSize      int64
+       EnableGlobalIndex  bool
+       GlobalIndexMemSize int64
 }
 
 type EncodingMethod struct {
@@ -186,7 +190,7 @@ func OpenDatabase(ctx context.Context, opts DatabaseOpts) 
(Database, error) {
                return nil, errors.Wrap(err, "failed to read directory contents 
failed")
        }
        thisContext := context.WithValue(ctx, logger.ContextKey, db.logger)
-       thisContext = context.WithValue(thisContext, encodingMethodKey, 
opts.EncodingMethod)
+       thisContext = context.WithValue(thisContext, optionsKey, opts)
        if len(entries) > 0 {
                return loadDatabase(thisContext, db)
        }
diff --git a/banyand/tsdb/tsdb_suite_test.go b/banyand/tsdb/tsdb_suite_test.go
index 97c11f2..cd11765 100644
--- a/banyand/tsdb/tsdb_suite_test.go
+++ b/banyand/tsdb/tsdb_suite_test.go
@@ -26,7 +26,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
-var defaultEventallyTimeout = 30 * time.Second
+var defaultEventuallyTimeout = 30 * time.Second
 
 func TestTsdb(t *testing.T) {
        RegisterFailHandler(Fail)
diff --git a/dist/LICENSE b/dist/LICENSE
index a8afea8..71429fb 100644
--- a/dist/LICENSE
+++ b/dist/LICENSE
@@ -256,7 +256,7 @@ BSD-3-Clause licenses
     github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3 BSD-3-Clause
     github.com/pmezard/go-difflib v1.0.0 BSD-3-Clause
     github.com/spf13/pflag v1.0.5 BSD-3-Clause
-    golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e BSD-3-Clause
+    golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 BSD-3-Clause
     golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f BSD-3-Clause
     golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 BSD-3-Clause
     golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 BSD-3-Clause
@@ -305,7 +305,7 @@ MIT licenses
     github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 MIT
     github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2 MIT
     go.etcd.io/bbolt v1.3.6 MIT
-    go.uber.org/atomic v1.9.0 MIT
+    go.uber.org/atomic v1.7.0 MIT
     go.uber.org/multierr v1.8.0 MIT
     go.uber.org/zap v1.17.0 MIT
     gopkg.in/natefinch/lumberjack.v2 v2.0.0 MIT
diff --git a/go.mod b/go.mod
index 3b5b9d7..e49e8eb 100644
--- a/go.mod
+++ b/go.mod
@@ -8,15 +8,20 @@ require (
        github.com/cespare/xxhash v1.1.0
        github.com/dgraph-io/badger/v3 v3.2011.1
        github.com/dgraph-io/ristretto v0.1.0
+       github.com/envoyproxy/protoc-gen-validate v0.1.0
+       github.com/go-chi/chi/v5 v5.0.7
        github.com/golang/mock v1.6.0
-       github.com/golang/protobuf v1.5.2 // indirect
        github.com/google/go-cmp v0.5.8
        github.com/google/uuid v1.3.0
+       github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
+       github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
+       github.com/hashicorp/golang-lru v0.5.4
        github.com/klauspost/compress v1.15.6
        github.com/oklog/run v1.1.0
        github.com/onsi/ginkgo/v2 v2.1.4
        github.com/onsi/gomega v1.19.0
        github.com/pkg/errors v0.9.1
+       github.com/prometheus/client_golang v1.12.2
        github.com/rs/zerolog v1.26.1
        github.com/spf13/cobra v1.4.0
        github.com/spf13/pflag v1.0.5
@@ -25,24 +30,13 @@ require (
        go.etcd.io/etcd/client/v3 v3.5.4
        go.etcd.io/etcd/server/v3 v3.5.4
        go.uber.org/multierr v1.8.0
-       golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
+       golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
+       golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
        google.golang.org/genproto v0.0.0-20220615141314-f1464d18c36b
        google.golang.org/grpc v1.47.0
        google.golang.org/protobuf v1.28.0
 )
 
-require (
-       github.com/envoyproxy/protoc-gen-validate v0.1.0
-       github.com/grpc-ecosystem/grpc-gateway/v2 v2.10.3
-       github.com/hashicorp/golang-lru v0.5.4
-       github.com/prometheus/client_golang v1.12.2
-       go.uber.org/atomic v1.9.0
-       golang.org/x/exp v0.0.0-20220602145555-4a0574d9293f
-       golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3
-)
-
-require golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
-
 require (
        github.com/beorn7/perks v1.0.1 // indirect
        github.com/bits-and-blooms/bitset v1.2.0 // indirect
@@ -53,15 +47,14 @@ require (
        github.com/dustin/go-humanize v1.0.0 // indirect
        github.com/form3tech-oss/jwt-go v3.2.3+incompatible // indirect
        github.com/fsnotify/fsnotify v1.5.4 // indirect
-       github.com/go-chi/chi/v5 v5.0.7
        github.com/gogo/protobuf v1.3.2 // indirect
        github.com/golang/glog v1.0.0 // indirect
        github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // 
indirect
+       github.com/golang/protobuf v1.5.2 // indirect
        github.com/golang/snappy v0.0.3 // indirect
        github.com/google/btree v1.0.1 // indirect
        github.com/google/flatbuffers v1.12.1 // indirect
        github.com/gorilla/websocket v1.4.2 // indirect
-       github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
        github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
        github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
        github.com/hashicorp/hcl v1.0.0 // indirect
@@ -106,11 +99,14 @@ require (
        go.opentelemetry.io/otel/sdk/metric v0.20.0 // indirect
        go.opentelemetry.io/otel/trace v0.20.0 // indirect
        go.opentelemetry.io/proto/otlp v0.7.0 // indirect
+       go.uber.org/atomic v1.7.0 // indirect
        go.uber.org/zap v1.17.0 // indirect
-       golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
+       golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
        golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect
+       golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
        golang.org/x/text v0.3.7 // indirect
        golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
+       golang.org/x/xerrors v0.0.0-20220517211312-f3a8303e98df // indirect
        gopkg.in/ini.v1 v1.66.4 // indirect
        gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
        gopkg.in/yaml.v2 v2.4.0 // indirect
diff --git a/go.sum b/go.sum
index d37cd1d..f6d35ae 100644
--- a/go.sum
+++ b/go.sum
@@ -512,9 +512,8 @@ go.opentelemetry.io/otel/trace v0.20.0/go.mod 
h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16g
 go.opentelemetry.io/proto/otlp v0.7.0 
h1:rwOQPCuKAKmwGKq2aVNnYIibI6wnV7EvzgfTCzcdGg8=
 go.opentelemetry.io/proto/otlp v0.7.0/go.mod 
h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
 go.uber.org/atomic v1.4.0/go.mod 
h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
+go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
 go.uber.org/atomic v1.7.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
-go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
-go.uber.org/atomic v1.9.0/go.mod 
h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
 go.uber.org/goleak v1.1.10 h1:z+mqJhf6ss6BSfSM671tgKyZBFPTTJM+HLxnhPC3wu0=
 go.uber.org/goleak v1.1.10/go.mod 
h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
 go.uber.org/multierr v1.1.0/go.mod 
h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
@@ -536,8 +535,8 @@ golang.org/x/crypto 
v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm
 golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod 
h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
 golang.org/x/crypto v0.0.0-20211215165025-cf75a172585e/go.mod 
h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
 golang.org/x/crypto v0.0.0-20220131195533-30dcbda58838/go.mod 
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e 
h1:T8NU3HyQ8ClP4SEE+KbFlg6n0NhuTsN4MyznaarGsZM=
-golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e/go.mod 
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 
h1:kUhD7nTDoI3fVd9G4ORWrbV5NY0liEs/Jg2pv5f+bBA=
+golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4/go.mod 
h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
 golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod 
h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
 golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod 
h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
@@ -577,8 +576,6 @@ golang.org/x/mod v0.4.0/go.mod 
h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 
h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 
h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o=
-golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod 
h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
 golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod 
h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY=
 golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod 
h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 192a94f..b3eb099 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -46,15 +46,8 @@ func (f FieldKey) Marshal() []byte {
 }
 
 func (f *FieldKey) Unmarshal(raw []byte) error {
-       switch len(raw) {
-       case 12:
-               f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
-               f.IndexRuleID = convert.BytesToUint32(raw[8:])
-       case 4:
-               f.IndexRuleID = convert.BytesToUint32(raw)
-       default:
-               return errors.Wrap(ErrMalformed, "unmarshal field key")
-       }
+       f.SeriesID = common.SeriesID(convert.BytesToUint64(raw[0:8]))
+       f.IndexRuleID = convert.BytesToUint32(raw[8:])
        return nil
 }
 
@@ -72,14 +65,17 @@ func (f Field) Marshal() ([]byte, error) {
 }
 
 func (f *Field) Unmarshal(raw []byte) error {
+       if len(raw) < 13 {
+               return errors.WithMessagef(ErrMalformed, "malformed field: 
expected more than 12, got %d", len(raw))
+       }
        fk := &f.Key
-       err := fk.Unmarshal(raw[:len(raw)-8])
+       err := fk.Unmarshal(raw[:12])
        if err != nil {
                return errors.Wrap(err, "unmarshal a field")
        }
-       termID := raw[len(raw)-8:]
-       f.Term = make([]byte, len(termID))
-       copy(f.Term, termID)
+       term := raw[12:]
+       f.Term = make([]byte, len(term))
+       copy(f.Term, term)
        return nil
 }
 
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 53de61b..e903098 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -55,14 +55,19 @@ func (s *store) Write(field index.Field, itemID 
common.ItemID) error {
 }
 
 type StoreOpts struct {
-       Path   string
-       Logger *logger.Logger
+       Path         string
+       Logger       *logger.Logger
+       MemTableSize int64
 }
 
 func NewStore(opts StoreOpts) (index.Store, error) {
        var err error
        var lsm kv.Store
-       if lsm, err = kv.OpenStore(0, opts.Path+"/lsm", 
kv.StoreWithLogger(opts.Logger)); err != nil {
+       if lsm, err = kv.OpenStore(
+               0,
+               opts.Path+"/lsm",
+               kv.StoreWithLogger(opts.Logger),
+               kv.StoreWithMemTableSize(opts.MemTableSize)); err != nil {
                return nil, err
        }
        return &store{
diff --git a/pkg/run/channel.go b/pkg/run/channel.go
deleted file mode 100644
index 82a9299..0000000
--- a/pkg/run/channel.go
+++ /dev/null
@@ -1,50 +0,0 @@
-// Licensed to Apache Software Foundation (ASF) under one or more contributor
-// license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright
-// ownership. Apache Software Foundation (ASF) licenses this file to you under
-// the Apache License, Version 2.0 (the "License"); you may
-// not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package run
-
-import (
-       "sync"
-)
-
-type Chan[T any] struct {
-       ch     chan T
-       closer sync.WaitGroup
-}
-
-func NewChan[T any](ch chan T) *Chan[T] {
-       return &Chan[T]{
-               ch: ch,
-       }
-}
-
-func (c *Chan[T]) Write(item T) {
-       c.closer.Add(1)
-       defer c.closer.Done()
-       c.ch <- item
-}
-
-func (c *Chan[T]) Read() (T, bool) {
-       item, more := <-c.ch
-       return item, more
-}
-
-func (c *Chan[T]) Close() error {
-       c.closer.Wait()
-       close(c.ch)
-       return nil
-}
diff --git a/pkg/test/helpers/fail_interceptor.go 
b/pkg/test/helpers/fail_interceptor.go
index a442d27..b971088 100644
--- a/pkg/test/helpers/fail_interceptor.go
+++ b/pkg/test/helpers/fail_interceptor.go
@@ -17,8 +17,9 @@
 package helpers
 
 import (
+       "sync/atomic"
+
        "github.com/onsi/gomega/types"
-       "go.uber.org/atomic"
 )
 
 type FailInterceptor struct {
@@ -29,7 +30,7 @@ type FailInterceptor struct {
 func NewFailInterceptor(fail types.GomegaFailHandler) *FailInterceptor {
        return &FailInterceptor{
                ginkgoFail: fail,
-               didFail:    atomic.NewBool(false),
+               didFail:    &atomic.Bool{},
        }
 }
 

Reply via email to