This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new f70f59b  Enhance the inverted index, metadata and logger (#227)
f70f59b is described below

commit f70f59b7201a83d2bf2a8716627ded4a11780b66
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 12 11:53:25 2022 +0800

    Enhance the inverted index, metadata and logger (#227)
---
 banyand/Dockerfile                   |  16 +---
 banyand/k8s.yml                      |   1 +
 banyand/liaison/grpc/measure.go      |  45 +++++-----
 banyand/liaison/grpc/server.go       |   2 +
 banyand/liaison/grpc/stream.go       |  45 +++++-----
 banyand/measure/service.go           |   1 +
 banyand/metadata/metadata.go         |   3 +-
 banyand/metadata/schema/error.go     |   2 +
 banyand/metadata/schema/etcd.go      |  78 ++++++++++++------
 banyand/metadata/schema/etcd_test.go |   8 +-
 banyand/metadata/schema/group.go     |   4 +-
 banyand/stream/service.go            |   1 +
 banyand/tsdb/block.go                |   7 +-
 banyand/tsdb/bucket/queue.go         |   2 +-
 banyand/tsdb/bucket/queue_test.go    |   2 +-
 banyand/tsdb/indexdb.go              |  26 ++----
 banyand/tsdb/tsdb.go                 |   6 ++
 go.mod                               |   2 +-
 pkg/index/index.go                   |  38 +++++++--
 pkg/index/inverted/inverted.go       | 155 +++++++++++++++++++++++------------
 pkg/index/lsm/iterator.go            |  10 +--
 pkg/index/lsm/lsm.go                 |   7 +-
 pkg/index/lsm/search.go              |   8 +-
 pkg/logger/logger.go                 |  59 ++++++++++++-
 pkg/logger/setting.go                |  26 +++---
 pkg/schema/metadata.go               |  65 ++++++++-------
 test/docker/base-compose.yml         |   2 +-
 27 files changed, 380 insertions(+), 241 deletions(-)

diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index dc40e27..a3a0ae7 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -34,8 +34,6 @@ ENV GO111MODULE "on"
 WORKDIR /src
 COPY go.* ./
 RUN go mod download
-RUN GOBIN=/bin go install github.com/grpc-ecosystem/[email protected] \
-    && chmod 755 /bin/grpc-health-probe
 
 FROM base AS builder
 
@@ -44,8 +42,7 @@ RUN --mount=target=. \
             BUILD_DIR=/out make -C banyand all
 
 FROM alpine:edge AS certs
-RUN apk add --no-cache ca-certificates
-RUN update-ca-certificates
+RUN apk add --no-cache ca-certificates && update-ca-certificates
 
 FROM busybox:stable-glibc
 
@@ -57,14 +54,3 @@ EXPOSE 17913
 EXPOSE 6060
 
 ENTRYPOINT ["/banyand-server"]
-
-FROM busybox:stable-glibc AS test
-
-COPY --from=builder /out/banyand-server /banyand-server
-COPY --from=base /bin/grpc-health-probe /grpc-health-probe
-
-EXPOSE 17912
-EXPOSE 17913
-EXPOSE 6060
-
-ENTRYPOINT ["/banyand-server"]
\ No newline at end of file
diff --git a/banyand/k8s.yml b/banyand/k8s.yml
index 0d78bb6..88853e4 100644
--- a/banyand/k8s.yml
+++ b/banyand/k8s.yml
@@ -81,6 +81,7 @@ spec:
         image: apache/skywalking-banyandb:v0.0.0-dev
         args:
         - "standalone"
+        - "--measure-idx-batch-wait-sec=30"
         - "--logging.level=warn"
         - "--logging.modules=measure.measure-default.service_cpm_minute"
         - "--logging.levels=debug"
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index a0c8d0b..9d72fa6 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -23,7 +23,6 @@ import (
        "time"
 
        "github.com/pkg/errors"
-       "github.com/rs/zerolog"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 
@@ -39,34 +38,44 @@ import (
 type measureService struct {
        measurev1.UnimplementedMeasureServiceServer
        *discoveryService
+       sampled *logger.Logger
+}
+
+func (ms *measureService) setLogger(log *logger.Logger) {
+       ms.sampled = log.Sampled(10)
 }
 
 func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) 
error {
-       reply := func() error {
-               return measure.Send(&measurev1.WriteResponse{})
+       reply := func(measure measurev1.MeasureService_WriteServer, logger 
*logger.Logger) {
+               if errResp := measure.Send(&measurev1.WriteResponse{}); errResp 
!= nil {
+                       logger.Err(errResp).Msg("failed to send response")
+               }
        }
-       sampled := ms.log.Sample(&zerolog.BasicSampler{N: 10})
+       ctx := measure.Context()
        for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
                writeRequest, err := measure.Recv()
                if errors.Is(err, io.EOF) {
                        return nil
                }
                if err != nil {
-                       return err
+                       ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to receive message")
+                       reply(measure, ms.sampled)
+                       continue
                }
                if errTime := 
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
-                       sampled.Error().Err(errTime).Stringer("written", 
writeRequest).Msg("the data point time is invalid")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
+                       ms.sampled.Error().Err(errTime).Stringer("written", 
writeRequest).Msg("the data point time is invalid")
+                       reply(measure, ms.sampled)
                        continue
                }
                entity, tagValues, shardID, err := 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
                if err != nil {
-                       sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
+                       ms.sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
+                       reply(measure, ms.sampled)
                        continue
                }
                iwr := &measurev1.InternalWriteRequest{
@@ -80,15 +89,9 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
iwr)
                _, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite, 
message)
                if errWritePub != nil {
-                       sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to send a message")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
-                       continue
-               }
-               if errSend := reply(); errSend != nil {
-                       return errSend
+                       ms.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeRequest)).Msg("failed to send a message")
                }
+               reply(measure, ms.sampled)
        }
 }
 
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 5a58d80..4758585 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -112,6 +112,8 @@ func NewServer(_ context.Context, pipeline queue.Queue, 
repo discovery.ServiceRe
 
 func (s *server) PreRun() error {
        s.log = logger.GetLogger("liaison-grpc")
+       s.streamSVC.setLogger(s.log)
+       s.measureSVC.setLogger(s.log)
        components := []struct {
                discoverySVC *discoveryService
                shardEvent   bus.Topic
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e798da4..9a4a101 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -23,7 +23,6 @@ import (
        "time"
 
        "github.com/pkg/errors"
-       "github.com/rs/zerolog"
        "google.golang.org/grpc/codes"
        "google.golang.org/grpc/status"
 
@@ -39,34 +38,44 @@ import (
 type streamService struct {
        streamv1.UnimplementedStreamServiceServer
        *discoveryService
+       sampled *logger.Logger
+}
+
+func (s *streamService) setLogger(log *logger.Logger) {
+       s.sampled = log.Sampled(10)
 }
 
 func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
-       reply := func() error {
-               return stream.Send(&streamv1.WriteResponse{})
+       reply := func(stream streamv1.StreamService_WriteServer, logger 
*logger.Logger) {
+               if errResp := stream.Send(&streamv1.WriteResponse{}); errResp 
!= nil {
+                       logger.Err(errResp).Msg("failed to send response")
+               }
        }
-       sampled := s.log.Sample(&zerolog.BasicSampler{N: 10})
+       ctx := stream.Context()
        for {
+               select {
+               case <-ctx.Done():
+                       return ctx.Err()
+               default:
+               }
                writeEntity, err := stream.Recv()
                if errors.Is(err, io.EOF) {
                        return nil
                }
                if err != nil {
-                       return err
+                       s.sampled.Error().Stringer("written", 
writeEntity).Err(err).Msg("failed to receive message")
+                       reply(stream, s.sampled)
+                       continue
                }
                if errTime := 
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
-                       sampled.Error().Stringer("written", 
writeEntity).Err(errTime).Msg("the element time is invalid")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
+                       s.sampled.Error().Stringer("written", 
writeEntity).Err(errTime).Msg("the element time is invalid")
+                       reply(stream, s.sampled)
                        continue
                }
                entity, tagValues, shardID, err := 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
                if err != nil {
-                       sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
+                       s.sampled.Error().Err(err).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
+                       reply(stream, s.sampled)
                        continue
                }
                iwr := &streamv1.InternalWriteRequest{
@@ -80,15 +89,9 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
iwr)
                _, errWritePub := s.pipeline.Publish(data.TopicStreamWrite, 
message)
                if errWritePub != nil {
-                       sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to send a message")
-                       if errResp := reply(); errResp != nil {
-                               return errResp
-                       }
-                       continue
-               }
-               if errSend := reply(); errSend != nil {
-                       return errSend
+                       s.sampled.Error().Err(errWritePub).RawJSON("written", 
logger.Proto(writeEntity)).Msg("failed to send a message")
                }
+               reply(stream, s.sampled)
        }
 }
 
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 60e615d..a606c9f 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -82,6 +82,7 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of 
database")
        flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size", 
16<<20, "block memory size")
        flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 
1<<20, "series metadata memory size")
+       flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec, 
"measure-idx-batch-wait-sec", 1, "index batch wait in second")
        return flagS
 }
 
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 44f5953..e4f3dd4 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -29,7 +29,6 @@ import (
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
@@ -88,7 +87,7 @@ func (s *service) PreRun() error {
        var err error
        s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
                schema.ConfigureListener(s.listenClientURL, s.listenPeerURL),
-               schema.RootDir(s.rootDir), 
schema.LoggerLevel(logger.GetLogger().GetLevel().String()))
+               schema.RootDir(s.rootDir))
        if err != nil {
                return err
        }
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index ee4efa6..00e32ea 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -34,6 +34,8 @@ var (
        errGRPCAlreadyExists       = statusGRPCAlreadyExists.Err()
        statusDataLoss             = status.New(codes.DataLoss, "banyandb: 
resource corrupts.")
        errGRPCDataLoss            = statusDataLoss.Err()
+
+       errClosed = errors.New("metadata registry is closed")
 )
 
 // BadRequest creates a gRPC error with error details with type BadRequest,
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index d7c4cf3..91b0b24 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -23,15 +23,20 @@ import (
        "os"
        "path/filepath"
        "sync"
+       "time"
 
        "github.com/pkg/errors"
        clientv3 "go.etcd.io/etcd/client/v3"
        "go.etcd.io/etcd/server/v3/embed"
+       "go.uber.org/zap"
+       "google.golang.org/grpc"
        "google.golang.org/protobuf/proto"
 
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        propertyv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 var (
@@ -62,13 +67,6 @@ func RootDir(rootDir string) RegistryOption {
        }
 }
 
-// LoggerLevel sets the logger level.
-func LoggerLevel(loggerLevel string) RegistryOption {
-       return func(config *etcdSchemaRegistryConfig) {
-               config.loggerLevel = loggerLevel
-       }
-}
-
 // ConfigureListener sets client and peer urls of listeners.
 func ConfigureListener(lc, lp string) RegistryOption {
        return func(config *etcdSchemaRegistryConfig) {
@@ -89,7 +87,7 @@ func (eh *eventHandler) interestOf(kind Kind) bool {
 type etcdSchemaRegistry struct {
        server   *embed.Etcd
        client   *clientv3.Client
-       kv       clientv3.KV
+       closer   *run.Closer
        handlers []*eventHandler
        mux      sync.RWMutex
 }
@@ -101,8 +99,6 @@ type etcdSchemaRegistryConfig struct {
        listenerClientURL string
        // listenerPeerURL is the listener for peer
        listenerPeerURL string
-       // loggerLevel defines log level
-       loggerLevel string
 }
 
 func (e *etcdSchemaRegistry) RegisterHandler(kind Kind, handler EventHandler) {
@@ -149,6 +145,7 @@ func (e *etcdSchemaRegistry) StoppingNotify() <-chan 
struct{} {
 }
 
 func (e *etcdSchemaRegistry) Close() error {
+       e.closer.CloseThenWait()
        _ = e.client.Close()
        e.server.Close()
        return nil
@@ -164,8 +161,15 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
        for _, opt := range options {
                opt(registryConfig)
        }
+       zapCfg := logger.GetLogger("etcd").ToZapConfig()
+
+       var l *zap.Logger
+       var err error
+       if l, err = zapCfg.Build(); err != nil {
+               return nil, err
+       }
        // TODO: allow use cluster setting
-       embedConfig := newStandaloneEtcdConfig(registryConfig)
+       embedConfig := newStandaloneEtcdConfig(registryConfig, l)
        e, err := embed.StartEtcd(embedConfig)
        if err != nil {
                return nil, err
@@ -173,21 +177,33 @@ func NewEtcdSchemaRegistry(options ...RegistryOption) 
(Registry, error) {
        if e != nil {
                <-e.Server.ReadyNotify() // wait for e.Server to join the 
cluster
        }
-       client, err := clientv3.NewFromURL(e.Config().ACUrls[0].String())
+
+       config := clientv3.Config{
+               Endpoints:            []string{e.Config().ACUrls[0].String()},
+               DialTimeout:          5 * time.Second,
+               DialKeepAliveTime:    30 * time.Second,
+               DialKeepAliveTimeout: 10 * time.Second,
+               DialOptions:          []grpc.DialOption{grpc.WithBlock()},
+               Logger:               l,
+       }
+       client, err := clientv3.New(config)
        if err != nil {
                return nil, err
        }
-       kvClient := clientv3.NewKV(client)
        reg := &etcdSchemaRegistry{
                server: e,
-               kv:     kvClient,
                client: client,
+               closer: run.NewCloser(0),
        }
        return reg, nil
 }
 
 func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message 
proto.Message) error {
-       resp, err := e.kv.Get(ctx, key)
+       if !e.closer.AddRunning() {
+               return errClosed
+       }
+       defer e.closer.Done()
+       resp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
        }
@@ -212,11 +228,15 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key 
string, message proto.
 // and overwrite the existing value if so.
 // Otherwise, it will return ErrGRPCResourceNotFound.
 func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) 
error {
+       if !e.closer.AddRunning() {
+               return errClosed
+       }
+       defer e.closer.Done()
        key, err := metadata.key()
        if err != nil {
                return err
        }
-       getResp, err := e.kv.Get(ctx, key)
+       getResp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
        }
@@ -239,7 +259,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, 
metadata Metadata) erro
                }
 
                modRevision := getResp.Kvs[0].ModRevision
-               txnResp, txnErr := e.kv.Txn(ctx).
+               txnResp, txnErr := e.client.Txn(ctx).
                        If(clientv3.Compare(clientv3.ModRevision(key), "=", 
modRevision)).
                        Then(clientv3.OpPut(key, string(val))).
                        Commit()
@@ -260,11 +280,15 @@ func (e *etcdSchemaRegistry) update(ctx context.Context, 
metadata Metadata) erro
 // and put the value if it does not exist.
 // Otherwise, it will return ErrGRPCAlreadyExists.
 func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata) 
error {
+       if !e.closer.AddRunning() {
+               return errClosed
+       }
+       defer e.closer.Done()
        key, err := metadata.key()
        if err != nil {
                return err
        }
-       getResp, err := e.kv.Get(ctx, key)
+       getResp, err := e.client.Get(ctx, key)
        if err != nil {
                return err
        }
@@ -279,7 +303,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, 
metadata Metadata) erro
        if replace {
                return errGRPCAlreadyExists
        }
-       _, err = e.kv.Put(ctx, key, string(val))
+       _, err = e.client.Put(ctx, key, string(val))
        if err != nil {
                return err
        }
@@ -289,7 +313,11 @@ func (e *etcdSchemaRegistry) create(ctx context.Context, 
metadata Metadata) erro
 }
 
 func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix 
string, factory func() proto.Message) ([]proto.Message, error) {
-       resp, err := e.kv.Get(ctx, prefix, clientv3.WithFromKey(), 
clientv3.WithRange(incrementLastByte(prefix)))
+       if !e.closer.AddRunning() {
+               return nil, errClosed
+       }
+       defer e.closer.Done()
+       resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(), 
clientv3.WithRange(incrementLastByte(prefix)))
        if err != nil {
                return nil, err
        }
@@ -314,11 +342,15 @@ func listPrefixesForEntity(group, entityPrefix string) 
string {
 }
 
 func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata) 
(bool, error) {
+       if !e.closer.AddRunning() {
+               return false, errClosed
+       }
+       defer e.closer.Done()
        key, err := metadata.key()
        if err != nil {
                return false, err
        }
-       resp, err := e.kv.Delete(ctx, key, clientv3.WithPrevKV())
+       resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV())
        if err != nil {
                return false, err
        }
@@ -365,9 +397,9 @@ func incrementLastByte(key string) string {
        return string(bb)
 }
 
-func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig) *embed.Config {
+func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger 
*zap.Logger) *embed.Config {
        cfg := embed.NewConfig()
-       cfg.LogLevel = config.loggerLevel
+       cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)
        cfg.Dir = filepath.Join(config.rootDir, "metadata")
        cURL, _ := url.Parse(config.listenerClientURL)
        pURL, _ := url.Parse(config.listenerPeerURL)
diff --git a/banyand/metadata/schema/etcd_test.go 
b/banyand/metadata/schema/etcd_test.go
index 5532d24..22bb5a0 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -136,7 +136,7 @@ func useRandomPort() RegistryOption {
 
 func Test_Etcd_Entity_Get(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir(), LoggerLevel("warn"))
+       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
        tester.NoError(err)
        tester.NotNil(registry)
        defer registry.Close()
@@ -228,7 +228,7 @@ func Test_Etcd_Entity_Get(t *testing.T) {
 
 func Test_Etcd_Entity_List(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir(), LoggerLevel("warn"))
+       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
        tester.NoError(err)
        tester.NotNil(registry)
        defer registry.Close()
@@ -310,7 +310,7 @@ func Test_Etcd_Entity_List(t *testing.T) {
 
 func Test_Etcd_Delete(t *testing.T) {
        tester := assert.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir(), LoggerLevel("warn"))
+       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
        tester.NoError(err)
        tester.NotNil(registry)
        defer registry.Close()
@@ -379,7 +379,7 @@ func Test_Etcd_Delete(t *testing.T) {
 
 func Test_Notify(t *testing.T) {
        req := require.New(t)
-       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir(), LoggerLevel("warn"))
+       registry, err := NewEtcdSchemaRegistry(useRandomPort(), 
useRandomTempDir())
        req.NoError(err)
        req.NotNil(registry)
        defer registry.Close()
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 3ecb0c7..e6ed49f 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -43,7 +43,7 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context, 
group string) (*commo
 }
 
 func (e *etcdSchemaRegistry) ListGroup(ctx context.Context) 
([]*commonv1.Group, error) {
-       messages, err := e.kv.Get(ctx, groupsKeyPrefix, clientv3.WithFromKey(), 
clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
+       messages, err := e.client.Get(ctx, groupsKeyPrefix, 
clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
        if err != nil {
                return nil, err
        }
@@ -69,7 +69,7 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context, 
group string) (boo
                return false, errors.Wrap(err, group)
        }
        keyPrefix := groupsKeyPrefix + g.GetMetadata().GetName() + "/"
-       resp, err := e.kv.Delete(ctx, keyPrefix, 
clientv3.WithRange(incrementLastByte(keyPrefix)))
+       resp, err := e.client.Delete(ctx, keyPrefix, 
clientv3.WithRange(incrementLastByte(keyPrefix)))
        if err != nil {
                return false, err
        }
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index ac95748..c4f2c7b 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -76,6 +76,7 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20, 
"block memory size")
        flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size", 
1<<20, "series metadata memory size")
        flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize, 
"stream-global-index-mem-size", 2<<20, "global index memory size")
+       flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec, 
"stream-idx-batch-wait-sec", 1, "index batch wait in second")
        return flagS
 }
 
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index cdfd8f2..66251bf 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,6 +78,7 @@ type block struct {
        lock        sync.RWMutex
        segID       SectionID
        blockID     SectionID
+       indexOpts   InvertedIndexOpts
 }
 
 type blockOpts struct {
@@ -145,6 +146,7 @@ func (b *block) options(ctx context.Context) {
        if b.lsmMemSize < defaultKVMemorySize {
                b.lsmMemSize = defaultKVMemorySize
        }
+       b.indexOpts = options.BlockInvertedIndex
 }
 
 func (b *block) openSafely() (err error) {
@@ -171,8 +173,9 @@ func (b *block) open() (err error) {
        }
        b.closableLst = append(b.closableLst, b.store)
        if b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
-               Path:   path.Join(b.path, componentSecondInvertedIdx),
-               Logger: b.l.Named(componentSecondInvertedIdx),
+               Path:         path.Join(b.path, componentSecondInvertedIdx),
+               Logger:       b.l.Named(componentSecondInvertedIdx),
+               BatchWaitSec: b.indexOpts.BatchWaitSec,
        }); err != nil {
                return err
        }
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 6cb1bfa..dc86953 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -101,7 +101,7 @@ func NewQueue(l *logger.Logger, size int, maxSize int, 
scheduler *timestamp.Sche
                evictFn:     evictFn,
                l:           l,
        }
-       if err := scheduler.Register(QueueName, cron.Descriptor, "@every 1m", 
c.cleanEvict); err != nil {
+       if err := scheduler.Register(QueueName, cron.Descriptor, "@every 5m", 
c.cleanEvict); err != nil {
                return nil, err
        }
        return c, nil
diff --git a/banyand/tsdb/bucket/queue_test.go 
b/banyand/tsdb/bucket/queue_test.go
index ac85449..eb78072 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -132,7 +132,7 @@ var _ = Describe("Queue", func() {
                Expect(enRecentSize).To(Equal(192))
                Expect(l.Len()).To(Equal(128))
                Expect(len(evictLst)).To(Equal(0))
-               clock.Add(time.Minute)
+               clock.Add(6 * time.Minute)
                if !scheduler.Trigger(bucket.QueueName) {
                        Fail("trigger fails")
                }
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index cb82f88..d0fff8f 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -59,12 +59,9 @@ type indexDB struct {
 
 func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
        result := make([]GlobalItemID, 0)
-       f, err := field.Marshal()
-       if err != nil {
-               return nil, err
-       }
+       f := field.Marshal()
        for _, s := range i.segCtrl.segments() {
-               err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
+               err := s.globalIndex.GetAll(f, func(rawBytes []byte) error {
                        id := &GlobalItemID{}
                        errUnMarshal := id.unMarshal(rawBytes)
                        if errUnMarshal != nil {
@@ -76,8 +73,11 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, 
error) {
                if errors.Is(err, kv.ErrKeyNotFound) {
                        return result, nil
                }
+               if err != nil {
+                       return nil, err
+               }
        }
-       return result, err
+       return result, nil
 }
 
 func (i *indexDB) WriterBuilder() IndexWriterBuilder {
@@ -151,12 +151,7 @@ func (i *indexWriter) WriteLSMIndex(fields []index.Field) 
(err error) {
                if i.scope != nil {
                        field.Key.SeriesID = GlobalSeriesID(i.scope)
                }
-               key, errInternal := field.Marshal()
-               if errInternal != nil {
-                       err = multierr.Append(err, errInternal)
-                       continue
-               }
-               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
+               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
        }
        return err
 }
@@ -166,12 +161,7 @@ func (i *indexWriter) WriteInvertedIndex(fields 
[]index.Field) (err error) {
                if i.scope != nil {
                        field.Key.SeriesID = GlobalSeriesID(i.scope)
                }
-               key, errInternal := field.Marshal()
-               if errInternal != nil {
-                       err = multierr.Append(err, errInternal)
-                       continue
-               }
-               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
+               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
        }
        return err
 }
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8f6ad3a..eb517d1 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -102,12 +102,18 @@ type DatabaseOpts struct {
        BlockInterval      IntervalRule
        TTL                IntervalRule
        BlockMemSize       int64
+       BlockInvertedIndex InvertedIndexOpts
        SeriesMemSize      int64
        GlobalIndexMemSize int64
        ShardNum           uint32
        EnableGlobalIndex  bool
 }
 
+// InvertedIndexOpts wraps options to create the block inverted index.
+type InvertedIndexOpts struct {
+       BatchWaitSec int64
+}
+
 // EncodingMethod wraps encoder/decoder pools to flush/compact data on disk.
 type EncodingMethod struct {
        EncoderPool encoding.SeriesEncoderPool
diff --git a/go.mod b/go.mod
index d52b8c1..a74b376 100644
--- a/go.mod
+++ b/go.mod
@@ -118,7 +118,7 @@ require (
        go.opentelemetry.io/otel/trace v1.11.1 // indirect
        go.opentelemetry.io/proto/otlp v0.19.0 // indirect
        go.uber.org/atomic v1.10.0 // indirect
-       go.uber.org/zap v1.23.0 // indirect
+       go.uber.org/zap v1.23.0
        golang.org/x/crypto v0.3.0 // indirect
        golang.org/x/net v0.2.0 // indirect
        golang.org/x/sys v0.2.0 // indirect
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 36849e6..1862231 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -49,10 +49,12 @@ func (f FieldKey) MarshalIndexRule() string {
 
 // Marshal encodes f to bytes.
 func (f FieldKey) Marshal() []byte {
-       return bytes.Join([][]byte{
-               f.SeriesID.Marshal(),
-               convert.Uint32ToBytes(f.IndexRuleID),
-       }, nil)
+       s := f.SeriesID.Marshal()
+       i := []byte(f.MarshalIndexRule())
+       b := make([]byte, len(s)+len(i))
+       bp := copy(b, s)
+       copy(b[bp:], i)
+       return b
 }
 
 // MarshalToStr encodes f to string.
@@ -79,8 +81,20 @@ type Field struct {
 }
 
 // Marshal encodes f to bytes.
-func (f Field) Marshal() ([]byte, error) {
-       return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
+func (f Field) Marshal() []byte {
+       s := f.Key.SeriesID.Marshal()
+       i := []byte(f.Key.MarshalIndexRule())
+       b := make([]byte, len(s)+len(i)+len(f.Term))
+       bp := copy(b, s)
+       bp += copy(b[bp:], i)
+       copy(b[bp:], f.Term)
+       return b
+}
+
+// FieldStr return a string represent of Field which is composed by key and 
term.
+func FieldStr(key FieldKey, term []byte) string {
+       f := Field{Key: key, Term: term}
+       return string(f.Marshal())
 }
 
 // Unmarshal decodes bytes to f.
@@ -93,12 +107,18 @@ func (f *Field) Unmarshal(raw []byte) error {
        if err != nil {
                return errors.Wrap(err, "unmarshal a field")
        }
-       term := raw[12:]
-       f.Term = make([]byte, len(term))
-       copy(f.Term, term)
+       f.Term = UnmarshalTerm(raw)
        return nil
 }
 
+// UnmarshalTerm decodes term from a encoded field.
+func UnmarshalTerm(raw []byte) []byte {
+       term := raw[12:]
+       result := make([]byte, len(term))
+       copy(result, term)
+       return result
+}
+
 // RangeOpts contains options to performance a continuous scan.
 type RangeOpts struct {
        Upper         []byte
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index d1efa79..3020f3c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -28,6 +28,7 @@ import (
        "github.com/blugelabs/bluge"
        "github.com/blugelabs/bluge/analysis"
        "github.com/blugelabs/bluge/analysis/analyzer"
+       blugeIndex "github.com/blugelabs/bluge/index"
        "github.com/blugelabs/bluge/search"
        "github.com/dgraph-io/badger/v3/y"
        "go.uber.org/multierr"
@@ -48,6 +49,12 @@ const (
        docID         = "_id"
        batchSize     = 1024
        seriesIDField = "series_id"
+       idField       = "id"
+)
+
+var (
+       defaultUpper = convert.Uint64ToBytes(math.MaxUint64)
+       defaultLower = convert.Uint64ToBytes(0)
 )
 
 var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
@@ -64,8 +71,9 @@ var _ index.Store = (*store)(nil)
 
 // StoreOpts wraps options to create a inverted index repository.
 type StoreOpts struct {
-       Logger *logger.Logger
-       Path   string
+       Logger       *logger.Logger
+       Path         string
+       BatchWaitSec int64
 }
 
 type doc struct {
@@ -78,26 +86,37 @@ type flushEvent struct {
 }
 
 type store struct {
-       writer *bluge.Writer
-       ch     chan any
-       closer *run.Closer
-       l      *logger.Logger
+       writer        *bluge.Writer
+       ch            chan any
+       closer        *run.Closer
+       l             *logger.Logger
+       batchInterval time.Duration
 }
 
 // NewStore create a new inverted index repository.
 func NewStore(opts StoreOpts) (index.Store, error) {
-       config := bluge.DefaultConfig(opts.Path)
+       indexConfig := blugeIndex.DefaultConfig(opts.Path).WithUnsafeBatches().
+               WithPersisterNapTimeMSec(60 * 1000)
+       indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1
+       indexConfig.MergePlanOptions.MaxSegmentSize = 500000
+       indexConfig.MergePlanOptions.SegmentsPerMergeTask = 20
+       config := bluge.DefaultConfigWithIndexConfig(indexConfig)
        config.DefaultSearchAnalyzer = 
analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
        config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
        w, err := bluge.OpenWriter(config)
        if err != nil {
                return nil, err
        }
+       sec := opts.BatchWaitSec
+       if sec < 1 {
+               sec = 1
+       }
        s := &store{
-               writer: w,
-               l:      opts.Logger,
-               ch:     make(chan any, batchSize),
-               closer: run.NewCloser(1),
+               writer:        w,
+               batchInterval: time.Duration(sec * int64(time.Second)),
+               l:             opts.Logger,
+               ch:            make(chan any, batchSize),
+               closer:        run.NewCloser(1),
        }
        s.run()
        return s, nil
@@ -136,21 +155,40 @@ func (s *store) Iterator(fieldKey index.FieldKey, 
termRange index.RangeOpts, ord
                bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
                return index.DummyFieldIterator, nil
        }
+       if termRange.Upper == nil {
+               termRange.Upper = defaultUpper
+       }
+       if termRange.Lower == nil {
+               termRange.Lower = defaultLower
+       }
        reader, err := s.writer.Reader()
        if err != nil {
                return nil, err
        }
        fk := fieldKey.MarshalIndexRule()
-       query := bluge.NewBooleanQuery()
-       query.
-               
AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField)).
-               AddMust(bluge.NewTermRangeInclusiveQuery(
-                       string(termRange.Lower),
-                       string(termRange.Upper),
+       var query bluge.Query
+       shouldDecodeTerm := true
+       if fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+               query = bluge.NewTermRangeInclusiveQuery(
+                       index.FieldStr(fieldKey, termRange.Lower),
+                       index.FieldStr(fieldKey, termRange.Upper),
                        termRange.IncludesLower,
                        termRange.IncludesUpper,
                ).
-                       SetField(fk))
+                       SetField(fk)
+       } else {
+               shouldDecodeTerm = false
+               query = bluge.NewBooleanQuery().
+                       AddMust(bluge.NewTermRangeInclusiveQuery(
+                               string(termRange.Lower),
+                               string(termRange.Upper),
+                               termRange.IncludesLower,
+                               termRange.IncludesUpper,
+                       ).
+                               SetField(fk)).
+                       
AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
+       }
+
        sortedKey := fk
        if order == modelv1.Sort_SORT_DESC {
                sortedKey = "-" + sortedKey
@@ -159,7 +197,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
        if err != nil {
                return nil, err
        }
-       result := newBlugeMatchIterator(documentMatchIterator, fk)
+       result := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm)
        return &result, nil
 }
 
@@ -173,15 +211,21 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
                return nil, err
        }
        fk := field.Key.MarshalIndexRule()
-       query := bluge.NewBooleanQuery()
-       query.
-               
AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())).SetField(seriesIDField)).
-               AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk))
+       var query bluge.Query
+       shouldDecodeTerm := true
+       if field.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+               query = bluge.NewTermQuery(string(field.Marshal())).SetField(fk)
+       } else {
+               shouldDecodeTerm = false
+               query = bluge.NewBooleanQuery().
+                       
AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk)).
+                       
AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())).SetField(seriesIDField))
+       }
        documentMatchIterator, err := reader.Search(context.Background(), 
bluge.NewAllMatches(query))
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk)
+       iter := newBlugeMatchIterator(documentMatchIterator, fk, 
shouldDecodeTerm)
        list = roaring.NewPostingList()
        for iter.Next() {
                err = multierr.Append(err, list.Union(iter.Val().Value))
@@ -191,28 +235,26 @@ func (s *store) MatchTerms(field index.Field) (list 
posting.List, err error) {
 }
 
 func (s *store) Match(fieldKey index.FieldKey, matches []string) 
(posting.List, error) {
-       if len(matches) == 0 {
+       if len(matches) == 0 || fieldKey.Analyzer == 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
                return roaring.DummyPostingList, nil
        }
        reader, err := s.writer.Reader()
        if err != nil {
                return nil, err
        }
+       analyzer := analyzers[fieldKey.Analyzer]
        fk := fieldKey.MarshalIndexRule()
        query := bluge.NewBooleanQuery()
        
query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
        for _, m := range matches {
-               q := bluge.NewMatchQuery(m).SetField(fk)
-               if fieldKey.Analyzer != 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-                       q.SetAnalyzer(analyzers[fieldKey.Analyzer])
-               }
-               query.AddMust(q)
+               query.AddMust(bluge.NewMatchQuery(m).SetField(fk).
+                       SetAnalyzer(analyzer))
        }
        documentMatchIterator, err := reader.Search(context.Background(), 
bluge.NewAllMatches(query))
        if err != nil {
                return nil, err
        }
-       iter := newBlugeMatchIterator(documentMatchIterator, fk)
+       iter := newBlugeMatchIterator(documentMatchIterator, fk, false)
        list := roaring.NewPostingList()
        for iter.Next() {
                err = multierr.Append(err, list.Union(iter.Val().Value))
@@ -251,8 +293,7 @@ func (s *store) run() {
                }
                var docIDBuffer bytes.Buffer
                for {
-                       timer := time.NewTimer(time.Second)
-                       docIDBuffer.Reset()
+                       timer := time.NewTimer(s.batchInterval)
                        select {
                        case <-s.closer.CloseNotify():
                                return
@@ -267,26 +308,29 @@ func (s *store) run() {
                                case doc:
                                        // TODO: generate a segment directly.
                                        fk := d.fields[0].Key
+                                       docIDBuffer.Reset()
                                        docIDBuffer.Write(fk.SeriesID.Marshal())
                                        
docIDBuffer.Write(convert.Uint64ToBytes(uint64(d.itemID)))
                                        doc := 
bluge.NewDocument(docIDBuffer.String())
-
+                                       toAddSeriesIDField := false
                                        for _, f := range d.fields {
-                                               
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, 
f.Key.SeriesID.Marshal()))
-                                               field := 
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).StoreValue().Sortable()
-                                               if f.Key.Analyzer != 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-                                                       
field.WithAnalyzer(analyzers[f.Key.Analyzer])
+                                               if f.Key.Analyzer == 
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+                                                       
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Marshal()).StoreValue().Sortable())
+                                               } else {
+                                                       toAddSeriesIDField = 
true
+                                                       
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(), 
f.Term).StoreValue().Sortable().
+                                                               
WithAnalyzer(analyzers[f.Key.Analyzer]))
                                                }
-                                               doc.AddField(field)
+                                       }
+                                       if toAddSeriesIDField {
+                                               
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
                                        }
                                        size++
+                                       batch.Update(doc.ID(), doc)
                                        if size >= batchSize {
                                                flush()
-                                       } else {
-                                               batch.Update(doc.ID(), doc)
                                        }
                                }
-
                        case <-timer.C:
                                flush()
                        }
@@ -308,18 +352,20 @@ func (s *store) flush() {
 }
 
 type blugeMatchIterator struct {
-       delegated search.DocumentMatchIterator
-       err       error
-       current   *index.PostingValue
-       agg       *index.PostingValue
-       fieldKey  string
-       closed    bool
+       delegated        search.DocumentMatchIterator
+       err              error
+       current          *index.PostingValue
+       agg              *index.PostingValue
+       fieldKey         string
+       shouldDecodeTerm bool
+       closed           bool
 }
 
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey 
string) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey 
string, shouldDecodeTerm bool) blugeMatchIterator {
        return blugeMatchIterator{
-               delegated: delegated,
-               fieldKey:  fieldKey,
+               delegated:        delegated,
+               fieldKey:         fieldKey,
+               shouldDecodeTerm: shouldDecodeTerm,
        }
 }
 
@@ -363,7 +409,12 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
                        i++
                }
                if field == bmi.fieldKey {
-                       term = y.Copy(value)
+                       v := y.Copy(value)
+                       if bmi.shouldDecodeTerm {
+                               term = index.UnmarshalTerm(v)
+                       } else {
+                               term = v
+                       }
                        i++
                }
                return i < 2
diff --git a/pkg/index/lsm/iterator.go b/pkg/index/lsm/iterator.go
index 694e7ab..dc82435 100644
--- a/pkg/index/lsm/iterator.go
+++ b/pkg/index/lsm/iterator.go
@@ -89,7 +89,7 @@ func (f *fieldIteratorTemplate) Close() error {
 
 func newFieldIteratorTemplate(l *logger.Logger, fieldKey index.FieldKey, 
termRange index.RangeOpts, order modelv1.Sort, iterable kv.Iterable,
        fn compositePostingValueFn,
-) (*fieldIteratorTemplate, error) {
+) *fieldIteratorTemplate {
        if termRange.Upper == nil {
                termRange.Upper = defaultUpper
        }
@@ -117,17 +117,13 @@ func newFieldIteratorTemplate(l *logger.Logger, fieldKey 
index.FieldKey, termRan
                Key:  fieldKey,
                Term: term,
        }
-       seekKey, err := field.Marshal()
-       if err != nil {
-               return nil, err
-       }
        return &fieldIteratorTemplate{
                delegated: newDelegateIterator(iter, fieldKey, l),
                termRange: termRange,
                fn:        fn,
                reverse:   reverse,
-               seekKey:   seekKey,
-       }, nil
+               seekKey:   field.Marshal(),
+       }
 }
 
 func parseKey(fieldKey index.FieldKey, key []byte) (index.Field, error) {
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 835a199..55721f4 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -46,13 +46,8 @@ func (s *store) Close() error {
 
 func (s *store) Write(fields []index.Field, itemID common.ItemID) (err error) {
        for _, field := range fields {
-               f, errInternal := field.Marshal()
-               if errInternal != nil {
-                       err = multierr.Append(err, errInternal)
-                       continue
-               }
                itemIDInt := uint64(itemID)
-               err = multierr.Append(err, s.lsm.PutWithVersion(f, 
convert.Uint64ToBytes(itemIDInt), itemIDInt))
+               err = multierr.Append(err, 
s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt), 
itemIDInt))
        }
        return err
 }
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 4af241a..d494cad 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -39,12 +39,8 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list 
posting.List, err erro
 }
 
 func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
-       f, err := field.Marshal()
-       if err != nil {
-               return nil, err
-       }
        list = roaring.NewPostingList()
-       err = s.lsm.GetAll(f, func(itemID []byte) error {
+       err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
                list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
                return nil
        })
@@ -94,7 +90,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange 
index.RangeOpts, ord
                                pv.Value.Insert(common.ItemID(itemID))
                        }
                        return pv, nil
-               })
+               }), nil
 }
 
 func (s *store) Match(_ index.FieldKey, _ []string) (posting.List, error) {
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 57122b5..64dbff3 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -21,9 +21,13 @@ package logger
 
 import (
        "context"
+       "fmt"
        "strings"
+       "time"
 
        "github.com/rs/zerolog"
+       "go.uber.org/zap"
+       "go.uber.org/zap/zapcore"
 )
 
 // ContextKey is the key to store Logger in the context.
@@ -42,8 +46,9 @@ type Logging struct {
 // Logger is wrapper for rs/zerolog logger with module, it is singleton.
 type Logger struct {
        *zerolog.Logger
-       modules map[string]zerolog.Level
-       module  string
+       modules     map[string]zerolog.Level
+       module      string
+       development bool
 }
 
 // Module returns logger's module name.
@@ -73,7 +78,55 @@ func (l *Logger) Named(name ...string) *Logger {
                }
        }
        subLogger := root.l.With().Str("module", 
moduleBuilder.String()).Logger().Level(level)
-       return &Logger{module: module, modules: l.modules, Logger: &subLogger}
+       return &Logger{module: module, modules: l.modules, development: 
l.development, Logger: &subLogger}
+}
+
+// Sampled return a Logger with a sampler that will send every Nth events.
+func (l *Logger) Sampled(n uint32) *Logger {
+       sampled := l.Logger.Sample(&zerolog.BasicSampler{N: n})
+       l.Logger = &sampled
+       return l
+}
+
+// ToZapConfig outputs the zap config is derived from l.
+func (l *Logger) ToZapConfig() zap.Config {
+       level, err := zap.ParseAtomicLevel(l.GetLevel().String())
+       if err != nil {
+               panic(err)
+       }
+       if !l.development {
+               config := zap.NewProductionConfig()
+               config.Level = level
+               return config
+       }
+       encoderConfig := zapcore.EncoderConfig{
+               // Keys can be anything except the empty string.
+               TimeKey:       "T",
+               LevelKey:      "L",
+               NameKey:       "N",
+               CallerKey:     "C",
+               FunctionKey:   zapcore.OmitKey,
+               MessageKey:    "M",
+               StacktraceKey: "S",
+               LineEnding:    zapcore.DefaultLineEnding,
+               EncodeLevel: func(l zapcore.Level, pae 
zapcore.PrimitiveArrayEncoder) {
+                       pae.AppendString(strings.ToUpper(fmt.Sprintf("| %-6s|", 
l.String())))
+               },
+               EncodeTime: func(t time.Time, pae 
zapcore.PrimitiveArrayEncoder) {
+                       pae.AppendString(t.Format(time.RFC3339))
+               },
+               EncodeDuration:   zapcore.StringDurationEncoder,
+               EncodeCaller:     zapcore.FullCallerEncoder,
+               ConsoleSeparator: " ",
+       }
+       return zap.Config{
+               Level:            level,
+               Development:      true,
+               Encoding:         "console",
+               EncoderConfig:    encoderConfig,
+               OutputPaths:      []string{"stderr"},
+               ErrorOutputPaths: []string{"stderr"},
+       }
 }
 
 // Loggable indicates the implement supports logging.
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 1018bfe..46a75ee 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -112,25 +112,21 @@ func getLogger(cfg Logging) (*Logger, error) {
                return nil, err
        }
        var w io.Writer
-       switch cfg.Env {
-       case "dev":
+       development := strings.EqualFold(cfg.Env, "DEV")
+
+       if development {
                cw := zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: 
time.RFC3339}
                cw.FormatLevel = func(i interface{}) string {
                        return strings.ToUpper(fmt.Sprintf("| %-6s|", i))
                }
-               cw.FormatMessage = func(i interface{}) string {
-                       return fmt.Sprintf("***%s****", i)
-               }
-               cw.FormatFieldName = func(i interface{}) string {
-                       return fmt.Sprintf("%s:", i)
-               }
-               cw.FormatFieldValue = func(i interface{}) string {
-                       return strings.ToUpper(fmt.Sprintf("%s", i))
-               }
                w = io.Writer(cw)
-       default:
-               w = os.Stdout
+       } else {
+               w = os.Stderr
+       }
+       ctx := zerolog.New(w).Level(lvl).With().Timestamp()
+       if development {
+               ctx = ctx.Stack().Caller()
        }
-       l := zerolog.New(w).Level(lvl).With().Timestamp().Logger()
-       return &Logger{module: rootName, Logger: &l, modules: modules}, nil
+       l := ctx.Logger()
+       return &Logger{module: rootName, Logger: &l, modules: modules, 
development: development}, nil
 }
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 4388e45..e217103 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,7 +21,6 @@ package schema
 import (
        "context"
        "io"
-       "runtime"
        "sync"
        "sync/atomic"
        "time"
@@ -39,6 +38,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/partition"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 // EventType defines actions of events.
@@ -115,6 +115,8 @@ type Repository interface {
        Close()
 }
 
+const defaultWorkerNum = 10
+
 var _ Repository = (*schemaRepo)(nil)
 
 type schemaRepo struct {
@@ -123,10 +125,11 @@ type schemaRepo struct {
        resourceSupplier ResourceSupplier
        l                *logger.Logger
        data             map[string]*group
-       workerStopCh     chan struct{}
+       closer           *run.Closer
        eventCh          chan MetadataEvent
        shardTopic       bus.Topic
        entityTopic      bus.Topic
+       workerNum        int
        sync.RWMutex
 }
 
@@ -147,8 +150,9 @@ func NewRepository(
                shardTopic:       shardTopic,
                entityTopic:      entityTopic,
                data:             make(map[string]*group),
-               eventCh:          make(chan MetadataEvent),
-               workerStopCh:     make(chan struct{}),
+               eventCh:          make(chan MetadataEvent, defaultWorkerNum),
+               workerNum:        defaultWorkerNum,
+               closer:           run.NewCloser(defaultWorkerNum),
        }
 }
 
@@ -157,9 +161,10 @@ func (sr *schemaRepo) SendMetadataEvent(event 
MetadataEvent) {
 }
 
 func (sr *schemaRepo) Watcher() {
-       for i := 0; i < 10; i++ {
+       for i := 0; i < sr.workerNum; i++ {
                go func() {
                        defer func() {
+                               sr.closer.Done()
                                if err := recover(); err != nil {
                                        sr.l.Warn().Interface("err", 
err).Msg("watching the events")
                                }
@@ -173,32 +178,32 @@ func (sr *schemaRepo) Watcher() {
                                        if e := sr.l.Debug(); e.Enabled() {
                                                e.Interface("event", 
evt).Msg("received an event")
                                        }
-                                       for i := 0; i < 10; i++ {
-                                               var err error
-                                               switch evt.Typ {
-                                               case EventAddOrUpdate:
-                                                       switch evt.Kind {
-                                                       case EventKindGroup:
-                                                               _, err = 
sr.StoreGroup(evt.Metadata)
-                                                       case EventKindResource:
-                                                               _, err = 
sr.storeResource(evt.Metadata)
-                                                       }
-                                               case EventDelete:
-                                                       switch evt.Kind {
-                                                       case EventKindGroup:
-                                                               err = 
sr.deleteGroup(evt.Metadata)
-                                                       case EventKindResource:
-                                                               err = 
sr.deleteResource(evt.Metadata)
-                                                       }
+                                       var err error
+                                       switch evt.Typ {
+                                       case EventAddOrUpdate:
+                                               switch evt.Kind {
+                                               case EventKindGroup:
+                                                       _, err = 
sr.StoreGroup(evt.Metadata)
+                                               case EventKindResource:
+                                                       _, err = 
sr.storeResource(evt.Metadata)
+                                               }
+                                       case EventDelete:
+                                               switch evt.Kind {
+                                               case EventKindGroup:
+                                                       err = 
sr.deleteGroup(evt.Metadata)
+                                               case EventKindResource:
+                                                       err = 
sr.deleteResource(evt.Metadata)
                                                }
-                                               if err == nil {
-                                                       break
+                                       }
+                                       if err != nil {
+                                               
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event. 
retry...")
+                                               select {
+                                               case sr.eventCh <- evt:
+                                               case <-sr.closer.CloseNotify():
+                                                       return
                                                }
-                                               runtime.Gosched()
-                                               time.Sleep(time.Second)
-                                               
sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the 
metadata event. retry...")
                                        }
-                               case <-sr.workerStopCh:
+                               case <-sr.closer.CloseNotify():
                                        return
                                }
                        }
@@ -370,9 +375,7 @@ func (sr *schemaRepo) Close() {
                        sr.l.Warn().Interface("err", err).Msg("closing 
resource")
                }
        }()
-       if sr.workerStopCh != nil {
-               close(sr.workerStopCh)
-       }
+       sr.closer.CloseThenWait()
 
        sr.RLock()
        defer sr.RUnlock()
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 908b269..adcd5c0 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -21,7 +21,7 @@ services:
       - 6060
     command: standalone
     healthcheck:
-      test: ["CMD", "/grpc-health-probe", "-addr=localhost:17912"]
+      test: wget --spider http://localhost:17913 || wget --spider 
--no-check-certificate https://localhost:17913 || exit 1
       interval: 5s
       timeout: 10s
       retries: 120

Reply via email to