This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new fd64f687 Flush measure memory data to disk. (#360)
fd64f687 is described below

commit fd64f687c5efbb40a60e4b05ebb7ec5ca286a98d
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 19:38:23 2023 +0800

    Flush measure memory data to disk. (#360)
---
 CHANGES.md                                         |   1 +
 banyand/internal/storage/segment.go                |   7 +-
 banyand/internal/storage/shard.go                  |   2 +-
 banyand/internal/storage/storage.go                |   2 +-
 banyand/internal/storage/tsdb.go                   |   2 +-
 banyand/measure/block_writer.go                    |   2 -
 banyand/measure/flusher.go                         | 100 +++++++
 banyand/measure/gc.go                              |  86 ++++++
 banyand/measure/gc_test.go                         | 105 ++++++++
 banyand/measure/introducer.go                      | 148 +++++++++++
 banyand/measure/part.go                            | 127 ++++++++-
 banyand/measure/part_iter_test.go                  |  62 +++--
 banyand/measure/part_metadata.go                   |  62 ++++-
 banyand/measure/part_test.go                       |  29 ++-
 banyand/measure/primary_metadata.go                |   3 +-
 banyand/measure/query.go                           |  38 ++-
 banyand/measure/query_test.go                      | 154 ++++++-----
 banyand/measure/snapshot.go                        | 111 ++++++++
 banyand/measure/snapshot_test.go                   | 287 +++++++++++++++++++++
 banyand/measure/tstable.go                         | 207 +++++++++++++--
 banyand/measure/tstable_test.go                    | 127 ++++++---
 go.mod                                             |   2 +-
 pkg/fs/error.go                                    |  22 +-
 pkg/fs/file_system.go                              |  19 ++
 pkg/fs/local_file_system.go                        |  71 ++++-
 pkg/fs/local_file_system_nix.go                    |  74 ++++++
 pkg/fs/local_file_system_windows.go                |  61 +++++
 pkg/watcher/watcher.go                             |  70 +++++
 .../query_ondisk/query_ondisk_suite_test.go        |   1 -
 29 files changed, 1781 insertions(+), 201 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ad38d375..b5e03537 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -14,6 +14,7 @@ Release Notes.
 - Remove primary index.
 - Measure column-based storage:
   - Data ingestion and retrieval.
+  - Flush memory data to disk.
 
 ### Bugs
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index a74da252..d4b0d7ff 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -292,16 +292,17 @@ func (sc *segmentController[T]) sortLst() {
 }
 
 func (sc *segmentController[T]) load(start, end time.Time, root string) (seg 
*segment[T], err error) {
+       suffix := sc.Format(start)
+       segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
        var tsTable T
-       if tsTable, err = sc.tsTableCreator(sc.location, sc.position, sc.l, 
timestamp.NewSectionTimeRange(start, end)); err != nil {
+       if tsTable, err = sc.tsTableCreator(lfs, segPath, sc.position, sc.l, 
timestamp.NewSectionTimeRange(start, end)); err != nil {
                return nil, err
        }
-       suffix := sc.Format(start)
        ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
        seg, err = openSegment[T](common.SetPosition(ctx, func(p 
common.Position) common.Position {
                p.Segment = suffix
                return p
-       }), start, end, path.Join(root, fmt.Sprintf(segTemplate, suffix)), 
suffix, sc.segmentSize, sc.scheduler, tsTable)
+       }), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index cdec81ae..19150f9c 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -75,7 +75,7 @@ func (d *database[T]) openShard(ctx context.Context, id 
common.ShardID) (*shard[
        return s, nil
 }
 
-func (s *shard[T]) closer() {
+func (s *shard[T]) close() {
        s.closeOnce.Do(func() {
                s.scheduler.Close()
                s.segmentManageStrategy.Close()
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index cfda39bf..911c5410 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -92,7 +92,7 @@ type TSTableWrapper[T TSTable] interface {
 }
 
 // TSTableCreator creates a TSTable.
-type TSTableCreator[T TSTable] func(root string, position common.Position,
+type TSTableCreator[T TSTable] func(fileSystem fs.FileSystem, root string, 
position common.Position,
        l *logger.Logger, timeRange timestamp.TimeRange) (T, error)
 
 // IntervalUnit denotes the unit of a time point.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a425609e..fd5b648a 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -73,7 +73,7 @@ func (d *database[T]) Close() error {
        d.Lock()
        defer d.Unlock()
        for _, s := range d.sLst {
-               s.closer()
+               s.close()
        }
        return d.index.Close()
 }
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index 9fcc6e9a..fc840084 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,7 +19,6 @@ package measure
 
 import (
        "sync"
-       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
@@ -242,7 +241,6 @@ func (bw *blockWriter) Flush(ph *partMetadata) {
        ph.BlocksCount = bw.totalBlocksCount
        ph.MinTimestamp = bw.totalMinTimestamp
        ph.MaxTimestamp = bw.totalMaxTimestamp
-       ph.Version = time.Now().UnixNano() // TODO: use a global version
 
        bw.mustFlushPrimaryBlock(bw.primaryBlockData)
 
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
new file mode 100644
index 00000000..4fe26408
--- /dev/null
+++ b/banyand/measure/flusher.go
@@ -0,0 +1,100 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, 
introducerWatcher watcher.Channel, epoch uint64) {
+       defer tst.loopCloser.Done()
+       epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify())
+       if epochWatcher == nil {
+               return
+       }
+
+       for {
+               select {
+               case <-tst.loopCloser.CloseNotify():
+                       return
+               case <-epochWatcher.Watch():
+                       var curSnapshot *snapshot
+                       tst.RLock()
+                       if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+                               curSnapshot = tst.snapshot
+                               curSnapshot.incRef()
+                       }
+                       tst.RUnlock()
+                       if curSnapshot != nil {
+                               epoch = tst.flush(curSnapshot, flushCh)
+                               if epoch == 0 {
+                                       return
+                               }
+                               tst.persistSnapshot(curSnapshot)
+                               curSnapshot.decRef()
+                               if tst.currentEpoch() != epoch {
+                                       continue
+                               }
+                       }
+                       epochWatcher = introducerWatcher.Add(epoch, 
tst.loopCloser.CloseNotify())
+                       if epochWatcher == nil {
+                               return
+                       }
+                       tst.gc.clean()
+               }
+       }
+}
+
+func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) uint64 {
+       ind := generateFlusherIntroduction()
+       defer releaseFlusherIntroduction(ind)
+       for _, pw := range snapshot.parts {
+               if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
+                       continue
+               }
+               partPath := partPath(tst.root, pw.ID())
+               pw.mp.mustFlush(tst.fileSystem, partPath)
+               newPW := newPartWrapper(nil, mustOpenFilePart(partPath, 
tst.fileSystem), tst.fileSystem)
+               newPW.p.partMetadata.ID = pw.ID()
+               ind.flushed[newPW.ID()] = newPW
+       }
+       if len(ind.flushed) < 1 {
+               return snapshot.epoch
+       }
+       ind.applied = make(chan struct{})
+       select {
+       case flushCh <- ind:
+       case <-tst.loopCloser.CloseNotify():
+               return 0
+       }
+       select {
+       case <-ind.applied:
+               return snapshot.epoch
+       case <-tst.loopCloser.CloseNotify():
+               return 0
+       }
+}
+
+func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
+       var partNames []string
+       for i := range snapshot.parts {
+               partNames = append(partNames, partName(snapshot.parts[i].ID()))
+       }
+       tst.mustWriteSnapshot(snapshot.epoch, partNames)
+       tst.gc.registerSnapshot(snapshot.epoch)
+}
diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go
new file mode 100644
index 00000000..8fb6f608
--- /dev/null
+++ b/banyand/measure/gc.go
@@ -0,0 +1,86 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "errors"
+       "path"
+       "sort"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+type garbageCleaner struct {
+       parent    *tsTable
+       snapshots []uint64
+       parts     []uint64
+}
+
+func (g *garbageCleaner) registerSnapshot(snapshot uint64) {
+       g.snapshots = append(g.snapshots, snapshot)
+}
+
+func (g *garbageCleaner) submitParts(parts ...uint64) {
+       g.parts = append(g.parts, parts...)
+}
+
+func (g garbageCleaner) clean() {
+       if len(g.snapshots) > 1 {
+               g.cleanSnapshots()
+       }
+       if len(g.parts) > 0 {
+               g.cleanParts()
+       }
+}
+
+func (g *garbageCleaner) cleanSnapshots() {
+       if len(g.snapshots) < 2 {
+               return
+       }
+       if !sort.SliceIsSorted(g.snapshots, func(i, j int) bool {
+               return g.snapshots[i] < g.snapshots[j]
+       }) {
+               sort.Slice(g.snapshots, func(i, j int) bool {
+                       return g.snapshots[i] < g.snapshots[j]
+               })
+       }
+       // keep the latest snapshot
+       var remainingSnapshots []uint64
+       for i := 0; i < len(g.snapshots)-1; i++ {
+               filePath := path.Join(g.parent.root, 
snapshotName(g.snapshots[i]))
+               if err := g.parent.fileSystem.DeleteFile(filePath); err != nil {
+                       var notExistErr *fs.FileSystemError
+                       if errors.As(err, &notExistErr) && notExistErr.Code != 
fs.IsNotExistError {
+                               g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot, will retry in next round. Please 
check manually")
+                               remainingSnapshots = append(remainingSnapshots, 
g.snapshots[i])
+                       }
+               }
+       }
+       if remainingSnapshots == nil {
+               g.snapshots = g.snapshots[len(g.snapshots)-1:]
+               return
+       }
+       remained := g.snapshots[len(g.snapshots)-1]
+       g.snapshots = g.snapshots[:0]
+       g.snapshots = append(g.snapshots, remainingSnapshots...)
+       g.snapshots = append(g.snapshots, remained)
+}
+
+func (g garbageCleaner) cleanParts() {
+       panic("implement me")
+}
diff --git a/banyand/measure/gc_test.go b/banyand/measure/gc_test.go
new file mode 100644
index 00000000..2d6d8901
--- /dev/null
+++ b/banyand/measure/gc_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "path/filepath"
+       "sort"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestGarbageCleanerCleanSnapshot(t *testing.T) {
+       tests := []struct {
+               name            string
+               snapshotsOnDisk []uint64
+               snapshotsInGC   []uint64
+               wantOnDisk      []uint64
+               wantInGC        []uint64
+       }{
+               {
+                       name: "Test with no snapshots on disk and no snapshots 
in GC",
+               },
+               {
+                       name:            "Test with some snapshots on disk and 
no snapshots in GC",
+                       snapshotsInGC:   nil,
+                       snapshotsOnDisk: []uint64{1, 2, 3},
+                       wantInGC:        nil,
+                       wantOnDisk:      []uint64{1, 2, 3},
+               },
+               {
+                       name:            "Test with no snapshots on disk and 
some snapshots in GC",
+                       snapshotsOnDisk: nil,
+                       snapshotsInGC:   []uint64{1, 2, 3},
+                       wantOnDisk:      nil,
+                       // gc won't fix the miss match between inmemory and disk
+                       wantInGC: []uint64{3},
+               },
+               {
+                       name:            "Test with some snapshots on disk and 
some snapshots in GC",
+                       snapshotsOnDisk: []uint64{1, 2, 3, 4, 5},
+                       snapshotsInGC:   []uint64{1, 3, 5},
+                       wantOnDisk:      []uint64{2, 4, 5},
+                       wantInGC:        []uint64{5},
+               },
+               {
+                       name:            "Test with unsorted",
+                       snapshotsOnDisk: []uint64{1, 3, 2, 5, 4},
+                       snapshotsInGC:   []uint64{5, 1, 3},
+                       wantOnDisk:      []uint64{2, 4, 5},
+                       wantInGC:        []uint64{5},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       root, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       parent := &tsTable{
+                               root:       root,
+                               l:          logger.GetLogger("test"),
+                               fileSystem: fs.NewLocalFileSystem(),
+                       }
+                       for i := range tt.snapshotsOnDisk {
+                               filePath := filepath.Join(parent.root, 
snapshotName(tt.snapshotsOnDisk[i]))
+                               file, err := 
parent.fileSystem.CreateFile(filePath, filePermission)
+                               require.NoError(t, err)
+                               require.NoError(t, file.Close())
+                       }
+                       g := &garbageCleaner{
+                               parent:    parent,
+                               snapshots: tt.snapshotsInGC,
+                       }
+                       g.cleanSnapshots()
+                       var got []uint64
+                       for _, de := range 
parent.fileSystem.ReadDir(parent.root) {
+                               s, err := parseSnapshot(de.Name())
+                               require.NoError(t, err, "failed to parse 
snapshot name:%s", de.Name())
+                               got = append(got, s)
+                       }
+                       sort.Slice(got, func(i, j int) bool { return got[i] < 
got[j] })
+                       assert.Equal(t, tt.wantOnDisk, got)
+                       assert.Equal(t, tt.wantInGC, g.snapshots)
+               })
+       }
+}
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
new file mode 100644
index 00000000..17a4edef
--- /dev/null
+++ b/banyand/measure/introducer.go
@@ -0,0 +1,148 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+type introduction struct {
+       memPart *partWrapper
+       applied chan struct{}
+}
+
+func (i *introduction) reset() {
+       i.memPart = nil
+       i.applied = nil
+}
+
+var introductionPool = sync.Pool{}
+
+func generateIntroduction() *introduction {
+       v := introductionPool.Get()
+       if v == nil {
+               return &introduction{}
+       }
+       return v.(*introduction)
+}
+
+func releaseIntroduction(i *introduction) {
+       i.reset()
+       introductionPool.Put(i)
+}
+
+type flusherIntroduction struct {
+       flushed map[uint64]*partWrapper
+       applied chan struct{}
+}
+
+func (i *flusherIntroduction) reset() {
+       for k := range i.flushed {
+               delete(i.flushed, k)
+       }
+       i.applied = nil
+}
+
+var flusherIntroductionPool = sync.Pool{}
+
+func generateFlusherIntroduction() *flusherIntroduction {
+       v := flusherIntroductionPool.Get()
+       if v == nil {
+               return &flusherIntroduction{
+                       flushed: make(map[uint64]*partWrapper),
+               }
+       }
+       return v.(*flusherIntroduction)
+}
+
+func releaseFlusherIntroduction(i *flusherIntroduction) {
+       i.reset()
+       flusherIntroductionPool.Put(i)
+}
+
+func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, 
watcherCh watcher.Channel, epoch uint64) {
+       var introducerWatchers watcher.Epochs
+       defer tst.loopCloser.Done()
+       for {
+               select {
+               case <-tst.loopCloser.CloseNotify():
+                       return
+               case next := <-tst.introductions:
+                       tst.introduceMemPart(next, epoch)
+                       epoch++
+               case next := <-flushCh:
+                       tst.introduceFlushed(next, epoch)
+                       epoch++
+               case epochWatcher := <-watcherCh:
+                       introducerWatchers.Add(epochWatcher)
+               }
+               curEpoch := tst.currentEpoch()
+               introducerWatchers.Notify(curEpoch)
+       }
+}
+
+func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch 
uint64) {
+       cur := tst.currentSnapshot()
+       if cur != nil {
+               defer cur.decRef()
+       } else {
+               cur = new(snapshot)
+       }
+
+       next := nextIntroduction.memPart
+       nextSnp := cur.copyAllTo(epoch)
+       next.p.partMetadata.ID = epoch
+       nextSnp.parts = append(nextSnp.parts, next)
+       tst.replaceSnapshot(&nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, 
epoch uint64) {
+       cur := tst.currentSnapshot()
+       if cur == nil {
+               tst.l.Panic().Msg("current snapshot is nil")
+       }
+       defer cur.decRef()
+       nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+       tst.replaceSnapshot(&nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (tst *tsTable) replaceSnapshot(next *snapshot) {
+       tst.Lock()
+       defer tst.Unlock()
+       if tst.snapshot != nil {
+               tst.snapshot.decRef()
+       }
+       tst.snapshot = next
+}
+
+func (tst *tsTable) currentEpoch() uint64 {
+       s := tst.currentSnapshot()
+       if s == nil {
+               return 0
+       }
+       defer s.decRef()
+       return s.epoch
+}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 6760f10e..81fcce0e 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -18,6 +18,9 @@
 package measure
 
 import (
+       "fmt"
+       "path"
+       "path/filepath"
        "sort"
        "sync"
        "sync/atomic"
@@ -26,9 +29,21 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       metadataFilename               = "metadata.json"
+       primaryFilename                = "primary.bin"
+       metaFilename                   = "meta.bin"
+       timestampsFilename             = "timestamps.bin"
+       fieldValuesFilename            = "fields.bin"
+       tagFamiliesMetadataFilenameExt = ".tfm"
+       tagFamiliesFilenameExt         = ".tf"
 )
 
 type part struct {
+       path                 string
        meta                 fs.Reader
        primary              fs.Reader
        timestamps           fs.Reader
@@ -39,6 +54,22 @@ type part struct {
        partMetadata         partMetadata
 }
 
+func (p *part) close() {
+       fs.MustClose(p.primary)
+       fs.MustClose(p.timestamps)
+       fs.MustClose(p.fieldValues)
+       for _, tf := range p.tagFamilies {
+               fs.MustClose(tf)
+       }
+       for _, tfh := range p.tagFamilyMetadata {
+               fs.MustClose(tfh)
+       }
+}
+
+func (p *part) String() string {
+       return fmt.Sprintf("part %d", p.partMetadata.ID)
+}
+
 func openMemPart(mp *memPart) *part {
        var p part
        p.partMetadata = mp.partMetadata
@@ -140,6 +171,25 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) 
{
        releaseBlockWriter(bsw)
 }
 
+func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) {
+       fileSystem.MkdirPanicIfExist(path, dirPermission)
+
+       fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path, 
metaFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path, 
primaryFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path, 
timestampsFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.fieldValues.Buf, filepath.Join(path, 
fieldValuesFilename), filePermission)
+       for name, tf := range mp.tagFamilies {
+               fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path, 
name+tagFamiliesFilenameExt), filePermission)
+       }
+       for name, tfh := range mp.tagFamilyMetadata {
+               fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), filePermission)
+       }
+
+       mp.partMetadata.mustWriteMetadata(fileSystem, path)
+
+       fileSystem.SyncPath(path)
+}
+
 func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
        n := uint64(len(time.RFC3339Nano))
        n += uint64(len(dps.fields[index].name))
@@ -171,13 +221,15 @@ func releaseMemPart(mp *memPart) {
 var memPartPool sync.Pool
 
 type partWrapper struct {
-       mp  *memPart
-       p   *part
-       ref int32
+       fileSystem    fs.FileSystem
+       mp            *memPart
+       p             *part
+       ref           int32
+       mustBeDeleted uint32
 }
 
-func newMemPartWrapper(mp *memPart, p *part) *partWrapper {
-       return &partWrapper{mp: mp, p: p, ref: 1}
+func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem) 
*partWrapper {
+       return &partWrapper{mp: mp, p: p, fileSystem: fileSystem, ref: 1}
 }
 
 func (pw *partWrapper) incRef() {
@@ -192,5 +244,70 @@ func (pw *partWrapper) decRef() {
        if pw.mp != nil {
                releaseMemPart(pw.mp)
                pw.mp = nil
+               pw.p = nil
+               return
+       }
+       pw.p.close()
+       if atomic.LoadUint32(&pw.mustBeDeleted) == 0 {
+               return
        }
+       pw.fileSystem.MustRMAll(pw.p.path)
+}
+
+func (pw *partWrapper) ID() uint64 {
+       return pw.p.partMetadata.ID
+}
+
+func mustOpenFilePart(partPath string, fileSystem fs.FileSystem) *part {
+       var p part
+       p.path = partPath
+       p.partMetadata.mustReadMetadata(fileSystem, partPath)
+
+       metaPath := path.Join(partPath, metaFilename)
+       pr := mustOpenReader(metaPath, fileSystem)
+       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(pr))
+       fs.MustClose(pr)
+
+       p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
+       p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), 
fileSystem)
+       p.fieldValues = mustOpenReader(path.Join(partPath, 
fieldValuesFilename), fileSystem)
+       ee := fileSystem.ReadDir(partPath)
+       for _, e := range ee {
+               if e.IsDir() {
+                       continue
+               }
+               if filepath.Ext(e.Name()) == tagFamiliesMetadataFilenameExt {
+                       if p.tagFamilyMetadata == nil {
+                               p.tagFamilyMetadata = make(map[string]fs.Reader)
+                       }
+                       p.tagFamilyMetadata[removeExt(e.Name(), 
tagFamiliesMetadataFilenameExt)] = mustOpenReader(path.Join(partPath, 
e.Name()), fileSystem)
+               }
+               if filepath.Ext(e.Name()) == tagFamiliesFilenameExt {
+                       if p.tagFamilies == nil {
+                               p.tagFamilies = make(map[string]fs.Reader)
+                       }
+                       p.tagFamilies[removeExt(e.Name(), 
tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), 
fileSystem)
+               }
+       }
+       return &p
+}
+
+func mustOpenReader(name string, fileSystem fs.FileSystem) fs.Reader {
+       f, err := fileSystem.OpenFile(name)
+       if err != nil {
+               logger.Panicf("cannot open %q: %s", name, err)
+       }
+       return f
+}
+
+func removeExt(nameWithExt, ext string) string {
+       return nameWithExt[:len(nameWithExt)-len(ext)]
+}
+
+func partPath(root string, epoch uint64) string {
+       return filepath.Join(root, partName(epoch))
+}
+
+func partName(epoch uint64) string {
+       return fmt.Sprintf("%016x", epoch)
 }
diff --git a/banyand/measure/part_iter_test.go 
b/banyand/measure/part_iter_test.go
index 37579062..4aa6d7a9 100644
--- a/banyand/measure/part_iter_test.go
+++ b/banyand/measure/part_iter_test.go
@@ -23,8 +23,11 @@ import (
 
        "github.com/google/go-cmp/cmp"
        "github.com/google/go-cmp/cmp/cmpopts"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
 // TODO: test more scenarios.
@@ -108,36 +111,47 @@ func Test_partIter_nextBlock(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       mp := generateMemPart()
-                       releaseMemPart(mp)
-                       mp.mustInitFromDataPoints(dps)
+                       verifyPart := func(p *part) {
+                               defer p.close()
+                               pi := partIter{}
+                               pi.init(p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
 
-                       p := openMemPart(mp)
-
-                       pi := partIter{}
-                       pi.init(p, tt.sids, tt.opt.minTimestamp, 
tt.opt.maxTimestamp)
+                               var got []blockMetadata
+                               for pi.nextBlock() {
+                                       if pi.curBlock.seriesID == 0 {
+                                               t.Errorf("Expected currBlock to 
be initialized, but it was nil")
+                                       }
+                                       got = append(got, pi.curBlock)
+                               }
 
-                       var got []blockMetadata
-                       for pi.nextBlock() {
-                               if pi.curBlock.seriesID == 0 {
-                                       t.Errorf("Expected currBlock to be 
initialized, but it was nil")
+                               if !errors.Is(pi.error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", pi.err, tt.wantErr)
                                }
-                               got = append(got, pi.curBlock)
-                       }
 
-                       if !errors.Is(pi.error(), tt.wantErr) {
-                               t.Errorf("Unexpected error: got %v, want %v", 
pi.err, tt.wantErr)
+                               if diff := cmp.Diff(got, tt.want,
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"uncompressedSizeBytes"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"field"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
+                                       cmp.AllowUnexported(blockMetadata{}),
+                               ); diff != "" {
+                                       t.Errorf("Unexpected blockMetadata 
(-got +want):\n%s", diff)
+                               }
                        }
+                       mp := generateMemPart()
+                       releaseMemPart(mp)
+                       mp.mustInitFromDataPoints(dps)
 
-                       if diff := cmp.Diff(got, tt.want,
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"uncompressedSizeBytes"),
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
-                               cmpopts.IgnoreFields(blockMetadata{}, "field"),
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
-                               cmp.AllowUnexported(blockMetadata{}),
-                       ); diff != "" {
-                               t.Errorf("Unexpected blockMetadata (-got 
+want):\n%s", diff)
-                       }
+                       p := openMemPart(mp)
+                       verifyPart(p)
+                       tmpDir, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       epoch := uint64(1)
+                       partPath := partPath(tmpDir, epoch)
+                       fileSystem := fs.NewLocalFileSystem()
+                       mp.mustFlush(fileSystem, partPath)
+                       p = mustOpenFilePart(partPath, fileSystem)
+                       verifyPart(p)
                })
        }
 }
diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go
index 6c6f6e17..0323e660 100644
--- a/banyand/measure/part_metadata.go
+++ b/banyand/measure/part_metadata.go
@@ -17,6 +17,14 @@
 
 package measure
 
+import (
+       "encoding/json"
+       "path/filepath"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
 type partMetadata struct {
        CompressedSizeBytes   uint64
        UncompressedSizeBytes uint64
@@ -24,15 +32,51 @@ type partMetadata struct {
        BlocksCount           uint64
        MinTimestamp          int64
        MaxTimestamp          int64
-       Version               int64
+       ID                    uint64
+}
+
+func (pm *partMetadata) reset() {
+       pm.CompressedSizeBytes = 0
+       pm.UncompressedSizeBytes = 0
+       pm.TotalCount = 0
+       pm.BlocksCount = 0
+       pm.MinTimestamp = 0
+       pm.MaxTimestamp = 0
+       pm.ID = 0
+}
+
+func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath 
string) {
+       pm.reset()
+
+       metadataPath := filepath.Join(partPath, metadataFilename)
+       metadata, err := fileSystem.Read(metadataPath)
+       if err != nil {
+               logger.Panicf("cannot read %s", err)
+               return
+       }
+       if err := json.Unmarshal(metadata, pm); err != nil {
+               logger.Panicf("cannot parse %q: %s", metadataPath, err)
+               return
+       }
+
+       if pm.MinTimestamp > pm.MaxTimestamp {
+               logger.Panicf("MinTimestamp cannot exceed MaxTimestamp; got %d 
vs %d", pm.MinTimestamp, pm.MaxTimestamp)
+       }
 }
 
-func (ph *partMetadata) reset() {
-       ph.CompressedSizeBytes = 0
-       ph.UncompressedSizeBytes = 0
-       ph.TotalCount = 0
-       ph.BlocksCount = 0
-       ph.MinTimestamp = 0
-       ph.MaxTimestamp = 0
-       ph.Version = 0
+func (pm *partMetadata) mustWriteMetadata(fileSystem fs.FileSystem, partPath 
string) {
+       metadata, err := json.Marshal(pm)
+       if err != nil {
+               logger.Panicf("cannot marshal metadata: %s", err)
+               return
+       }
+       metadataPath := filepath.Join(partPath, metadataFilename)
+       n, err := fileSystem.Write(metadata, metadataPath, filePermission)
+       if err != nil {
+               logger.Panicf("cannot write metadata: %s", err)
+               return
+       }
+       if n != len(metadata) {
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", metadataPath, n, len(metadata))
+       }
 }
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index 29b5fdfe..5473fd76 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -21,13 +21,16 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-func Test_memPart_mustInitFromDataPoints(t *testing.T) {
+func TestMustInitFromDataPoints(t *testing.T) {
        tests := []struct {
                dps  *dataPoints
                name string
@@ -92,6 +95,30 @@ func Test_memPart_mustInitFromDataPoints(t *testing.T) {
                        assert.Equal(t, tt.want.MinTimestamp, 
mp.partMetadata.MinTimestamp)
                        assert.Equal(t, tt.want.MaxTimestamp, 
mp.partMetadata.MaxTimestamp)
                        assert.Equal(t, tt.want.TotalCount, 
mp.partMetadata.TotalCount)
+                       assert.Equal(t, len(mp.tagFamilies), 
len(mp.tagFamilyMetadata))
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       epoch := uint64(1)
+                       path := partPath(tmpPath, epoch)
+
+                       fileSystem := fs.NewLocalFileSystem()
+                       mp.mustFlush(fileSystem, path)
+                       p := mustOpenFilePart(path, fileSystem)
+                       defer p.close()
+                       assert.Equal(t, tt.want.BlocksCount, 
p.partMetadata.BlocksCount)
+                       assert.Equal(t, tt.want.MinTimestamp, 
p.partMetadata.MinTimestamp)
+                       assert.Equal(t, tt.want.MaxTimestamp, 
p.partMetadata.MaxTimestamp)
+                       assert.Equal(t, tt.want.TotalCount, 
p.partMetadata.TotalCount)
+                       if len(mp.tagFamilies) > 0 {
+                               for k := range mp.tagFamilies {
+                                       _, ok := mp.tagFamilyMetadata[k]
+                                       require.True(t, ok, 
"mp.tagFamilyMetadata %s not found", k)
+                                       _, ok = p.tagFamilies[k]
+                                       require.True(t, ok, "p.tagFamilies %s 
not found", k)
+                                       _, ok = p.tagFamilyMetadata[k]
+                                       require.True(t, ok, 
"p.tagFamilyMetadata %s not found", k)
+                               }
+                       }
                })
        }
 }
diff --git a/banyand/measure/primary_metadata.go 
b/banyand/measure/primary_metadata.go
index 19374283..7b09503e 100644
--- a/banyand/measure/primary_metadata.go
+++ b/banyand/measure/primary_metadata.go
@@ -100,7 +100,6 @@ func mustReadPrimaryBlockMetadata(dst 
[]primaryBlockMetadata, r *reader) []prima
        return dst
 }
 
-// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader 
entries to dst and returns the result.
 func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) 
([]primaryBlockMetadata, error) {
        dstOrig := dst
        for len(src) > 0 {
@@ -112,7 +111,7 @@ func unmarshalPrimaryBlockMetadata(dst 
[]primaryBlockMetadata, src []byte) ([]pr
                ih := &dst[len(dst)-1]
                tail, err := ih.unmarshal(src)
                if err != nil {
-                       return dstOrig, fmt.Errorf("cannot unmarshal 
indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
+                       return dstOrig, fmt.Errorf("cannot unmarshal 
primaryBlockHeader %d: %w", len(dst)-len(dstOrig), err)
                }
                src = tail
        }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index d4ebadab..d0b17d7f 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -73,6 +73,11 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
        }
        tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable])
        tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
+       defer func() {
+               for i := range tabWrappers {
+                       tabWrappers[i].DecRef()
+               }
+       }()
        sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
        if err != nil {
                return nil, err
@@ -86,15 +91,24 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
        for i := range sl {
                sids = append(sids, sl[i].ID)
        }
-       var pws []*partWrapper
        var parts []*part
        qo := queryOptions{
                MeasureQueryOptions: mqo,
                minTimestamp:        mqo.TimeRange.Start.UnixNano(),
                maxTimestamp:        mqo.TimeRange.End.UnixNano(),
        }
-       for _, tw := range tabWrappers {
-               pws, parts = tw.Table().getParts(pws, parts, qo)
+       var n int
+       for i := range tabWrappers {
+               s := tabWrappers[i].Table().currentSnapshot()
+               if s == nil {
+                       continue
+               }
+               parts, n = s.getParts(parts, qo)
+               if n < 1 {
+                       s.decRef()
+                       continue
+               }
+               result.snapshots = append(result.snapshots, s)
        }
        // TODO: cache tstIter
        var tstIter tstIter
@@ -303,7 +317,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue 
{
 type queryResult struct {
        sidToIndex map[common.SeriesID]int
        data       []*blockCursor
-       pws        []*partWrapper
+       snapshots  []*snapshot
        loaded     bool
        orderByTS  bool
        ascTS      bool
@@ -348,10 +362,10 @@ func (qr *queryResult) Release() {
                qr.data[i] = nil
        }
        qr.data = qr.data[:0]
-       for i := range qr.pws {
-               qr.pws[i].decRef()
+       for i := range qr.snapshots {
+               qr.snapshots[i].decRef()
        }
-       qr.pws = qr.pws[:0]
+       qr.snapshots = qr.snapshots[:0]
 }
 
 func (qr queryResult) Len() int {
@@ -361,8 +375,8 @@ func (qr queryResult) Len() int {
 func (qr queryResult) Less(i, j int) bool {
        leftTS := qr.data[i].timestamps[qr.data[i].idx]
        rightTS := qr.data[j].timestamps[qr.data[j].idx]
-       leftVersion := qr.data[i].p.partMetadata.Version
-       rightVersion := qr.data[j].p.partMetadata.Version
+       leftVersion := qr.data[i].p.partMetadata.ID
+       rightVersion := qr.data[j].p.partMetadata.ID
        if qr.orderByTS {
                if leftTS == rightTS {
                        if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID {
@@ -416,7 +430,7 @@ func (qr *queryResult) merge() *pbv1.Result {
                step = -1
        }
        result := &pbv1.Result{}
-       var lastPartVersion int64
+       var lastPartVersion uint64
        var lastSid common.SeriesID
 
        for qr.Len() > 0 {
@@ -428,12 +442,12 @@ func (qr *queryResult) merge() *pbv1.Result {
 
                if len(result.Timestamps) > 0 &&
                        topBC.timestamps[topBC.idx] == 
result.Timestamps[len(result.Timestamps)-1] {
-                       if topBC.p.partMetadata.Version > lastPartVersion {
+                       if topBC.p.partMetadata.ID > lastPartVersion {
                                logger.Panicf("following parts version should 
be less or equal to the previous one")
                        }
                } else {
                        topBC.copyTo(result)
-                       lastPartVersion = topBC.p.partMetadata.Version
+                       lastPartVersion = topBC.p.partMetadata.ID
                }
 
                topBC.idx += step
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index d69f77ad..4226dfc7 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -24,11 +24,19 @@ import (
        "time"
 
        "github.com/google/go-cmp/cmp"
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/testing/protocmp"
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
 func TestQueryResult(t *testing.T) {
@@ -363,71 +371,101 @@ func TestQueryResult(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       // Initialize a tstIter object.
-                       tst := &tsTable{}
-                       defer tst.Close()
-                       for _, dps := range tt.dpsList {
-                               tst.mustAddDataPoints(dps)
-                               time.Sleep(100 * time.Millisecond)
-                       }
-                       queryOpts := queryOptions{
-                               minTimestamp: tt.minTimestamp,
-                               maxTimestamp: tt.maxTimestamp,
-                       }
-                       pws, pp := tst.getParts(nil, nil, queryOpts)
-                       defer func() {
-                               for _, pw := range pws {
-                                       pw.decRef()
+                       verify := func(tst *tsTable) uint64 {
+                               defer tst.Close()
+                               queryOpts := queryOptions{
+                                       minTimestamp: tt.minTimestamp,
+                                       maxTimestamp: tt.maxTimestamp,
                                }
-                       }()
-                       sids := make([]common.SeriesID, len(tt.sids))
-                       copy(sids, tt.sids)
-                       sort.Slice(sids, func(i, j int) bool {
-                               return sids[i] < tt.sids[j]
-                       })
-                       ti := &tstIter{}
-                       ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp)
+                               s := tst.currentSnapshot()
+                               require.NotNil(t, s)
+                               defer s.decRef()
+                               pp, n := s.getParts(nil, queryOpts)
+                               require.Equal(t, len(tt.dpsList), n, "parts: 
%+v", pp)
+                               sids := make([]common.SeriesID, len(tt.sids))
+                               copy(sids, tt.sids)
+                               sort.Slice(sids, func(i, j int) bool {
+                                       return sids[i] < tt.sids[j]
+                               })
+                               ti := &tstIter{}
+                               ti.init(pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
 
-                       var result queryResult
-                       for ti.nextBlock() {
-                               bc := generateBlockCursor()
-                               p := ti.piHeap[0]
-                               opts := queryOpts
-                               opts.TagProjection = 
tagProjections[int(p.curBlock.seriesID)]
-                               opts.FieldProjection = 
fieldProjections[int(p.curBlock.seriesID)]
-                               bc.init(p.p, p.curBlock, opts)
-                               result.data = append(result.data, bc)
-                       }
-                       defer result.Release()
-                       if tt.orderBySeries {
-                               result.sidToIndex = 
make(map[common.SeriesID]int)
-                               for i, si := range tt.sids {
-                                       result.sidToIndex[si] = i
+                               var result queryResult
+                               for ti.nextBlock() {
+                                       bc := generateBlockCursor()
+                                       p := ti.piHeap[0]
+                                       opts := queryOpts
+                                       opts.TagProjection = 
tagProjections[int(p.curBlock.seriesID)]
+                                       opts.FieldProjection = 
fieldProjections[int(p.curBlock.seriesID)]
+                                       bc.init(p.p, p.curBlock, opts)
+                                       result.data = append(result.data, bc)
                                }
-                       } else {
-                               result.orderByTS = true
-                               result.ascTS = tt.ascTS
-                       }
-                       var got []pbv1.Result
-                       for {
-                               r := result.Pull()
-                               if r == nil {
-                                       break
+                               defer result.Release()
+                               if tt.orderBySeries {
+                                       result.sidToIndex = 
make(map[common.SeriesID]int)
+                                       for i, si := range tt.sids {
+                                               result.sidToIndex[si] = i
+                                       }
+                               } else {
+                                       result.orderByTS = true
+                                       result.ascTS = tt.ascTS
+                               }
+                               var got []pbv1.Result
+                               for {
+                                       r := result.Pull()
+                                       if r == nil {
+                                               break
+                                       }
+                                       sort.Slice(r.TagFamilies, func(i, j 
int) bool {
+                                               return r.TagFamilies[i].Name < 
r.TagFamilies[j].Name
+                                       })
+                                       got = append(got, *r)
                                }
-                               sort.Slice(r.TagFamilies, func(i, j int) bool {
-                                       return r.TagFamilies[i].Name < 
r.TagFamilies[j].Name
-                               })
-                               got = append(got, *r)
-                       }
 
-                       if !errors.Is(ti.Error(), tt.wantErr) {
-                               t.Errorf("Unexpected error: got %v, want %v", 
ti.err, tt.wantErr)
-                       }
+                               if !errors.Is(ti.Error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", ti.err, tt.wantErr)
+                               }
 
-                       if diff := cmp.Diff(got, tt.want,
-                               protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
-                               t.Errorf("Unexpected []pbv1.Result (-got 
+want):\n%s", diff)
+                               if diff := cmp.Diff(got, tt.want,
+                                       protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
+                                       t.Errorf("Unexpected []pbv1.Result 
(-got +want):\n%s", diff)
+                               }
+                               return s.epoch
                        }
+
+                       t.Run("memory snapshot", func(t *testing.T) {
+                               tst := &tsTable{
+                                       loopCloser:    run.NewCloser(2),
+                                       introductions: make(chan *introduction),
+                               }
+                               flushCh := make(chan *flusherIntroduction)
+                               introducerWatcher := make(watcher.Channel, 1)
+                               go tst.introducerLoop(flushCh, 
introducerWatcher, 1)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               _ = verify(tst)
+                       })
+
+                       t.Run("file snapshot", func(t *testing.T) {
+                               // Initialize a tstIter object.
+                               tmpPath, defFn := test.Space(require.New(t))
+                               fileSystem := fs.NewLocalFileSystem()
+                               defer defFn()
+                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               epoch := verify(tst)
+
+                               // reopen the table
+                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               assert.Equal(t, epoch, verify(tst))
+                       })
                })
        }
 }
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
new file mode 100644
index 00000000..ddc995dc
--- /dev/null
+++ b/banyand/measure/snapshot.go
@@ -0,0 +1,111 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "errors"
+       "fmt"
+       "path/filepath"
+       "sync/atomic"
+)
+
+func (tst *tsTable) currentSnapshot() *snapshot {
+       tst.RLock()
+       defer tst.RUnlock()
+       if tst.snapshot == nil {
+               return nil
+       }
+       s := tst.snapshot
+       s.incRef()
+       return s
+}
+
+type snapshot struct {
+       parts []*partWrapper
+       epoch uint64
+
+       ref int32
+}
+
+func (s *snapshot) getParts(dst []*part, opts queryOptions) ([]*part, int) {
+       var count int
+       for _, p := range s.parts {
+               pm := p.p.partMetadata
+               if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > 
pm.MaxTimestamp {
+                       continue
+               }
+               dst = append(dst, p.p)
+               count++
+       }
+       return dst, count
+}
+
+func (s *snapshot) incRef() {
+       atomic.AddInt32(&s.ref, 1)
+}
+
+func (s *snapshot) decRef() {
+       n := atomic.AddInt32(&s.ref, -1)
+       if n > 0 {
+               return
+       }
+       for i := range s.parts {
+               s.parts[i].decRef()
+       }
+       s.parts = s.parts[:0]
+}
+
+func (s *snapshot) copyAllTo(nextEpoch uint64) snapshot {
+       var result snapshot
+       result.epoch = nextEpoch
+       result.ref = 1
+       for i := range s.parts {
+               s.parts[i].incRef()
+               result.parts = append(result.parts, s.parts[i])
+       }
+       return result
+}
+
+func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) 
snapshot {
+       var result snapshot
+       result.epoch = nextEpoch
+       result.ref = 1
+       for i := 0; i < len(s.parts); i++ {
+               if n, ok := nextParts[s.parts[i].ID()]; ok {
+                       result.parts = append(result.parts, n)
+                       continue
+               }
+               s.parts[i].incRef()
+               result.parts = append(result.parts, s.parts[i])
+       }
+       return result
+}
+
+func snapshotName(snapshot uint64) string {
+       return fmt.Sprintf("%016x%s", snapshot, snapshotSuffix)
+}
+
+func parseSnapshot(name string) (uint64, error) {
+       if filepath.Ext(name) != snapshotSuffix {
+               return 0, errors.New("invalid snapshot file ext")
+       }
+       if len(name) < 16 {
+               return 0, errors.New("invalid snapshot file name")
+       }
+       return parseEpoch(name[:16])
+}
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
new file mode 100644
index 00000000..f571d4c9
--- /dev/null
+++ b/banyand/measure/snapshot_test.go
@@ -0,0 +1,287 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestSnapshotGetParts(t *testing.T) {
+       tests := []struct {
+               snapshot *snapshot
+               name     string
+               dst      []*part
+               expected []*part
+               opts     queryOptions
+               count    int
+       }{
+               {
+                       name: "Test with empty snapshot",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 0,
+                               maxTimestamp: 10,
+                       },
+                       expected: []*part{},
+                       count:    0,
+               },
+               {
+                       name: "Test with non-empty snapshot and no matching 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 0,
+                                                       MaxTimestamp: 5,
+                                               }},
+                                       },
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 6,
+                                                       MaxTimestamp: 10,
+                                               }},
+                                       },
+                               },
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 11,
+                               maxTimestamp: 15,
+                       },
+                       expected: []*part{},
+                       count:    0,
+               },
+               {
+                       name: "Test with non-empty snapshot and some matching 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 0,
+                                                       MaxTimestamp: 5,
+                                               }},
+                                       },
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 6,
+                                                       MaxTimestamp: 10,
+                                               }},
+                                       },
+                               },
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 5,
+                               maxTimestamp: 10,
+                       },
+                       expected: []*part{
+                               {partMetadata: partMetadata{
+                                       MinTimestamp: 0,
+                                       MaxTimestamp: 5,
+                               }},
+                               {partMetadata: partMetadata{
+                                       MinTimestamp: 6,
+                                       MaxTimestamp: 10,
+                               }},
+                       },
+                       count: 2,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result, count := tt.snapshot.getParts(tt.dst, tt.opts)
+                       assert.Equal(t, tt.expected, result)
+                       assert.Equal(t, tt.count, count)
+               })
+       }
+}
+
+func TestSnapshotCopyAllTo(t *testing.T) {
+       tests := []struct {
+               name      string
+               snapshot  snapshot
+               expected  snapshot
+               nextEpoch uint64
+               closePrev bool
+       }{
+               {
+                       name: "Test with empty snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       nextEpoch: 1,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: nil,
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {ref: 2},
+                                       {ref: 3},
+                               },
+                       },
+               },
+               {
+                       name: "Test with closed previous snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       closePrev: true,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tt.snapshot.copyAllTo(tt.nextEpoch)
+                       if tt.closePrev {
+                               tt.snapshot.decRef()
+                       }
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestSnapshotMerge(t *testing.T) {
+       tests := []struct {
+               snapshot  *snapshot
+               nextParts map[uint64]*partWrapper
+               name      string
+               expected  snapshot
+               nextEpoch uint64
+               closePrev bool
+       }{
+               {
+                       name: "Test with empty snapshot and empty next parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{},
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: nil,
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot and empty next 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{},
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 2},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 3},
+                               },
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot and non-empty next 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{
+                               2: {p: &part{partMetadata: partMetadata{ID: 
2}}, ref: 1},
+                               3: {p: &part{partMetadata: partMetadata{ID: 
3}}, ref: 1},
+                       },
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 2},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 1},
+                               },
+                       },
+               },
+               {
+                       name: "Test with closed previous snapshot",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       closePrev: true,
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{
+                               2: {p: &part{partMetadata: partMetadata{ID: 
2}}, ref: 1},
+                               3: {p: &part{partMetadata: partMetadata{ID: 
3}}, ref: 1},
+                       },
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 1},
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tt.snapshot.merge(tt.nextEpoch, tt.nextParts)
+                       if tt.closePrev {
+                               tt.snapshot.decRef()
+                       }
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 6ce4adbe..f3c445dc 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -19,30 +19,193 @@ package measure
 
 import (
        "container/heap"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
+       "path/filepath"
+       "sort"
+       "strconv"
        "sync"
+       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
-func newTSTable(_ string, _ common.Position, _ *logger.Logger, _ 
timestamp.TimeRange) (*tsTable, error) {
-       return &tsTable{}, nil
+const (
+       snapshotSuffix = ".snp"
+       filePermission = 0o600
+       dirPermission  = 0o700
+)
+
+func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position, 
l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) {
+       var tst tsTable
+       tst.fileSystem = fileSystem
+       tst.root = rootPath
+       tst.l = l
+       tst.gc.parent = &tst
+       ee := fileSystem.ReadDir(rootPath)
+       if len(ee) == 0 {
+               t := &tst
+               t.startLoop(uint64(time.Now().UnixNano()))
+               return t, nil
+       }
+       var loadedParts []uint64
+       var loadedSnapshots []uint64
+       var needToDelete []string
+       for i := range ee {
+               if ee[i].IsDir() {
+                       p, err := parseEpoch(ee[i].Name())
+                       if err != nil {
+                               l.Info().Err(err).Msg("cannot parse part file 
name. skip and delete it")
+                               needToDelete = append(needToDelete, 
ee[i].Name())
+                               continue
+                       }
+                       loadedParts = append(loadedParts, p)
+                       continue
+               }
+               if filepath.Ext(ee[i].Name()) != snapshotSuffix {
+                       continue
+               }
+               snapshot, err := parseSnapshot(ee[i].Name())
+               if err != nil {
+                       l.Info().Err(err).Msg("cannot parse snapshot file name. 
skip and delete it")
+                       needToDelete = append(needToDelete, ee[i].Name())
+                       continue
+               }
+               loadedSnapshots = append(loadedSnapshots, snapshot)
+               tst.gc.registerSnapshot(snapshot)
+       }
+       for i := range needToDelete {
+               if err := fileSystem.DeleteFile(filepath.Join(rootPath, 
needToDelete[i])); err != nil {
+                       l.Warn().Err(err).Str("path", filepath.Join(rootPath, 
needToDelete[i])).Msg("failed to delete part. Please check manually")
+               }
+       }
+       if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               t := &tst
+               t.startLoop(uint64(time.Now().UnixNano()))
+               return t, nil
+       }
+       sort.Slice(loadedSnapshots, func(i, j int) bool {
+               return loadedSnapshots[i] > loadedSnapshots[j]
+       })
+       epoch := loadedSnapshots[0]
+       t := &tst
+       t.loadSnapshot(epoch, loadedParts)
+       t.startLoop(epoch)
+       return t, nil
 }
 
 type tsTable struct {
-       memParts []*partWrapper
+       fileSystem    fs.FileSystem
+       l             *logger.Logger
+       snapshot      *snapshot
+       introductions chan *introduction
+       loopCloser    *run.Closer
+       root          string
+       gc            garbageCleaner
        sync.RWMutex
 }
 
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
+       parts := tst.mustReadSnapshot(epoch)
+       snp := snapshot{
+               epoch: epoch,
+       }
+       for _, partName := range loadedParts {
+               var find bool
+               for j := range parts {
+                       if partName == parts[j] {
+                               find = true
+                               break
+                       }
+               }
+               if !find {
+                       tst.gc.submitParts(partName)
+               }
+               p := mustOpenFilePart(partPath(tst.root, partName), 
tst.fileSystem)
+               p.partMetadata.ID = partName
+               snp.parts = append(snp.parts, newPartWrapper(nil, p, 
tst.fileSystem))
+       }
+       tst.gc.clean()
+       if len(snp.parts) < 1 {
+               return
+       }
+       snp.incRef()
+       tst.snapshot = &snp
+}
+
+func (tst *tsTable) startLoop(cur uint64) {
+       tst.loopCloser = run.NewCloser(3)
+       tst.introductions = make(chan *introduction)
+       flushCh := make(chan *flusherIntroduction)
+       introducerWatcher := make(watcher.Channel, 1)
+       go tst.introducerLoop(flushCh, introducerWatcher, cur+1)
+       go tst.flusherLoop(flushCh, introducerWatcher, cur)
+}
+
+func parseEpoch(epochStr string) (uint64, error) {
+       p, err := strconv.ParseUint(epochStr, 16, 64)
+       if err != nil {
+               return 0, fmt.Errorf("cannot parse path %s: %w", epochStr, err)
+       }
+       return p, nil
+}
+
+func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) {
+       data, err := json.Marshal(partNames)
+       if err != nil {
+               logger.Panicf("cannot marshal partNames to JSON: %s", err)
+       }
+       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+       lf, err := tst.fileSystem.CreateLockFile(snapshotPath, filePermission)
+       if err != nil {
+               logger.Panicf("cannot create lock file %s: %s", snapshotPath, 
err)
+       }
+       n, err := lf.Write(data)
+       if err != nil {
+               logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+       }
+       if n != len(data) {
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotPath, n, len(data))
+       }
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+       data, err := tst.fileSystem.Read(snapshotPath)
+       if err != nil {
+               logger.Panicf("cannot read %s: %s", snapshotPath, err)
+       }
+       var partNames []string
+       if err := json.Unmarshal(data, &partNames); err != nil {
+               logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+       }
+       var result []uint64
+       for i := range partNames {
+               e, err := parseEpoch(partNames[i])
+               if err != nil {
+                       logger.Panicf("cannot parse %s: %s", partNames[i], err)
+               }
+               result = append(result, e)
+       }
+       return result
+}
+
 func (tst *tsTable) Close() error {
-       tst.Lock()
-       defer tst.Unlock()
-       for _, p := range tst.memParts {
-               p.decRef()
+       if tst.loopCloser != nil {
+               tst.loopCloser.Done()
+               tst.loopCloser.CloseThenWait()
+       }
+       tst.RLock()
+       defer tst.RUnlock()
+       if tst.snapshot != nil {
+               tst.snapshot.decRef()
        }
        return nil
 }
@@ -56,26 +219,20 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
        mp.mustInitFromDataPoints(dps)
        p := openMemPart(mp)
 
-       pw := newMemPartWrapper(mp, p)
-
-       tst.Lock()
-       defer tst.Unlock()
-       tst.memParts = append(tst.memParts, pw)
-}
+       ind := generateIntroduction()
+       defer releaseIntroduction(ind)
+       ind.applied = make(chan struct{})
+       ind.memPart = newPartWrapper(mp, p, tst.fileSystem)
 
-func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts 
queryOptions) ([]*partWrapper, []*part) {
-       tst.RLock()
-       defer tst.RUnlock()
-       for _, p := range tst.memParts {
-               pm := p.mp.partMetadata
-               if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > 
pm.MaxTimestamp {
-                       continue
-               }
-               p.incRef()
-               dst = append(dst, p)
-               dstPart = append(dstPart, p.p)
+       select {
+       case tst.introductions <- ind:
+       case <-tst.loopCloser.CloseNotify():
+               return
+       }
+       select {
+       case <-ind.applied:
+       case <-tst.loopCloser.CloseNotify():
        }
-       return dst, dstPart
 }
 
 type tstIter struct {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index fdfb9879..ca231e4f 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -29,7 +29,13 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
 func Test_tsTable_mustAddDataPoints(t *testing.T) {
@@ -88,19 +94,31 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       tst := &tsTable{}
+                       tst := &tsTable{
+                               loopCloser:    run.NewCloser(2),
+                               introductions: make(chan *introduction),
+                       }
+                       flushCh := make(chan *flusherIntroduction)
+                       introducerWatcher := make(watcher.Channel, 1)
+                       go tst.introducerLoop(flushCh, introducerWatcher, 1)
                        defer tst.Close()
                        for _, dps := range tt.dpsList {
                                tst.mustAddDataPoints(dps)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       assert.Equal(t, tt.want, len(tst.memParts))
-                       var lastVersion int64
-                       for _, pw := range tst.memParts {
+                       s := tst.currentSnapshot()
+                       if s == nil {
+                               s = new(snapshot)
+                       }
+                       defer s.decRef()
+                       assert.Equal(t, tt.want, len(s.parts))
+                       var lastVersion uint64
+                       for _, pw := range s.parts {
+                               require.Greater(t, pw.ID(), uint64(0))
                                if lastVersion == 0 {
-                                       lastVersion = pw.p.partMetadata.Version
+                                       lastVersion = pw.ID()
                                } else {
-                                       require.Less(t, lastVersion, 
pw.p.partMetadata.Version)
+                                       require.Less(t, lastVersion, pw.ID())
                                }
                        }
                })
@@ -169,43 +187,76 @@ func Test_tstIter(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       tst := &tsTable{}
-                       defer tst.Close()
-                       for _, dps := range tt.dpsList {
-                               tst.mustAddDataPoints(dps)
-                               time.Sleep(100 * time.Millisecond)
-                       }
-                       pws, pp := tst.getParts(nil, nil, queryOptions{
-                               minTimestamp: tt.minTimestamp,
-                               maxTimestamp: tt.maxTimestamp,
-                       })
-                       defer func() {
-                               for _, pw := range pws {
-                                       pw.decRef()
+                       verify := func(tst *tsTable) uint64 {
+                               defer tst.Close()
+                               s := tst.currentSnapshot()
+                               if s == nil {
+                                       s = new(snapshot)
                                }
-                       }()
-                       ti := &tstIter{}
-                       ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
-                       var got []blockMetadata
-                       for ti.nextBlock() {
-                               if ti.piHeap[0].curBlock.seriesID == 0 {
-                                       t.Errorf("Expected curBlock to be 
initialized, but it was nil")
+                               defer s.decRef()
+                               pp, n := s.getParts(nil, queryOptions{
+                                       minTimestamp: tt.minTimestamp,
+                                       maxTimestamp: tt.maxTimestamp,
+                               })
+                               require.Equal(t, len(s.parts), n)
+                               ti := &tstIter{}
+                               ti.init(pp, tt.sids, tt.minTimestamp, 
tt.maxTimestamp)
+                               var got []blockMetadata
+                               for ti.nextBlock() {
+                                       if ti.piHeap[0].curBlock.seriesID == 0 {
+                                               t.Errorf("Expected curBlock to 
be initialized, but it was nil")
+                                       }
+                                       got = append(got, ti.piHeap[0].curBlock)
                                }
-                               got = append(got, ti.piHeap[0].curBlock)
-                       }
 
-                       if !errors.Is(ti.Error(), tt.wantErr) {
-                               t.Errorf("Unexpected error: got %v, want %v", 
ti.err, tt.wantErr)
-                       }
+                               if !errors.Is(ti.Error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", ti.err, tt.wantErr)
+                               }
 
-                       if diff := cmp.Diff(got, tt.want,
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
-                               cmpopts.IgnoreFields(blockMetadata{}, "field"),
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
-                               cmp.AllowUnexported(blockMetadata{}),
-                       ); diff != "" {
-                               t.Errorf("Unexpected blockMetadata (-got 
+want):\n%s", diff)
+                               if diff := cmp.Diff(got, tt.want,
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"field"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
+                                       cmp.AllowUnexported(blockMetadata{}),
+                               ); diff != "" {
+                                       t.Errorf("Unexpected blockMetadata 
(-got +want):\n%s", diff)
+                               }
+                               return s.epoch
                        }
+
+                       t.Run("memory snapshot", func(t *testing.T) {
+                               tst := &tsTable{
+                                       loopCloser:    run.NewCloser(2),
+                                       introductions: make(chan *introduction),
+                               }
+                               flushCh := make(chan *flusherIntroduction)
+                               introducerWatcher := make(watcher.Channel, 1)
+                               go tst.introducerLoop(flushCh, 
introducerWatcher, 1)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               verify(tst)
+                       })
+
+                       t.Run("file snapshot", func(t *testing.T) {
+                               tmpPath, defFn := test.Space(require.New(t))
+                               fileSystem := fs.NewLocalFileSystem()
+                               defer defFn()
+
+                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               epoch := verify(tst)
+
+                               // reopen the table
+                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               assert.Equal(t, epoch, verify(tst))
+                       })
                })
        }
 }
diff --git a/go.mod b/go.mod
index 3d57a1a9..4a9f0d19 100644
--- a/go.mod
+++ b/go.mod
@@ -139,7 +139,7 @@ require (
        go.uber.org/zap v1.26.0
        golang.org/x/crypto v0.17.0 // indirect
        golang.org/x/net v0.19.0 // indirect
-       golang.org/x/sys v0.15.0 // indirect
+       golang.org/x/sys v0.15.0
        golang.org/x/text v0.14.0 // indirect
        golang.org/x/time v0.5.0 // indirect
        golang.org/x/tools v0.16.1 // indirect
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index a54c513a..8247d9e3 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -20,17 +20,19 @@ package fs
 
 import "fmt"
 
+// FileSystemError code.
 const (
-       isExistError    = 0
-       isNotExistError = 1
-       permissionError = 2
-       openError       = 3
-       deleteError     = 4
-       writeError      = 5
-       readError       = 6
-       flushError      = 7
-       closeError      = 8
-       otherError      = 9
+       isExistError = iota
+       IsNotExistError
+       permissionError
+       openError
+       deleteError
+       writeError
+       readError
+       flushError
+       closeError
+       lockError
+       otherError
 )
 
 // FileSystemError implements the Error interface.
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 098287e2..da483846 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -94,12 +94,20 @@ type FileSystem interface {
        ReadDir(dirname string) []DirEntry
        // Create and open the file by specified name and mode.
        CreateFile(name string, permission Mode) (File, error)
+       // Create and open lock file by specified name and mode.
+       CreateLockFile(name string, permission Mode) (File, error)
+       // Open the file by specified name and mode.
+       OpenFile(name string) (File, error)
        // Flush mode, which flushes all data to one file.
        Write(buffer []byte, name string, permission Mode) (int, error)
+       // Read the entire file using streaming read.
+       Read(name string) ([]byte, error)
        // Delete the file.
        DeleteFile(name string) error
        // Delete the directory.
        MustRMAll(path string)
+       // SyncPath the directory of file.
+       SyncPath(path string)
 }
 
 // DirEntry is the interface that wraps the basic information about a file or 
directory.
@@ -111,6 +119,17 @@ type DirEntry interface {
        IsDir() bool
 }
 
+// MustFlush flushes all data to one file and panics if it cannot write all 
data.
+func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) {
+       n, err := fs.Write(buffer, name, permission)
+       if err != nil {
+               logger.GetLogger().Panic().Err(err).Str("path", 
name).Msg("cannot write data")
+       }
+       if n != len(buffer) {
+               logger.GetLogger().Panic().Int("written", n).Int("expected", 
len(buffer)).Str("path", name).Msg("BUG: writer wrote wrong number of bytes")
+       }
+}
+
 // MustWriteData writes data to w and panics if it cannot write all data.
 func MustWriteData(w Writer, data []byte) {
        if len(data) == 0 {
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 4de839ff..006d7245 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,6 +25,7 @@ import (
        "io"
        "os"
        "path/filepath"
+       "time"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -59,7 +60,7 @@ func readErrorHandle(operation string, err error, name 
string, size int) (int, e
                return size, err
        case os.IsNotExist(err):
                return size, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("%s failed, file is not exist, 
file name: %s, error message: %s", operation, name, err),
                }
        case os.IsPermission(err):
@@ -159,6 +160,31 @@ func (fs *localFileSystem) CreateFile(name string, 
permission Mode) (File, error
        }
 }
 
+func (fs *localFileSystem) OpenFile(name string) (File, error) {
+       file, err := os.Open(name)
+       switch {
+       case err == nil:
+               return &LocalFile{
+                       file: file,
+               }, nil
+       case os.IsNotExist(err):
+               return nil, &FileSystemError{
+                       Code:    IsNotExistError,
+                       Message: fmt.Sprintf("File is not exist, file name: 
%s,error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return nil, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, error message: %s", name, err),
+               }
+       default:
+               return nil, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create file return error, file 
name: %s,error message: %s", name, err),
+               }
+       }
+}
+
 // Write flushes all data to one file.
 func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) 
(int, error) {
        file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
@@ -194,6 +220,30 @@ func (fs *localFileSystem) Write(buffer []byte, name 
string, permission Mode) (i
        return size, nil
 }
 
+// Read is used to read the entire file using streaming read.
+func (fs *localFileSystem) Read(name string) ([]byte, error) {
+       data, err := os.ReadFile(name)
+       switch {
+       case err == nil:
+               return data, nil
+       case os.IsNotExist(err):
+               return data, &FileSystemError{
+                       Code:    IsNotExistError,
+                       Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return data, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, error message: %s", name, err),
+               }
+       default:
+               return data, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Read file error, file name: %s, 
error message: %s", name, err),
+               }
+       }
+}
+
 // DeleteFile is used to delete the file.
 func (fs *localFileSystem) DeleteFile(name string) error {
        err := os.Remove(name)
@@ -202,7 +252,7 @@ func (fs *localFileSystem) DeleteFile(name string) error {
                return nil
        case os.IsNotExist(err):
                return &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", name, err),
                }
        case os.IsPermission(err):
@@ -219,9 +269,16 @@ func (fs *localFileSystem) DeleteFile(name string) error {
 }
 
 func (fs *localFileSystem) MustRMAll(path string) {
-       if err := os.RemoveAll(path); err != nil {
-               logger.Panicf("failed to remove all files under %s", path)
+       if err := os.RemoveAll(path); err == nil {
+               return
+       }
+       for i := 0; i < 5; i++ {
+               time.Sleep(time.Second)
+               if err := os.RemoveAll(path); err == nil {
+                       return
+               }
        }
+       fs.logger.Panic().Str("path", path).Msg("failed to remove all files 
under path")
 }
 
 // Write adds new data to the end of a file.
@@ -232,7 +289,7 @@ func (file *LocalFile) Write(buffer []byte) (int, error) {
                return size, nil
        case os.IsNotExist(err):
                return size, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", file.file.Name(), err),
                }
        case os.IsPermission(err):
@@ -260,7 +317,7 @@ func (file *LocalFile) Writev(iov *[][]byte) (int, error) {
                        size += wsize
                case os.IsNotExist(err):
                        return size, &FileSystemError{
-                               Code:    isNotExistError,
+                               Code:    IsNotExistError,
                                Message: fmt.Sprintf("File is not exist, file 
name: %s, error message: %s", file.file.Name(), err),
                        }
                case os.IsPermission(err):
@@ -317,7 +374,7 @@ func (file *LocalFile) Size() (int64, error) {
        if err != nil {
                if os.IsNotExist(err) {
                        return -1, &FileSystemError{
-                               Code:    isNotExistError,
+                               Code:    IsNotExistError,
                                Message: fmt.Sprintf("File is not exist, file 
name: %s, error message: %s", file.file.Name(), err),
                        }
                } else if os.IsPermission(err) {
diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go
new file mode 100644
index 00000000..75db2649
--- /dev/null
+++ b/pkg/fs/local_file_system_nix.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd
+// +build darwin dragonfly freebsd linux netbsd openbsd
+
+package fs
+
+import (
+       "fmt"
+       "os"
+
+       "golang.org/x/sys/unix"
+)
+
+// localFileSystem is the implementation of FileSystem interface.
+func (*localFileSystem) CreateLockFile(name string, permission Mode) (File, 
error) {
+       file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
+       switch {
+       case err == nil:
+               if err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB); 
err != nil {
+                       return nil, &FileSystemError{
+                               Code:    lockError,
+                               Message: fmt.Sprintf("Cannot lock file, file 
name: %s, error message: %s", name, err),
+                       }
+               }
+               return &LocalFile{
+                       file: file,
+               }, nil
+       case os.IsExist(err):
+               return nil, &FileSystemError{
+                       Code:    isExistError,
+                       Message: fmt.Sprintf("File is exist, file name: 
%s,error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return nil, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, permission: %d,error message: %s", name, permission, err),
+               }
+       default:
+               return nil, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create file return error, file 
name: %s,error message: %s", name, err),
+               }
+       }
+}
+
+func (fs *localFileSystem) SyncPath(name string) {
+       file, err := os.Open(name)
+       if err != nil {
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
open file")
+       }
+       if err := file.Sync(); err != nil {
+               _ = file.Close()
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
sync file")
+       }
+       if err := file.Close(); err != nil {
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
close file")
+       }
+}
diff --git a/pkg/fs/local_file_system_windows.go 
b/pkg/fs/local_file_system_windows.go
new file mode 100644
index 00000000..ab6d5c91
--- /dev/null
+++ b/pkg/fs/local_file_system_windows.go
@@ -0,0 +1,61 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fs
+
+import (
+       "fmt"
+       "os"
+
+       "golang.org/x/sys/windows"
+)
+
+// localFileSystem is the implementation of FileSystem interface.
+func (*localFileSystem) CreateLockFile(name string, permission Mode) (File, 
error) {
+       file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
+       switch {
+       case err == nil:
+               lockFlags := uint32(windows.LOCKFILE_FAIL_IMMEDIATELY)
+               lockFlags |= uint32(windows.LOCKFILE_EXCLUSIVE_LOCK)
+               if err = windows.LockFileEx(windows.Handle(file.Fd()), 
lockFlags, 0, 1, 0, &windows.Overlapped{}); err != nil {
+                       return nil, &FileSystemError{
+                               Code:    lockError,
+                               Message: fmt.Sprintf("Cannot lock file, file 
name: %s, error message: %s", name, err),
+                       }
+               }
+               return &LocalFile{
+                       file: file,
+               }, nil
+       case os.IsExist(err):
+               return nil, &FileSystemError{
+                       Code:    isExistError,
+                       Message: fmt.Sprintf("File is exist, file name: 
%s,error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return nil, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, permission: %d,error message: %s", name, permission, err),
+               }
+       default:
+               return nil, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create file return error, file 
name: %s,error message: %s", name, err),
+               }
+       }
+}
+
+func (fs *localFileSystem) SyncPath(_ string) {}
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
new file mode 100644
index 00000000..eaedaa38
--- /dev/null
+++ b/pkg/watcher/watcher.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package watcher provides a watcher to watch the epoch.
+package watcher
+
+// Epoch is a epoch watcher.
+// It will be notified when the epoch is reached.
+type Epoch struct {
+       ch    chan struct{}
+       epoch uint64
+}
+
+// Watch returns a channel that will be notified when the epoch is reached.
+func (e *Epoch) Watch() <-chan struct{} {
+       return e.ch
+}
+
+// Epochs is a list of epoch watchers.
+type Epochs []*Epoch
+
+// Add adds a epoch watcher.
+func (e *Epochs) Add(epoch *Epoch) {
+       *e = append(*e, epoch)
+}
+
+// Notify notifies all epoch watchers that the epoch is reached.
+func (e *Epochs) Notify(epoch uint64) {
+       var remained Epochs
+       for _, ep := range *e {
+               if ep.epoch < epoch {
+                       close(ep.ch)
+                       continue
+               }
+               remained.Add(ep)
+       }
+       *e = remained
+}
+
+// Channel is a channel of epoch watchers.
+type Channel chan *Epoch
+
+// Add adds a epoch watcher.
+// It returns nil if the channel is closed.
+func (w Channel) Add(epoch uint64, closeCh <-chan struct{}) *Epoch {
+       ep := &Epoch{
+               epoch: epoch,
+               ch:    make(chan struct{}, 1),
+       }
+       select {
+       case w <- ep:
+       case <-closeCh:
+               return nil
+       }
+       return ep
+}
diff --git 
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go 
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index d6846ee3..ac89d108 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -42,7 +42,6 @@ import (
 )
 
 func TestIntegrationQueryOnDisk(t *testing.T) {
-       t.Skip("skip on-disk integration test until measure parts flushing is 
supported")
        RegisterFailHandler(Fail)
        RunSpecs(t, "Integration Query OnDisk Suite", 
Label(integration_standalone.Labels...))
 }

Reply via email to