This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lint
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit c3adea8356fafa4e4c01a443980b619d08373e25
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri Dec 2 07:10:42 2022 +0000

    Fix lint issues in banyand
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/cmd/server/main.go         |  1 +
 banyand/discovery/discovery.go     |  4 ++++
 banyand/doc.go                     |  1 +
 banyand/internal/cmd/root.go       |  1 +
 banyand/internal/cmd/standalone.go |  2 +-
 banyand/kv/badger.go               | 21 +++++++---------
 banyand/kv/kv.go                   | 40 ++++++++++++++++++-------------
 banyand/liaison/grpc/discovery.go  |  6 ++---
 banyand/liaison/grpc/measure.go    |  9 +++----
 banyand/liaison/grpc/registry.go   |  2 +-
 banyand/liaison/grpc/server.go     | 34 +++++++++++++-------------
 banyand/liaison/grpc/stream.go     |  7 ++----
 banyand/liaison/http/health.go     |  4 ++--
 banyand/liaison/http/server.go     | 14 +++++------
 banyand/liaison/liaison.go         | 10 +++-----
 banyand/measure/field_flag_test.go |  1 +
 banyand/measure/measure.go         |  7 +++---
 banyand/measure/measure_query.go   |  6 +++--
 banyand/measure/measure_topn.go    | 10 ++++----
 banyand/measure/measure_write.go   | 10 ++++----
 banyand/measure/metadata.go        |  2 +-
 banyand/measure/service.go         |  6 +++--
 banyand/metadata/metadata.go       |  5 ++++
 banyand/observability/metric.go    |  2 ++
 banyand/observability/pprof.go     |  1 +
 banyand/observability/type.go      |  4 ++++
 banyand/query/processor.go         |  3 ++-
 banyand/query/query.go             | 10 ++++----
 banyand/queue/local.go             |  2 +-
 banyand/queue/queue.go             |  3 +++
 banyand/stream/metadata.go         |  2 +-
 banyand/stream/service.go          |  9 +++----
 banyand/stream/stream.go           |  4 +++-
 banyand/stream/stream_query.go     |  6 +++--
 banyand/stream/stream_write.go     |  6 ++---
 banyand/tsdb/block.go              | 12 +++++-----
 banyand/tsdb/block_ctrl.go         | 10 ++++----
 banyand/tsdb/bucket/bucket.go      | 23 +++++++++++-------
 banyand/tsdb/bucket/queue.go       | 18 +++++++++-----
 banyand/tsdb/bucket/strategy.go    | 33 +++++++++++++++----------
 banyand/tsdb/index/writer.go       | 16 ++++++-------
 banyand/tsdb/indexdb.go            | 14 ++++++-----
 banyand/tsdb/metric.go             |  1 +
 banyand/tsdb/retention.go          |  2 +-
 banyand/tsdb/scope.go              | 21 ++++++++--------
 banyand/tsdb/segment.go            |  2 +-
 banyand/tsdb/segment_ctrl.go       |  8 +++----
 banyand/tsdb/series.go             | 22 ++++++++++-------
 banyand/tsdb/series_seek.go        |  4 ++++
 banyand/tsdb/series_seek_filter.go |  6 ++---
 banyand/tsdb/series_seek_sort.go   |  4 ++--
 banyand/tsdb/series_write.go       | 21 ++++++++--------
 banyand/tsdb/seriesdb.go           | 48 +++++++++++++++++++++++++++++--------
 banyand/tsdb/seriesdb_test.go      |  2 +-
 banyand/tsdb/shard.go              | 22 +++++++----------
 banyand/tsdb/tsdb.go               | 49 +++++++++++++++++++++++++++-----------
 pkg/schema/metadata.go             |  4 ++--
 pkg/test/setup/setup.go            |  2 +-
 58 files changed, 353 insertions(+), 246 deletions(-)

diff --git a/banyand/cmd/server/main.go b/banyand/cmd/server/main.go
index 6a2568c..8906f84 100644
--- a/banyand/cmd/server/main.go
+++ b/banyand/cmd/server/main.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package main implements the executable banyandb server named banyand.
 package main
 
 import (
diff --git a/banyand/discovery/discovery.go b/banyand/discovery/discovery.go
index 34c4cf8..daacd59 100644
--- a/banyand/discovery/discovery.go
+++ b/banyand/discovery/discovery.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package discovery implements the service discovery.
 package discovery
 
 import (
@@ -24,6 +25,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// ServiceRepo provides service subscripting and publishing.
+//
 //go:generate mockgen -destination=./discovery_mock.go -package=discovery 
github.com/apache/skywalking-banyandb/banyand/discovery ServiceRepo
 type ServiceRepo interface {
        NodeID() string
@@ -52,6 +55,7 @@ func (r *repo) Publish(topic bus.Topic, message 
...bus.Message) (bus.Future, err
        return r.local.Publish(topic, message...)
 }
 
+// NewServiceRepo return a new ServiceRepo.
 func NewServiceRepo(_ context.Context) (ServiceRepo, error) {
        return &repo{
                local: bus.NewBus(),
diff --git a/banyand/doc.go b/banyand/doc.go
index 94f961d..ab85caf 100644
--- a/banyand/doc.go
+++ b/banyand/doc.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package banyand implements a executable database server.
 package banyand
 
 /*
diff --git a/banyand/internal/cmd/root.go b/banyand/internal/cmd/root.go
index 01cc159..c5dd966 100644
--- a/banyand/internal/cmd/root.go
+++ b/banyand/internal/cmd/root.go
@@ -33,6 +33,7 @@ const logo = `
 ╚═════╝ ╚═╝  ╚═╝╚═╝  ╚═══╝   ╚═╝   ╚═╝  ╚═╝╚═╝  ╚═══╝╚═════╝ ╚═════╝ 
 `
 
+// NewRoot returns a root command.
 func NewRoot() *cobra.Command {
        cmd := &cobra.Command{
                DisableAutoGenTag: true,
diff --git a/banyand/internal/cmd/standalone.go 
b/banyand/internal/cmd/standalone.go
index 020fb51..2f3b2ad 100644
--- a/banyand/internal/cmd/standalone.go
+++ b/banyand/internal/cmd/standalone.go
@@ -65,7 +65,7 @@ func newStandaloneCmd() *cobra.Command {
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate measure service")
        }
-       q, err := query.NewExecutor(ctx, streamSvc, measureSvc, metaSvc, repo, 
pipeline)
+       q, err := query.NewService(ctx, streamSvc, measureSvc, metaSvc, repo, 
pipeline)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate query processor")
        }
diff --git a/banyand/kv/badger.go b/banyand/kv/badger.go
index 584a664..6fb9c0b 100644
--- a/banyand/kv/badger.go
+++ b/banyand/kv/badger.go
@@ -34,12 +34,13 @@ import (
 )
 
 var (
-       _              Store           = (*badgerDB)(nil)
-       _              IndexStore      = (*badgerDB)(nil)
-       _              y.Iterator      = (*mergedIter)(nil)
-       _              TimeSeriesStore = (*badgerTSS)(nil)
-       bitMergeEntry  byte            = 1 << 3
-       ErrKeyNotFound                 = badger.ErrKeyNotFound
+       _             Store           = (*badgerDB)(nil)
+       _             IndexStore      = (*badgerDB)(nil)
+       _             y.Iterator      = (*mergedIter)(nil)
+       _             TimeSeriesStore = (*badgerTSS)(nil)
+       bitMergeEntry byte            = 1 << 3
+       // ErrKeyNotFound denotes the expected key can not be got from the kv 
service.
+       ErrKeyNotFound = badger.ErrKeyNotFound
 )
 
 type badgerTSS struct {
@@ -147,12 +148,6 @@ func (b *badgerDB) Stats() observability.Statistics {
        return badgerStats(b.db)
 }
 
-func (b *badgerDB) Handover(iterator Iterator) error {
-       return b.db.HandoverIterator(&mergedIter{
-               delegated: iterator,
-       })
-}
-
 func (b *badgerDB) Scan(prefix, seekKey []byte, opt ScanOpts, f ScanFunc) 
error {
        opts := badger.DefaultIteratorOptions
        opts.PrefetchSize = opt.PrefetchSize
@@ -173,7 +168,7 @@ func (b *badgerDB) Scan(prefix, seekKey []byte, opt 
ScanOpts, f ScanFunc) error
                err := f(b.shardID, k, func() ([]byte, error) {
                        return y.Copy(it.Value().Value), nil
                })
-               if errors.Is(err, ErrStopScan) {
+               if errors.Is(err, errStopScan) {
                        break
                }
                if err != nil {
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index f180d55..fb3960a 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package kv implements a key-value engine.
 package kv
 
 import (
@@ -31,21 +32,25 @@ import (
 )
 
 var (
-       ErrStopScan     = errors.New("stop scanning")
+       errStopScan = errors.New("stop scanning")
+
+       // DefaultScanOpts is a helper to provides canonical options for 
scanning.
        DefaultScanOpts = ScanOpts{
                PrefetchSize:   100,
                PrefetchValues: true,
        }
 )
 
-type Writer interface {
+type writer interface {
        // Put a value
        Put(key, val []byte) error
        PutWithVersion(key, val []byte, version uint64) error
 }
 
+// ScanFunc is the closure executed on scanning out a pair of key-value.
 type ScanFunc func(shardID int, key []byte, getVal func() ([]byte, error)) 
error
 
+// ScanOpts wraps options for scanning the kv storage.
 type ScanOpts struct {
        Prefix         []byte
        PrefetchSize   int
@@ -53,6 +58,7 @@ type ScanOpts struct {
        Reverse        bool
 }
 
+// Reader allows retrieving data from kv.
 type Reader interface {
        Iterable
        // Get a value by its key
@@ -65,10 +71,11 @@ type Reader interface {
 type Store interface {
        observability.Observable
        io.Closer
-       Writer
+       writer
        Reader
 }
 
+// TimeSeriesWriter allows writing to a time-series storage.
 type TimeSeriesWriter interface {
        // Put a value with a timestamp/version
        Put(key, val []byte, ts uint64) error
@@ -77,6 +84,7 @@ type TimeSeriesWriter interface {
        PutAsync(key, val []byte, ts uint64, f func(error)) error
 }
 
+// TimeSeriesReader allows retrieving data from a time-series storage.
 type TimeSeriesReader interface {
        // Get a value by its key and timestamp/version
        Get(key []byte, ts uint64) ([]byte, error)
@@ -91,6 +99,7 @@ type TimeSeriesStore interface {
        TimeSeriesReader
 }
 
+// TimeSeriesOptions sets an options for creating a TimeSeriesStore.
 type TimeSeriesOptions func(TimeSeriesStore)
 
 // TSSWithLogger sets a external logger into underlying TimeSeriesStore.
@@ -104,6 +113,7 @@ func TSSWithLogger(l *logger.Logger) TimeSeriesOptions {
        }
 }
 
+// TSSWithEncoding sets encoding and decoding pools for flushing and 
compacting.
 func TSSWithEncoding(encoderPool encoding.SeriesEncoderPool, decoderPool 
encoding.SeriesDecoderPool) TimeSeriesOptions {
        return func(store TimeSeriesStore) {
                if btss, ok := store.(*badgerTSS); ok {
@@ -117,25 +127,20 @@ func TSSWithEncoding(encoderPool 
encoding.SeriesEncoderPool, decoderPool encodin
        }
 }
 
-func TSSWithFlushCallback(callback func()) TimeSeriesOptions {
+// TSSWithMemTableSize sets the size of memory table in bytes.
+func TSSWithMemTableSize(sizeInBytes int64) TimeSeriesOptions {
        return func(store TimeSeriesStore) {
-               if btss, ok := store.(*badgerTSS); ok {
-                       btss.dbOpts.FlushCallBack = callback
-               }
-       }
-}
-
-func TSSWithMemTableSize(size int64) TimeSeriesOptions {
-       return func(store TimeSeriesStore) {
-               if size < 1 {
+               if sizeInBytes < 1 {
                        return
                }
                if btss, ok := store.(*badgerTSS); ok {
-                       btss.dbOpts.MemTableSize = size
+                       btss.dbOpts.MemTableSize = sizeInBytes
                }
        }
 }
 
+// Iterator allows iterating the kv tables.
+// TODO: use generic to provide a unique iterator
 type Iterator interface {
        Next()
        Rewind()
@@ -147,17 +152,16 @@ type Iterator interface {
        Close() error
 }
 
+// Iterable allows creating a Iterator.
 type Iterable interface {
        NewIterator(opt ScanOpts) Iterator
 }
 
-type HandoverCallback func()
-
+// IndexStore allows writing and reading index format data.
 type IndexStore interface {
        observability.Observable
        Iterable
        Reader
-       Handover(iterator Iterator) error
        Close() error
 }
 
@@ -184,6 +188,7 @@ func OpenTimeSeriesStore(shardID int, path string, options 
...TimeSeriesOptions)
        return btss, nil
 }
 
+// StoreOptions sets options for creating Store.
 type StoreOptions func(Store)
 
 // StoreWithLogger sets a external logger into underlying Store.
@@ -236,6 +241,7 @@ func OpenStore(shardID int, path string, options 
...StoreOptions) (Store, error)
        return bdb, nil
 }
 
+// IndexOptions sets options for creating the index store.
 type IndexOptions func(store IndexStore)
 
 // IndexWithLogger sets a external logger into underlying IndexStore.
diff --git a/banyand/liaison/grpc/discovery.go 
b/banyand/liaison/grpc/discovery.go
index cbff00e..bb9f66e 100644
--- a/banyand/liaison/grpc/discovery.go
+++ b/banyand/liaison/grpc/discovery.go
@@ -33,7 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
-var ErrNotExist = errors.New("the object doesn't exist")
+var errNotExist = errors.New("the object doesn't exist")
 
 type discoveryService struct {
        shardRepo  *shardRepo
@@ -61,11 +61,11 @@ func (ds *discoveryService) navigate(metadata 
*commonv1.Metadata, tagFamilies []
                Name: metadata.Group,
        }))
        if !existed {
-               return nil, nil, common.ShardID(0), errors.Wrapf(ErrNotExist, 
"finding the shard num by: %v", metadata)
+               return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the shard num by: %v", metadata)
        }
        locator, existed := ds.entityRepo.getLocator(getID(metadata))
        if !existed {
-               return nil, nil, common.ShardID(0), errors.Wrapf(ErrNotExist, 
"finding the locator by: %v", metadata)
+               return nil, nil, common.ShardID(0), errors.Wrapf(errNotExist, 
"finding the locator by: %v", metadata)
        }
        return locator.Locate(metadata.Name, tagFamilies, shardNum)
 }
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index be38a38..a0c8d0b 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -43,10 +43,7 @@ type measureService struct {
 
 func (ms *measureService) Write(measure measurev1.MeasureService_WriteServer) 
error {
        reply := func() error {
-               if err := measure.Send(&measurev1.WriteResponse{}); err != nil {
-                       return err
-               }
-               return nil
+               return measure.Send(&measurev1.WriteResponse{})
        }
        sampled := ms.log.Sample(&zerolog.BasicSampler{N: 10})
        for {
@@ -118,7 +115,7 @@ func (ms *measureService) Query(_ context.Context, req 
*measurev1.QueryRequest)
        case []*measurev1.DataPoint:
                return &measurev1.QueryResponse{DataPoints: d}, nil
        case common.Error:
-               return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
+               return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
        return nil, nil
 }
@@ -142,7 +139,7 @@ func (ms *measureService) TopN(_ context.Context, 
topNRequest *measurev1.TopNReq
        case []*measurev1.TopNList:
                return &measurev1.TopNResponse{Lists: d}, nil
        case common.Error:
-               return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
+               return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
        return nil, nil
 }
diff --git a/banyand/liaison/grpc/registry.go b/banyand/liaison/grpc/registry.go
index 767cb6b..87927e7 100644
--- a/banyand/liaison/grpc/registry.go
+++ b/banyand/liaison/grpc/registry.go
@@ -400,7 +400,7 @@ func (rs *groupRegistryServer) Get(ctx context.Context, req 
*databasev1.GroupReg
        }, nil
 }
 
-func (rs *groupRegistryServer) List(ctx context.Context, req 
*databasev1.GroupRegistryServiceListRequest) (
+func (rs *groupRegistryServer) List(ctx context.Context, _ 
*databasev1.GroupRegistryServiceListRequest) (
        *databasev1.GroupRegistryServiceListResponse, error,
 ) {
        groups, err := rs.schemaRegistry.GroupRegistry().ListGroup(ctx)
diff --git a/banyand/liaison/grpc/server.go b/banyand/liaison/grpc/server.go
index 4aa66ba..5a58d80 100644
--- a/banyand/liaison/grpc/server.go
+++ b/banyand/liaison/grpc/server.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package grpc implements the gRPC services defined by APIs.
 package grpc
 
 import (
@@ -45,13 +46,13 @@ import (
 const defaultRecvSize = 1024 * 1024 * 10
 
 var (
-       ErrServerCert = errors.New("invalid server cert file")
-       ErrServerKey  = errors.New("invalid server key file")
-       ErrNoAddr     = errors.New("no address")
-       ErrQueryMsg   = errors.New("invalid query message")
+       errServerCert = errors.New("invalid server cert file")
+       errServerKey  = errors.New("invalid server key file")
+       errNoAddr     = errors.New("no address")
+       errQueryMsg   = errors.New("invalid query message")
 )
 
-type Server struct {
+type server struct {
        pipeline queue.Queue
        creds    credentials.TransportCredentials
        repo     discovery.ServiceRepo
@@ -74,8 +75,9 @@ type Server struct {
        tls            bool
 }
 
-func NewServer(_ context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) *Server {
-       return &Server{
+// NewServer returns a new gRPC server.
+func NewServer(_ context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) run.Unit {
+       return &server{
                pipeline: pipeline,
                repo:     repo,
                streamSVC: &streamService{
@@ -108,7 +110,7 @@ func NewServer(_ context.Context, pipeline queue.Queue, 
repo discovery.ServiceRe
        }
 }
 
-func (s *Server) PreRun() error {
+func (s *server) PreRun() error {
        s.log = logger.GetLogger("liaison-grpc")
        components := []struct {
                discoverySVC *discoveryService
@@ -140,11 +142,11 @@ func (s *Server) PreRun() error {
        return nil
 }
 
-func (s *Server) Name() string {
+func (s *server) Name() string {
        return "grpc"
 }
 
-func (s *Server) FlagSet() *run.FlagSet {
+func (s *server) FlagSet() *run.FlagSet {
        fs := run.NewFlagSet("grpc")
        fs.IntVarP(&s.maxRecvMsgSize, "max-recv-msg-size", "", defaultRecvSize, 
"the size of max receiving message")
        fs.BoolVarP(&s.tls, "tls", "", false, "connection uses TLS if true, 
else plain TCP")
@@ -154,18 +156,18 @@ func (s *Server) FlagSet() *run.FlagSet {
        return fs
 }
 
-func (s *Server) Validate() error {
+func (s *server) Validate() error {
        if s.addr == "" {
-               return ErrNoAddr
+               return errNoAddr
        }
        if !s.tls {
                return nil
        }
        if s.certFile == "" {
-               return ErrServerCert
+               return errServerCert
        }
        if s.keyFile == "" {
-               return ErrServerKey
+               return errServerKey
        }
        creds, errTLS := credentials.NewServerTLSFromFile(s.certFile, s.keyFile)
        if errTLS != nil {
@@ -175,7 +177,7 @@ func (s *Server) Validate() error {
        return nil
 }
 
-func (s *Server) Serve() run.StopNotify {
+func (s *server) Serve() run.StopNotify {
        var opts []grpclib.ServerOption
        if s.tls {
                opts = []grpclib.ServerOption{grpclib.Creds(s.creds)}
@@ -215,7 +217,7 @@ func (s *Server) Serve() run.StopNotify {
        return s.stopCh
 }
 
-func (s *Server) GracefulStop() {
+func (s *server) GracefulStop() {
        s.log.Info().Msg("stopping")
        stopped := make(chan struct{})
        go func() {
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index 191df78..f70e188 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -43,10 +43,7 @@ type streamService struct {
 
 func (s *streamService) Write(stream streamv1.StreamService_WriteServer) error 
{
        reply := func() error {
-               if err := stream.Send(&streamv1.WriteResponse{}); err != nil {
-                       return err
-               }
-               return nil
+               return stream.Send(&streamv1.WriteResponse{})
        }
        sampled := s.log.Sample(&zerolog.BasicSampler{N: 10})
        for {
@@ -127,7 +124,7 @@ func (s *streamService) Query(_ context.Context, req 
*streamv1.QueryRequest) (*s
        case []*streamv1.Element:
                return &streamv1.QueryResponse{Elements: d}, nil
        case common.Error:
-               return nil, errors.WithMessage(ErrQueryMsg, d.Msg())
+               return nil, errors.WithMessage(errQueryMsg, d.Msg())
        }
        return nil, nil
 }
diff --git a/banyand/liaison/http/health.go b/banyand/liaison/http/health.go
index 7681e21..51f6272 100644
--- a/banyand/liaison/http/health.go
+++ b/banyand/liaison/http/health.go
@@ -56,7 +56,7 @@ type healthCheckClient struct {
        conn *grpc.ClientConn
 }
 
-func (g *healthCheckClient) Check(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(*grpc_health_v1.HealthCheckResponse, error) {
+func (g *healthCheckClient) Check(ctx context.Context, _ 
*grpc_health_v1.HealthCheckRequest, _ ...grpc.CallOption) 
(*grpc_health_v1.HealthCheckResponse, error) {
        var resp *grpc_health_v1.HealthCheckResponse
        if err := grpchelper.Request(ctx, 10*time.Second, func(rpcCtx 
context.Context) (err error) {
                resp, err = grpc_health_v1.NewHealthClient(g.conn).Check(rpcCtx,
@@ -70,6 +70,6 @@ func (g *healthCheckClient) Check(ctx context.Context, r 
*grpc_health_v1.HealthC
        return resp, nil
 }
 
-func (g *healthCheckClient) Watch(ctx context.Context, r 
*grpc_health_v1.HealthCheckRequest, opts ...grpc.CallOption) 
(grpc_health_v1.Health_WatchClient, error) {
+func (g *healthCheckClient) Watch(_ context.Context, _ 
*grpc_health_v1.HealthCheckRequest, _ ...grpc.CallOption) 
(grpc_health_v1.Health_WatchClient, error) {
        return nil, status.Error(codes.Unimplemented, "unimplemented")
 }
diff --git a/banyand/liaison/http/server.go b/banyand/liaison/http/server.go
index 4f51e25..6008bc9 100644
--- a/banyand/liaison/http/server.go
+++ b/banyand/liaison/http/server.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package http implements the gRPC gateway.
 package http
 
 import (
@@ -40,14 +41,13 @@ import (
        "github.com/apache/skywalking-banyandb/ui"
 )
 
-type ServiceRepo interface {
-       run.Config
-       run.Service
-}
-
-var _ ServiceRepo = (*service)(nil)
+var (
+       _ run.Config  = (*service)(nil)
+       _ run.Service = (*service)(nil)
+)
 
-func NewService() ServiceRepo {
+// NewService return a http service.
+func NewService() run.Unit {
        return &service{
                stopCh: make(chan struct{}),
        }
diff --git a/banyand/liaison/liaison.go b/banyand/liaison/liaison.go
index e5a19dc..bb59f22 100644
--- a/banyand/liaison/liaison.go
+++ b/banyand/liaison/liaison.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package liaison implements a transmission layer between a data layer and a 
client.
 package liaison
 
 import (
@@ -27,12 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Endpoint interface {
-       run.Config
-       run.PreRunner
-       run.Service
-}
-
-func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) (Endpoint, error) {
+// NewEndpoint return a new endpoint which is the entry point for the database 
server.
+func NewEndpoint(ctx context.Context, pipeline queue.Queue, repo 
discovery.ServiceRepo, schemaRegistry metadata.Service) (run.Unit, error) {
        return grpc.NewServer(ctx, pipeline, repo, schemaRegistry), nil
 }
diff --git a/banyand/measure/field_flag_test.go 
b/banyand/measure/field_flag_test.go
index ae0845f..c47e457 100644
--- a/banyand/measure/field_flag_test.go
+++ b/banyand/measure/field_flag_test.go
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package measure
 
 import (
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 0cc528f..4a2e3be 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -15,14 +15,15 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package measures implements a time-series-based storage which is consists 
of a sequence of data points.
+// Each data point contains tags and fields. They arrive in a fixed interval. 
A data point could be updated
+// by one with the identical entity(series_id) and timestamp.
 package measure
 
 import (
        "context"
        "time"
 
-       "go.uber.org/multierr"
-
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
@@ -74,7 +75,7 @@ func (s *measure) EntityLocator() partition.EntityLocator {
 }
 
 func (s *measure) Close() error {
-       return multierr.Combine(s.processorManager.Close(), 
s.indexWriter.Close())
+       return s.processorManager.Close()
 }
 
 func (s *measure) parseSpec() (err error) {
diff --git a/banyand/measure/measure_query.go b/banyand/measure/measure_query.go
index 1220e73..17ea882 100644
--- a/banyand/measure/measure_query.go
+++ b/banyand/measure/measure_query.go
@@ -35,13 +35,15 @@ import (
        resourceSchema "github.com/apache/skywalking-banyandb/pkg/schema"
 )
 
-var ErrTagFamilyNotExist = errors.New("tag family doesn't exist")
+var errTagFamilyNotExist = errors.New("tag family doesn't exist")
 
+// Query allow to retrieve measure data points.
 type Query interface {
        LoadGroup(name string) (resourceSchema.Group, bool)
        Measure(measure *commonv1.Metadata) (Measure, error)
 }
 
+// Measure allows inspecting measure data points' details.
 type Measure interface {
        io.Closer
        Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
@@ -135,7 +137,7 @@ func (s *measure) ParseTagFamily(family string, item 
tsdb.Item) (*modelv1.TagFam
                }
        }
        if tagSpec == nil {
-               return nil, ErrTagFamilyNotExist
+               return nil, errTagFamilyNotExist
        }
        for i, tag := range tagFamily.GetTags() {
                tags[i] = &modelv1.Tag{
diff --git a/banyand/measure/measure_topn.go b/banyand/measure/measure_topn.go
index e5ad23d..4d51099 100644
--- a/banyand/measure/measure_topn.go
+++ b/banyand/measure/measure_topn.go
@@ -47,7 +47,8 @@ import (
 
 const (
        timeBucketFormat = "200601021504"
-       TopNTagFamily    = "__topN__"
+       // TopNTagFamily is the identity of a tag family which contains the 
topN calculated result.
+       TopNTagFamily = "__topN__"
 )
 
 var (
@@ -57,6 +58,7 @@ var (
 
        errUnsupportedConditionValueType = errors.New("unsupported value type 
in the condition")
 
+       // TopNValueFieldSpec denotes the field specification of the topN 
calculated result.
        TopNValueFieldSpec = &databasev1.FieldSpec{
                Name:              "value",
                FieldType:         databasev1.FieldType_FIELD_TYPE_INT,
@@ -109,7 +111,7 @@ func (t *topNStreamingProcessor) run(ctx context.Context) {
 
 // Teardown is called by the Flow as a lifecycle hook.
 // So we should not block on err channel within this method.
-func (t *topNStreamingProcessor) Teardown(ctx context.Context) error {
+func (t *topNStreamingProcessor) Teardown(_ context.Context) error {
        t.Wait()
        return nil
 }
@@ -280,13 +282,13 @@ func (t *topNStreamingProcessor) start() 
*topNStreamingProcessor {
                        streaming.WithSortKeyExtractor(func(record 
flow.StreamRecord) int64 {
                                return record.Data().(flow.Data)[1].(int64)
                        }),
-                       OrderBy(t.topNSchema.GetFieldValueSort()),
+                       orderBy(t.topNSchema.GetFieldValueSort()),
                ).To(t).Open()
        go t.handleError()
        return t
 }
 
-func OrderBy(sort modelv1.Sort) streaming.TopNOption {
+func orderBy(sort modelv1.Sort) streaming.TopNOption {
        if sort == modelv1.Sort_SORT_ASC {
                return streaming.OrderBy(streaming.ASC)
        }
diff --git a/banyand/measure/measure_write.go b/banyand/measure/measure_write.go
index ff26fc2..a29c535 100644
--- a/banyand/measure/measure_write.go
+++ b/banyand/measure/measure_write.go
@@ -37,7 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var ErrMalformedElement = errors.New("element is malformed")
+var errMalformedElement = errors.New("element is malformed")
 
 func (s *measure) write(md *commonv1.Metadata, shardID common.ShardID, entity 
[]byte, entityValues tsdb.EntityValues, value *measurev1.DataPointValue) error {
        t := value.GetTimestamp().AsTime().Local()
@@ -47,10 +47,10 @@ func (s *measure) write(md *commonv1.Metadata, shardID 
common.ShardID, entity []
        sm := s.schema
        fLen := len(value.GetTagFamilies())
        if fLen < 1 {
-               return errors.Wrap(ErrMalformedElement, "no tag family")
+               return errors.Wrap(errMalformedElement, "no tag family")
        }
        if fLen > len(sm.TagFamilies) {
-               return errors.Wrap(ErrMalformedElement, "tag family number is 
more than expected")
+               return errors.Wrap(errMalformedElement, "tag family number is 
more than expected")
        }
        shard, err := s.databaseSupplier.SupplyTSDB().Shard(shardID)
        if err != nil {
@@ -80,7 +80,7 @@ func (s *measure) write(md *commonv1.Metadata, shardID 
common.ShardID, entity []
                        builder.Family(familyIdentity(spec.GetName(), 
pbv1.TagFlag), bb)
                }
                if len(value.GetFields()) > len(sm.GetFields()) {
-                       return nil, errors.Wrap(ErrMalformedElement, "fields 
number is more than expected")
+                       return nil, errors.Wrap(errMalformedElement, "fields 
number is more than expected")
                }
                for fi, fieldValue := range value.GetFields() {
                        fieldSpec := sm.GetFields()[fi]
@@ -90,7 +90,7 @@ func (s *measure) write(md *commonv1.Metadata, shardID 
common.ShardID, entity []
                                continue
                        }
                        if fType != fieldSpec.GetFieldType() {
-                               return nil, errors.Wrapf(ErrMalformedElement, 
"field %s type is unexpected", fieldSpec.GetName())
+                               return nil, errors.Wrapf(errMalformedElement, 
"field %s type is unexpected", fieldSpec.GetName())
                        }
                        data := encodeFieldValue(fieldValue)
                        if data == nil {
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 88aa56c..06b1f97 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -209,7 +209,7 @@ func (s *supplier) OpenResource(shardNum uint32, db 
tsdb.Supplier, spec resource
        }, s.l)
 }
 
-func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
+func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        return s.metadata.MeasureRegistry().GetMeasure(ctx, md)
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 6b15ede..60e615d 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -38,10 +38,12 @@ import (
 )
 
 var (
-       ErrEmptyRootPath   = errors.New("root path is empty")
+       errEmptyRootPath = errors.New("root path is empty")
+       // ErrMeasureNotExist denotes a measure doesn't exist in the metadata 
repo.
        ErrMeasureNotExist = errors.New("measure doesn't exist")
 )
 
+// Service allows inspecting the measure data points.
 type Service interface {
        run.PreRunner
        run.Config
@@ -85,7 +87,7 @@ func (s *service) FlagSet() *run.FlagSet {
 
 func (s *service) Validate() error {
        if s.root == "" {
-               return ErrEmptyRootPath
+               return errEmptyRootPath
        }
        return nil
 }
diff --git a/banyand/metadata/metadata.go b/banyand/metadata/metadata.go
index acaf677..44f5953 100644
--- a/banyand/metadata/metadata.go
+++ b/banyand/metadata/metadata.go
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package metadata implements a Raft-based distributed metadata storage 
system.
+// Powered by etcd.
 package metadata
 
 import (
@@ -39,6 +41,7 @@ type IndexFilter interface {
        Subjects(ctx context.Context, indexRule *databasev1.IndexRule, catalog 
commonv1.Catalog) ([]schema.Spec, error)
 }
 
+// Repo is the facade to interact with the metadata repository.
 type Repo interface {
        IndexFilter
        StreamRegistry() schema.Stream
@@ -50,6 +53,7 @@ type Repo interface {
        PropertyRegistry() schema.Property
 }
 
+// Service is the metadata repository.
 type Service interface {
        Repo
        run.PreRunner
@@ -101,6 +105,7 @@ func (s *service) GracefulStop() {
        <-s.schemaRegistry.StopNotify()
 }
 
+// NewService returns a new metadata repository Service.
 func NewService(_ context.Context) (Service, error) {
        return &service{}, nil
 }
diff --git a/banyand/observability/metric.go b/banyand/observability/metric.go
index b4598c6..9430237 100644
--- a/banyand/observability/metric.go
+++ b/banyand/observability/metric.go
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package observability
 
 import (
@@ -31,6 +32,7 @@ var (
        _ run.Config  = (*metricService)(nil)
 )
 
+// NewMetricService returns a metric service.
 func NewMetricService() run.Service {
        return &metricService{
                closer: run.NewCloser(1),
diff --git a/banyand/observability/pprof.go b/banyand/observability/pprof.go
index 1340f9b..0e65136 100644
--- a/banyand/observability/pprof.go
+++ b/banyand/observability/pprof.go
@@ -31,6 +31,7 @@ var (
        _ run.Config  = (*metricService)(nil)
 )
 
+// NewProfService returns a pprof service.
 func NewProfService() run.Service {
        return &pprofService{
                closer: run.NewCloser(1),
diff --git a/banyand/observability/type.go b/banyand/observability/type.go
index f6829d1..d6e4b09 100644
--- a/banyand/observability/type.go
+++ b/banyand/observability/type.go
@@ -14,17 +14,21 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
+// Package observability provides metrics, profiling, and etc.
 package observability
 
 import "errors"
 
 var errNoAddr = errors.New("no address")
 
+// Statistics represents a sample of a module.
 type Statistics struct {
        MemBytes    int64
        MaxMemBytes int64
 }
 
+// Observable allows sampling.
 type Observable interface {
        Stats() Statistics
 }
diff --git a/banyand/query/processor.go b/banyand/query/processor.go
index 43100f2..6171b1a 100644
--- a/banyand/query/processor.go
+++ b/banyand/query/processor.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/query/executor"
        logical_measure 
"github.com/apache/skywalking-banyandb/pkg/query/logical/measure"
        logical_stream 
"github.com/apache/skywalking-banyandb/pkg/query/logical/stream"
+       "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
 const (
@@ -44,7 +45,7 @@ const (
 )
 
 var (
-       _ Executor            = (*queryService)(nil)
+       _ run.PreRunner       = (*queryService)(nil)
        _ bus.MessageListener = (*streamQueryProcessor)(nil)
        _ bus.MessageListener = (*measureQueryProcessor)(nil)
        _ bus.MessageListener = (*topNQueryProcessor)(nil)
diff --git a/banyand/query/query.go b/banyand/query/query.go
index 267a922..3e4eab1 100644
--- a/banyand/query/query.go
+++ b/banyand/query/query.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package query implement the query module for liaison and other modules to 
retrieve data.
 package query
 
 import (
@@ -28,13 +29,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
-type Executor interface {
-       run.PreRunner
-}
-
-func NewExecutor(_ context.Context, streamService stream.Service, 
measureService measure.Service,
+// NewService return a new query service.
+func NewService(_ context.Context, streamService stream.Service, 
measureService measure.Service,
        metaService metadata.Service, serviceRepo discovery.ServiceRepo, 
pipeline queue.Queue,
-) (Executor, error) {
+) (run.Unit, error) {
        svc := &queryService{
                metaService: metaService,
                serviceRepo: serviceRepo,
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index 1061cbf..ec4a95e 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-// Package queue implements the data transmission queue
+// Package queue implements the data transmission queue.
 package queue
 
 import (
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 7941a9b..0946ecb 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -25,6 +25,8 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/run"
 )
 
+// Queue builds a data transmission tunnel between subscribers and publishers.
+//
 //go:generate mockgen -destination=./queue_mock.go -package=queue 
github.com/apache/skywalking-banyandb/pkg/bus MessageListener
 type Queue interface {
        run.Unit
@@ -33,6 +35,7 @@ type Queue interface {
        run.Service
 }
 
+// NewQueue return a new Queue which relies on the discovery service.
 func NewQueue(_ context.Context, repo discovery.ServiceRepo) (Queue, error) {
        return &local{
                repo:   repo,
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 5b200d9..286a276 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -195,7 +195,7 @@ func (s *supplier) OpenResource(shardNum uint32, db 
tsdb.Supplier, spec resource
        }, s.l), nil
 }
 
-func (s *supplier) ResourceSchema(repo metadata.Repo, md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
+func (s *supplier) ResourceSchema(md *commonv1.Metadata) 
(resourceSchema.ResourceSchema, error) {
        ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
        defer cancel()
        return s.metadata.StreamRegistry().GetStream(ctx, md)
diff --git a/banyand/stream/service.go b/banyand/stream/service.go
index 77c4989..ac95748 100644
--- a/banyand/stream/service.go
+++ b/banyand/stream/service.go
@@ -36,10 +36,11 @@ import (
 )
 
 var (
-       ErrEmptyRootPath  = errors.New("root path is empty")
-       ErrStreamNotExist = errors.New("stream doesn't exist")
+       errEmptyRootPath  = errors.New("root path is empty")
+       errStreamNotExist = errors.New("stream doesn't exist")
 )
 
+// Service allows inspecting the stream elements.
 type Service interface {
        run.PreRunner
        run.Config
@@ -64,7 +65,7 @@ type service struct {
 func (s *service) Stream(metadata *commonv1.Metadata) (Stream, error) {
        sm, ok := s.schemaRepo.loadStream(metadata)
        if !ok {
-               return nil, errors.WithStack(ErrStreamNotExist)
+               return nil, errors.WithStack(errStreamNotExist)
        }
        return sm, nil
 }
@@ -80,7 +81,7 @@ func (s *service) FlagSet() *run.FlagSet {
 
 func (s *service) Validate() error {
        if s.root == "" {
-               return ErrEmptyRootPath
+               return errEmptyRootPath
        }
        return nil
 }
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index 41f45a4..c8866cf 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -15,6 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package stream implements a time-series-based storage which is consists of 
a sequence of element.
+// Each element drops in a arbitrary interval. They are immutable, can not be 
updated or overwritten.
 package stream
 
 import (
@@ -65,7 +67,7 @@ func (s *stream) EntityLocator() partition.EntityLocator {
 }
 
 func (s *stream) Close() error {
-       return s.indexWriter.Close()
+       return nil
 }
 
 func (s *stream) parseSpec() {
diff --git a/banyand/stream/stream_query.go b/banyand/stream/stream_query.go
index 1f89be8..5b3ef7c 100644
--- a/banyand/stream/stream_query.go
+++ b/banyand/stream/stream_query.go
@@ -31,12 +31,14 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/partition"
 )
 
-var ErrTagFamilyNotExist = errors.New("tag family doesn't exist")
+var errTagFamilyNotExist = errors.New("tag family doesn't exist")
 
+// Query allow to retrieve elements in a series of streams.
 type Query interface {
        Stream(stream *commonv1.Metadata) (Stream, error)
 }
 
+// Stream allows inspecting elements' details.
 type Stream interface {
        io.Closer
        Shards(entity tsdb.Entity) ([]tsdb.Shard, error)
@@ -101,7 +103,7 @@ func (s *stream) ParseTagFamily(family string, item 
tsdb.Item) (*modelv1.TagFami
                }
        }
        if tagSpec == nil {
-               return nil, ErrTagFamilyNotExist
+               return nil, errTagFamilyNotExist
        }
        for i, tag := range tagFamily.GetTags() {
                tags[i] = &modelv1.Tag{
diff --git a/banyand/stream/stream_write.go b/banyand/stream/stream_write.go
index 3a77da3..01bb656 100644
--- a/banyand/stream/stream_write.go
+++ b/banyand/stream/stream_write.go
@@ -36,7 +36,7 @@ import (
 )
 
 var (
-       ErrMalformedElement = errors.New("element is malformed")
+       errMalformedElement = errors.New("element is malformed")
        writtenBytes        *prometheus.CounterVec
 )
 
@@ -59,10 +59,10 @@ func (s *stream) write(shardID common.ShardID, entity 
[]byte, entityValues tsdb.
        sm := s.schema
        fLen := len(value.GetTagFamilies())
        if fLen < 1 {
-               return errors.Wrap(ErrMalformedElement, "no tag family")
+               return errors.Wrap(errMalformedElement, "no tag family")
        }
        if fLen > len(sm.TagFamilies) {
-               return errors.Wrap(ErrMalformedElement, "tag family number is 
more than expected")
+               return errors.Wrap(errMalformedElement, "tag family number is 
more than expected")
        }
        shard, err := s.db.SupplyTSDB().Shard(shardID)
        if err != nil {
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index d5c48cf..cdfd8f2 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -53,7 +53,7 @@ const (
        defaultEnqueueTimeout = 500 * time.Millisecond
 )
 
-var ErrBlockClosingInterrupted = errors.New("interrupt to close the block")
+var errBlockClosingInterrupted = errors.New("interrupt to close the block")
 
 type block struct {
        encodingMethod EncodingMethod
@@ -189,9 +189,9 @@ func (b *block) open() (err error) {
        return nil
 }
 
-func (b *block) delegate(ctx context.Context) (BlockDelegate, error) {
+func (b *block) delegate(ctx context.Context) (blockDelegate, error) {
        if b.deleted.Load() {
-               return nil, errors.WithMessagef(ErrBlockAbsent, "block %s is 
deleted", b)
+               return nil, errors.WithMessagef(errBlockAbsent, "block %s is 
deleted", b)
        }
        blockID := BlockID{
                BlockID: b.blockID,
@@ -276,7 +276,7 @@ func (b *block) close(ctx context.Context) (err error) {
        select {
        case <-ctx.Done():
                stopWaiting.Store(true)
-               return errors.Wrapf(ErrBlockClosingInterrupted, "block:%s", b)
+               return errors.Wrapf(errBlockClosingInterrupted, "block:%s", b)
        case <-ch:
        }
        b.closed.Store(true)
@@ -313,7 +313,7 @@ func (b *block) stats() (names []string, stats 
[]observability.Statistics) {
        return names, stats
 }
 
-type BlockDelegate interface {
+type blockDelegate interface {
        io.Closer
        contains(ts time.Time) bool
        write(key []byte, val []byte, ts time.Time) error
@@ -330,7 +330,7 @@ type BlockDelegate interface {
        String() string
 }
 
-var _ BlockDelegate = (*bDelegate)(nil)
+var _ blockDelegate = (*bDelegate)(nil)
 
 type bDelegate struct {
        delegate *block
diff --git a/banyand/tsdb/block_ctrl.go b/banyand/tsdb/block_ctrl.go
index 475b218..b1071d3 100644
--- a/banyand/tsdb/block_ctrl.go
+++ b/banyand/tsdb/block_ctrl.go
@@ -93,7 +93,7 @@ func (bc *blockController) Next() (bucket.Reporter, error) {
        }
        b := c.(*block)
 
-       return bc.newHeadBlock(bc.blockSize.NextTime(b.Start))
+       return bc.newHeadBlock(bc.blockSize.nextTime(b.Start))
 }
 
 func (bc *blockController) newHeadBlock(now time.Time) (*block, error) {
@@ -157,14 +157,14 @@ func (bc *blockController) Parse(value string) 
(time.Time, error) {
        panic("invalid interval unit")
 }
 
-func (bc *blockController) span(ctx context.Context, timeRange 
timestamp.TimeRange) ([]BlockDelegate, error) {
+func (bc *blockController) span(ctx context.Context, timeRange 
timestamp.TimeRange) ([]blockDelegate, error) {
        bb := bc.search(func(b *block) bool {
                return b.Overlapping(timeRange)
        })
        if bb == nil {
                return nil, nil
        }
-       dd := make([]BlockDelegate, len(bb))
+       dd := make([]blockDelegate, len(bb))
        for i, b := range bb {
                d, err := b.delegate(ctx)
                if err != nil {
@@ -175,7 +175,7 @@ func (bc *blockController) span(ctx context.Context, 
timeRange timestamp.TimeRan
        return dd, nil
 }
 
-func (bc *blockController) get(ctx context.Context, blockID SectionID) 
(BlockDelegate, error) {
+func (bc *blockController) get(ctx context.Context, blockID SectionID) 
(blockDelegate, error) {
        b := bc.getBlock(blockID)
        if b != nil {
                return b.delegate(ctx)
@@ -251,7 +251,7 @@ func (bc *blockController) create(start time.Time) (*block, 
error) {
                        next = s
                }
        }
-       stdEnd := bc.blockSize.NextTime(start)
+       stdEnd := bc.blockSize.nextTime(start)
        var end time.Time
        if next != nil && next.Start.Before(stdEnd) {
                end = next.Start
diff --git a/banyand/tsdb/bucket/bucket.go b/banyand/tsdb/bucket/bucket.go
index 6bdb01f..e05a400 100644
--- a/banyand/tsdb/bucket/bucket.go
+++ b/banyand/tsdb/bucket/bucket.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package bucket implements a rolling bucket system.
 package bucket
 
 import (
@@ -29,21 +30,25 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var ErrReporterClosed = errors.New("reporter is closed")
+var errReporterClosed = errors.New("reporter is closed")
 
+// Controller defines the provider of a Reporter.
 type Controller interface {
        Current() (Reporter, error)
        Next() (Reporter, error)
        OnMove(prev, next Reporter)
 }
 
+// Status is a sample of the Reporter's status.
 type Status struct {
        Capacity int
        Volume   int
 }
 
+// Channel reports the status of a Reporter.
 type Channel chan Status
 
+// Reporter allows reporting status to its supervisor.
 type Reporter interface {
        // TODO: refactor Report to return a status. It's too complicated to 
return a channel
        Report() (Channel, error)
@@ -51,15 +56,16 @@ type Reporter interface {
 }
 
 var (
-       _             Reporter = (*dummyReporter)(nil)
-       _             Reporter = (*timeBasedReporter)(nil)
-       DummyReporter          = &dummyReporter{}
+       _ Reporter = (*dummyReporter)(nil)
+       _ Reporter = (*timeBasedReporter)(nil)
+       // DummyReporter is a special Reporter to avoid nil errors.
+       DummyReporter = &dummyReporter{}
 )
 
 type dummyReporter struct{}
 
 func (*dummyReporter) Report() (Channel, error) {
-       return nil, ErrReporterClosed
+       return nil, errReporterClosed
 }
 
 func (*dummyReporter) Stop() {
@@ -77,6 +83,7 @@ type timeBasedReporter struct {
        name string
 }
 
+// NewTimeBasedReporter returns a Reporter which sends report based on time.
 func NewTimeBasedReporter(name string, timeRange timestamp.TimeRange, clock 
timestamp.Clock, scheduler *timestamp.Scheduler) Reporter {
        if timeRange.End.Before(clock.Now()) {
                return DummyReporter
@@ -93,11 +100,11 @@ func NewTimeBasedReporter(name string, timeRange 
timestamp.TimeRange, clock time
 
 func (tr *timeBasedReporter) Report() (Channel, error) {
        if tr.scheduler.Closed() {
-               return nil, ErrReporterClosed
+               return nil, errReporterClosed
        }
        now := tr.clock.Now()
        if now.After(tr.End) {
-               return nil, ErrReporterClosed
+               return nil, errReporterClosed
        }
        ch := make(Channel, 1)
        interval := tr.Duration() >> 4
@@ -137,7 +144,7 @@ func (tr *timeBasedReporter) Report() (Channel, error) {
                }); err != nil {
                close(ch)
                if errors.Is(err, timestamp.ErrSchedulerClosed) {
-                       return nil, ErrReporterClosed
+                       return nil, errReporterClosed
                }
                return nil, err
        }
diff --git a/banyand/tsdb/bucket/queue.go b/banyand/tsdb/bucket/queue.go
index 18e900a..6cb1bfa 100644
--- a/banyand/tsdb/bucket/queue.go
+++ b/banyand/tsdb/bucket/queue.go
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package bucket
 
 import (
@@ -31,10 +32,13 @@ import (
 )
 
 type (
-       EvictFn       func(ctx context.Context, id interface{}) error
+       // EvictFn is a closure executed on evicting an item.
+       EvictFn func(ctx context.Context, id interface{}) error
+       // OnAddRecentFn is a notifier on adding an item into the recent queue.
        OnAddRecentFn func() error
 )
 
+// Queue is a LRU queue.
 type Queue interface {
        Touch(id fmt.Stringer) bool
        Push(ctx context.Context, id fmt.Stringer, fn OnAddRecentFn) error
@@ -45,13 +49,14 @@ type Queue interface {
 }
 
 const (
-       QueueName          = "block-queue-cleanup"
-       DefaultRecentRatio = 0.25
+       // QueueName is identity of the queue.
+       QueueName = "block-queue-cleanup"
 
+       defaultRecentRatio    = 0.25
        defaultEvictBatchSize = 10
 )
 
-var ErrInvalidSize = errors.New("invalid size")
+var errInvalidSize = errors.New("invalid size")
 
 type lruQueue struct {
        recent      simplelru.LRUCache
@@ -65,12 +70,13 @@ type lruQueue struct {
        lock        sync.RWMutex
 }
 
+// NewQueue return a Queue for blocks eviction.
 func NewQueue(l *logger.Logger, size int, maxSize int, scheduler 
*timestamp.Scheduler, evictFn EvictFn) (Queue, error) {
        if size <= 0 {
-               return nil, ErrInvalidSize
+               return nil, errInvalidSize
        }
 
-       recentSize := int(float64(size) * DefaultRecentRatio)
+       recentSize := int(float64(size) * defaultRecentRatio)
        evictSize := maxSize - size
 
        recent, err := simplelru.NewLRU(size, nil)
diff --git a/banyand/tsdb/bucket/strategy.go b/banyand/tsdb/bucket/strategy.go
index dda86d8..31b601d 100644
--- a/banyand/tsdb/bucket/strategy.go
+++ b/banyand/tsdb/bucket/strategy.go
@@ -30,41 +30,48 @@ import (
 )
 
 var (
+       // ErrInvalidParameter denotes input parameters are invalid.
        ErrInvalidParameter = errors.New("parameters are invalid")
-       ErrNoMoreBucket     = errors.New("no more buckets")
+       // ErrNoMoreBucket denotes the bucket volume reaches the limitation.
+       ErrNoMoreBucket = errors.New("no more buckets")
 )
 
-type Ratio float64
+type ratio float64
 
+// Strategy controls Reporters with Controller's help.
 type Strategy struct {
        optionsErr   error
        ctrl         Controller
        current      atomic.Value
        logger       *logger.Logger
        closer       *run.Closer
-       ratio        Ratio
+       ratio        ratio
        currentRatio uint64
 }
 
+// StrategyOptions sets how to create a Strategy.
 type StrategyOptions func(*Strategy)
 
-func WithNextThreshold(ratio Ratio) StrategyOptions {
+// WithNextThreshold sets a ratio to creat the next Reporter.
+func WithNextThreshold(r ratio) StrategyOptions {
        return func(s *Strategy) {
-               if ratio > 1.0 {
+               if r > 1.0 {
                        s.optionsErr = multierr.Append(s.optionsErr,
-                               errors.Wrapf(ErrInvalidParameter, "ratio %v is 
more than 1.0", ratio))
+                               errors.Wrapf(ErrInvalidParameter, "ratio %v is 
more than 1.0", r))
                        return
                }
-               s.ratio = ratio
+               s.ratio = r
        }
 }
 
+// WithLogger sets a logger.Logger.
 func WithLogger(logger *logger.Logger) StrategyOptions {
        return func(s *Strategy) {
                s.logger = logger
        }
 }
 
+// NewStrategy returns a Strategy.
 func NewStrategy(ctrl Controller, options ...StrategyOptions) (*Strategy, 
error) {
        if ctrl == nil {
                return nil, errors.Wrap(ErrInvalidParameter, "controller is 
absent")
@@ -98,12 +105,13 @@ func (s *Strategy) resetCurrent() error {
        return nil
 }
 
+// Run the Strategy in the background.
 func (s *Strategy) Run() {
        go func(s *Strategy) {
                defer s.closer.Done()
                for {
                        c, err := s.current.Load().(Reporter).Report()
-                       if errors.Is(err, ErrReporterClosed) {
+                       if errors.Is(err, errReporterClosed) {
                                return
                        }
                        if err != nil {
@@ -138,9 +146,9 @@ func (s *Strategy) observe(c Channel) bool {
                        if !more {
                                return moreBucket
                        }
-                       ratio := Ratio(status.Volume) / Ratio(status.Capacity)
-                       atomic.StoreUint64(&s.currentRatio, 
math.Float64bits(float64(ratio)))
-                       if ratio >= s.ratio && next == nil && moreBucket {
+                       r := ratio(status.Volume) / ratio(status.Capacity)
+                       atomic.StoreUint64(&s.currentRatio, 
math.Float64bits(float64(r)))
+                       if r >= s.ratio && next == nil && moreBucket {
                                n, err := s.ctrl.Next()
                                if errors.Is(err, ErrNoMoreBucket) {
                                        moreBucket = false
@@ -151,7 +159,7 @@ func (s *Strategy) observe(c Channel) bool {
                                        next = n
                                }
                        }
-                       if ratio >= 1.0 {
+                       if r >= 1.0 {
                                s.ctrl.OnMove(s.current.Load().(Reporter), next)
                                if next != nil {
                                        s.current.Store(next)
@@ -164,6 +172,7 @@ func (s *Strategy) observe(c Channel) bool {
        }
 }
 
+// Close the Strategy running in the background.
 func (s *Strategy) Close() {
        s.closer.CloseThenWait()
 }
diff --git a/banyand/tsdb/index/writer.go b/banyand/tsdb/index/writer.go
index 5e98d04..7385466 100644
--- a/banyand/tsdb/index/writer.go
+++ b/banyand/tsdb/index/writer.go
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package index implements transferring data to indices.
 package index
 
 import (
@@ -35,8 +36,7 @@ import (
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
 )
 
-type CallbackFn func()
-
+// Message wraps value and other info to generate relative indices.
 type Message struct {
        Value       Value
        LocalWriter tsdb.Writer
@@ -44,11 +44,13 @@ type Message struct {
        Scope       tsdb.Entry
 }
 
+// Value represents the input data for generating indices.
 type Value struct {
        Timestamp   time.Time
        TagFamilies []*modelv1.TagFamilyForWrite
 }
 
+// WriterOptions wrap all options to create an index writer.
 type WriterOptions struct {
        DB                tsdb.Supplier
        Families          []*databasev1.TagFamilySpec
@@ -64,6 +66,7 @@ const (
        tree
 )
 
+// Writer generates indices based on index rules.
 type Writer struct {
        db                tsdb.Supplier
        l                 *logger.Logger
@@ -72,6 +75,7 @@ type Writer struct {
        enableGlobalIndex bool
 }
 
+// NewWriter returns a new Writer with WriterOptions.
 func NewWriter(ctx context.Context, options WriterOptions) *Writer {
        w := new(Writer)
        parentLogger := ctx.Value(logger.ContextKey)
@@ -127,10 +131,6 @@ func (s *Writer) Write(m Message) {
        }
 }
 
-func (s *Writer) Close() error {
-       return nil
-}
-
 // TODO: should listen to pipeline in a distributed cluster.
 func (s *Writer) writeGlobalIndex(scope tsdb.Entry, ref tsdb.GlobalItemID, 
value Value) error {
        collect := func(ruleIndexes []*partition.IndexRuleLocator, fn 
func(indexWriter tsdb.IndexWriter, fields []index.Field) error) error {
@@ -225,12 +225,12 @@ func (s *Writer) writeLocalIndex(writer tsdb.Writer, 
value Value) (err error) {
        )
 }
 
-var ErrUnsupportedIndexType = errors.New("unsupported index type")
+var errUnsupportedIndexType = errors.New("unsupported index type")
 
 func getIndexValue(ruleIndex *partition.IndexRuleLocator, value Value) (val 
[][]byte, err error) {
        val = make([][]byte, 0)
        if len(ruleIndex.TagIndices) != 1 {
-               return nil, errors.WithMessagef(ErrUnsupportedIndexType,
+               return nil, errors.WithMessagef(errUnsupportedIndexType,
                        "the index rule %s(%v) didn't support composited tags",
                        ruleIndex.Rule.Metadata.Name, ruleIndex.Rule.Tags)
        }
diff --git a/banyand/tsdb/indexdb.go b/banyand/tsdb/indexdb.go
index 797b111..cb82f88 100644
--- a/banyand/tsdb/indexdb.go
+++ b/banyand/tsdb/indexdb.go
@@ -30,16 +30,19 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
+// IndexDatabase allows stocking index data.
 type IndexDatabase interface {
        WriterBuilder() IndexWriterBuilder
        Seek(field index.Field) ([]GlobalItemID, error)
 }
 
+// IndexWriter allows ingesting index data.
 type IndexWriter interface {
        WriteLSMIndex(field []index.Field) error
        WriteInvertedIndex(field []index.Field) error
 }
 
+// IndexWriterBuilder is a helper to build IndexWriter.
 type IndexWriterBuilder interface {
        Scope(scope Entry) IndexWriterBuilder
        Time(ts time.Time) IndexWriterBuilder
@@ -47,8 +50,6 @@ type IndexWriterBuilder interface {
        Build() (IndexWriter, error)
 }
 
-type IndexSeekBuilder interface{}
-
 var _ IndexDatabase = (*indexDB)(nil)
 
 type indexDB struct {
@@ -65,7 +66,7 @@ func (i *indexDB) Seek(field index.Field) ([]GlobalItemID, 
error) {
        for _, s := range i.segCtrl.segments() {
                err = s.globalIndex.GetAll(f, func(rawBytes []byte) error {
                        id := &GlobalItemID{}
-                       errUnMarshal := id.UnMarshal(rawBytes)
+                       errUnMarshal := id.unMarshal(rawBytes)
                        if errUnMarshal != nil {
                                return errUnMarshal
                        }
@@ -120,7 +121,7 @@ func (i *indexWriterBuilder) Build() (IndexWriter, error) {
                return nil, err
        }
        if i.globalItemID == nil {
-               return nil, errors.WithStack(ErrNoVal)
+               return nil, errors.WithStack(errNoVal)
        }
        return &indexWriter{
                scope:  i.scope,
@@ -155,7 +156,7 @@ func (i *indexWriter) WriteLSMIndex(fields []index.Field) 
(err error) {
                        err = multierr.Append(err, errInternal)
                        continue
                }
-               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), 
uint64(i.ts.UnixNano())))
+               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
        }
        return err
 }
@@ -170,11 +171,12 @@ func (i *indexWriter) WriteInvertedIndex(fields 
[]index.Field) (err error) {
                        err = multierr.Append(err, errInternal)
                        continue
                }
-               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.Marshal(), 
uint64(i.ts.UnixNano())))
+               err = multierr.Append(err, 
i.seg.globalIndex.PutWithVersion(key, i.itemID.marshal(), 
uint64(i.ts.UnixNano())))
        }
        return err
 }
 
+// GlobalSeriesID encodes Entry to common.SeriesID.
 func GlobalSeriesID(scope Entry) common.SeriesID {
        return common.SeriesID(convert.Hash(scope))
 }
diff --git a/banyand/tsdb/metric.go b/banyand/tsdb/metric.go
index cd99c68..40340b5 100644
--- a/banyand/tsdb/metric.go
+++ b/banyand/tsdb/metric.go
@@ -14,6 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+
 package tsdb
 
 import (
diff --git a/banyand/tsdb/retention.go b/banyand/tsdb/retention.go
index 6eb2e0f..43ff3f3 100644
--- a/banyand/tsdb/retention.go
+++ b/banyand/tsdb/retention.go
@@ -47,7 +47,7 @@ func newRetentionTask(segment *segmentController, ttl 
IntervalRule) *retentionTa
                segment:  segment,
                option:   cron.Minute | cron.Hour,
                expr:     expr,
-               duration: ttl.EstimatedDuration(),
+               duration: ttl.estimatedDuration(),
        }
 }
 
diff --git a/banyand/tsdb/scope.go b/banyand/tsdb/scope.go
index ab9711c..952fd3c 100644
--- a/banyand/tsdb/scope.go
+++ b/banyand/tsdb/scope.go
@@ -24,45 +24,46 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/observability"
 )
 
-var _ Shard = (*ScopedShard)(nil)
+var _ Shard = (*scopedShard)(nil)
 
-type ScopedShard struct {
+type scopedShard struct {
        delegated Shard
        scope     Entry
 }
 
+// NewScopedShard returns a shard in a scope.
 func NewScopedShard(scope Entry, delegated Shard) Shard {
-       return &ScopedShard{
+       return &scopedShard{
                scope:     scope,
                delegated: delegated,
        }
 }
 
-func (sd *ScopedShard) Close() error {
+func (sd *scopedShard) Close() error {
        // the delegate can't close the underlying shard
        return nil
 }
 
-func (sd *ScopedShard) ID() common.ShardID {
+func (sd *scopedShard) ID() common.ShardID {
        return sd.delegated.ID()
 }
 
-func (sd *ScopedShard) Series() SeriesDatabase {
+func (sd *scopedShard) Series() SeriesDatabase {
        return &scopedSeriesDatabase{
                scope:     sd.scope,
                delegated: sd.delegated.Series(),
        }
 }
 
-func (sd *ScopedShard) Index() IndexDatabase {
+func (sd *scopedShard) Index() IndexDatabase {
        return sd.delegated.Index()
 }
 
-func (sd *ScopedShard) TriggerSchedule(task string) bool {
+func (sd *scopedShard) TriggerSchedule(task string) bool {
        return sd.delegated.TriggerSchedule(task)
 }
 
-func (sd *ScopedShard) State() ShardState {
+func (sd *scopedShard) State() ShardState {
        return sd.delegated.State()
 }
 
@@ -90,5 +91,5 @@ func (sdd *scopedSeriesDatabase) GetByID(id common.SeriesID) 
(Series, error) {
 }
 
 func (sdd *scopedSeriesDatabase) List(ctx context.Context, path Path) 
(SeriesList, error) {
-       return sdd.delegated.List(ctx, path.Prepend(sdd.scope))
+       return sdd.delegated.List(ctx, path.prepend(sdd.scope))
 }
diff --git a/banyand/tsdb/segment.go b/banyand/tsdb/segment.go
index 4a21a71..c6582f2 100644
--- a/banyand/tsdb/segment.go
+++ b/banyand/tsdb/segment.go
@@ -32,7 +32,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-var ErrEndOfSegment = errors.New("reached the end of the segment")
+var errEndOfSegment = errors.New("reached the end of the segment")
 
 type segment struct {
        globalIndex kv.Store
diff --git a/banyand/tsdb/segment_ctrl.go b/banyand/tsdb/segment_ctrl.go
index de02ef1..98653a2 100644
--- a/banyand/tsdb/segment_ctrl.go
+++ b/banyand/tsdb/segment_ctrl.go
@@ -134,8 +134,8 @@ func (sc *segmentController) Next() (bucket.Reporter, 
error) {
                return nil, err
        }
        seg := c.(*segment)
-       reporter, err := sc.create(sc.segmentSize.NextTime(seg.Start))
-       if errors.Is(err, ErrEndOfSegment) {
+       reporter, err := sc.create(sc.segmentSize.nextTime(seg.Start))
+       if errors.Is(err, errEndOfSegment) {
                return nil, bucket.ErrNoMoreBucket
        }
        return reporter, err
@@ -187,7 +187,7 @@ func (sc *segmentController) open() error {
        defer sc.Unlock()
        return loadSections(sc.location, sc, sc.segmentSize, func(start, end 
time.Time) error {
                _, err := sc.load(start, end, sc.location)
-               if errors.Is(err, ErrEndOfSegment) {
+               if errors.Is(err, errEndOfSegment) {
                        return nil
                }
                return err
@@ -207,7 +207,7 @@ func (sc *segmentController) create(start time.Time) 
(*segment, error) {
                        next = s
                }
        }
-       stdEnd := sc.segmentSize.NextTime(start)
+       stdEnd := sc.segmentSize.nextTime(start)
        var end time.Time
        if next != nil && next.Start.Before(stdEnd) {
                end = next.Start
diff --git a/banyand/tsdb/series.go b/banyand/tsdb/series.go
index 7ea6e66..a872c82 100644
--- a/banyand/tsdb/series.go
+++ b/banyand/tsdb/series.go
@@ -33,11 +33,14 @@ import (
 )
 
 var (
+       // ErrEmptySeriesSpan hints there is no any data blocks based on the 
input time range.
        ErrEmptySeriesSpan = errors.New("there is no data in such time range")
-       ErrItemIDMalformed = errors.New("serialized item id is malformed")
-       ErrBlockAbsent     = errors.New("block is absent")
+       errItemIDMalformed = errors.New("serialized item id is malformed")
+       errBlockAbsent     = errors.New("block is absent")
 )
 
+// GlobalItemID is the top level identity of an item.
+// The item could be retrieved by a GlobalItemID in a tsdb.
 type GlobalItemID struct {
        ShardID  common.ShardID
        segID    SectionID
@@ -46,7 +49,7 @@ type GlobalItemID struct {
        ID       common.ItemID
 }
 
-func (i *GlobalItemID) Marshal() []byte {
+func (i *GlobalItemID) marshal() []byte {
        return bytes.Join([][]byte{
                convert.Uint32ToBytes(uint32(i.ShardID)),
                sectionIDToBytes(i.segID),
@@ -56,9 +59,9 @@ func (i *GlobalItemID) Marshal() []byte {
        }, nil)
 }
 
-func (i *GlobalItemID) UnMarshal(data []byte) error {
+func (i *GlobalItemID) unMarshal(data []byte) error {
        if len(data) != 4+4+4+8+8 {
-               return ErrItemIDMalformed
+               return errItemIDMalformed
        }
        var offset int
        i.ShardID = common.ShardID(convert.BytesToUint32(data[offset : 
offset+4]))
@@ -71,6 +74,8 @@ func (i *GlobalItemID) UnMarshal(data []byte) error {
        return nil
 }
 
+// Series denotes a series of data points group by a common.SeriesID
+// common.SeriesID is encoded by a entity defined by Stream or Measure.
 type Series interface {
        ID() common.SeriesID
        Span(ctx context.Context, timeRange timestamp.TimeRange) (SeriesSpan, 
error)
@@ -79,6 +84,7 @@ type Series interface {
        String() string
 }
 
+// SeriesSpan is a span in a time series. It contains data blocks in such time 
range.
 type SeriesSpan interface {
        io.Closer
        WriterBuilder() WriterBuilder
@@ -101,7 +107,7 @@ func (s *series) Get(ctx context.Context, id GlobalItemID) 
(Item, io.Closer, err
                return nil, nil, err
        }
        if b == nil {
-               return nil, nil, errors.WithMessagef(ErrBlockAbsent, "id: %v", 
id)
+               return nil, nil, errors.WithMessagef(errBlockAbsent, "id: %v", 
id)
        }
        return &item{
                data:        b.dataReader(),
@@ -191,7 +197,7 @@ type seriesSpan struct {
        l         *logger.Logger
        timeRange timestamp.TimeRange
        series    string
-       blocks    []BlockDelegate
+       blocks    []blockDelegate
        seriesID  common.SeriesID
        shardID   common.ShardID
 }
@@ -211,7 +217,7 @@ func (s *seriesSpan) SeekerBuilder() SeekerBuilder {
        return newSeekerBuilder(s)
 }
 
-func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks 
[]BlockDelegate, id common.SeriesID, series string, shardID common.ShardID) 
*seriesSpan {
+func newSeriesSpan(ctx context.Context, timeRange timestamp.TimeRange, blocks 
[]blockDelegate, id common.SeriesID, series string, shardID common.ShardID) 
*seriesSpan {
        s := &seriesSpan{
                blocks:    blocks,
                seriesID:  id,
diff --git a/banyand/tsdb/series_seek.go b/banyand/tsdb/series_seek.go
index db66d98..28147f7 100644
--- a/banyand/tsdb/series_seek.go
+++ b/banyand/tsdb/series_seek.go
@@ -34,12 +34,14 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+// Iterator allows iterating a series in a time span.
 type Iterator interface {
        Next() bool
        Val() Item
        Close() error
 }
 
+// Item allows retrieving raw data from an item.
 type Item interface {
        Family(family []byte) ([]byte, error)
        PrintContext(l *logger.Logger, family []byte, n int)
@@ -49,6 +51,7 @@ type Item interface {
        Time() uint64
 }
 
+// SeekerBuilder a helper to build a Seeker.
 type SeekerBuilder interface {
        Filter(predicator index.Filter) SeekerBuilder
        OrderByIndex(indexRule *databasev1.IndexRule, order modelv1.Sort) 
SeekerBuilder
@@ -56,6 +59,7 @@ type SeekerBuilder interface {
        Build() (Seeker, error)
 }
 
+// Seeker allows searching data in a Database.
 type Seeker interface {
        Seek() ([]Iterator, error)
 }
diff --git a/banyand/tsdb/series_seek_filter.go 
b/banyand/tsdb/series_seek_filter.go
index 7e8cd4a..64d7ed1 100644
--- a/banyand/tsdb/series_seek_filter.go
+++ b/banyand/tsdb/series_seek_filter.go
@@ -24,14 +24,14 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
-var ErrUnsupportedIndexRule = errors.New("the index rule is not supported")
+var errUnsupportedIndexRule = errors.New("the index rule is not supported")
 
 func (s *seekerBuilder) Filter(predicator index.Filter) SeekerBuilder {
        s.predicator = predicator
        return s
 }
 
-func (s *seekerBuilder) buildIndexFilter(block BlockDelegate) (filterFn, 
error) {
+func (s *seekerBuilder) buildIndexFilter(block blockDelegate) (filterFn, 
error) {
        if s.predicator == nil {
                return nil, nil
        }
@@ -42,7 +42,7 @@ func (s *seekerBuilder) buildIndexFilter(block BlockDelegate) 
(filterFn, error)
                case databasev1.IndexRule_TYPE_TREE:
                        return block.lsmIndexReader(), nil
                default:
-                       return nil, ErrUnsupportedIndexRule
+                       return nil, errUnsupportedIndexRule
                }
        }, s.seriesSpan.seriesID)
        if err != nil {
diff --git a/banyand/tsdb/series_seek_sort.go b/banyand/tsdb/series_seek_sort.go
index d56cacf..40d05b2 100644
--- a/banyand/tsdb/series_seek_sort.go
+++ b/banyand/tsdb/series_seek_sort.go
@@ -36,7 +36,7 @@ import (
 )
 
 var (
-       ErrUnspecifiedIndexType = errors.New("Unspecified index type")
+       errUnspecifiedIndexType = errors.New("Unspecified index type")
        emptyFilters            = make([]filterFn, 0)
 )
 
@@ -89,7 +89,7 @@ func (s *seekerBuilder) buildSeriesByIndex() (series 
[]Iterator, err error) {
                case databasev1.IndexRule_TYPE_INVERTED:
                        inner, err = b.invertedIndexReader().Iterator(fieldKey, 
s.rangeOptsForSorting, s.order)
                case databasev1.IndexRule_TYPE_UNSPECIFIED:
-                       return nil, 
errors.WithMessagef(ErrUnspecifiedIndexType, "index rule:%v", 
s.indexRuleForSorting)
+                       return nil, 
errors.WithMessagef(errUnspecifiedIndexType, "index rule:%v", 
s.indexRuleForSorting)
                }
                if err != nil {
                        return nil, err
diff --git a/banyand/tsdb/series_write.go b/banyand/tsdb/series_write.go
index b6da92c..f903566 100644
--- a/banyand/tsdb/series_write.go
+++ b/banyand/tsdb/series_write.go
@@ -29,6 +29,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/index"
 )
 
+// WriterBuilder is a helper to build a Writer.
 type WriterBuilder interface {
        Family(name []byte, val []byte) WriterBuilder
        Time(ts time.Time) WriterBuilder
@@ -36,6 +37,7 @@ type WriterBuilder interface {
        Build() (Writer, error)
 }
 
+// Writer allow ingesting data into a tsdb.
 type Writer interface {
        IndexWriter
        Write() (GlobalItemID, error)
@@ -47,7 +49,7 @@ var _ WriterBuilder = (*writerBuilder)(nil)
 
 type writerBuilder struct {
        series *seriesSpan
-       block  BlockDelegate
+       block  blockDelegate
        values []struct {
                family []byte
                val    []byte
@@ -84,26 +86,25 @@ func (w *writerBuilder) Val(val []byte) WriterBuilder {
 }
 
 var (
-       ErrNoTime = errors.New("no time specified")
-       ErrNoVal  = errors.New("no value specified")
+       errNoTime           = errors.New("no time specified")
+       errNoVal            = errors.New("no value specified")
+       errDuplicatedFamily = errors.New("duplicated family")
 )
 
-var ErrDuplicatedFamily = errors.New("duplicated family")
-
 func (w *writerBuilder) Build() (Writer, error) {
        if w.block == nil {
-               return nil, errors.WithMessagef(ErrNoTime, "ts:%v", w.ts)
+               return nil, errors.WithMessagef(errNoTime, "ts:%v", w.ts)
        }
        if len(w.values) < 1 {
-               return nil, errors.WithStack(ErrNoVal)
+               return nil, errors.WithStack(errNoVal)
        }
        for i, value := range w.values {
                for j := i + 1; j < len(w.values); j++ {
                        if value.family == nil && w.values[j].family == nil {
-                               return nil, errors.Wrap(ErrDuplicatedFamily, 
"default family")
+                               return nil, errors.Wrap(errDuplicatedFamily, 
"default family")
                        }
                        if bytes.Equal(value.family, w.values[j].family) {
-                               return nil, errors.Wrapf(ErrDuplicatedFamily, 
"family:%s", value.family)
+                               return nil, errors.Wrapf(errDuplicatedFamily, 
"family:%s", value.family)
                        }
                }
        }
@@ -133,7 +134,7 @@ var _ Writer = (*writer)(nil)
 
 type writer struct {
        ts      time.Time
-       block   BlockDelegate
+       block   blockDelegate
        itemID  *GlobalItemID
        columns []struct {
                family []byte
diff --git a/banyand/tsdb/seriesdb.go b/banyand/tsdb/seriesdb.go
index aac8e55..88acc8d 100644
--- a/banyand/tsdb/seriesdb.go
+++ b/banyand/tsdb/seriesdb.go
@@ -49,12 +49,17 @@ var (
        zeroIntBytes    = convert.Uint64ToBytes(0)
 )
 
+// AnyEntry is the `*` for a regular expression. It could match "any" Entry in 
an Entity.
 var AnyEntry = Entry(nil)
 
+// Entry is an element in an Entity.
 type Entry []byte
 
+// Entity denotes an identity of a Series.
+// It defined by Stream or Measure schema.
 type Entity []Entry
 
+// Marshal encodes an Entity to bytes.
 func (e Entity) Marshal() []byte {
        data := make([][]byte, len(e))
        for i, entry := range e {
@@ -63,18 +68,21 @@ func (e Entity) Marshal() []byte {
        return bytes.Join(data, nil)
 }
 
+// Prepend inserts an Entry before the first Entry as the prefix.
 func (e Entity) Prepend(entry Entry) Entity {
        d := e
        d = append(Entity{entry}, d...)
        return d
 }
 
+// Copy an Entity deeply.
 func (e Entity) Copy() Entity {
        a := make(Entity, len(e))
        copy(a, e)
        return a
 }
 
+// NewEntity return an Entity with an fixed length.
 func NewEntity(length int) Entity {
        e := make(Entity, length)
        for i := 0; i < length; i++ {
@@ -83,18 +91,23 @@ func NewEntity(length int) Entity {
        return e
 }
 
+// EntityValue represents the value of a tag which is a part of an entity.
 type EntityValue *modelv1.TagValue
 
+// EntityValueToEntry transforms EntityValue to Entry.
 func EntityValueToEntry(ev EntityValue) (Entry, error) {
        return pbv1.MarshalTagValue(ev)
 }
 
+// EntityValues is the encoded Entity.
 type EntityValues []EntityValue
 
+// Prepend inserts an EntityValue before the first EntityValue as the prefix.
 func (evs EntityValues) Prepend(scope EntityValue) EntityValues {
        return append(EntityValues{scope}, evs...)
 }
 
+// Encode EntityValues to tag values.
 func (evs EntityValues) Encode() (result []*modelv1.TagValue) {
        for _, v := range evs {
                result = append(result, v)
@@ -102,6 +115,7 @@ func (evs EntityValues) Encode() (result 
[]*modelv1.TagValue) {
        return
 }
 
+// ToEntity transforms EntityValues to Entity.
 func (evs EntityValues) ToEntity() (result Entity, err error) {
        for _, v := range evs {
                entry, errMarshal := EntityValueToEntry(v)
@@ -113,6 +127,7 @@ func (evs EntityValues) ToEntity() (result Entity, err 
error) {
        return
 }
 
+// String outputs the string represent of an EntityValue.
 func (evs EntityValues) String() string {
        var strBuilder strings.Builder
        vv := evs.Encode()
@@ -125,6 +140,7 @@ func (evs EntityValues) String() string {
        return strBuilder.String()
 }
 
+// DecodeEntityValues decodes tag values to EntityValues.
 func DecodeEntityValues(tvv []*modelv1.TagValue) (result EntityValues) {
        for _, tv := range tvv {
                result = append(result, tv)
@@ -132,14 +148,17 @@ func DecodeEntityValues(tvv []*modelv1.TagValue) (result 
EntityValues) {
        return
 }
 
+// StrValue returns an EntityValue which wraps a string value.
 func StrValue(v string) EntityValue {
        return &modelv1.TagValue{Value: &modelv1.TagValue_Str{Str: 
&modelv1.Str{Value: v}}}
 }
 
+// Int64Value returns an EntityValue which wraps a int64 value.
 func Int64Value(v int64) EntityValue {
        return &modelv1.TagValue{Value: &modelv1.TagValue_Int{Int: 
&modelv1.Int{Value: v}}}
 }
 
+// MarshalEntityValues encodes EntityValues to bytes.
 func MarshalEntityValues(evs EntityValues) ([]byte, error) {
        data := &modelv1.TagFamilyForWrite{}
        for _, v := range evs {
@@ -148,6 +167,7 @@ func MarshalEntityValues(evs EntityValues) ([]byte, error) {
        return proto.Marshal(data)
 }
 
+// UnmarshalEntityValues decodes EntityValues from bytes.
 func UnmarshalEntityValues(evs []byte) (result EntityValues, err error) {
        data := &modelv1.TagFamilyForWrite{}
        result = make(EntityValues, len(data.Tags))
@@ -160,6 +180,8 @@ func UnmarshalEntityValues(evs []byte) (result 
EntityValues, err error) {
        return
 }
 
+// Path denotes a expression to match a Series.
+// It supports the fuzzy matching more than EQ by setting an entry to AnyEntry.
 type Path struct {
        prefix   []byte
        seekKey  []byte
@@ -169,7 +191,8 @@ type Path struct {
        offset   int
 }
 
-func NewPath(entries []Entry) Path {
+// NewPath return a Path with a matching expression.
+func NewPath(matchingExpression []Entry) Path {
        p := Path{
                seekKey:  make([]byte, 0),
                mask:     make([]byte, 0),
@@ -177,7 +200,7 @@ func NewPath(entries []Entry) Path {
        }
 
        var encounterAny bool
-       for _, e := range entries {
+       for _, e := range matchingExpression {
                if e == nil {
                        encounterAny = true
                        p.mask = append(p.mask, zeroIntBytes...)
@@ -207,7 +230,7 @@ func (p *Path) extractPrefix() {
        }
 }
 
-func (p Path) Prepend(entry Entry) Path {
+func (p Path) prepend(entry Entry) Path {
        e := Hash(entry)
        p.template = prepend(p.template, e)
        p.offset += len(e)
@@ -223,6 +246,7 @@ func prepend(src []byte, entry []byte) []byte {
        return dst
 }
 
+// SeriesDatabase allows retrieving series.
 type SeriesDatabase interface {
        observability.Observable
        io.Closer
@@ -233,9 +257,9 @@ type SeriesDatabase interface {
 
 type blockDatabase interface {
        shardID() common.ShardID
-       span(ctx context.Context, timeRange timestamp.TimeRange) 
([]BlockDelegate, error)
-       create(ctx context.Context, ts time.Time) (BlockDelegate, error)
-       block(ctx context.Context, id GlobalItemID) (BlockDelegate, error)
+       span(ctx context.Context, timeRange timestamp.TimeRange) 
([]blockDelegate, error)
+       create(ctx context.Context, ts time.Time) (blockDelegate, error)
+       block(ctx context.Context, id GlobalItemID) (blockDelegate, error)
 }
 
 var (
@@ -272,7 +296,7 @@ func (s *seriesDB) GetByID(id common.SeriesID) (Series, 
error) {
        return newSeries(s.context(), id, series, s), nil
 }
 
-func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (BlockDelegate, 
error) {
+func (s *seriesDB) block(ctx context.Context, id GlobalItemID) (blockDelegate, 
error) {
        seg := s.segCtrl.get(id.segID)
        if seg == nil {
                return nil, nil
@@ -410,8 +434,8 @@ func (s *seriesDB) List(ctx context.Context, path Path) 
(SeriesList, error) {
        return result, err
 }
 
-func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) 
([]BlockDelegate, error) {
-       result := make([]BlockDelegate, 0)
+func (s *seriesDB) span(ctx context.Context, timeRange timestamp.TimeRange) 
([]blockDelegate, error) {
+       result := make([]blockDelegate, 0)
        for _, s := range s.segCtrl.span(timeRange) {
                dd, err := s.blockController.span(ctx, timeRange)
                if err != nil {
@@ -425,7 +449,7 @@ func (s *seriesDB) span(ctx context.Context, timeRange 
timestamp.TimeRange) ([]B
        return result, nil
 }
 
-func (s *seriesDB) create(ctx context.Context, ts time.Time) (BlockDelegate, 
error) {
+func (s *seriesDB) create(ctx context.Context, ts time.Time) (blockDelegate, 
error) {
        s.Lock()
        defer s.Unlock()
        timeRange := timestamp.NewInclusiveTimeRange(ts, ts)
@@ -508,10 +532,12 @@ func HashEntity(entity Entity) []byte {
        return result
 }
 
+// SeriesID transforms Entity to common.SeriesID.
 func SeriesID(entity Entity) common.SeriesID {
        return common.SeriesID(convert.Hash(HashEntity(entity)))
 }
 
+// Hash encode Entry to 8 bytes.
 func Hash(entry []byte) []byte {
        return convert.Uint64ToBytes(convert.Hash(entry))
 }
@@ -520,6 +546,7 @@ func bytesToSeriesID(data []byte) common.SeriesID {
        return common.SeriesID(convert.BytesToUint64(data))
 }
 
+// SeriesList is a collection of Series.
 type SeriesList []Series
 
 func (a SeriesList) Len() int {
@@ -534,6 +561,7 @@ func (a SeriesList) Swap(i, j int) {
        a[i], a[j] = a[j], a[i]
 }
 
+// Merge other SeriesList with this one to create a new SeriesList.
 func (a SeriesList) Merge(other SeriesList) SeriesList {
        if len(other) == 0 {
                return a
diff --git a/banyand/tsdb/seriesdb_test.go b/banyand/tsdb/seriesdb_test.go
index 4d232f5..129f320 100644
--- a/banyand/tsdb/seriesdb_test.go
+++ b/banyand/tsdb/seriesdb_test.go
@@ -342,7 +342,7 @@ func TestNewPath(t *testing.T) {
                t.Run(tt.name, func(t *testing.T) {
                        got := NewPath(tt.entity)
                        if tt.scope != nil {
-                               got = got.Prepend(tt.scope)
+                               got = got.prepend(tt.scope)
                        }
                        tester.Equal(tt.want, got)
                })
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index f89b2d8..07b89a7 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -54,6 +54,7 @@ type shard struct {
        id                    common.ShardID
 }
 
+// OpenShard returns an existed Shard or create a new one if not existed.
 func OpenShard(ctx context.Context, id common.ShardID,
        root string, segmentSize, blockSize, ttl IntervalRule, openedBlockSize, 
maxOpenedBlockSize int,
 ) (Shard, error) {
@@ -178,8 +179,10 @@ func (s *shard) Close() (err error) {
        return err
 }
 
+// IntervalUnit denotes the unit of a time point.
 type IntervalUnit int
 
+// Available IntervalUnits. HOUR and DAY are adequate for the APM scenario.
 const (
        HOUR IntervalUnit = iota
        DAY
@@ -195,12 +198,13 @@ func (iu IntervalUnit) String() string {
        panic("invalid interval unit")
 }
 
+// IntervalRule defines a length of two points in time.
 type IntervalRule struct {
        Unit IntervalUnit
        Num  int
 }
 
-func (ir IntervalRule) NextTime(current time.Time) time.Time {
+func (ir IntervalRule) nextTime(current time.Time) time.Time {
        switch ir.Unit {
        case HOUR:
                return current.Add(time.Hour * time.Duration(ir.Num))
@@ -210,17 +214,7 @@ func (ir IntervalRule) NextTime(current time.Time) 
time.Time {
        panic("invalid interval unit")
 }
 
-func (ir IntervalRule) PreviousTime(current time.Time) time.Time {
-       switch ir.Unit {
-       case HOUR:
-               return current.Add(-time.Hour * time.Duration(ir.Num))
-       case DAY:
-               return current.AddDate(0, 0, -ir.Num)
-       }
-       panic("invalid interval unit")
-}
-
-func (ir IntervalRule) EstimatedDuration() time.Duration {
+func (ir IntervalRule) estimatedDuration() time.Duration {
        switch ir.Unit {
        case HOUR:
                return time.Hour * time.Duration(ir.Num)
@@ -236,7 +230,7 @@ type parser interface {
 
 func loadSections(root string, parser parser, intervalRule IntervalRule, 
loadFn func(start, end time.Time) error) error {
        var startTimeLst []time.Time
-       if err := WalkDir(
+       if err := walkDir(
                root,
                segPathPrefix,
                func(suffix string) error {
@@ -255,7 +249,7 @@ func loadSections(root string, parser parser, intervalRule 
IntervalRule, loadFn
                if i < len(startTimeLst)-1 {
                        end = startTimeLst[i+1]
                } else {
-                       end = intervalRule.NextTime(start)
+                       end = intervalRule.nextTime(start)
                }
                if err := loadFn(start, end); err != nil {
                        return err
diff --git a/banyand/tsdb/tsdb.go b/banyand/tsdb/tsdb.go
index 70546db..8f6ad3a 100644
--- a/banyand/tsdb/tsdb.go
+++ b/banyand/tsdb/tsdb.go
@@ -15,6 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+// Package tsdb implements a time-series-based storage engine.
+// It provides:
+//   - Partition data based on a time axis.
+//   - Sharding data based on a series id which represents a unique entity of 
stream/measure
+//   - Retrieving data based on index.Filter.
+//   - Cleaning expired data, or the data retention.
 package tsdb
 
 import (
@@ -55,23 +61,27 @@ const (
 )
 
 var (
-       ErrInvalidShardID = errors.New("invalid shard id")
-       ErrOpenDatabase   = errors.New("fails to open the database")
+       errInvalidShardID = errors.New("invalid shard id")
+       errOpenDatabase   = errors.New("fails to open the database")
 
        optionsKey = contextOptionsKey{}
 )
 
 type contextOptionsKey struct{}
 
+// Supplier allows getting a tsdb's runtime.
 type Supplier interface {
        SupplyTSDB() Database
 }
+
+// Database allows listing and getting shard details.
 type Database interface {
        io.Closer
        Shards() []Shard
        Shard(id common.ShardID) (Shard, error)
 }
 
+// Shard allows accessing data of tsdb.
 type Shard interface {
        io.Closer
        ID() common.ShardID
@@ -84,6 +94,7 @@ type Shard interface {
 
 var _ Database = (*database)(nil)
 
+// DatabaseOpts wraps options to create a tsdb.
 type DatabaseOpts struct {
        EncodingMethod     EncodingMethod
        Location           string
@@ -97,14 +108,18 @@ type DatabaseOpts struct {
        EnableGlobalIndex  bool
 }
 
+// EncodingMethod wraps encoder/decoder pools to flush/compact data on disk.
 type EncodingMethod struct {
        EncoderPool encoding.SeriesEncoderPool
        DecoderPool encoding.SeriesDecoderPool
 }
 
 type (
+       // SectionID is the kind of a block/segment.
        SectionID uint32
-       BlockID   struct {
+
+       // BlockID is the identity of a block in a shard.
+       BlockID struct {
                SegID   SectionID
                BlockID SectionID
        }
@@ -114,6 +129,7 @@ func (b BlockID) String() string {
        return fmt.Sprintf("BlockID-%d-%d", parseSuffix(b.SegID), 
parseSuffix(b.BlockID))
 }
 
+// GenerateInternalID returns a identity of a section(segment or block) based 
on IntervalRule.
 func GenerateInternalID(unit IntervalUnit, suffix int) SectionID {
        return SectionID(unit)<<31 | ((SectionID(suffix) << 1) >> 1)
 }
@@ -131,11 +147,14 @@ func readSectionID(data []byte, offset int) (SectionID, 
int) {
        return SectionID(convert.BytesToUint32(data[offset:end])), end
 }
 
+// BlockState is a sample of a block's runtime state.
 type BlockState struct {
        TimeRange timestamp.TimeRange
        ID        BlockID
        Closed    bool
 }
+
+// ShardState is a sample of a shard's runtime state.
 type ShardState struct {
        Blocks           []BlockState
        OpenBlocks       []BlockID
@@ -159,7 +178,7 @@ func (d *database) Shards() []Shard {
 
 func (d *database) Shard(id common.ShardID) (Shard, error) {
        if uint(id) >= uint(len(d.sLst)) {
-               return nil, ErrInvalidShardID
+               return nil, errInvalidShardID
        }
        return d.sLst[id], nil
 }
@@ -175,24 +194,26 @@ func (d *database) Close() error {
        return err
 }
 
+// OpenDatabase returns a new tsdb runtime. This constructor will create a new 
database if it's absent,
+// or load an existing one.
 func OpenDatabase(ctx context.Context, opts DatabaseOpts) (Database, error) {
        if opts.EncodingMethod.EncoderPool == nil || 
opts.EncodingMethod.DecoderPool == nil {
-               return nil, errors.Wrap(ErrOpenDatabase, "encoding method is 
absent")
+               return nil, errors.Wrap(errOpenDatabase, "encoding method is 
absent")
        }
        if _, err := mkdir(opts.Location); err != nil {
                return nil, err
        }
        if opts.SegmentInterval.Num == 0 {
-               return nil, errors.Wrap(ErrOpenDatabase, "segment interval is 
absent")
+               return nil, errors.Wrap(errOpenDatabase, "segment interval is 
absent")
        }
        if opts.BlockInterval.Num == 0 {
-               return nil, errors.Wrap(ErrOpenDatabase, "block interval is 
absent")
+               return nil, errors.Wrap(errOpenDatabase, "block interval is 
absent")
        }
-       if opts.BlockInterval.EstimatedDuration() > 
opts.SegmentInterval.EstimatedDuration() {
-               return nil, errors.Wrapf(ErrOpenDatabase, "the block size is 
bigger than the segment size")
+       if opts.BlockInterval.estimatedDuration() > 
opts.SegmentInterval.estimatedDuration() {
+               return nil, errors.Wrapf(errOpenDatabase, "the block size is 
bigger than the segment size")
        }
        if opts.TTL.Num == 0 {
-               return nil, errors.Wrap(ErrOpenDatabase, "ttl is absent")
+               return nil, errors.Wrap(errOpenDatabase, "ttl is absent")
        }
        db := &database{
                location:    opts.Location,
@@ -242,7 +263,7 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        // TODO: open the manifest file
        db.Lock()
        defer db.Unlock()
-       err := WalkDir(db.location, shardPathPrefix, func(suffix string) error {
+       err := walkDir(db.location, shardPathPrefix, func(suffix string) error {
                shardID, err := strconv.Atoi(suffix)
                if err != nil {
                        return err
@@ -281,9 +302,9 @@ func loadDatabase(ctx context.Context, db *database) 
(Database, error) {
        return db, nil
 }
 
-type WalkFn func(suffix string) error
+type walkFn func(suffix string) error
 
-func WalkDir(root, prefix string, walkFn WalkFn) error {
+func walkDir(root, prefix string, wf walkFn) error {
        files, err := os.ReadDir(root)
        if err != nil {
                return errors.Wrapf(err, "failed to walk the database path: 
%s", root)
@@ -293,7 +314,7 @@ func WalkDir(root, prefix string, walkFn WalkFn) error {
                        continue
                }
                segs := strings.Split(f.Name(), "-")
-               errWalk := walkFn(segs[len(segs)-1])
+               errWalk := wf(segs[len(segs)-1])
                if errWalk != nil {
                        return errors.WithMessagef(errWalk, "failed to load: 
%s", f.Name())
                }
diff --git a/pkg/schema/metadata.go b/pkg/schema/metadata.go
index 1b2f979..daa2859 100644
--- a/pkg/schema/metadata.go
+++ b/pkg/schema/metadata.go
@@ -88,7 +88,7 @@ type Resource interface {
 
 type ResourceSupplier interface {
        OpenResource(shardNum uint32, db tsdb.Supplier, spec ResourceSpec) 
(Resource, error)
-       ResourceSchema(repo metadata.Repo, metdata *commonv1.Metadata) 
(ResourceSchema, error)
+       ResourceSchema(metdata *commonv1.Metadata) (ResourceSchema, error)
        OpenDB(groupSchema *commonv1.Group) (tsdb.Database, error)
 }
 
@@ -287,7 +287,7 @@ func (sr *schemaRepo) storeResource(metadata 
*commonv1.Metadata) (Resource, erro
                        return nil, errors.WithMessagef(err, "create unknown 
group:%s", metadata.Group)
                }
        }
-       stm, err := sr.resourceSupplier.ResourceSchema(sr.metadata, metadata)
+       stm, err := sr.resourceSupplier.ResourceSchema(metadata)
        if err != nil {
                return nil, errors.WithMessage(err, "fails to get the resource")
        }
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index 4235604..e8b3063 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -82,7 +82,7 @@ func modules(flags []string) func() {
        measureSvc, err := measure.NewService(context.TODO(), metaSvc, repo, 
pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        // Init `Query` module
-       q, err := query.NewExecutor(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
+       q, err := query.NewService(context.TODO(), streamSvc, measureSvc, 
metaSvc, repo, pipeline)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
        tcp := grpc.NewServer(context.TODO(), pipeline, repo, metaSvc)
        httpServer := http.NewService()

Reply via email to