This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new d043b3af Fix issues found by smoke and stress tests (#342)
d043b3af is described below

commit d043b3af6e228c583c043c6dcccf5025db022378
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Mon Oct 16 13:38:33 2023 +0800

    Fix issues found by smoke and stress tests (#342)
---
 .gitattributes                                     |   1 +
 .gitignore                                         |   3 +
 .licenserc.yaml                                    |   1 +
 banyand/k8s.yml                                    |   2 +-
 banyand/kv/kv.go                                   |  18 +--
 banyand/liaison/grpc/measure.go                    |  22 +--
 banyand/liaison/grpc/stream.go                     |  22 +--
 banyand/measure/service.go                         |   1 +
 banyand/measure/tstable.go                         |   9 +-
 banyand/metadata/client.go                         |  18 ++-
 banyand/metadata/schema/etcd.go                    |  33 +++--
 banyand/metadata/schema/node.go                    |   4 +-
 banyand/metadata/schema/register_test.go           |   6 +-
 banyand/metadata/schema/schema.go                  |   2 +-
 banyand/metadata/server.go                         |   4 +-
 banyand/stream/tstable.go                          |   7 +-
 banyand/tsdb/block.go                              |   2 +-
 banyand/tsdb/shard.go                              |   1 -
 banyand/tsdb/tsdb_test.go                          |   2 +-
 banyand/tsdb/tstable.go                            |   4 +-
 go.mod                                             |   2 +-
 pkg/cmdsetup/data.go                               |   2 +-
 pkg/cmdsetup/liaison.go                            |   2 +-
 pkg/index/inverted/inverted.go                     |  18 ++-
 pkg/query/logical/measure/measure_plan_groupby.go  |   6 +
 pkg/test/setup/setup.go                            |  31 ++++-
 pkg/wal/wal.go                                     |  14 +-
 .../query_ondisk/query_ondisk_suite_test.go        | 105 ++++++++++++++
 test/stress/cases/istio/cpu.prof                   |   0
 test/stress/cases/istio/heap.prof                  |   0
 test/stress/cases/istio/istio_suite_test.go        | 152 +++++++++++++++++++--
 test/stress/cases/istio/repo.go                    |  23 ++--
 test/stress/cases/istio/report.md                  | 141 +++++++++++++------
 test/stress/cases/istio/testdata/access.tar.bz2    | Bin 0 -> 6026312 bytes
 test/stress/cases/istio/testdata/access.tar.gz     | Bin 15477358 -> 0 bytes
 35 files changed, 512 insertions(+), 146 deletions(-)

diff --git a/.gitattributes b/.gitattributes
index 28118cbe..9643b431 100644
--- a/.gitattributes
+++ b/.gitattributes
@@ -24,5 +24,6 @@
 *.jpeg binary
 *.ico binary
 *.gz binary
+*.bz2 binary
 
 go.sum merge=union
diff --git a/.gitignore b/.gitignore
index eaad4b52..6b001b0e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,6 @@ target
 
 # okteto
 .stignore
+
+# profile result
+*.prof
diff --git a/.licenserc.yaml b/.licenserc.yaml
index 00e91e4a..6f36ce3e 100644
--- a/.licenserc.yaml
+++ b/.licenserc.yaml
@@ -84,6 +84,7 @@ header: # `header` section is configurations for source codes 
license header.
     - '**/ginkgo.report'
     - 'ui'
     - '.github/PULL_REQUEST_TEMPLATE'
+    - "**/*.prof"
 
   comment: on-failure # on what condition license-eye will comment on the pull 
request, `on-failure`, `always`, `never`.
 
diff --git a/banyand/k8s.yml b/banyand/k8s.yml
index 2b8f32fc..a416b7a6 100644
--- a/banyand/k8s.yml
+++ b/banyand/k8s.yml
@@ -82,7 +82,7 @@ spec:
         args:
         - "standalone"
         - "--measure-idx-batch-wait-sec=30"
-        - "--logging.level=warn"
+        - "--logging-level=info"
         imagePullPolicy: Always
         livenessProbe:
           failureThreshold: 5
diff --git a/banyand/kv/kv.go b/banyand/kv/kv.go
index 28675730..e6180683 100644
--- a/banyand/kv/kv.go
+++ b/banyand/kv/kv.go
@@ -148,15 +148,6 @@ func TSSWithMemTableSize(sizeInBytes int64) 
TimeSeriesOptions {
        }
 }
 
-// TSSWithTimeRange sets the time range of the time series.
-func TSSWithTimeRange(timeRange timestamp.TimeRange) TimeSeriesOptions {
-       return func(store TimeSeriesStore) {
-               if btss, ok := store.(*badgerTSS); ok {
-                       btss.timeRange = timeRange
-               }
-       }
-}
-
 // Iterator allows iterating the kv tables.
 // TODO: use generic to provide a unique iterator.
 type Iterator interface {
@@ -185,7 +176,7 @@ type IndexStore interface {
 
 // OpenTimeSeriesStore creates a new TimeSeriesStore.
 // nolint: contextcheck
-func OpenTimeSeriesStore(path string, options ...TimeSeriesOptions) 
(TimeSeriesStore, error) {
+func OpenTimeSeriesStore(path string, timeRange timestamp.TimeRange, options 
...TimeSeriesOptions) (TimeSeriesStore, error) {
        btss := new(badgerTSS)
        btss.dbOpts = badger.DefaultOptions(path)
        for _, opt := range options {
@@ -198,7 +189,8 @@ func OpenTimeSeriesStore(path string, options 
...TimeSeriesOptions) (TimeSeriesS
                WithInTable().
                WithMaxLevels(2).
                WithBaseTableSize(10 << 20).
-               WithBaseLevelSize(math.MaxInt64)
+               WithBaseLevelSize(math.MaxInt64).
+               WithBlockCacheSize(10 << 20)
        if btss.dbOpts.MemTableSize < int64(defaultKVMemorySize) {
                btss.dbOpts.MemTableSize = int64(defaultKVMemorySize)
        }
@@ -212,6 +204,7 @@ func OpenTimeSeriesStore(path string, options 
...TimeSeriesOptions) (TimeSeriesS
                return nil, fmt.Errorf("failed to open time series store: %w", 
err)
        }
        btss.TSet = *badger.NewTSet(btss.db)
+       btss.timeRange = timeRange
        return btss, nil
 }
 
@@ -264,7 +257,8 @@ func OpenStore(path string, opts ...StoreOptions) (Store, 
error) {
                WithBaseTableSize(5 << 20).
                WithBaseLevelSize(25 << 20).
                WithCompression(options.ZSTD).
-               WithZSTDCompressionLevel(1)
+               WithZSTDCompressionLevel(1).
+               WithBlockCacheSize(10 << 20)
 
        var err error
        bdb.db, err = badger.Open(bdb.dbOpts)
diff --git a/banyand/liaison/grpc/measure.go b/banyand/liaison/grpc/measure.go
index 2a32d29d..ebb5c58f 100644
--- a/banyand/liaison/grpc/measure.go
+++ b/banyand/liaison/grpc/measure.go
@@ -88,16 +88,18 @@ func (ms *measureService) Write(measure 
measurev1.MeasureService_WriteServer) er
                        reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_INVALID_TIMESTAMP, writeRequest.GetMessageId(), measure, 
ms.sampled)
                        continue
                }
-               measureCache, existed := 
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
-               if !existed {
-                       ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to measure schema not found")
-                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, 
ms.sampled)
-                       continue
-               }
-               if writeRequest.Metadata.ModRevision != 
measureCache.ModRevision {
-                       ms.sampled.Error().Stringer("written", 
writeRequest).Msg("the measure schema is expired")
-                       reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, 
ms.sampled)
-                       continue
+               if writeRequest.Metadata.ModRevision > 0 {
+                       measureCache, existed := 
ms.entityRepo.getLocator(getID(writeRequest.GetMetadata()))
+                       if !existed {
+                               ms.sampled.Error().Err(err).Stringer("written", 
writeRequest).Msg("failed to measure schema not found")
+                               reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                               continue
+                       }
+                       if writeRequest.Metadata.ModRevision != 
measureCache.ModRevision {
+                               ms.sampled.Error().Stringer("written", 
writeRequest).Msg("the measure schema is expired")
+                               reply(writeRequest.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeRequest.GetMessageId(), measure, 
ms.sampled)
+                               continue
+                       }
                }
                entity, tagValues, shardID, err := 
ms.navigate(writeRequest.GetMetadata(), 
writeRequest.GetDataPoint().GetTagFamilies())
                if err != nil {
diff --git a/banyand/liaison/grpc/stream.go b/banyand/liaison/grpc/stream.go
index aa3d49a4..d2aca976 100644
--- a/banyand/liaison/grpc/stream.go
+++ b/banyand/liaison/grpc/stream.go
@@ -88,16 +88,18 @@ func (s *streamService) Write(stream 
streamv1.StreamService_WriteServer) error {
                        reply(nil, modelv1.Status_STATUS_INVALID_TIMESTAMP, 
writeEntity.GetMessageId(), stream, s.sampled)
                        continue
                }
-               streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
-               if !existed {
-                       s.sampled.Error().Err(err).Stringer("written", 
writeEntity).Msg("failed to stream schema not found")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
-                       continue
-               }
-               if writeEntity.Metadata.ModRevision != streamCache.ModRevision {
-                       s.sampled.Error().Stringer("written", 
writeEntity).Msg("the stream schema is expired")
-                       reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, 
s.sampled)
-                       continue
+               if writeEntity.Metadata.ModRevision > 0 {
+                       streamCache, existed := 
s.entityRepo.getLocator(getID(writeEntity.GetMetadata()))
+                       if !existed {
+                               s.sampled.Error().Err(err).Stringer("written", 
writeEntity).Msg("failed to stream schema not found")
+                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_NOT_FOUND, writeEntity.GetMessageId(), stream, s.sampled)
+                               continue
+                       }
+                       if writeEntity.Metadata.ModRevision != 
streamCache.ModRevision {
+                               s.sampled.Error().Stringer("written", 
writeEntity).Msg("the stream schema is expired")
+                               reply(writeEntity.GetMetadata(), 
modelv1.Status_STATUS_EXPIRED_SCHEMA, writeEntity.GetMessageId(), stream, 
s.sampled)
+                               continue
+                       }
                }
                entity, tagValues, shardID, err := 
s.navigate(writeEntity.GetMetadata(), writeEntity.GetElement().GetTagFamilies())
                if err != nil {
diff --git a/banyand/measure/service.go b/banyand/measure/service.go
index 43940d8f..98a9e101 100644
--- a/banyand/measure/service.go
+++ b/banyand/measure/service.go
@@ -87,6 +87,7 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.Var(&s.BlockBufferSize, "measure-buffer-size", "block buffer 
size")
        flagS.Var(&s.dbOpts.SeriesMemSize, "measure-seriesmeta-mem-size", 
"series metadata memory size")
        flagS.Int64Var(&s.dbOpts.BlockInvertedIndex.BatchWaitSec, 
"measure-idx-batch-wait-sec", 1, "index batch wait in second")
+       flagS.BoolVar(&s.dbOpts.EnableWAL, "measure-enable-wal", true, "enable 
write ahead log")
        return flagS
 }
 
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 28ca9561..1e0723ad 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -37,6 +37,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 const (
@@ -87,7 +88,7 @@ func (t *tsTable) openBuffer() (err error) {
 func (t *tsTable) Close() (err error) {
        t.lock.Lock()
        defer t.lock.Unlock()
-       for _, b := range []io.Closer{t.encoderBuffer, t.buffer, t.sst, 
t.encoderSST} {
+       for _, b := range []io.Closer{t.sst, t.encoderSST} {
                if b != nil {
                        err = multierr.Append(err, b.Close())
                }
@@ -179,9 +180,12 @@ type tsTableFactory struct {
        compressionMethod databasev1.CompressionMethod
 }
 
-func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, 
root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
+func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, 
root string,
+       position common.Position, l *logger.Logger, timeRange 
timestamp.TimeRange,
+) (tsdb.TSTable, error) {
        encoderSST, err := kv.OpenTimeSeriesStore(
                path.Join(root, encoded),
+               timeRange,
                kv.TSSWithMemTableSize(ttf.bufferSize),
                kv.TSSWithLogger(l.Named(encoded)),
                kv.TSSWithEncoding(ttf.encoderPool, ttf.decoderPool, 
ttf.encodingChunkSize),
@@ -191,6 +195,7 @@ func (ttf *tsTableFactory) NewTSTable(bufferSupplier 
*tsdb.BufferSupplier, root
        }
        sst, err := kv.OpenTimeSeriesStore(
                path.Join(root, plain),
+               timeRange,
                kv.TSSWithMemTableSize(ttf.bufferSize),
                kv.TSSWithLogger(l.Named(plain)),
                kv.TSSWithZSTDCompression(int(ttf.plainChunkSize)),
diff --git a/banyand/metadata/client.go b/banyand/metadata/client.go
index 35ecbcec..411b3c03 100644
--- a/banyand/metadata/client.go
+++ b/banyand/metadata/client.go
@@ -40,15 +40,19 @@ const (
 )
 
 // NewClient returns a new metadata client.
-func NewClient(_ context.Context) (Service, error) {
-       return &clientService{closer: run.NewCloser(1)}, nil
+func NewClient(forceRegisterNode bool) (Service, error) {
+       return &clientService{
+               closer:            run.NewCloser(1),
+               forceRegisterNode: forceRegisterNode,
+       }, nil
 }
 
 type clientService struct {
-       namespace      string
-       schemaRegistry schema.Registry
-       closer         *run.Closer
-       endpoints      []string
+       schemaRegistry    schema.Registry
+       closer            *run.Closer
+       namespace         string
+       endpoints         []string
+       forceRegisterNode bool
 }
 
 func (s *clientService) SchemaRegistry() schema.Registry {
@@ -100,7 +104,7 @@ func (s *clientService) PreRun(ctx context.Context) error {
        }
        for {
                ctxRegister, cancel := context.WithTimeout(ctx, time.Second*10)
-               err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo)
+               err = s.schemaRegistry.RegisterNode(ctxRegister, nodeInfo, 
s.forceRegisterNode)
                cancel()
                if errors.Is(err, context.DeadlineExceeded) {
                        l.Warn().Strs("etcd-endpoints", 
s.endpoints).Msg("register node timeout, retrying...")
diff --git a/banyand/metadata/schema/etcd.go b/banyand/metadata/schema/etcd.go
index 8b90490e..3abb630a 100644
--- a/banyand/metadata/schema/etcd.go
+++ b/banyand/metadata/schema/etcd.go
@@ -93,7 +93,7 @@ func (e *etcdSchemaRegistry) RegisterHandler(name string, 
kind Kind, handler Eve
        for i := 0; i < KindSize; i++ {
                ki := Kind(1 << i)
                if kind&ki > 0 {
-                       e.l.Info().Str("name", name).Stringer("kind", 
kind).Msg("registering watcher")
+                       e.l.Info().Str("name", name).Stringer("kind", 
ki).Msg("registering watcher")
                        w := e.newWatcher(name, ki, handler)
                        e.watchers = append(e.watchers, w)
                }
@@ -315,7 +315,7 @@ func (e *etcdSchemaRegistry) delete(ctx context.Context, 
metadata Metadata) (boo
        return false, nil
 }
 
-func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata) 
error {
+func (e *etcdSchemaRegistry) register(ctx context.Context, metadata Metadata, 
forced bool) error {
        if !e.closer.AddRunning() {
                return ErrClosed
        }
@@ -334,18 +334,25 @@ func (e *etcdSchemaRegistry) register(ctx 
context.Context, metadata Metadata) er
        if err != nil {
                return err
        }
-       var ops []clientv3.Cmp
-       ops = append(ops, clientv3.Compare(clientv3.CreateRevision(key), "=", 
0))
-       txn := e.client.Txn(ctx).If(ops...)
-       txn = txn.Then(clientv3.OpPut(key, string(val), 
clientv3.WithLease(lease.ID)))
-       txn = txn.Else(clientv3.OpGet(key))
-       response, err := txn.Commit()
-       if err != nil {
-               return err
-       }
-       if !response.Succeeded {
-               return errGRPCAlreadyExists
+       if forced {
+               if _, err = e.client.Put(ctx, key, string(val), 
clientv3.WithLease(lease.ID)); err != nil {
+                       return err
+               }
+       } else {
+               var ops []clientv3.Cmp
+               ops = append(ops, 
clientv3.Compare(clientv3.CreateRevision(key), "=", 0))
+               txn := e.client.Txn(ctx).If(ops...)
+               txn = txn.Then(clientv3.OpPut(key, string(val), 
clientv3.WithLease(lease.ID)))
+               txn = txn.Else(clientv3.OpGet(key))
+               response, errCommit := txn.Commit()
+               if errCommit != nil {
+                       return errCommit
+               }
+               if !response.Succeeded {
+                       return errGRPCAlreadyExists
+               }
        }
+
        // Keep the lease alive
        // nolint:contextcheck
        keepAliveChan, err := e.client.KeepAlive(context.Background(), lease.ID)
diff --git a/banyand/metadata/schema/node.go b/banyand/metadata/schema/node.go
index 575b8c89..d0c4a0bd 100644
--- a/banyand/metadata/schema/node.go
+++ b/banyand/metadata/schema/node.go
@@ -47,14 +47,14 @@ func (e *etcdSchemaRegistry) ListNode(ctx context.Context, 
role databasev1.Role)
        return entities, nil
 }
 
-func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node 
*databasev1.Node) error {
+func (e *etcdSchemaRegistry) RegisterNode(ctx context.Context, node 
*databasev1.Node, forced bool) error {
        return e.register(ctx, Metadata{
                TypeMeta: TypeMeta{
                        Kind: KindNode,
                        Name: node.Metadata.Name,
                },
                Spec: node,
-       })
+       }, forced)
 }
 
 func formatNodeKey(name string) string {
diff --git a/banyand/metadata/schema/register_test.go 
b/banyand/metadata/schema/register_test.go
index 9f05a999..45e096d9 100644
--- a/banyand/metadata/schema/register_test.go
+++ b/banyand/metadata/schema/register_test.go
@@ -72,7 +72,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
        })
 
        ginkgo.It("should revoke the leaser", func() {
-               gomega.Expect(r.register(context.Background(), 
md)).ShouldNot(gomega.HaveOccurred())
+               gomega.Expect(r.register(context.Background(), md, 
true)).ShouldNot(gomega.HaveOccurred())
                k, err := md.key()
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                gomega.Expect(r.get(context.Background(), k, 
&databasev1.Node{})).ShouldNot(gomega.HaveOccurred())
@@ -86,7 +86,7 @@ var _ = ginkgo.Describe("etcd_register", func() {
        })
 
        ginkgo.It("should register only once", func() {
-               gomega.Expect(r.register(context.Background(), 
md)).ShouldNot(gomega.HaveOccurred())
-               gomega.Expect(r.register(context.Background(), 
md)).Should(gomega.MatchError(errGRPCAlreadyExists))
+               gomega.Expect(r.register(context.Background(), md, 
false)).ShouldNot(gomega.HaveOccurred())
+               gomega.Expect(r.register(context.Background(), md, 
false)).Should(gomega.MatchError(errGRPCAlreadyExists))
        })
 })
diff --git a/banyand/metadata/schema/schema.go 
b/banyand/metadata/schema/schema.go
index dd4570ca..3a35c13f 100644
--- a/banyand/metadata/schema/schema.go
+++ b/banyand/metadata/schema/schema.go
@@ -195,5 +195,5 @@ type Property interface {
 // Node allows CRUD node schemas in a group.
 type Node interface {
        ListNode(ctx context.Context, role databasev1.Role) 
([]*databasev1.Node, error)
-       RegisterNode(ctx context.Context, node *databasev1.Node) error
+       RegisterNode(ctx context.Context, node *databasev1.Node, forced bool) 
error
 }
diff --git a/banyand/metadata/server.go b/banyand/metadata/server.go
index a4c59359..c96398f8 100644
--- a/banyand/metadata/server.go
+++ b/banyand/metadata/server.go
@@ -89,10 +89,10 @@ func (s *server) GracefulStop() {
 }
 
 // NewService returns a new metadata repository Service.
-func NewService(ctx context.Context) (Service, error) {
+func NewService(_ context.Context) (Service, error) {
        s := &server{}
        var err error
-       s.Service, err = NewClient(ctx)
+       s.Service, err = NewClient(true)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 1cdc4fa7..4f262b0f 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -33,6 +33,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/kv"
        "github.com/apache/skywalking-banyandb/banyand/tsdb"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 const (
@@ -117,8 +118,10 @@ type tsTableFactory struct {
        chunkSize         int
 }
 
-func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, 
root string, position common.Position, l *logger.Logger) (tsdb.TSTable, error) {
-       sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), 
kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)),
+func (ttf *tsTableFactory) NewTSTable(bufferSupplier *tsdb.BufferSupplier, 
root string, position common.Position,
+       l *logger.Logger, timeRange timestamp.TimeRange,
+) (tsdb.TSTable, error) {
+       sst, err := kv.OpenTimeSeriesStore(path.Join(root, id), timeRange, 
kv.TSSWithMemTableSize(ttf.bufferSize), kv.TSSWithLogger(l.Named(id)),
                kv.TSSWithZSTDCompression(ttf.chunkSize))
        if err != nil {
                return nil, fmt.Errorf("failed to create time series table: 
%w", err)
diff --git a/banyand/tsdb/block.go b/banyand/tsdb/block.go
index 32e4d521..a8596508 100644
--- a/banyand/tsdb/block.go
+++ b/banyand/tsdb/block.go
@@ -202,7 +202,7 @@ func (b *block) openSafely() (err error) {
 
 func (b *block) open() (err error) {
        if b.tsTable, err = 
b.openOpts.tsTableFactory.NewTSTable(b.openOpts.bufferSupplier,
-               b.path, b.position, b.l); err != nil {
+               b.path, b.position, b.l, b.TimeRange); err != nil {
                return err
        }
        b.closableLst = append(b.closableLst, b.tsTable)
diff --git a/banyand/tsdb/shard.go b/banyand/tsdb/shard.go
index b7841837..5ab48ca0 100644
--- a/banyand/tsdb/shard.go
+++ b/banyand/tsdb/shard.go
@@ -244,7 +244,6 @@ func (s *shard) TriggerSchedule(task string) bool {
 
 func (s *shard) Close() (err error) {
        s.closeOnce.Do(func() {
-               _ = s.bufferSupplier.Close()
                s.scheduler.Close()
                s.segmentManageStrategy.Close()
                ctx, cancel := context.WithTimeout(context.Background(), 
5*time.Second)
diff --git a/banyand/tsdb/tsdb_test.go b/banyand/tsdb/tsdb_test.go
index ebe299b7..8c2dbd71 100644
--- a/banyand/tsdb/tsdb_test.go
+++ b/banyand/tsdb/tsdb_test.go
@@ -141,7 +141,7 @@ func NewByPassTSTableFactory() TSTableFactory {
        return bypassTSTableFactory{}
 }
 
-func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ 
common.Position, _ *logger.Logger) (TSTable, error) {
+func (bypassTSTableFactory) NewTSTable(_ *BufferSupplier, _ string, _ 
common.Position, _ *logger.Logger, _ timestamp.TimeRange) (TSTable, error) {
        return newBypassTSTable()
 }
 
diff --git a/banyand/tsdb/tstable.go b/banyand/tsdb/tstable.go
index 9aab9bb0..d23a5b8c 100644
--- a/banyand/tsdb/tstable.go
+++ b/banyand/tsdb/tstable.go
@@ -25,6 +25,7 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 // TSTable is time series table.
@@ -43,5 +44,6 @@ type TSTable interface {
 // TSTableFactory is the factory of TSTable.
 type TSTableFactory interface {
        // NewTSTable creates a new TSTable.
-       NewTSTable(bufferSupplier *BufferSupplier, root string, position 
common.Position, l *logger.Logger) (TSTable, error)
+       NewTSTable(bufferSupplier *BufferSupplier, root string, position 
common.Position,
+               l *logger.Logger, timeRange timestamp.TimeRange) (TSTable, 
error)
 }
diff --git a/go.mod b/go.mod
index 991bfa80..10bd2daa 100644
--- a/go.mod
+++ b/go.mod
@@ -66,7 +66,7 @@ require (
        github.com/davecgh/go-spew v1.1.1 // indirect
        github.com/dgraph-io/ristretto v0.1.1 // indirect
        github.com/dgryski/go-metro v0.0.0-20180109044635-280f6062b5bc // 
indirect
-       github.com/dustin/go-humanize v1.0.1 // indirect
+       github.com/dustin/go-humanize v1.0.1
        github.com/fsnotify/fsnotify v1.6.0 // indirect
        github.com/go-logr/logr v1.2.4 // indirect
        github.com/go-logr/stdr v1.2.2 // indirect
diff --git a/pkg/cmdsetup/data.go b/pkg/cmdsetup/data.go
index a6ec86a6..7932f711 100644
--- a/pkg/cmdsetup/data.go
+++ b/pkg/cmdsetup/data.go
@@ -38,7 +38,7 @@ import (
 func newDataCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       metaSvc, err := metadata.NewClient(ctx)
+       metaSvc, err := metadata.NewClient(false)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
diff --git a/pkg/cmdsetup/liaison.go b/pkg/cmdsetup/liaison.go
index b0c03985..c2c8eec5 100644
--- a/pkg/cmdsetup/liaison.go
+++ b/pkg/cmdsetup/liaison.go
@@ -40,7 +40,7 @@ import (
 func newLiaisonCmd(runners ...run.Unit) *cobra.Command {
        l := logger.GetLogger("bootstrap")
        ctx := context.Background()
-       metaSvc, err := metadata.NewClient(ctx)
+       metaSvc, err := metadata.NewClient(false)
        if err != nil {
                l.Fatal().Err(err).Msg("failed to initiate metadata service")
        }
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 5601d94a..2437a627 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -25,6 +25,7 @@ import (
        "io"
        "log"
        "math"
+       "sync/atomic"
        "time"
 
        "github.com/blugelabs/bluge"
@@ -90,13 +91,13 @@ type store struct {
        ch            chan any
        closer        *run.Closer
        l             *logger.Logger
+       errClosing    atomic.Pointer[error]
        batchInterval time.Duration
 }
 
 // NewStore create a new inverted index repository.
 func NewStore(opts StoreOpts) (index.Store, error) {
-       indexConfig := blugeIndex.DefaultConfig(opts.Path).WithUnsafeBatches().
-               WithPersisterNapTimeMSec(60 * 1000)
+       indexConfig := blugeIndex.DefaultConfig(opts.Path)
        indexConfig.MergePlanOptions.MaxSegmentsPerTier = 1
        indexConfig.MergePlanOptions.MaxSegmentSize = 500000
        indexConfig.MergePlanOptions.SegmentsPerMergeTask = 20
@@ -124,7 +125,10 @@ func NewStore(opts StoreOpts) (index.Store, error) {
 
 func (s *store) Close() error {
        s.closer.CloseThenWait()
-       return s.writer.Close()
+       if s.errClosing.Load() != nil {
+               return *s.errClosing.Load()
+       }
+       return nil
 }
 
 func (s *store) Write(fields []index.Field, docID uint64) error {
@@ -291,7 +295,12 @@ func (s *store) SizeOnDisk() int64 {
 
 func (s *store) run() {
        go func() {
-               defer s.closer.Done()
+               defer func() {
+                       if err := s.writer.Close(); err != nil {
+                               s.errClosing.Store(&err)
+                       }
+                       s.closer.Done()
+               }()
                size := 0
                batch := bluge.NewBatch()
                flush := func() {
@@ -304,6 +313,7 @@ func (s *store) run() {
                        batch.Reset()
                        size = 0
                }
+               defer flush()
                var docIDBuffer bytes.Buffer
                for {
                        timer := time.NewTimer(s.batchInterval)
diff --git a/pkg/query/logical/measure/measure_plan_groupby.go 
b/pkg/query/logical/measure/measure_plan_groupby.go
index 9f62565d..067577c8 100644
--- a/pkg/query/logical/measure/measure_plan_groupby.go
+++ b/pkg/query/logical/measure/measure_plan_groupby.go
@@ -154,6 +154,12 @@ func formatGroupByKey(point *measurev1.DataPoint, 
groupByTagsRefs [][]*logical.T
        hash := xxhash.New()
        for _, tagFamilyRef := range groupByTagsRefs {
                for _, tagRef := range tagFamilyRef {
+                       if tagRef.Spec.TagFamilyIdx >= 
len(point.GetTagFamilies()) {
+                               return 0, errors.New("tag family index out of 
range")
+                       }
+                       if tagRef.Spec.TagIdx >= 
len(point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()) {
+                               return 0, errors.New("tag index out of range")
+                       }
                        tag := 
point.GetTagFamilies()[tagRef.Spec.TagFamilyIdx].GetTags()[tagRef.Spec.TagIdx]
                        switch v := tag.GetValue().GetValue().(type) {
                        case *modelv1.TagValue_Str:
diff --git a/pkg/test/setup/setup.go b/pkg/test/setup/setup.go
index b3159f90..8fdcbfec 100644
--- a/pkg/test/setup/setup.go
+++ b/pkg/test/setup/setup.go
@@ -71,6 +71,32 @@ func StandaloneWithSchemaLoaders(schemaLoaders 
[]SchemaLoader, certFile, keyFile
        var ports []int
        ports, err = test.AllocateFreePorts(4)
        gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       addr, httpAddr, closeFn := standaloneServer(path, ports, schemaLoaders, 
certFile, keyFile, flags...)
+       return addr, httpAddr, func() {
+               closeFn()
+               deferFn()
+       }
+}
+
+// ClosableStandalone wires standalone modules to build a testing ready 
runtime.
+func ClosableStandalone(path string, ports []int, flags ...string) (string, 
string, func()) {
+       return standaloneServer(path, ports, []SchemaLoader{
+               &preloadService{name: "stream"},
+               &preloadService{name: "measure"},
+       }, "", "", flags...)
+}
+
+// ClosableStandaloneWithSchemaLoaders wires standalone modules to build a 
testing ready runtime.
+func ClosableStandaloneWithSchemaLoaders(path string, ports []int, 
schemaLoaders []SchemaLoader, flags ...string) (string, string, func()) {
+       return standaloneServer(path, ports, schemaLoaders, "", "", flags...)
+}
+
+// EmptyClosableStandalone wires standalone modules to build a testing ready 
runtime.
+func EmptyClosableStandalone(path string, ports []int, flags ...string) 
(string, string, func()) {
+       return standaloneServer(path, ports, nil, "", "", flags...)
+}
+
+func standaloneServer(path string, ports []int, schemaLoaders []SchemaLoader, 
certFile, keyFile string, flags ...string) (string, string, func()) {
        addr := fmt.Sprintf("%s:%d", host, ports[0])
        httpAddr := fmt.Sprintf("%s:%d", host, ports[1])
        endpoint := fmt.Sprintf("http://%s:%d";, host, ports[2])
@@ -127,10 +153,7 @@ func StandaloneWithSchemaLoaders(schemaLoaders 
[]SchemaLoader, certFile, keyFile
                err = preloadGroup.Run(context.Background())
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
        }
-       return addr, httpAddr, func() {
-               closeFn()
-               deferFn()
-       }
+       return addr, httpAddr, closeFn
 }
 
 // SchemaLoader is a service that can preload schema.
diff --git a/pkg/wal/wal.go b/pkg/wal/wal.go
index db8f416d..0d9fac6c 100644
--- a/pkg/wal/wal.go
+++ b/pkg/wal/wal.go
@@ -30,6 +30,7 @@ import (
        "strconv"
        "strings"
        "sync"
+       "syscall"
        "time"
 
        "github.com/golang/snappy"
@@ -332,13 +333,16 @@ func (log *log) Delete(segmentID SegmentID) error {
        if segmentID == log.workSegment.segmentID {
                return errors.New("Can not delete the segment which is working")
        }
-
+       defer delete(log.segmentMap, segmentID)
        err := os.Remove(log.segmentMap[segmentID].path)
-       if err != nil {
-               return errors.Wrap(err, "Delete WAL segment error")
+       if err == nil {
+               return nil
        }
-       delete(log.segmentMap, segmentID)
-       return nil
+       var pathErr *os.PathError
+       if errors.As(err, &pathErr) && errors.Is(pathErr.Err, syscall.ENOENT) {
+               return nil
+       }
+       return errors.Wrap(err, "Delete WAL segment error")
 }
 
 // Close all of segments and stop WAL work.
diff --git 
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go 
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
new file mode 100644
index 00000000..ac89d108
--- /dev/null
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_query_ondisk_test
+
+import (
+       "testing"
+       "time"
+
+       . "github.com/onsi/ginkgo/v2"
+       . "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/test/helpers"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       test_cases "github.com/apache/skywalking-banyandb/test/cases"
+       casesmeasure "github.com/apache/skywalking-banyandb/test/cases/measure"
+       casesstream "github.com/apache/skywalking-banyandb/test/cases/stream"
+       casestopn "github.com/apache/skywalking-banyandb/test/cases/topn"
+       integration_standalone 
"github.com/apache/skywalking-banyandb/test/integration/standalone"
+)
+
+func TestIntegrationQueryOnDisk(t *testing.T) {
+       RegisterFailHandler(Fail)
+       RunSpecs(t, "Integration Query OnDisk Suite", 
Label(integration_standalone.Labels...))
+}
+
+var (
+       connection *grpc.ClientConn
+       now        time.Time
+       deferFunc  func()
+       goods      []gleak.Goroutine
+)
+
+var _ = SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       Expect(logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })).To(Succeed())
+       path, diskCleanupFn, err := test.NewSpace()
+       Expect(err).NotTo(HaveOccurred())
+       var ports []int
+       ports, err = test.AllocateFreePorts(4)
+       Expect(err).NotTo(HaveOccurred())
+       addr, _, closeFunc := setup.ClosableStandalone(path, ports)
+       ns := timestamp.NowMilli().UnixNano()
+       now = time.Unix(0, ns-ns%int64(time.Minute))
+       test_cases.Initialize(addr, now)
+       closeFunc()
+       time.Sleep(time.Second)
+       addr, _, closeFunc = setup.EmptyClosableStandalone(path, ports)
+       deferFunc = func() {
+               closeFunc()
+               diskCleanupFn()
+       }
+       return []byte(addr)
+}, func(address []byte) {
+       var err error
+       connection, err = grpchelper.Conn(string(address), 10*time.Second,
+               grpc.WithTransportCredentials(insecure.NewCredentials()))
+       casesstream.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+       casesmeasure.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+       casestopn.SharedContext = helpers.SharedContext{
+               Connection: connection,
+               BaseTime:   now,
+       }
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = SynchronizedAfterSuite(func() {
+       if connection != nil {
+               Expect(connection.Close()).To(Succeed())
+       }
+}, func() {
+       deferFunc()
+       Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+})
diff --git a/test/stress/cases/istio/cpu.prof b/test/stress/cases/istio/cpu.prof
new file mode 100644
index 00000000..e69de29b
diff --git a/test/stress/cases/istio/heap.prof 
b/test/stress/cases/istio/heap.prof
new file mode 100644
index 00000000..e69de29b
diff --git a/test/stress/cases/istio/istio_suite_test.go 
b/test/stress/cases/istio/istio_suite_test.go
index bab730bc..b6ac0957 100644
--- a/test/stress/cases/istio/istio_suite_test.go
+++ b/test/stress/cases/istio/istio_suite_test.go
@@ -24,20 +24,25 @@ import (
        "fmt"
        "io"
        "os"
+       "path/filepath"
+       "runtime/pprof"
        "testing"
        "time"
 
+       "github.com/dustin/go-humanize"
        . "github.com/onsi/ginkgo/v2"
        . "github.com/onsi/gomega"
        "github.com/pkg/errors"
        "google.golang.org/grpc"
        "google.golang.org/grpc/credentials/insecure"
+       "google.golang.org/grpc/stats"
        "google.golang.org/protobuf/encoding/protojson"
        "google.golang.org/protobuf/types/known/timestamppb"
 
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        "github.com/apache/skywalking-banyandb/pkg/grpchelper"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
        "github.com/apache/skywalking-banyandb/pkg/test/helpers"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
@@ -49,6 +54,41 @@ func TestIstio(t *testing.T) {
        RunSpecs(t, "Istio Suite", Label("integration", "slow"))
 }
 
+var (
+       cpuProfileFile  *os.File
+       heapProfileFile *os.File
+)
+
+var _ = BeforeSuite(func() {
+       // Create CPU profile file
+       var err error
+       cpuProfileFile, err = os.Create("cpu.prof")
+       Expect(err).NotTo(HaveOccurred())
+
+       // Start CPU profiling
+       err = pprof.StartCPUProfile(cpuProfileFile)
+       Expect(err).NotTo(HaveOccurred())
+
+       // Create heap profile file
+       heapProfileFile, err = os.Create("heap.prof")
+       Expect(err).NotTo(HaveOccurred())
+})
+
+var _ = AfterSuite(func() {
+       // Stop CPU profiling
+       pprof.StopCPUProfile()
+
+       // Write heap profile
+       err := pprof.WriteHeapProfile(heapProfileFile)
+       Expect(err).NotTo(HaveOccurred())
+
+       // Close profile files
+       err = cpuProfileFile.Close()
+       Expect(err).NotTo(HaveOccurred())
+       err = heapProfileFile.Close()
+       Expect(err).NotTo(HaveOccurred())
+})
+
 var _ = Describe("Istio", func() {
        BeforeEach(func() {
                Expect(logger.Init(logger.Logging{
@@ -57,20 +97,41 @@ var _ = Describe("Istio", func() {
                })).To(Succeed())
        })
        It("should pass", func() {
-               addr, _, deferFunc := 
setup.StandaloneWithSchemaLoaders([]setup.SchemaLoader{&preloadService{name: 
"oap"}}, "", "")
-               DeferCleanup(deferFunc)
+               path, deferFn, err := test.NewSpace()
+               Expect(err).NotTo(HaveOccurred())
+               DeferCleanup(func() {
+                       printDiskUsage(path+"/measure", 5, 0)
+                       deferFn()
+               })
+               var ports []int
+               ports, err = test.AllocateFreePorts(4)
+               Expect(err).NotTo(HaveOccurred())
+               addr, _, closerServerFunc := 
setup.ClosableStandaloneWithSchemaLoaders(
+                       path, ports,
+                       []setup.SchemaLoader{&preloadService{name: "oap"}},
+                       "--logging-level", "info")
+               DeferCleanup(closerServerFunc)
                Eventually(helpers.HealthCheck(addr, 10*time.Second, 
10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())),
                        flags.EventuallyTimeout).Should(Succeed())
-               conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               bc := &clientCounter{}
+               conn, err := grpchelper.Conn(addr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()), 
grpc.WithStatsHandler(bc))
                Expect(err).NotTo(HaveOccurred())
                DeferCleanup(func() {
                        conn.Close()
                })
-               Expect(ReadAndWriteFromFile(extractData(), conn)).To(Succeed())
+               startTime := time.Now()
+               writtenCount, err := ReadAndWriteFromFile(extractData(), conn)
+               Expect(err).To(Succeed())
+               endTime := time.Now()
+
+               fmt.Printf("written %d items in %s\n", writtenCount, 
endTime.Sub(startTime).String())
+               fmt.Printf("throughput: %f items/s\n", 
float64(writtenCount)/endTime.Sub(startTime).Seconds())
+               fmt.Printf("throughput(kb/s) %f\n", 
float64(bc.bytesSent)/endTime.Sub(startTime).Seconds()/1024)
+               fmt.Printf("latency: %s\n", 
bc.totalLatency/time.Duration(writtenCount))
        })
 })
 
-func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) error {
+func ReadAndWriteFromFile(filePath string, conn *grpc.ClientConn) (int, error) 
{
        // Open the file for reading
 
        l := logger.GetLogger("load_test")
@@ -97,16 +158,23 @@ func ReadAndWriteFromFile(filePath string, conn 
*grpc.ClientConn) error {
        ctx := context.Background()
        client, err := c.Write(ctx)
        if err != nil {
-               return fmt.Errorf("failed to create write client: %w", err)
+               return 0, fmt.Errorf("failed to create write client: %w", err)
        }
+       writeCount := 0
        flush := func(createClient bool) error {
                if errClose := client.CloseSend(); errClose != nil {
                        return fmt.Errorf("failed to close send: %w", errClose)
                }
                bulkSize = 2000
-               _, err = client.Recv()
-               if err != nil && errors.Is(err, io.EOF) {
-                       return fmt.Errorf("failed to receive client: %w", err)
+               writeCount += 2000
+               for i := 0; i < 2000; i++ {
+                       _, err = client.Recv()
+                       if err != nil && !errors.Is(err, io.EOF) {
+                               return fmt.Errorf("failed to receive client: 
%w", err)
+                       }
+                       if errors.Is(err, io.EOF) {
+                               break
+                       }
                }
                if !createClient {
                        return nil
@@ -140,14 +208,14 @@ func ReadAndWriteFromFile(filePath string, conn 
*grpc.ClientConn) error {
                        }
                        jsonMsg, errRead := reader.ReadString('\n')
                        if errRead != nil && errRead.Error() != "EOF" {
-                               return fmt.Errorf("failed to read line from 
file: %w", errRead)
+                               return fmt.Errorf("line %d failed to read line 
from file: %w", 2000-bulkSize, errRead)
                        }
                        if errRead != nil && errRead.Error() == "EOF" {
                                break
                        }
                        var req measurev1.WriteRequest
                        if errUnmarshal := protojson.Unmarshal([]byte(jsonMsg), 
&req); errUnmarshal != nil {
-                               return fmt.Errorf("failed to unmarshal JSON 
message: %w", errUnmarshal)
+                               return fmt.Errorf("line %d failed to unmarshal 
JSON message: %w", 2000-bulkSize, errUnmarshal)
                        }
 
                        req.MessageId = uint64(time.Now().UnixNano())
@@ -175,8 +243,68 @@ func ReadAndWriteFromFile(filePath string, conn 
*grpc.ClientConn) error {
        }
        for i := 0; i < 40; i++ {
                if err = loop(i); err != nil {
+                       return writeCount, err
+               }
+       }
+       return writeCount, flush(false)
+}
+
+func printDiskUsage(dir string, maxDepth, curDepth int) {
+       // Calculate the total size of all files and directories within the 
directory
+       var totalSize int64
+       err := filepath.Walk(dir, func(path string, info os.FileInfo, err 
error) error {
+               if err != nil {
                        return err
                }
+               if info.Mode().IsRegular() {
+                       totalSize += info.Size()
+               }
+               return nil
+       })
+       if err != nil {
+               fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+               return
+       }
+
+       // Print the disk usage of the current directory
+       fmt.Printf("%s: %s\n", dir, humanize.Bytes(uint64(totalSize)))
+
+       // Recursively print the disk usage of subdirectories
+       if curDepth < maxDepth {
+               files, err := os.ReadDir(dir)
+               if err != nil {
+                       fmt.Fprintf(os.Stderr, "Error: %v\n", err)
+                       return
+               }
+               for _, file := range files {
+                       if file.IsDir() {
+                               subdir := filepath.Join(dir, file.Name())
+                               printDiskUsage(subdir, maxDepth, curDepth+1)
+                       }
+               }
        }
-       return flush(false)
+}
+
+type clientCounter struct {
+       bytesSent    int
+       totalLatency time.Duration
+}
+
+func (*clientCounter) HandleConn(context.Context, stats.ConnStats) {}
+
+func (c *clientCounter) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) 
context.Context {
+       return ctx
+}
+
+func (c *clientCounter) HandleRPC(_ context.Context, s stats.RPCStats) {
+       switch s := s.(type) {
+       case *stats.OutPayload:
+               c.bytesSent += s.WireLength
+       case *stats.End:
+               c.totalLatency += s.EndTime.Sub(s.BeginTime)
+       }
+}
+
+func (c *clientCounter) TagConn(ctx context.Context, _ *stats.ConnTagInfo) 
context.Context {
+       return ctx
 }
diff --git a/test/stress/cases/istio/repo.go b/test/stress/cases/istio/repo.go
index 89ac54aa..f0de5fea 100644
--- a/test/stress/cases/istio/repo.go
+++ b/test/stress/cases/istio/repo.go
@@ -20,7 +20,7 @@ package istio
 import (
        "archive/tar"
        "bytes"
-       "compress/gzip"
+       "compress/bzip2"
        "context"
        "embed"
        "encoding/json"
@@ -51,13 +51,23 @@ func extractData() string {
 
        // Create a subdirectory called "tmp" in the temporary directory
        tmpSubDir := filepath.Join(tmpDir, "testdata")
+       target := filepath.Join(tmpSubDir, "access.log")
+       if _, err := os.Stat(target); err == nil {
+               absPath, err := filepath.Abs(target)
+               if err != nil {
+                       fmt.Printf("Error getting absolute path: %v\n", err)
+                       os.Exit(1)
+               }
+               return absPath
+       }
        err := os.MkdirAll(tmpSubDir, 0o755)
        if err != nil {
                fmt.Fprintf(os.Stderr, "Error creating tmp directory: %v\n", 
err)
                os.Exit(1)
        }
        var data []byte
-       if data, err = store.ReadFile("testdata/access.tar.gz"); err != nil {
+       if data, err = store.ReadFile("testdata/access.tar.bz2"); err != nil {
+               fmt.Printf("Error reading file: %v\n", err)
                os.Exit(1)
        }
        filePath, err := extractTarGz(data, tmpSubDir)
@@ -69,13 +79,8 @@ func extractData() string {
 }
 
 func extractTarGz(src []byte, dest string) (string, error) {
-       gzReader, err := gzip.NewReader(io.Reader(bytes.NewReader(src)))
-       if err != nil {
-               return "", err
-       }
-       defer gzReader.Close()
-
-       tarReader := tar.NewReader(gzReader)
+       bzReader := bzip2.NewReader(io.Reader(bytes.NewReader(src)))
+       tarReader := tar.NewReader(bzReader)
 
        for {
                header, err := tarReader.Next()
diff --git a/test/stress/cases/istio/report.md 
b/test/stress/cases/istio/report.md
index 83d985d6..ee76e41e 100644
--- a/test/stress/cases/istio/report.md
+++ b/test/stress/cases/istio/report.md
@@ -1,52 +1,113 @@
 # Testing Report
 
+## Scenario
+
+### SkyWalking Entities
+
+Service: 256
+Instances: 2048, 8 per service
+
+
+### Demo cluster
+
+Traffic RPS: 4352
+VUS: 8192
+
 ## Result
 
+written 16186000 items in 38m43.221090505s
+throughput: 6967.051077 items/s
+throughput(kb/s) 1782.985321
+latency: 2ns
+
+## CPU Profile
+
+CPU Usage: 324%
+
 ```bash
-Ran 1 of 1 Specs in 2384.312 seconds
-SUCCESS! -- 1 Passed | 0 Failed | 0 Pending | 0 Skipped
-PASS
+Showing top 10 nodes out of 300
+      flat  flat%   sum%        cum   cum%
+   348.58s  4.62%  4.62%    383.07s  5.08%  runtime.findObject
+   272.92s  3.62%  8.24%    272.92s  3.62%  runtime.memmove
+   240.53s  3.19% 11.43%    240.53s  3.19%  runtime/internal/syscall.Syscall6
+   239.05s  3.17% 14.60%    239.05s  3.17%  runtime.memclrNoHeapPointers
+   210.82s  2.80% 17.40%    340.64s  4.52%  
github.com/klauspost/compress/zstd.(*doubleFastEncoder).Encode
+   189.80s  2.52% 19.92%   1111.75s 14.74%  runtime.mallocgc
+   182.17s  2.42% 22.33%    687.47s  9.12%  runtime.scanobject
+   134.93s  1.79% 24.12%    202.49s  2.69%  
github.com/dgraph-io/badger/v3/table.(*MergeIterator).Value
+   116.62s  1.55% 25.67%    116.62s  1.55%  runtime.nextFreeFast (inline)
+   110.73s  1.47% 27.14%    110.73s  1.47%  
github.com/klauspost/compress/zstd.sequenceDecs_decodeSync_bmi2
+```
+
+From the top 10 list, we can see that the CPU is mainly used by `compaction`.
+
+## Heap Profile
 
-Ginkgo ran 1 suite in 39m47.81357862s
+Heap Size: 1.2GB
+
+```bash
+Showing top 10 nodes out of 104
+      flat  flat%   sum%        cum   cum%
+  690.27MB 53.22% 53.22%   690.27MB 53.22%  
github.com/dgraph-io/ristretto/z.Calloc (inline)
+  172.07MB 13.27% 66.48%   172.07MB 13.27%  runtime.malg
+     128MB  9.87% 76.35%      128MB  9.87%  
github.com/klauspost/compress/zstd.(*fastBase).ensureHist (inline)
+   78.98MB  6.09% 82.44%    78.98MB  6.09%  
github.com/dgraph-io/badger/v3/skl.newArena
+   57.51MB  4.43% 86.87%   141.71MB 10.92%  
github.com/dgraph-io/badger/v3/table.(*Builder).addHelper.func1
+   36.02MB  2.78% 89.65%   177.73MB 13.70%  
github.com/dgraph-io/badger/v3/table.(*Builder).addHelper
+   28.97MB  2.23% 91.88%    28.97MB  2.23%  runtime/pprof.(*profMap).lookup
+   26.50MB  2.04% 93.93%   757.59MB 58.41%  
github.com/dgraph-io/badger/v3/table.(*Builder).addInternal
+    8.21MB  0.63% 94.56%     8.21MB  0.63%  
github.com/klauspost/compress/zstd.encoderOptions.encoder
+       4MB  0.31% 94.87%    48.50MB  3.74%  
github.com/dgraph-io/badger/v3/table.(*Table).block
 ```
 
+From the top 10 list, we can see that the memory is mainly used by write 
`buffer(skl)` and `compaction(table)`.
+Especially, the compaction includes several table related operations, such as 
`table.(*Builder).addHelper`,
+consumes most of the memory.
+
+
 ## Disk Usage
 
 ```bash
-4.0K    ./measure-minute/shard-0/seg-20230626/index
-24M     ./measure-minute/shard-0/seg-20230626/block-2023062607/lsm
-17M     ./measure-minute/shard-0/seg-20230626/block-2023062607/encoded
-26M     ./measure-minute/shard-0/seg-20230626/block-2023062607/tst
-66M     ./measure-minute/shard-0/seg-20230626/block-2023062607
-66M     ./measure-minute/shard-0/seg-20230626
-4.4M    ./measure-minute/shard-0/series/md
-2.2M    ./measure-minute/shard-0/series/inverted
-20K     ./measure-minute/shard-0/series/lsm
-6.6M    ./measure-minute/shard-0/series
-72M     ./measure-minute/shard-0
-4.0K    ./measure-minute/shard-1/seg-20230626/index
-24M     ./measure-minute/shard-1/seg-20230626/block-2023062607/lsm
-19M     ./measure-minute/shard-1/seg-20230626/block-2023062607/encoded
-26M     ./measure-minute/shard-1/seg-20230626/block-2023062607/tst
-68M     ./measure-minute/shard-1/seg-20230626/block-2023062607
-68M     ./measure-minute/shard-1/seg-20230626
-4.4M    ./measure-minute/shard-1/series/md
-2.2M    ./measure-minute/shard-1/series/inverted
-20K     ./measure-minute/shard-1/series/lsm
-6.5M    ./measure-minute/shard-1/series
-74M     ./measure-minute/shard-1
-146M    ./measure-minute
-4.0K    ./measure-default/shard-0/seg-20230626/index
-79M     ./measure-default/shard-0/seg-20230626/block-2023062607/lsm
-65M     ./measure-default/shard-0/seg-20230626/block-2023062607/encoded
-85M     ./measure-default/shard-0/seg-20230626/block-2023062607/tst
-228M    ./measure-default/shard-0/seg-20230626/block-2023062607
-228M    ./measure-default/shard-0/seg-20230626
-16M     ./measure-default/shard-0/series/md
-2.4M    ./measure-default/shard-0/series/inverted
-20K     ./measure-default/shard-0/series/lsm
-18M     ./measure-default/shard-0/series
-245M    ./measure-default/shard-0
-245M    ./measure-default
-391M    .
+measure: 446 MB
+measure/measure-default: 272 MB
+measure/measure-default/shard-0: 272 MB
+measure/measure-default/shard-0/buffer-0: 1.4 MB
+measure/measure-default/shard-0/buffer-1: 2.8 MB
+measure/measure-default/shard-0/seg-20231015: 247 MB
+measure/measure-default/shard-0/seg-20231015/block-2023101516: 247 MB
+measure/measure-default/shard-0/seg-20231015/block-2023101516/encoded: 74 MB
+measure/measure-default/shard-0/seg-20231015/block-2023101516/lsm: 83 MB
+measure/measure-default/shard-0/seg-20231015/block-2023101516/tst: 90 MB
+measure/measure-default/shard-0/seg-20231015/index: 0 B
+measure/measure-default/shard-0/series: 21 MB
+measure/measure-default/shard-0/series/inverted: 2.9 MB
+measure/measure-default/shard-0/series/lsm: 1.0 MB
+measure/measure-default/shard-0/series/md: 17 MB
+measure/measure-minute: 173 MB
+measure/measure-minute/shard-0: 89 MB
+measure/measure-minute/shard-0/buffer-0: 2.0 MB
+measure/measure-minute/shard-0/buffer-1: 1.6 MB
+measure/measure-minute/shard-0/seg-20231015: 76 MB
+measure/measure-minute/shard-0/seg-20231015/block-2023101516: 76 MB
+measure/measure-minute/shard-0/seg-20231015/block-2023101516/encoded: 23 MB
+measure/measure-minute/shard-0/seg-20231015/block-2023101516/lsm: 26 MB
+measure/measure-minute/shard-0/seg-20231015/block-2023101516/tst: 28 MB
+measure/measure-minute/shard-0/seg-20231015/index: 0 B
+measure/measure-minute/shard-0/series: 8.7 MB
+measure/measure-minute/shard-0/series/inverted: 2.1 MB
+measure/measure-minute/shard-0/series/lsm: 1.0 MB
+measure/measure-minute/shard-0/series/md: 5.6 MB
+measure/measure-minute/shard-1: 84 MB
+measure/measure-minute/shard-1/buffer-0: 1.5 MB
+measure/measure-minute/shard-1/buffer-1: 698 kB
+measure/measure-minute/shard-1/seg-20231015: 73 MB
+measure/measure-minute/shard-1/seg-20231015/block-2023101516: 73 MB
+measure/measure-minute/shard-1/seg-20231015/block-2023101516/encoded: 19 MB
+measure/measure-minute/shard-1/seg-20231015/block-2023101516/lsm: 26 MB
+measure/measure-minute/shard-1/seg-20231015/block-2023101516/tst: 28 MB
+measure/measure-minute/shard-1/seg-20231015/index: 0 B
+measure/measure-minute/shard-1/series: 9.3 MB
+measure/measure-minute/shard-1/series/inverted: 2.7 MB
+measure/measure-minute/shard-1/series/lsm: 1.0 MB
+measure/measure-minute/shard-1/series/md: 5.6 MB
 ```
diff --git a/test/stress/cases/istio/testdata/access.tar.bz2 
b/test/stress/cases/istio/testdata/access.tar.bz2
new file mode 100644
index 00000000..1384b47c
Binary files /dev/null and b/test/stress/cases/istio/testdata/access.tar.bz2 
differ
diff --git a/test/stress/cases/istio/testdata/access.tar.gz 
b/test/stress/cases/istio/testdata/access.tar.gz
deleted file mode 100644
index 5608dca8..00000000
Binary files a/test/stress/cases/istio/testdata/access.tar.gz and /dev/null 
differ

Reply via email to