This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new f70f59b Enhance the inverted index, metadata and logger (#227)
f70f59b is described below
commit f70f59b7201a83d2bf2a8716627ded4a11780b66
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 12 11:53:25 2022 +0800
Enhance the inverted index, metadata and logger (#227)
---
banyand/Dockerfile | 16 +---
banyand/k8s.yml | 1 +
banyand/liaison/grpc/measure.go | 45 +++++-----
banyand/liaison/grpc/server.go | 2 +
banyand/liaison/grpc/stream.go | 45 +++++-----
banyand/measure/service.go | 1 +
banyand/metadata/metadata.go | 3 +-
banyand/metadata/schema/error.go | 2 +
banyand/metadata/schema/etcd.go | 78 ++++++++++++------
banyand/metadata/schema/etcd_test.go | 8 +-
banyand/metadata/schema/group.go | 4 +-
banyand/stream/service.go | 1 +
banyand/tsdb/block.go | 7 +-
banyand/tsdb/bucket/queue.go | 2 +-
banyand/tsdb/bucket/queue_test.go | 2 +-
banyand/tsdb/indexdb.go | 26 ++----
banyand/tsdb/tsdb.go | 6 ++
go.mod | 2 +-
pkg/index/index.go | 38 +++++++--
pkg/index/inverted/inverted.go | 155 +++++++++++++++++++++++------------
pkg/index/lsm/iterator.go | 10 +--
pkg/index/lsm/lsm.go | 7 +-
pkg/index/lsm/search.go | 8 +-
pkg/logger/logger.go | 59 ++++++++++++-
pkg/logger/setting.go | 26 +++---
pkg/schema/metadata.go | 65 ++++++++-------
test/docker/base-compose.yml | 2 +-
27 files changed, 380 insertions(+), 241 deletions(-)
diff --git a/banyand/Dockerfile b/banyand/Dockerfile
index dc40e27..a3a0ae7 100644
--- a/banyand/Dockerfile
+++ b/banyand/Dockerfile
@@ -34,8 +34,6 @@ ENV GO111MODULE "on"
WORKDIR /src
COPY go.* ./
RUN go mod download
-RUN GOBIN=/bin go install github.com/grpc-ecosystem/[email protected] \
- && chmod 755 /bin/grpc-health-probe
FROM base AS builder
@@ -44,8 +42,7 @@ RUN --mount=target=. \
BUILD_DIR=/out make -C banyand all
FROM alpine:edge AS certs
-RUN apk add --no-cache ca-certificates
-RUN update-ca-certificates
+RUN apk add --no-cache ca-certificates && update-ca-certificates
FROM busybox:stable-glibc
@@ -57,14 +54,3 @@ EXPOSE 17913
EXPOSE 6060
ENTRYPOINT ["/banyand-server"]
-
-FROM busybox:stable-glibc AS test
-
-COPY --from=builder /out/banyand-server /banyand-server
-COPY --from=base /bin/grpc-health-probe /grpc-health-probe
-
-EXPOSE 17912
-EXPOSE 17913
-EXPOSE 6060
-
-ENTRYPOINT ["/banyand-server"]
\ No newline at end of file
diff --git a/banyand/k8s.yml b/banyand/k8s.yml
index 0d78bb6..88853e4 100644
--- a/banyand/k8s.yml
+++ b/banyand/k8s.yml
@@ -81,6 +81,7 @@ spec:
image: apache/skywalking-banyandb:v0.0.0-dev
args:
- "standalone"
+ - "--measure-idx-batch-wait-sec=30"
- "--logging.level=warn"
- "--logging.modules=measure.measure-default.service_cpm_minute"
- "--logging.levels=debug"
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index a0c8d0b..9d72fa6 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -23,7 +23,6 @@ import (
"time"
"github.com/pkg/errors"
- "github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -39,34 +38,44 @@ import (
type measureService struct {
measurev1.UnimplementedMeasureServiceServer
*discoveryService
+ sampled *logger.Logger
+}
+
+func (ms *measureService) setLogger(log *logger.Logger) {
+ ms.sampled = log.Sampled(10)
}
func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer)
error {
- reply := func() error {
- return measure.Send(&measurev1.WriteResponse{})
+ reply := func(measure measurev1.MeasureService_WriteServer, logger
*logger.Logger) {
+ if errResp := measure.Send(&measurev1.WriteResponse{}); errResp
!= nil {
+ logger.Err(errResp).Msg("failed to send response")
+ }
}
- sampled := ms.log.Sample(&zerolog.BasicSampler{N: 10})
+ ctx := measure.Context()
for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
writeRequest, err := measure.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
- return err
+ ms.sampled.Error().Err(err).Stringer("written",
writeRequest).Msg("failed to receive message")
+ reply(measure, ms.sampled)
+ continue
}
if errTime :=
timestamp.CheckPb(writeRequest.DataPoint.Timestamp); errTime != nil {
- sampled.Error().Err(errTime).Stringer("written",
writeRequest).Msg("the data point time is invalid")
- if errResp := reply(); errResp != nil {
- return errResp
- }
+ ms.sampled.Error().Err(errTime).Stringer("written",
writeRequest).Msg("the data point time is invalid")
+ reply(measure, ms.sampled)
continue
}
entity, tagValues, shardID, err :=
ms.navigate(writeRequest.GetMetadata(),
writeRequest.GetDataPoint().GetTagFamilies())
if err != nil {
- sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
- if errResp := reply(); errResp != nil {
- return errResp
- }
+ ms.sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to navigate to the write target")
+ reply(measure, ms.sampled)
continue
}
iwr := &measurev1.InternalWriteRequest{
@@ -80,15 +89,9 @@ func (ms *measureService) Write(measure
measurev1.MeasureService_WriteServer) er
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
iwr)
_, errWritePub := ms.pipeline.Publish(data.TopicMeasureWrite,
message)
if errWritePub != nil {
- sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to send a message")
- if errResp := reply(); errResp != nil {
- return errResp
- }
- continue
- }
- if errSend := reply(); errSend != nil {
- return errSend
+ ms.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeRequest)).Msg("failed to send a message")
}
+ reply(measure, ms.sampled)
}
}
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 5a58d80..4758585 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -112,6 +112,8 @@ func NewServer(_ context.Context, pipeline queue.Queue,
repo discovery.ServiceRe
func (s *server) PreRun() error {
s.log = logger.GetLogger("liaison-grpc")
+ s.streamSVC.setLogger(s.log)
+ s.measureSVC.setLogger(s.log)
components := []struct {
discoverySVC *discoveryService
shardEvent bus.Topic
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index e798da4..9a4a101 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -23,7 +23,6 @@ import (
"time"
"github.com/pkg/errors"
- "github.com/rs/zerolog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -39,34 +38,44 @@ import (
type streamService struct {
streamv1.UnimplementedStreamServiceServer
*discoveryService
+ sampled *logger.Logger
+}
+
+func (s *streamService) setLogger(log *logger.Logger) {
+ s.sampled = log.Sampled(10)
}
func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error
{
- reply := func() error {
- return stream.Send(&streamv1.WriteResponse{})
+ reply := func(stream streamv1.StreamService_WriteServer, logger
*logger.Logger) {
+ if errResp := stream.Send(&streamv1.WriteResponse{}); errResp
!= nil {
+ logger.Err(errResp).Msg("failed to send response")
+ }
}
- sampled := s.log.Sample(&zerolog.BasicSampler{N: 10})
+ ctx := stream.Context()
for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
writeEntity, err := stream.Recv()
if errors.Is(err, io.EOF) {
return nil
}
if err != nil {
- return err
+ s.sampled.Error().Stringer("written",
writeEntity).Err(err).Msg("failed to receive message")
+ reply(stream, s.sampled)
+ continue
}
if errTime :=
timestamp.CheckPb(writeEntity.GetElement().Timestamp); errTime != nil {
- sampled.Error().Stringer("written",
writeEntity).Err(errTime).Msg("the element time is invalid")
- if errResp := reply(); errResp != nil {
- return errResp
- }
+ s.sampled.Error().Stringer("written",
writeEntity).Err(errTime).Msg("the element time is invalid")
+ reply(stream, s.sampled)
continue
}
entity, tagValues, shardID, err :=
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
if err != nil {
- sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
- if errResp := reply(); errResp != nil {
- return errResp
- }
+ s.sampled.Error().Err(err).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to navigate to the write target")
+ reply(stream, s.sampled)
continue
}
iwr := &streamv1.InternalWriteRequest{
@@ -80,15 +89,9 @@ func (s *streamService) Write(stream
streamv1.StreamService_WriteServer) error {
message := bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
iwr)
_, errWritePub := s.pipeline.Publish(data.TopicStreamWrite,
message)
if errWritePub != nil {
- sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to send a message")
- if errResp := reply(); errResp != nil {
- return errResp
- }
- continue
- }
- if errSend := reply(); errSend != nil {
- return errSend
+ s.sampled.Error().Err(errWritePub).RawJSON("written",
logger.Proto(writeEntity)).Msg("failed to send a message")
}
+ reply(stream, s.sampled)
}
}
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 60e615d..a606c9f 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -82,6 +82,7 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.StringVar(&s.root, "measure-root-path", "/tmp", "the root path of
database")
flagS.Int64Var(&s.dbOpts.BlockMemSize, "measure-block-mem-size",
16<<20, "block memory size")
flagS.Int64Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size",
1<<20, "series metadata memory size")
+ flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec,
"measure-idx-batch-wait-sec", 1, "index batch wait in second")
return flagS
}
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index 44f5953..e4f3dd4 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -29,7 +29,6 @@ import (
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/metadata/schema"
- "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
)
@@ -88,7 +87,7 @@ func (s *service) PreRun() error {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.ConfigureListener(s.listenClientURL, s.listenPeerURL),
- schema.RootDir(s.rootDir),
schema.LoggerLevel(logger.GetLogger().GetLevel().String()))
+ schema.RootDir(s.rootDir))
if err != nil {
return err
}
diff --git a/banyand/metadata/schema/error.go b/banyand/metadata/schema/error.go
index ee4efa6..00e32ea 100644
--- a/banyand/metadata/schema/error.go
+++ b/banyand/metadata/schema/error.go
@@ -34,6 +34,8 @@ var (
errGRPCAlreadyExists = statusGRPCAlreadyExists.Err()
statusDataLoss = status.New(codes.DataLoss, "banyandb:
resource corrupts.")
errGRPCDataLoss = statusDataLoss.Err()
+
+ errClosed = errors.New("metadata registry is closed")
)
// BadRequest creates a gRPC error with error details with type BadRequest,
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index d7c4cf3..91b0b24 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -23,15 +23,20 @@ import (
"os"
"path/filepath"
"sync"
+ "time"
"github.com/pkg/errors"
clientv3 "go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/server/v3/embed"
+ "go.uber.org/zap"
+ "google.golang.org/grpc"
"google.golang.org/protobuf/proto"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
propertyv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/property/v1"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
var (
@@ -62,13 +67,6 @@ func RootDir(rootDir string) RegistryOption {
}
}
-// LoggerLevel sets the logger level.
-func LoggerLevel(loggerLevel string) RegistryOption {
- return func(config *etcdSchemaRegistryConfig) {
- config.loggerLevel = loggerLevel
- }
-}
-
// ConfigureListener sets client and peer urls of listeners.
func ConfigureListener(lc, lp string) RegistryOption {
return func(config *etcdSchemaRegistryConfig) {
@@ -89,7 +87,7 @@ func (eh *eventHandler) interestOf(kind Kind) bool {
type etcdSchemaRegistry struct {
server *embed.Etcd
client *clientv3.Client
- kv clientv3.KV
+ closer *run.Closer
handlers []*eventHandler
mux sync.RWMutex
}
@@ -101,8 +99,6 @@ type etcdSchemaRegistryConfig struct {
listenerClientURL string
// listenerPeerURL is the listener for peer
listenerPeerURL string
- // loggerLevel defines log level
- loggerLevel string
}
func (e *etcdSchemaRegistry) RegisterHandler(kind Kind, handler EventHandler) {
@@ -149,6 +145,7 @@ func (e *etcdSchemaRegistry) StoppingNotify() <-chan
struct{} {
}
func (e *etcdSchemaRegistry) Close() error {
+ e.closer.CloseThenWait()
_ = e.client.Close()
e.server.Close()
return nil
@@ -164,8 +161,15 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
for _, opt := range options {
opt(registryConfig)
}
+ zapCfg := logger.GetLogger("etcd").ToZapConfig()
+
+ var l *zap.Logger
+ var err error
+ if l, err = zapCfg.Build(); err != nil {
+ return nil, err
+ }
// TODO: allow use cluster setting
- embedConfig := newStandaloneEtcdConfig(registryConfig)
+ embedConfig := newStandaloneEtcdConfig(registryConfig, l)
e, err := embed.StartEtcd(embedConfig)
if err != nil {
return nil, err
@@ -173,21 +177,33 @@ func NewEtcdSchemaRegistry(options ...RegistryOption)
(Registry, error) {
if e != nil {
<-e.Server.ReadyNotify() // wait for e.Server to join the
cluster
}
- client, err := clientv3.NewFromURL(e.Config().ACUrls[0].String())
+
+ config := clientv3.Config{
+ Endpoints: []string{e.Config().ACUrls[0].String()},
+ DialTimeout: 5 * time.Second,
+ DialKeepAliveTime: 30 * time.Second,
+ DialKeepAliveTimeout: 10 * time.Second,
+ DialOptions: []grpc.DialOption{grpc.WithBlock()},
+ Logger: l,
+ }
+ client, err := clientv3.New(config)
if err != nil {
return nil, err
}
- kvClient := clientv3.NewKV(client)
reg := &etcdSchemaRegistry{
server: e,
- kv: kvClient,
client: client,
+ closer: run.NewCloser(0),
}
return reg, nil
}
func (e *etcdSchemaRegistry) get(ctx context.Context, key string, message
proto.Message) error {
- resp, err := e.kv.Get(ctx, key)
+ if !e.closer.AddRunning() {
+ return errClosed
+ }
+ defer e.closer.Done()
+ resp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
@@ -212,11 +228,15 @@ func (e *etcdSchemaRegistry) get(ctx context.Context, key
string, message proto.
// and overwrite the existing value if so.
// Otherwise, it will return ErrGRPCResourceNotFound.
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata)
error {
+ if !e.closer.AddRunning() {
+ return errClosed
+ }
+ defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return err
}
- getResp, err := e.kv.Get(ctx, key)
+ getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
@@ -239,7 +259,7 @@ func (e *etcdSchemaRegistry) update(ctx context.Context,
metadata Metadata) erro
}
modRevision := getResp.Kvs[0].ModRevision
- txnResp, txnErr := e.kv.Txn(ctx).
+ txnResp, txnErr := e.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=",
modRevision)).
Then(clientv3.OpPut(key, string(val))).
Commit()
@@ -260,11 +280,15 @@ func (e *etcdSchemaRegistry) update(ctx context.Context,
metadata Metadata) erro
// and put the value if it does not exist.
// Otherwise, it will return ErrGRPCAlreadyExists.
func (e *etcdSchemaRegistry) create(ctx context.Context, metadata Metadata)
error {
+ if !e.closer.AddRunning() {
+ return errClosed
+ }
+ defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return err
}
- getResp, err := e.kv.Get(ctx, key)
+ getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
@@ -279,7 +303,7 @@ func (e *etcdSchemaRegistry) create(ctx context.Context,
metadata Metadata) erro
if replace {
return errGRPCAlreadyExists
}
- _, err = e.kv.Put(ctx, key, string(val))
+ _, err = e.client.Put(ctx, key, string(val))
if err != nil {
return err
}
@@ -289,7 +313,11 @@ func (e *etcdSchemaRegistry) create(ctx context.Context,
metadata Metadata) erro
}
func (e *etcdSchemaRegistry) listWithPrefix(ctx context.Context, prefix
string, factory func() proto.Message) ([]proto.Message, error) {
- resp, err := e.kv.Get(ctx, prefix, clientv3.WithFromKey(),
clientv3.WithRange(incrementLastByte(prefix)))
+ if !e.closer.AddRunning() {
+ return nil, errClosed
+ }
+ defer e.closer.Done()
+ resp, err := e.client.Get(ctx, prefix, clientv3.WithFromKey(),
clientv3.WithRange(incrementLastByte(prefix)))
if err != nil {
return nil, err
}
@@ -314,11 +342,15 @@ func listPrefixesForEntity(group, entityPrefix string)
string {
}
func (e *etcdSchemaRegistry) delete(ctx context.Context, metadata Metadata)
(bool, error) {
+ if !e.closer.AddRunning() {
+ return false, errClosed
+ }
+ defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return false, err
}
- resp, err := e.kv.Delete(ctx, key, clientv3.WithPrevKV())
+ resp, err := e.client.Delete(ctx, key, clientv3.WithPrevKV())
if err != nil {
return false, err
}
@@ -365,9 +397,9 @@ func incrementLastByte(key string) string {
return string(bb)
}
-func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig) *embed.Config {
+func newStandaloneEtcdConfig(config *etcdSchemaRegistryConfig, logger
*zap.Logger) *embed.Config {
cfg := embed.NewConfig()
- cfg.LogLevel = config.loggerLevel
+ cfg.ZapLoggerBuilder = embed.NewZapLoggerBuilder(logger)
cfg.Dir = filepath.Join(config.rootDir, "metadata")
cURL, _ := url.Parse(config.listenerClientURL)
pURL, _ := url.Parse(config.listenerPeerURL)
diff --git a/banyand/metadata/schema/etcd_test.go
b/banyand/metadata/schema/etcd_test.go
index 5532d24..22bb5a0 100644
--- a/banyand/metadata/schema/etcd_test.go
+++ b/banyand/metadata/schema/etcd_test.go
@@ -136,7 +136,7 @@ func useRandomPort() RegistryOption {
func Test_Etcd_Entity_Get(t *testing.T) {
tester := assert.New(t)
- registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir(), LoggerLevel("warn"))
+ registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir())
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
@@ -228,7 +228,7 @@ func Test_Etcd_Entity_Get(t *testing.T) {
func Test_Etcd_Entity_List(t *testing.T) {
tester := assert.New(t)
- registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir(), LoggerLevel("warn"))
+ registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir())
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
@@ -310,7 +310,7 @@ func Test_Etcd_Entity_List(t *testing.T) {
func Test_Etcd_Delete(t *testing.T) {
tester := assert.New(t)
- registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir(), LoggerLevel("warn"))
+ registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir())
tester.NoError(err)
tester.NotNil(registry)
defer registry.Close()
@@ -379,7 +379,7 @@ func Test_Etcd_Delete(t *testing.T) {
func Test_Notify(t *testing.T) {
req := require.New(t)
- registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir(), LoggerLevel("warn"))
+ registry, err := NewEtcdSchemaRegistry(useRandomPort(),
useRandomTempDir())
req.NoError(err)
req.NotNil(registry)
defer registry.Close()
diff --git a/banyand/metadata/schema/group.go b/banyand/metadata/schema/group.go
index 3ecb0c7..e6ed49f 100644
--- a/banyand/metadata/schema/group.go
+++ b/banyand/metadata/schema/group.go
@@ -43,7 +43,7 @@ func (e *etcdSchemaRegistry) GetGroup(ctx context.Context,
group string) (*commo
}
func (e *etcdSchemaRegistry) ListGroup(ctx context.Context)
([]*commonv1.Group, error) {
- messages, err := e.kv.Get(ctx, groupsKeyPrefix, clientv3.WithFromKey(),
clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
+ messages, err := e.client.Get(ctx, groupsKeyPrefix,
clientv3.WithFromKey(), clientv3.WithRange(incrementLastByte(groupsKeyPrefix)))
if err != nil {
return nil, err
}
@@ -69,7 +69,7 @@ func (e *etcdSchemaRegistry) DeleteGroup(ctx context.Context,
group string) (boo
return false, errors.Wrap(err, group)
}
keyPrefix := groupsKeyPrefix + g.GetMetadata().GetName() + "/"
- resp, err := e.kv.Delete(ctx, keyPrefix,
clientv3.WithRange(incrementLastByte(keyPrefix)))
+ resp, err := e.client.Delete(ctx, keyPrefix,
clientv3.WithRange(incrementLastByte(keyPrefix)))
if err != nil {
return false, err
}
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index ac95748..c4f2c7b 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -76,6 +76,7 @@ func (s *service) FlagSet() *run.FlagSet {
flagS.Int64Var(&s.dbOpts.BlockMemSize, "stream-block-mem-size", 8<<20,
"block memory size")
flagS.Int64Var(&s.dbOpts.SeriesMemSize, "stream-seriesmeta-mem-size",
1<<20, "series metadata memory size")
flagS.Int64Var(&s.dbOpts.GlobalIndexMemSize,
"stream-global-index-mem-size", 2<<20, "global index memory size")
+ flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec,
"stream-idx-batch-wait-sec", 1, "index batch wait in second")
return flagS
}
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index cdfd8f2..66251bf 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -78,6 +78,7 @@ type block struct {
lock sync.RWMutex
segID SectionID
blockID SectionID
+ indexOpts InvertedIndexOpts
}
type blockOpts struct {
@@ -145,6 +146,7 @@ func (b *block) options(ctx context.Context) {
if b.lsmMemSize < defaultKVMemorySize {
b.lsmMemSize = defaultKVMemorySize
}
+ b.indexOpts = options.BlockInvertedIndex
}
func (b *block) openSafely() (err error) {
@@ -171,8 +173,9 @@ func (b *block) open() (err error) {
}
b.closableLst = append(b.closableLst, b.store)
if b.invertedIndex, err = inverted.NewStore(inverted.StoreOpts{
- Path: path.Join(b.path, componentSecondInvertedIdx),
- Logger: b.l.Named(componentSecondInvertedIdx),
+ Path: path.Join(b.path, componentSecondInvertedIdx),
+ Logger: b.l.Named(componentSecondInvertedIdx),
+ BatchWaitSec: b.indexOpts.BatchWaitSec,
}); err != nil {
return err
}
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 6cb1bfa..dc86953 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -101,7 +101,7 @@ func NewQueue(l *logger.Logger, size int, maxSize int,
scheduler *timestamp.Sche
evictFn: evictFn,
l: l,
}
- if err := scheduler.Register(QueueName, cron.Descriptor, "@every 1m",
c.cleanEvict); err != nil {
+ if err := scheduler.Register(QueueName, cron.Descriptor, "@every 5m",
c.cleanEvict); err != nil {
return nil, err
}
return c, nil
diff --git a/banyand/tsdb/bucket/queue_test.go
b/banyand/tsdb/bucket/queue_test.go
index ac85449..eb78072 100644
--- a/banyand/tsdb/bucket/queue_test.go
+++ b/banyand/tsdb/bucket/queue_test.go
@@ -132,7 +132,7 @@ var _ = Describe("Queue", func() {
Expect(enRecentSize).To(Equal(192))
Expect(l.Len()).To(Equal(128))
Expect(len(evictLst)).To(Equal(0))
- clock.Add(time.Minute)
+ clock.Add(6 * time.Minute)
if !scheduler.Trigger(bucket.QueueName) {
Fail("trigger fails")
}
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index cb82f88..d0fff8f 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -59,12 +59,9 @@ type indexDB struct {
func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, error) {
result := make([]GlobalItemID, 0)
- f, err := field.Marshal()
- if err != nil {
- return nil, err
- }
+ f := field.Marshal()
for _, s := range i.segCtrl.segments() {
- err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
+ err := s.globalIndex.GetAll(f, func(rawBytes []byte) error {
id := &GlobalItemID{}
errUnMarshal := id.unMarshal(rawBytes)
if errUnMarshal != nil {
@@ -76,8 +73,11 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID,
error) {
if errors.Is(err, kv.ErrKeyNotFound) {
return result, nil
}
+ if err != nil {
+ return nil, err
+ }
}
- return result, err
+ return result, nil
}
func (i *indexDB) WriterBuilder() IndexWriterBuilder {
@@ -151,12 +151,7 @@ func (i *indexWriter) WriteLSMIndex(fields []index.Field)
(err error) {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, errInternal := field.Marshal()
- if errInternal != nil {
- err = multierr.Append(err, errInternal)
- continue
- }
- err = multierr.Append(err,
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(),
uint64(i.ts.UnixNano())))
+ err = multierr.Append(err,
i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(),
uint64(i.ts.UnixNano())))
}
return err
}
@@ -166,12 +161,7 @@ func (i *indexWriter) WriteInvertedIndex(fields
[]index.Field) (err error) {
if i.scope != nil {
field.Key.SeriesID = GlobalSeriesID(i.scope)
}
- key, errInternal := field.Marshal()
- if errInternal != nil {
- err = multierr.Append(err, errInternal)
- continue
- }
- err = multierr.Append(err,
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(),
uint64(i.ts.UnixNano())))
+ err = multierr.Append(err,
i.seg.globalIndex.PutWithVersion(field.Marshal(), i.itemID.marshal(),
uint64(i.ts.UnixNano())))
}
return err
}
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 8f6ad3a..eb517d1 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -102,12 +102,18 @@ type DatabaseOpts struct {
BlockInterval IntervalRule
TTL IntervalRule
BlockMemSize int64
+ BlockInvertedIndex InvertedIndexOpts
SeriesMemSize int64
GlobalIndexMemSize int64
ShardNum uint32
EnableGlobalIndex bool
}
+// InvertedIndexOpts wraps options to create the block inverted index.
+type InvertedIndexOpts struct {
+ BatchWaitSec int64
+}
+
// EncodingMethod wraps encoder/decoder pools to flush/compact data on disk.
type EncodingMethod struct {
EncoderPool encoding.SeriesEncoderPool
diff --git a/go.mod b/go.mod
index d52b8c1..a74b376 100644
--- a/go.mod
+++ b/go.mod
@@ -118,7 +118,7 @@ require (
go.opentelemetry.io/otel/trace v1.11.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
- go.uber.org/zap v1.23.0 // indirect
+ go.uber.org/zap v1.23.0
golang.org/x/crypto v0.3.0 // indirect
golang.org/x/net v0.2.0 // indirect
golang.org/x/sys v0.2.0 // indirect
diff --git a/pkg/index/index.go b/pkg/index/index.go
index 36849e6..1862231 100644
--- a/pkg/index/index.go
+++ b/pkg/index/index.go
@@ -49,10 +49,12 @@ func (f FieldKey) MarshalIndexRule() string {
// Marshal encodes f to bytes.
func (f FieldKey) Marshal() []byte {
- return bytes.Join([][]byte{
- f.SeriesID.Marshal(),
- convert.Uint32ToBytes(f.IndexRuleID),
- }, nil)
+ s := f.SeriesID.Marshal()
+ i := []byte(f.MarshalIndexRule())
+ b := make([]byte, len(s)+len(i))
+ bp := copy(b, s)
+ copy(b[bp:], i)
+ return b
}
// MarshalToStr encodes f to string.
@@ -79,8 +81,20 @@ type Field struct {
}
// Marshal encodes f to bytes.
-func (f Field) Marshal() ([]byte, error) {
- return bytes.Join([][]byte{f.Key.Marshal(), f.Term}, nil), nil
+func (f Field) Marshal() []byte {
+ s := f.Key.SeriesID.Marshal()
+ i := []byte(f.Key.MarshalIndexRule())
+ b := make([]byte, len(s)+len(i)+len(f.Term))
+ bp := copy(b, s)
+ bp += copy(b[bp:], i)
+ copy(b[bp:], f.Term)
+ return b
+}
+
+// FieldStr return a string represent of Field which is composed by key and
term.
+func FieldStr(key FieldKey, term []byte) string {
+ f := Field{Key: key, Term: term}
+ return string(f.Marshal())
}
// Unmarshal decodes bytes to f.
@@ -93,12 +107,18 @@ func (f *Field) Unmarshal(raw []byte) error {
if err != nil {
return errors.Wrap(err, "unmarshal a field")
}
- term := raw[12:]
- f.Term = make([]byte, len(term))
- copy(f.Term, term)
+ f.Term = UnmarshalTerm(raw)
return nil
}
+// UnmarshalTerm decodes term from a encoded field.
+func UnmarshalTerm(raw []byte) []byte {
+ term := raw[12:]
+ result := make([]byte, len(term))
+ copy(result, term)
+ return result
+}
+
// RangeOpts contains options to performance a continuous scan.
type RangeOpts struct {
Upper []byte
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index d1efa79..3020f3c 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -28,6 +28,7 @@ import (
"github.com/blugelabs/bluge"
"github.com/blugelabs/bluge/analysis"
"github.com/blugelabs/bluge/analysis/analyzer"
+ blugeIndex "github.com/blugelabs/bluge/index"
"github.com/blugelabs/bluge/search"
"github.com/dgraph-io/badger/v3/y"
"go.uber.org/multierr"
@@ -48,6 +49,12 @@ const (
docID = "_id"
batchSize = 1024
seriesIDField = "series_id"
+ idField = "id"
+)
+
+var (
+ defaultUpper = convert.Uint64ToBytes(math.MaxUint64)
+ defaultLower = convert.Uint64ToBytes(0)
)
var analyzers map[databasev1.IndexRule_Analyzer]*analysis.Analyzer
@@ -64,8 +71,9 @@ var _ index.Store = (*store)(nil)
// StoreOpts wraps options to create a inverted index repository.
type StoreOpts struct {
- Logger *logger.Logger
- Path string
+ Logger *logger.Logger
+ Path string
+ BatchWaitSec int64
}
type doc struct {
@@ -78,26 +86,37 @@ type flushEvent struct {
}
type store struct {
- writer *bluge.Writer
- ch chan any
- closer *run.Closer
- l *logger.Logger
+ writer *bluge.Writer
+ ch chan any
+ closer *run.Closer
+ l *logger.Logger
+ batchInterval time.Duration
}
// NewStore create a new inverted index repository.
func NewStore(opts StoreOpts) (index.Store, error) {
- config := bluge.DefaultConfig(opts.Path)
+ indexConfig := blugeIndex.DefaultConfig(opts.Path).WithUnsafeBatches().
+ WithPersisterNapTimeMSec(60 * 1000)
+ indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1
+ indexConfig.MergePlanOptions.MaxSegmentSize = 500000
+ indexConfig.MergePlanOptions.SegmentsPerMergeTask = 20
+ config := bluge.DefaultConfigWithIndexConfig(indexConfig)
config.DefaultSearchAnalyzer =
analyzers[databasev1.IndexRule_ANALYZER_KEYWORD]
config.Logger = log.New(opts.Logger, opts.Logger.Module(), 0)
w, err := bluge.OpenWriter(config)
if err != nil {
return nil, err
}
+ sec := opts.BatchWaitSec
+ if sec < 1 {
+ sec = 1
+ }
s := &store{
- writer: w,
- l: opts.Logger,
- ch: make(chan any, batchSize),
- closer: run.NewCloser(1),
+ writer: w,
+ batchInterval: time.Duration(sec * int64(time.Second)),
+ l: opts.Logger,
+ ch: make(chan any, batchSize),
+ closer: run.NewCloser(1),
}
s.run()
return s, nil
@@ -136,21 +155,40 @@ func (s *store) Iterator(fieldKey index.FieldKey,
termRange index.RangeOpts, ord
bytes.Compare(termRange.Lower, termRange.Upper) > 0 {
return index.DummyFieldIterator, nil
}
+ if termRange.Upper == nil {
+ termRange.Upper = defaultUpper
+ }
+ if termRange.Lower == nil {
+ termRange.Lower = defaultLower
+ }
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
fk := fieldKey.MarshalIndexRule()
- query := bluge.NewBooleanQuery()
- query.
-
AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField)).
- AddMust(bluge.NewTermRangeInclusiveQuery(
- string(termRange.Lower),
- string(termRange.Upper),
+ var query bluge.Query
+ shouldDecodeTerm := true
+ if fieldKey.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+ query = bluge.NewTermRangeInclusiveQuery(
+ index.FieldStr(fieldKey, termRange.Lower),
+ index.FieldStr(fieldKey, termRange.Upper),
termRange.IncludesLower,
termRange.IncludesUpper,
).
- SetField(fk))
+ SetField(fk)
+ } else {
+ shouldDecodeTerm = false
+ query = bluge.NewBooleanQuery().
+ AddMust(bluge.NewTermRangeInclusiveQuery(
+ string(termRange.Lower),
+ string(termRange.Upper),
+ termRange.IncludesLower,
+ termRange.IncludesUpper,
+ ).
+ SetField(fk)).
+
AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
+ }
+
sortedKey := fk
if order == modelv1.Sort_SORT_DESC {
sortedKey = "-" + sortedKey
@@ -159,7 +197,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts, ord
if err != nil {
return nil, err
}
- result := newBlugeMatchIterator(documentMatchIterator, fk)
+ result := newBlugeMatchIterator(documentMatchIterator, fk,
shouldDecodeTerm)
return &result, nil
}
@@ -173,15 +211,21 @@ func (s *store) MatchTerms(field index.Field) (list
posting.List, err error) {
return nil, err
}
fk := field.Key.MarshalIndexRule()
- query := bluge.NewBooleanQuery()
- query.
-
AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())).SetField(seriesIDField)).
- AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk))
+ var query bluge.Query
+ shouldDecodeTerm := true
+ if field.Key.Analyzer == databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+ query = bluge.NewTermQuery(string(field.Marshal())).SetField(fk)
+ } else {
+ shouldDecodeTerm = false
+ query = bluge.NewBooleanQuery().
+
AddMust(bluge.NewTermQuery(string(field.Term)).SetField(fk)).
+
AddMust(bluge.NewTermQuery(string(field.Key.SeriesID.Marshal())).SetField(seriesIDField))
+ }
documentMatchIterator, err := reader.Search(context.Background(),
bluge.NewAllMatches(query))
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, fk)
+ iter := newBlugeMatchIterator(documentMatchIterator, fk,
shouldDecodeTerm)
list = roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
@@ -191,28 +235,26 @@ func (s *store) MatchTerms(field index.Field) (list
posting.List, err error) {
}
func (s *store) Match(fieldKey index.FieldKey, matches []string)
(posting.List, error) {
- if len(matches) == 0 {
+ if len(matches) == 0 || fieldKey.Analyzer ==
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
return roaring.DummyPostingList, nil
}
reader, err := s.writer.Reader()
if err != nil {
return nil, err
}
+ analyzer := analyzers[fieldKey.Analyzer]
fk := fieldKey.MarshalIndexRule()
query := bluge.NewBooleanQuery()
query.AddMust(bluge.NewTermQuery(string(fieldKey.SeriesID.Marshal())).SetField(seriesIDField))
for _, m := range matches {
- q := bluge.NewMatchQuery(m).SetField(fk)
- if fieldKey.Analyzer !=
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
- q.SetAnalyzer(analyzers[fieldKey.Analyzer])
- }
- query.AddMust(q)
+ query.AddMust(bluge.NewMatchQuery(m).SetField(fk).
+ SetAnalyzer(analyzer))
}
documentMatchIterator, err := reader.Search(context.Background(),
bluge.NewAllMatches(query))
if err != nil {
return nil, err
}
- iter := newBlugeMatchIterator(documentMatchIterator, fk)
+ iter := newBlugeMatchIterator(documentMatchIterator, fk, false)
list := roaring.NewPostingList()
for iter.Next() {
err = multierr.Append(err, list.Union(iter.Val().Value))
@@ -251,8 +293,7 @@ func (s *store) run() {
}
var docIDBuffer bytes.Buffer
for {
- timer := time.NewTimer(time.Second)
- docIDBuffer.Reset()
+ timer := time.NewTimer(s.batchInterval)
select {
case <-s.closer.CloseNotify():
return
@@ -267,26 +308,29 @@ func (s *store) run() {
case doc:
// TODO: generate a segment directly.
fk := d.fields[0].Key
+ docIDBuffer.Reset()
docIDBuffer.Write(fk.SeriesID.Marshal())
docIDBuffer.Write(convert.Uint64ToBytes(uint64(d.itemID)))
doc :=
bluge.NewDocument(docIDBuffer.String())
-
+ toAddSeriesIDField := false
for _, f := range d.fields {
-
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField,
f.Key.SeriesID.Marshal()))
- field :=
bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(),
f.Term).StoreValue().Sortable()
- if f.Key.Analyzer !=
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
-
field.WithAnalyzer(analyzers[f.Key.Analyzer])
+ if f.Key.Analyzer ==
databasev1.IndexRule_ANALYZER_UNSPECIFIED {
+
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(),
f.Marshal()).StoreValue().Sortable())
+ } else {
+ toAddSeriesIDField =
true
+
doc.AddField(bluge.NewKeywordFieldBytes(f.Key.MarshalIndexRule(),
f.Term).StoreValue().Sortable().
+
WithAnalyzer(analyzers[f.Key.Analyzer]))
}
- doc.AddField(field)
+ }
+ if toAddSeriesIDField {
+
doc.AddField(bluge.NewKeywordFieldBytes(seriesIDField, fk.SeriesID.Marshal()))
}
size++
+ batch.Update(doc.ID(), doc)
if size >= batchSize {
flush()
- } else {
- batch.Update(doc.ID(), doc)
}
}
-
case <-timer.C:
flush()
}
@@ -308,18 +352,20 @@ func (s *store) flush() {
}
type blugeMatchIterator struct {
- delegated search.DocumentMatchIterator
- err error
- current *index.PostingValue
- agg *index.PostingValue
- fieldKey string
- closed bool
+ delegated search.DocumentMatchIterator
+ err error
+ current *index.PostingValue
+ agg *index.PostingValue
+ fieldKey string
+ shouldDecodeTerm bool
+ closed bool
}
-func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey
string) blugeMatchIterator {
+func newBlugeMatchIterator(delegated search.DocumentMatchIterator, fieldKey
string, shouldDecodeTerm bool) blugeMatchIterator {
return blugeMatchIterator{
- delegated: delegated,
- fieldKey: fieldKey,
+ delegated: delegated,
+ fieldKey: fieldKey,
+ shouldDecodeTerm: shouldDecodeTerm,
}
}
@@ -363,7 +409,12 @@ func (bmi *blugeMatchIterator) nextTerm() bool {
i++
}
if field == bmi.fieldKey {
- term = y.Copy(value)
+ v := y.Copy(value)
+ if bmi.shouldDecodeTerm {
+ term = index.UnmarshalTerm(v)
+ } else {
+ term = v
+ }
i++
}
return i < 2
diff --git a/pkg/index/lsm/iterator.go b/pkg/index/lsm/iterator.go
index 694e7ab..dc82435 100644
--- a/pkg/index/lsm/iterator.go
+++ b/pkg/index/lsm/iterator.go
@@ -89,7 +89,7 @@ func (f *fieldIteratorTemplate) Close() error {
func newFieldIteratorTemplate(l *logger.Logger, fieldKey index.FieldKey,
termRange index.RangeOpts, order modelv1.Sort, iterable kv.Iterable,
fn compositePostingValueFn,
-) (*fieldIteratorTemplate, error) {
+) *fieldIteratorTemplate {
if termRange.Upper == nil {
termRange.Upper = defaultUpper
}
@@ -117,17 +117,13 @@ func newFieldIteratorTemplate(l *logger.Logger, fieldKey
index.FieldKey, termRan
Key: fieldKey,
Term: term,
}
- seekKey, err := field.Marshal()
- if err != nil {
- return nil, err
- }
return &fieldIteratorTemplate{
delegated: newDelegateIterator(iter, fieldKey, l),
termRange: termRange,
fn: fn,
reverse: reverse,
- seekKey: seekKey,
- }, nil
+ seekKey: field.Marshal(),
+ }
}
func parseKey(fieldKey index.FieldKey, key []byte) (index.Field, error) {
diff --git a/pkg/index/lsm/lsm.go b/pkg/index/lsm/lsm.go
index 835a199..55721f4 100644
--- a/pkg/index/lsm/lsm.go
+++ b/pkg/index/lsm/lsm.go
@@ -46,13 +46,8 @@ func (s *store) Close() error {
func (s *store) Write(fields []index.Field, itemID common.ItemID) (err error) {
for _, field := range fields {
- f, errInternal := field.Marshal()
- if errInternal != nil {
- err = multierr.Append(err, errInternal)
- continue
- }
itemIDInt := uint64(itemID)
- err = multierr.Append(err, s.lsm.PutWithVersion(f,
convert.Uint64ToBytes(itemIDInt), itemIDInt))
+ err = multierr.Append(err,
s.lsm.PutWithVersion(field.Marshal(), convert.Uint64ToBytes(itemIDInt),
itemIDInt))
}
return err
}
diff --git a/pkg/index/lsm/search.go b/pkg/index/lsm/search.go
index 4af241a..d494cad 100644
--- a/pkg/index/lsm/search.go
+++ b/pkg/index/lsm/search.go
@@ -39,12 +39,8 @@ func (s *store) MatchField(fieldKey index.FieldKey) (list
posting.List, err erro
}
func (s *store) MatchTerms(field index.Field) (list posting.List, err error) {
- f, err := field.Marshal()
- if err != nil {
- return nil, err
- }
list = roaring.NewPostingList()
- err = s.lsm.GetAll(f, func(itemID []byte) error {
+ err = s.lsm.GetAll(field.Marshal(), func(itemID []byte) error {
list.Insert(common.ItemID(convert.BytesToUint64(itemID)))
return nil
})
@@ -94,7 +90,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange
index.RangeOpts, ord
pv.Value.Insert(common.ItemID(itemID))
}
return pv, nil
- })
+ }), nil
}
func (s *store) Match(_ index.FieldKey, _ []string) (posting.List, error) {
diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go
index 57122b5..64dbff3 100644
--- a/pkg/logger/logger.go
+++ b/pkg/logger/logger.go
@@ -21,9 +21,13 @@ package logger
import (
"context"
+ "fmt"
"strings"
+ "time"
"github.com/rs/zerolog"
+ "go.uber.org/zap"
+ "go.uber.org/zap/zapcore"
)
// ContextKey is the key to store Logger in the context.
@@ -42,8 +46,9 @@ type Logging struct {
// Logger is wrapper for rs/zerolog logger with module, it is singleton.
type Logger struct {
*zerolog.Logger
- modules map[string]zerolog.Level
- module string
+ modules map[string]zerolog.Level
+ module string
+ development bool
}
// Module returns logger's module name.
@@ -73,7 +78,55 @@ func (l *Logger) Named(name ...string) *Logger {
}
}
subLogger := root.l.With().Str("module",
moduleBuilder.String()).Logger().Level(level)
- return &Logger{module: module, modules: l.modules, Logger: &subLogger}
+ return &Logger{module: module, modules: l.modules, development:
l.development, Logger: &subLogger}
+}
+
+// Sampled return a Logger with a sampler that will send every Nth events.
+func (l *Logger) Sampled(n uint32) *Logger {
+ sampled := l.Logger.Sample(&zerolog.BasicSampler{N: n})
+ l.Logger = &sampled
+ return l
+}
+
+// ToZapConfig outputs the zap config is derived from l.
+func (l *Logger) ToZapConfig() zap.Config {
+ level, err := zap.ParseAtomicLevel(l.GetLevel().String())
+ if err != nil {
+ panic(err)
+ }
+ if !l.development {
+ config := zap.NewProductionConfig()
+ config.Level = level
+ return config
+ }
+ encoderConfig := zapcore.EncoderConfig{
+ // Keys can be anything except the empty string.
+ TimeKey: "T",
+ LevelKey: "L",
+ NameKey: "N",
+ CallerKey: "C",
+ FunctionKey: zapcore.OmitKey,
+ MessageKey: "M",
+ StacktraceKey: "S",
+ LineEnding: zapcore.DefaultLineEnding,
+ EncodeLevel: func(l zapcore.Level, pae
zapcore.PrimitiveArrayEncoder) {
+ pae.AppendString(strings.ToUpper(fmt.Sprintf("| %-6s|",
l.String())))
+ },
+ EncodeTime: func(t time.Time, pae
zapcore.PrimitiveArrayEncoder) {
+ pae.AppendString(t.Format(time.RFC3339))
+ },
+ EncodeDuration: zapcore.StringDurationEncoder,
+ EncodeCaller: zapcore.FullCallerEncoder,
+ ConsoleSeparator: " ",
+ }
+ return zap.Config{
+ Level: level,
+ Development: true,
+ Encoding: "console",
+ EncoderConfig: encoderConfig,
+ OutputPaths: []string{"stderr"},
+ ErrorOutputPaths: []string{"stderr"},
+ }
}
// Loggable indicates the implement supports logging.
diff --git a/pkg/logger/setting.go b/pkg/logger/setting.go
index 1018bfe..46a75ee 100644
--- a/pkg/logger/setting.go
+++ b/pkg/logger/setting.go
@@ -112,25 +112,21 @@ func getLogger(cfg Logging) (*Logger, error) {
return nil, err
}
var w io.Writer
- switch cfg.Env {
- case "dev":
+ development := strings.EqualFold(cfg.Env, "DEV")
+
+ if development {
cw := zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat:
time.RFC3339}
cw.FormatLevel = func(i interface{}) string {
return strings.ToUpper(fmt.Sprintf("| %-6s|", i))
}
- cw.FormatMessage = func(i interface{}) string {
- return fmt.Sprintf("***%s****", i)
- }
- cw.FormatFieldName = func(i interface{}) string {
- return fmt.Sprintf("%s:", i)
- }
- cw.FormatFieldValue = func(i interface{}) string {
- return strings.ToUpper(fmt.Sprintf("%s", i))
- }
w = io.Writer(cw)
- default:
- w = os.Stdout
+ } else {
+ w = os.Stderr
+ }
+ ctx := zerolog.New(w).Level(lvl).With().Timestamp()
+ if development {
+ ctx = ctx.Stack().Caller()
}
- l := zerolog.New(w).Level(lvl).With().Timestamp().Logger()
- return &Logger{module: rootName, Logger: &l, modules: modules}, nil
+ l := ctx.Logger()
+ return &Logger{module: rootName, Logger: &l, modules: modules,
development: development}, nil
}
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 4388e45..e217103 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -21,7 +21,6 @@ package schema
import (
"context"
"io"
- "runtime"
"sync"
"sync/atomic"
"time"
@@ -39,6 +38,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/partition"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/run"
)
// EventType defines actions of events.
@@ -115,6 +115,8 @@ type Repository interface {
Close()
}
+const defaultWorkerNum = 10
+
var _ Repository = (*schemaRepo)(nil)
type schemaRepo struct {
@@ -123,10 +125,11 @@ type schemaRepo struct {
resourceSupplier ResourceSupplier
l *logger.Logger
data map[string]*group
- workerStopCh chan struct{}
+ closer *run.Closer
eventCh chan MetadataEvent
shardTopic bus.Topic
entityTopic bus.Topic
+ workerNum int
sync.RWMutex
}
@@ -147,8 +150,9 @@ func NewRepository(
shardTopic: shardTopic,
entityTopic: entityTopic,
data: make(map[string]*group),
- eventCh: make(chan MetadataEvent),
- workerStopCh: make(chan struct{}),
+ eventCh: make(chan MetadataEvent, defaultWorkerNum),
+ workerNum: defaultWorkerNum,
+ closer: run.NewCloser(defaultWorkerNum),
}
}
@@ -157,9 +161,10 @@ func (sr *schemaRepo) SendMetadataEvent(event
MetadataEvent) {
}
func (sr *schemaRepo) Watcher() {
- for i := 0; i < 10; i++ {
+ for i := 0; i < sr.workerNum; i++ {
go func() {
defer func() {
+ sr.closer.Done()
if err := recover(); err != nil {
sr.l.Warn().Interface("err",
err).Msg("watching the events")
}
@@ -173,32 +178,32 @@ func (sr *schemaRepo) Watcher() {
if e := sr.l.Debug(); e.Enabled() {
e.Interface("event",
evt).Msg("received an event")
}
- for i := 0; i < 10; i++ {
- var err error
- switch evt.Typ {
- case EventAddOrUpdate:
- switch evt.Kind {
- case EventKindGroup:
- _, err =
sr.StoreGroup(evt.Metadata)
- case EventKindResource:
- _, err =
sr.storeResource(evt.Metadata)
- }
- case EventDelete:
- switch evt.Kind {
- case EventKindGroup:
- err =
sr.deleteGroup(evt.Metadata)
- case EventKindResource:
- err =
sr.deleteResource(evt.Metadata)
- }
+ var err error
+ switch evt.Typ {
+ case EventAddOrUpdate:
+ switch evt.Kind {
+ case EventKindGroup:
+ _, err =
sr.StoreGroup(evt.Metadata)
+ case EventKindResource:
+ _, err =
sr.storeResource(evt.Metadata)
+ }
+ case EventDelete:
+ switch evt.Kind {
+ case EventKindGroup:
+ err =
sr.deleteGroup(evt.Metadata)
+ case EventKindResource:
+ err =
sr.deleteResource(evt.Metadata)
}
- if err == nil {
- break
+ }
+ if err != nil {
+
sr.l.Err(err).Interface("event", evt).Msg("fail to handle the metadata event.
retry...")
+ select {
+ case sr.eventCh <- evt:
+ case <-sr.closer.CloseNotify():
+ return
}
- runtime.Gosched()
- time.Sleep(time.Second)
-
sr.l.Err(err).Interface("event", evt).Int("round", i).Msg("fail to handle the
metadata event. retry...")
}
- case <-sr.workerStopCh:
+ case <-sr.closer.CloseNotify():
return
}
}
@@ -370,9 +375,7 @@ func (sr *schemaRepo) Close() {
sr.l.Warn().Interface("err", err).Msg("closing
resource")
}
}()
- if sr.workerStopCh != nil {
- close(sr.workerStopCh)
- }
+ sr.closer.CloseThenWait()
sr.RLock()
defer sr.RUnlock()
diff --git a/test/docker/base-compose.yml b/test/docker/base-compose.yml
index 908b269..adcd5c0 100644
--- a/test/docker/base-compose.yml
+++ b/test/docker/base-compose.yml
@@ -21,7 +21,7 @@ services:
- 6060
command: standalone
healthcheck:
- test: ["CMD", "/grpc-health-probe", "-addr=localhost:17912"]
+ test: wget --spider http://localhost:17913 || wget --spider
--no-check-certificate https://localhost:17913 || exit 1
interval: 5s
timeout: 10s
retries: 120