This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch measure-flusher
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 117f6171dbe5638c6b1726f4d894ae5253d71d65
Author: Gao Hongtao <[email protected]>
AuthorDate: Sat Dec 23 08:31:10 2023 +0800

    Add introducer and flusher
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/segment.go |   2 +-
 banyand/internal/storage/storage.go |   2 +-
 banyand/measure/block_writer.go     |   2 -
 banyand/measure/flusher.go          |  88 +++++++++++
 banyand/measure/gc.go               |  67 +++++++++
 banyand/measure/gc_test.go          | 105 +++++++++++++
 banyand/measure/introducer.go       | 152 +++++++++++++++++++
 banyand/measure/part.go             | 120 ++++++++++++++-
 banyand/measure/part_metadata.go    |  62 ++++++--
 banyand/measure/part_test.go        |  29 +++-
 banyand/measure/primary_metadata.go |   3 +-
 banyand/measure/query.go            |  33 +++--
 banyand/measure/query_test.go       |  12 +-
 banyand/measure/snapshot.go         | 109 ++++++++++++++
 banyand/measure/snapshot_test.go    | 287 ++++++++++++++++++++++++++++++++++++
 banyand/measure/tstable.go          | 201 ++++++++++++++++++++++---
 banyand/measure/tstable_test.go     |  28 ++--
 go.mod                              |   2 +-
 pkg/fs/error.go                     |  21 +--
 pkg/fs/file_system.go               |   8 +
 pkg/fs/local_file_system.go         |  61 +++++++-
 pkg/fs/local_file_system_nix.go     |  74 ++++++++++
 pkg/watcher/watcher.go              |  70 +++++++++
 23 files changed, 1452 insertions(+), 86 deletions(-)

diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index a74da252..85decd8b 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -293,7 +293,7 @@ func (sc *segmentController[T]) sortLst() {
 
 func (sc *segmentController[T]) load(start, end time.Time, root string) (seg 
*segment[T], err error) {
        var tsTable T
-       if tsTable, err = sc.tsTableCreator(sc.location, sc.position, sc.l, 
timestamp.NewSectionTimeRange(start, end)); err != nil {
+       if tsTable, err = sc.tsTableCreator(lfs, sc.location, sc.position, 
sc.l, timestamp.NewSectionTimeRange(start, end)); err != nil {
                return nil, err
        }
        suffix := sc.Format(start)
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index cfda39bf..911c5410 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -92,7 +92,7 @@ type TSTableWrapper[T TSTable] interface {
 }
 
 // TSTableCreator creates a TSTable.
-type TSTableCreator[T TSTable] func(root string, position common.Position,
+type TSTableCreator[T TSTable] func(fileSystem fs.FileSystem, root string, 
position common.Position,
        l *logger.Logger, timeRange timestamp.TimeRange) (T, error)
 
 // IntervalUnit denotes the unit of a time point.
diff --git a/banyand/measure/block_writer.go b/banyand/measure/block_writer.go
index 9fcc6e9a..fc840084 100644
--- a/banyand/measure/block_writer.go
+++ b/banyand/measure/block_writer.go
@@ -19,7 +19,6 @@ package measure
 
 import (
        "sync"
-       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
@@ -242,7 +241,6 @@ func (bw *blockWriter) Flush(ph *partMetadata) {
        ph.BlocksCount = bw.totalBlocksCount
        ph.MinTimestamp = bw.totalMinTimestamp
        ph.MaxTimestamp = bw.totalMaxTimestamp
-       ph.Version = time.Now().UnixNano() // TODO: use a global version
 
        bw.mustFlushPrimaryBlock(bw.primaryBlockData)
 
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
new file mode 100644
index 00000000..2609f09e
--- /dev/null
+++ b/banyand/measure/flusher.go
@@ -0,0 +1,88 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, 
introducerWatcher watcher.Channel, epoch uint64) {
+       defer tst.loopCloser.Done()
+       epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify())
+
+       for {
+               select {
+               case <-tst.loopCloser.CloseNotify():
+                       return
+               case <-epochWatcher.Watch():
+                       var curSnapshot *snapshot
+                       tst.Lock()
+                       if tst.snapshot != nil && tst.snapshot.epoch > epoch {
+                               curSnapshot = tst.snapshot
+                               curSnapshot.incRef()
+                       }
+                       tst.Unlock()
+                       if curSnapshot != nil {
+                               epoch = tst.flush(curSnapshot, flushCh)
+                               if epoch == 0 {
+                                       return
+                               }
+                               tst.persistSnapshot(curSnapshot)
+                               if tst.currentEpoch() != epoch {
+                                       continue
+                               }
+                       }
+                       epochWatcher = introducerWatcher.Add(epoch, 
tst.loopCloser.CloseNotify())
+                       tst.gc.clean()
+               }
+       }
+}
+
+func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) uint64 {
+       ind := generateFlusherIntroduction()
+       for _, pw := range snapshot.parts {
+               if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
+                       continue
+               }
+               partPath := partPath(tst.root, pw.ID())
+               pw.mp.mustFlush(tst.fileSystem, partPath)
+               newPW := newPartWrapper(nil, mustOpenFilePart(partPath, 
tst.fileSystem), tst.fileSystem)
+               ind.flushed[newPW.ID()] = newPW
+       }
+       ind.applied = make(chan struct{})
+       select {
+       case flushCh <- ind:
+       case <-tst.loopCloser.CloseNotify():
+               return 0
+       }
+       select {
+       case <-ind.applied:
+               return snapshot.epoch
+       case <-tst.loopCloser.CloseNotify():
+               return 0
+       }
+}
+
+func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
+       var partNames []string
+       for i := range snapshot.parts {
+               partNames = append(partNames, 
snapshotName(snapshot.parts[i].ID()))
+       }
+       tst.mustWriteSnapshot(snapshot.epoch, partNames)
+       tst.gc.registerSnapshot(snapshot.epoch)
+}
diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go
new file mode 100644
index 00000000..dbdde01d
--- /dev/null
+++ b/banyand/measure/gc.go
@@ -0,0 +1,67 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "path"
+       "sort"
+)
+
+type garbageCleaner struct {
+       parent    *tsTable
+       snapshots []uint64
+       parts     []uint64
+}
+
+func (g *garbageCleaner) registerSnapshot(snapshot uint64) {
+       g.snapshots = append(g.snapshots, snapshot)
+}
+
+func (g *garbageCleaner) submitParts(parts ...uint64) {
+       g.parts = append(g.parts, parts...)
+}
+
+func (g garbageCleaner) clean() {
+       if len(g.snapshots) > 1 {
+               g.cleanSnapshots()
+       }
+       if len(g.parts) > 0 {
+               g.cleanParts()
+       }
+}
+
+func (g *garbageCleaner) cleanSnapshots() {
+       if len(g.snapshots) < 2 {
+               return
+       }
+       sort.Slice(g.snapshots, func(i, j int) bool {
+               return g.snapshots[i] < g.snapshots[j]
+       })
+       // keep the latest snapshot
+       for i := 0; i < len(g.snapshots)-1; i++ {
+               filePath := path.Join(g.parent.root, 
snapshotName(g.snapshots[i]))
+               if err := g.parent.fileSystem.DeleteFile(filePath); err != nil {
+                       g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot. Please check manually")
+               }
+       }
+       g.snapshots = g.snapshots[len(g.snapshots)-1:]
+}
+
+func (g garbageCleaner) cleanParts() {
+       panic("implement me")
+}
diff --git a/banyand/measure/gc_test.go b/banyand/measure/gc_test.go
new file mode 100644
index 00000000..2d6d8901
--- /dev/null
+++ b/banyand/measure/gc_test.go
@@ -0,0 +1,105 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "path/filepath"
+       "sort"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+func TestGarbageCleanerCleanSnapshot(t *testing.T) {
+       tests := []struct {
+               name            string
+               snapshotsOnDisk []uint64
+               snapshotsInGC   []uint64
+               wantOnDisk      []uint64
+               wantInGC        []uint64
+       }{
+               {
+                       name: "Test with no snapshots on disk and no snapshots 
in GC",
+               },
+               {
+                       name:            "Test with some snapshots on disk and 
no snapshots in GC",
+                       snapshotsInGC:   nil,
+                       snapshotsOnDisk: []uint64{1, 2, 3},
+                       wantInGC:        nil,
+                       wantOnDisk:      []uint64{1, 2, 3},
+               },
+               {
+                       name:            "Test with no snapshots on disk and 
some snapshots in GC",
+                       snapshotsOnDisk: nil,
+                       snapshotsInGC:   []uint64{1, 2, 3},
+                       wantOnDisk:      nil,
+                       // gc won't fix the miss match between inmemory and disk
+                       wantInGC: []uint64{3},
+               },
+               {
+                       name:            "Test with some snapshots on disk and 
some snapshots in GC",
+                       snapshotsOnDisk: []uint64{1, 2, 3, 4, 5},
+                       snapshotsInGC:   []uint64{1, 3, 5},
+                       wantOnDisk:      []uint64{2, 4, 5},
+                       wantInGC:        []uint64{5},
+               },
+               {
+                       name:            "Test with unsorted",
+                       snapshotsOnDisk: []uint64{1, 3, 2, 5, 4},
+                       snapshotsInGC:   []uint64{5, 1, 3},
+                       wantOnDisk:      []uint64{2, 4, 5},
+                       wantInGC:        []uint64{5},
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       root, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       parent := &tsTable{
+                               root:       root,
+                               l:          logger.GetLogger("test"),
+                               fileSystem: fs.NewLocalFileSystem(),
+                       }
+                       for i := range tt.snapshotsOnDisk {
+                               filePath := filepath.Join(parent.root, 
snapshotName(tt.snapshotsOnDisk[i]))
+                               file, err := 
parent.fileSystem.CreateFile(filePath, filePermission)
+                               require.NoError(t, err)
+                               require.NoError(t, file.Close())
+                       }
+                       g := &garbageCleaner{
+                               parent:    parent,
+                               snapshots: tt.snapshotsInGC,
+                       }
+                       g.cleanSnapshots()
+                       var got []uint64
+                       for _, de := range 
parent.fileSystem.ReadDir(parent.root) {
+                               s, err := parseSnapshot(de.Name())
+                               require.NoError(t, err, "failed to parse 
snapshot name:%s", de.Name())
+                               got = append(got, s)
+                       }
+                       sort.Slice(got, func(i, j int) bool { return got[i] < 
got[j] })
+                       assert.Equal(t, tt.wantOnDisk, got)
+                       assert.Equal(t, tt.wantInGC, g.snapshots)
+               })
+       }
+}
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
new file mode 100644
index 00000000..3c124d2a
--- /dev/null
+++ b/banyand/measure/introducer.go
@@ -0,0 +1,152 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
+)
+
+type introduction struct {
+       memPart *partWrapper
+       applied chan struct{}
+}
+
+func (i *introduction) reset() {
+       i.memPart = nil
+       i.applied = nil
+}
+
+var introductionPool = sync.Pool{}
+
+func generateIntroduction() *introduction {
+       v := introductionPool.Get()
+       if v == nil {
+               return &introduction{}
+       }
+       return v.(*introduction)
+}
+
+func releaseIntroduction(i *introduction) {
+       i.reset()
+       introductionPool.Put(i)
+}
+
+type flusherIntroduction struct {
+       flushed map[uint64]*partWrapper
+       applied chan struct{}
+}
+
+func (i *flusherIntroduction) reset() {
+       for k := range i.flushed {
+               delete(i.flushed, k)
+       }
+       i.applied = nil
+}
+
+var flusherIntroductionPool = sync.Pool{}
+
+func generateFlusherIntroduction() *flusherIntroduction {
+       v := flusherIntroductionPool.Get()
+       if v == nil {
+               return &flusherIntroduction{
+                       flushed: make(map[uint64]*partWrapper),
+               }
+       }
+       return v.(*flusherIntroduction)
+}
+
+func releaseFlusherIntroduction(i *flusherIntroduction) {
+       i.reset()
+       flusherIntroductionPool.Put(i)
+}
+
+func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, 
watcherCh watcher.Channel, epoch uint64) {
+       var introducerWatchers watcher.Epochs
+       defer tst.loopCloser.Done()
+       for {
+               select {
+               case <-tst.loopCloser.CloseNotify():
+                       return
+               case next := <-tst.introductions:
+                       tst.introduceMemPart(next, epoch)
+                       epoch++
+               case next := <-flushCh:
+                       tst.introduceFlushed(next, epoch)
+                       epoch++
+               case epochWatcher := <-watcherCh:
+                       introducerWatchers.Add(epochWatcher)
+               }
+               curEpoch := tst.currentEpoch()
+               introducerWatchers.Notify(curEpoch)
+       }
+}
+
+func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch 
uint64) {
+       cur := tst.currentSnapshot()
+       if cur != nil {
+               defer cur.decRef()
+       } else {
+               cur = new(snapshot)
+       }
+       defer releaseIntroduction(nextIntroduction)
+
+       next := nextIntroduction.memPart
+       nextSnp := cur.copyAllTo(epoch)
+       next.p.partMetadata.ID = epoch
+       nextSnp.parts = append(nextSnp.parts, next)
+       tst.replaceSnapshot(&nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, 
epoch uint64) {
+       cur := tst.currentSnapshot()
+       if cur == nil {
+               tst.l.Panic().Msg("current snapshot is nil")
+       }
+       defer func() {
+               cur.decRef()
+               releaseFlusherIntroduction(nextIntroduction)
+       }()
+       nextSnp := cur.merge(epoch, nextIntroduction.flushed)
+       tst.replaceSnapshot(&nextSnp)
+       if nextIntroduction.applied != nil {
+               close(nextIntroduction.applied)
+       }
+}
+
+func (tst *tsTable) replaceSnapshot(next *snapshot) {
+       tst.Lock()
+       defer tst.Unlock()
+       if tst.snapshot != nil {
+               tst.snapshot.decRef()
+       }
+       tst.snapshot = next
+}
+
+func (tst *tsTable) currentEpoch() uint64 {
+       s := tst.currentSnapshot()
+       if s == nil {
+               return 0
+       }
+       defer s.decRef()
+       return s.epoch
+}
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 6760f10e..11e635b8 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -18,6 +18,9 @@
 package measure
 
 import (
+       "fmt"
+       "path"
+       "path/filepath"
        "sort"
        "sync"
        "sync/atomic"
@@ -26,9 +29,21 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       metadataFilename               = "metadata.json"
+       primaryFilename                = "primary.bin"
+       metaFilename                   = "meta.bin"
+       timestampsFilename             = "timestamps.bin"
+       fieldValuesFilename            = "fields.bin"
+       tagFamiliesMetadataFilenameExt = ".tfm"
+       tagFamiliesFilenameExt         = ".tf"
 )
 
 type part struct {
+       path                 string
        meta                 fs.Reader
        primary              fs.Reader
        timestamps           fs.Reader
@@ -39,6 +54,18 @@ type part struct {
        partMetadata         partMetadata
 }
 
+func (p *part) close() {
+       fs.MustClose(p.primary)
+       fs.MustClose(p.timestamps)
+       fs.MustClose(p.fieldValues)
+       for _, tf := range p.tagFamilies {
+               fs.MustClose(tf)
+       }
+       for _, tfh := range p.tagFamilyMetadata {
+               fs.MustClose(tfh)
+       }
+}
+
 func openMemPart(mp *memPart) *part {
        var p part
        p.partMetadata = mp.partMetadata
@@ -140,6 +167,25 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) 
{
        releaseBlockWriter(bsw)
 }
 
+func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) {
+       fileSystem.MkdirPanicIfExist(path, dirPermission)
+
+       fileSystem.Write(mp.meta.Buf, filepath.Join(path, metaFilename), 
filePermission)
+       fileSystem.Write(mp.primary.Buf, filepath.Join(path, primaryFilename), 
filePermission)
+       fileSystem.Write(mp.timestamps.Buf, filepath.Join(path, 
timestampsFilename), filePermission)
+       fileSystem.Write(mp.fieldValues.Buf, filepath.Join(path, 
fieldValuesFilename), filePermission)
+       for name, tf := range mp.tagFamilies {
+               fileSystem.Write(tf.Buf, filepath.Join(path, 
name+tagFamiliesFilenameExt), filePermission)
+       }
+       for name, tfh := range mp.tagFamilyMetadata {
+               fileSystem.Write(tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), filePermission)
+       }
+
+       mp.partMetadata.mustWriteMetadata(fileSystem, path)
+
+       fileSystem.SyncPath(path)
+}
+
 func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
        n := uint64(len(time.RFC3339Nano))
        n += uint64(len(dps.fields[index].name))
@@ -171,13 +217,15 @@ func releaseMemPart(mp *memPart) {
 var memPartPool sync.Pool
 
 type partWrapper struct {
-       mp  *memPart
-       p   *part
-       ref int32
+       mp            *memPart
+       p             *part
+       ref           int32
+       mustBeDeleted uint32
+       fileSystem    fs.FileSystem
 }
 
-func newMemPartWrapper(mp *memPart, p *part) *partWrapper {
-       return &partWrapper{mp: mp, p: p, ref: 1}
+func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem) 
*partWrapper {
+       return &partWrapper{mp: mp, p: p, fileSystem: fileSystem, ref: 1}
 }
 
 func (pw *partWrapper) incRef() {
@@ -192,5 +240,67 @@ func (pw *partWrapper) decRef() {
        if pw.mp != nil {
                releaseMemPart(pw.mp)
                pw.mp = nil
+               pw.p = nil
+               return
+       }
+       pw.p.close()
+       if atomic.LoadUint32(&pw.mustBeDeleted) == 0 {
+               return
+       }
+       pw.fileSystem.MustRMAll(pw.p.path)
+}
+
+func (pw *partWrapper) ID() uint64 {
+       return pw.p.partMetadata.ID
+}
+
+func mustOpenFilePart(partPath string, fileSystem fs.FileSystem) *part {
+       var p part
+       p.path = partPath
+       p.partMetadata.mustReadMetadata(fileSystem, partPath)
+
+       metaPath := path.Join(partPath, metaFilename)
+       pr := mustOpenReader(metaPath, fileSystem)
+       p.primaryBlockMetadata = 
mustReadPrimaryBlockMetadata(p.primaryBlockMetadata[:0], newReader(pr))
+       fs.MustClose(pr)
+
+       p.primary = mustOpenReader(path.Join(partPath, primaryFilename), 
fileSystem)
+       p.timestamps = mustOpenReader(path.Join(partPath, timestampsFilename), 
fileSystem)
+       p.fieldValues = mustOpenReader(path.Join(partPath, 
fieldValuesFilename), fileSystem)
+       ee := fileSystem.ReadDir(partPath)
+       for _, e := range ee {
+               if e.IsDir() {
+                       continue
+               }
+               if filepath.Ext(e.Name()) == tagFamiliesMetadataFilenameExt {
+                       if p.tagFamilyMetadata == nil {
+                               p.tagFamilyMetadata = make(map[string]fs.Reader)
+                       }
+                       p.tagFamilyMetadata[removeExt(e.Name(), 
tagFamiliesMetadataFilenameExt)] = mustOpenReader(path.Join(partPath, 
e.Name()), fileSystem)
+               }
+               if filepath.Ext(e.Name()) == tagFamiliesFilenameExt {
+                       if p.tagFamilies == nil {
+                               p.tagFamilies = make(map[string]fs.Reader)
+                       }
+                       p.tagFamilies[removeExt(e.Name(), 
tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), 
fileSystem)
+               }
+
+       }
+       return &p
+}
+
+func mustOpenReader(name string, fileSystem fs.FileSystem) fs.Reader {
+       f, err := fileSystem.OpenFile(name)
+       if err != nil {
+               logger.Panicf("cannot open %q: %s", name, err)
        }
+       return f
+}
+
+func removeExt(nameWithExt, ext string) string {
+       return nameWithExt[:len(nameWithExt)-len(ext)]
+}
+
+func partPath(root string, epoch uint64) string {
+       return filepath.Join(root, fmt.Sprintf("%016x", epoch))
 }
diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go
index 6c6f6e17..0323e660 100644
--- a/banyand/measure/part_metadata.go
+++ b/banyand/measure/part_metadata.go
@@ -17,6 +17,14 @@
 
 package measure
 
+import (
+       "encoding/json"
+       "path/filepath"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
 type partMetadata struct {
        CompressedSizeBytes   uint64
        UncompressedSizeBytes uint64
@@ -24,15 +32,51 @@ type partMetadata struct {
        BlocksCount           uint64
        MinTimestamp          int64
        MaxTimestamp          int64
-       Version               int64
+       ID                    uint64
+}
+
+func (pm *partMetadata) reset() {
+       pm.CompressedSizeBytes = 0
+       pm.UncompressedSizeBytes = 0
+       pm.TotalCount = 0
+       pm.BlocksCount = 0
+       pm.MinTimestamp = 0
+       pm.MaxTimestamp = 0
+       pm.ID = 0
+}
+
+func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath 
string) {
+       pm.reset()
+
+       metadataPath := filepath.Join(partPath, metadataFilename)
+       metadata, err := fileSystem.Read(metadataPath)
+       if err != nil {
+               logger.Panicf("cannot read %s", err)
+               return
+       }
+       if err := json.Unmarshal(metadata, pm); err != nil {
+               logger.Panicf("cannot parse %q: %s", metadataPath, err)
+               return
+       }
+
+       if pm.MinTimestamp > pm.MaxTimestamp {
+               logger.Panicf("MinTimestamp cannot exceed MaxTimestamp; got %d 
vs %d", pm.MinTimestamp, pm.MaxTimestamp)
+       }
 }
 
-func (ph *partMetadata) reset() {
-       ph.CompressedSizeBytes = 0
-       ph.UncompressedSizeBytes = 0
-       ph.TotalCount = 0
-       ph.BlocksCount = 0
-       ph.MinTimestamp = 0
-       ph.MaxTimestamp = 0
-       ph.Version = 0
+func (pm *partMetadata) mustWriteMetadata(fileSystem fs.FileSystem, partPath 
string) {
+       metadata, err := json.Marshal(pm)
+       if err != nil {
+               logger.Panicf("cannot marshal metadata: %s", err)
+               return
+       }
+       metadataPath := filepath.Join(partPath, metadataFilename)
+       n, err := fileSystem.Write(metadata, metadataPath, filePermission)
+       if err != nil {
+               logger.Panicf("cannot write metadata: %s", err)
+               return
+       }
+       if n != len(metadata) {
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", metadataPath, n, len(metadata))
+       }
 }
diff --git a/banyand/measure/part_test.go b/banyand/measure/part_test.go
index 29b5fdfe..5473fd76 100644
--- a/banyand/measure/part_test.go
+++ b/banyand/measure/part_test.go
@@ -21,13 +21,16 @@ import (
        "testing"
 
        "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-func Test_memPart_mustInitFromDataPoints(t *testing.T) {
+func TestMustInitFromDataPoints(t *testing.T) {
        tests := []struct {
                dps  *dataPoints
                name string
@@ -92,6 +95,30 @@ func Test_memPart_mustInitFromDataPoints(t *testing.T) {
                        assert.Equal(t, tt.want.MinTimestamp, 
mp.partMetadata.MinTimestamp)
                        assert.Equal(t, tt.want.MaxTimestamp, 
mp.partMetadata.MaxTimestamp)
                        assert.Equal(t, tt.want.TotalCount, 
mp.partMetadata.TotalCount)
+                       assert.Equal(t, len(mp.tagFamilies), 
len(mp.tagFamilyMetadata))
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       epoch := uint64(1)
+                       path := partPath(tmpPath, epoch)
+
+                       fileSystem := fs.NewLocalFileSystem()
+                       mp.mustFlush(fileSystem, path)
+                       p := mustOpenFilePart(path, fileSystem)
+                       defer p.close()
+                       assert.Equal(t, tt.want.BlocksCount, 
p.partMetadata.BlocksCount)
+                       assert.Equal(t, tt.want.MinTimestamp, 
p.partMetadata.MinTimestamp)
+                       assert.Equal(t, tt.want.MaxTimestamp, 
p.partMetadata.MaxTimestamp)
+                       assert.Equal(t, tt.want.TotalCount, 
p.partMetadata.TotalCount)
+                       if len(mp.tagFamilies) > 0 {
+                               for k := range mp.tagFamilies {
+                                       _, ok := mp.tagFamilyMetadata[k]
+                                       require.True(t, ok, 
"mp.tagFamilyMetadata %s not found", k)
+                                       _, ok = p.tagFamilies[k]
+                                       require.True(t, ok, "p.tagFamilies %s 
not found", k)
+                                       _, ok = p.tagFamilyMetadata[k]
+                                       require.True(t, ok, 
"p.tagFamilyMetadata %s not found", k)
+                               }
+                       }
                })
        }
 }
diff --git a/banyand/measure/primary_metadata.go 
b/banyand/measure/primary_metadata.go
index 19374283..7b09503e 100644
--- a/banyand/measure/primary_metadata.go
+++ b/banyand/measure/primary_metadata.go
@@ -100,7 +100,6 @@ func mustReadPrimaryBlockMetadata(dst 
[]primaryBlockMetadata, r *reader) []prima
        return dst
 }
 
-// unmarshalPrimaryBlockMetadata appends unmarshaled from src indexBlockHeader 
entries to dst and returns the result.
 func unmarshalPrimaryBlockMetadata(dst []primaryBlockMetadata, src []byte) 
([]primaryBlockMetadata, error) {
        dstOrig := dst
        for len(src) > 0 {
@@ -112,7 +111,7 @@ func unmarshalPrimaryBlockMetadata(dst 
[]primaryBlockMetadata, src []byte) ([]pr
                ih := &dst[len(dst)-1]
                tail, err := ih.unmarshal(src)
                if err != nil {
-                       return dstOrig, fmt.Errorf("cannot unmarshal 
indexBlockHeader %d: %w", len(dst)-len(dstOrig), err)
+                       return dstOrig, fmt.Errorf("cannot unmarshal 
primaryBlockHeader %d: %w", len(dst)-len(dstOrig), err)
                }
                src = tail
        }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index d4ebadab..4451a253 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -86,15 +86,24 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
        for i := range sl {
                sids = append(sids, sl[i].ID)
        }
-       var pws []*partWrapper
        var parts []*part
        qo := queryOptions{
                MeasureQueryOptions: mqo,
                minTimestamp:        mqo.TimeRange.Start.UnixNano(),
                maxTimestamp:        mqo.TimeRange.End.UnixNano(),
        }
-       for _, tw := range tabWrappers {
-               pws, parts = tw.Table().getParts(pws, parts, qo)
+       var n int
+       for i := range tabWrappers {
+               s := tabWrappers[i].Table().currentSnapshot()
+               if s == nil {
+                       continue
+               }
+               parts, n = s.getParts(parts, qo)
+               if n < 1 {
+                       s.decRef()
+                       continue
+               }
+               result.snapshots = append(result.snapshots, s)
        }
        // TODO: cache tstIter
        var tstIter tstIter
@@ -303,7 +312,7 @@ func binaryDataFieldValue(value []byte) *modelv1.FieldValue 
{
 type queryResult struct {
        sidToIndex map[common.SeriesID]int
        data       []*blockCursor
-       pws        []*partWrapper
+       snapshots  []*snapshot
        loaded     bool
        orderByTS  bool
        ascTS      bool
@@ -348,10 +357,10 @@ func (qr *queryResult) Release() {
                qr.data[i] = nil
        }
        qr.data = qr.data[:0]
-       for i := range qr.pws {
-               qr.pws[i].decRef()
+       for i := range qr.snapshots {
+               qr.snapshots[i].decRef()
        }
-       qr.pws = qr.pws[:0]
+       qr.snapshots = qr.snapshots[:0]
 }
 
 func (qr queryResult) Len() int {
@@ -361,8 +370,8 @@ func (qr queryResult) Len() int {
 func (qr queryResult) Less(i, j int) bool {
        leftTS := qr.data[i].timestamps[qr.data[i].idx]
        rightTS := qr.data[j].timestamps[qr.data[j].idx]
-       leftVersion := qr.data[i].p.partMetadata.Version
-       rightVersion := qr.data[j].p.partMetadata.Version
+       leftVersion := qr.data[i].p.partMetadata.ID
+       rightVersion := qr.data[j].p.partMetadata.ID
        if qr.orderByTS {
                if leftTS == rightTS {
                        if qr.data[i].bm.seriesID == qr.data[j].bm.seriesID {
@@ -416,7 +425,7 @@ func (qr *queryResult) merge() *pbv1.Result {
                step = -1
        }
        result := &pbv1.Result{}
-       var lastPartVersion int64
+       var lastPartVersion uint64
        var lastSid common.SeriesID
 
        for qr.Len() > 0 {
@@ -428,12 +437,12 @@ func (qr *queryResult) merge() *pbv1.Result {
 
                if len(result.Timestamps) > 0 &&
                        topBC.timestamps[topBC.idx] == 
result.Timestamps[len(result.Timestamps)-1] {
-                       if topBC.p.partMetadata.Version > lastPartVersion {
+                       if topBC.p.partMetadata.ID > lastPartVersion {
                                logger.Panicf("following parts version should 
be less or equal to the previous one")
                        }
                } else {
                        topBC.copyTo(result)
-                       lastPartVersion = topBC.p.partMetadata.Version
+                       lastPartVersion = topBC.p.partMetadata.ID
                }
 
                topBC.idx += step
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index d69f77ad..44ad9c52 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -24,6 +24,7 @@ import (
        "time"
 
        "github.com/google/go-cmp/cmp"
+       "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/testing/protocmp"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -374,12 +375,11 @@ func TestQueryResult(t *testing.T) {
                                minTimestamp: tt.minTimestamp,
                                maxTimestamp: tt.maxTimestamp,
                        }
-                       pws, pp := tst.getParts(nil, nil, queryOpts)
-                       defer func() {
-                               for _, pw := range pws {
-                                       pw.decRef()
-                               }
-                       }()
+                       s := tst.currentSnapshot()
+                       require.NotNil(t, s)
+                       defer s.decRef()
+                       pp, n := s.getParts(nil, queryOpts)
+                       require.Equal(t, len(tt.dpsList), n)
                        sids := make([]common.SeriesID, len(tt.sids))
                        copy(sids, tt.sids)
                        sort.Slice(sids, func(i, j int) bool {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
new file mode 100644
index 00000000..63339392
--- /dev/null
+++ b/banyand/measure/snapshot.go
@@ -0,0 +1,109 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "errors"
+       "fmt"
+       "path/filepath"
+       "sync/atomic"
+)
+
+func (tst *tsTable) currentSnapshot() *snapshot {
+       tst.RLock()
+       defer tst.RUnlock()
+       if tst.snapshot == nil {
+               return nil
+       }
+       s := tst.snapshot
+       s.incRef()
+       return s
+}
+
+type snapshot struct {
+       parts []*partWrapper
+       epoch uint64
+
+       ref int32
+}
+
+func (s *snapshot) getParts(dst []*part, opts queryOptions) ([]*part, int) {
+       var count int
+       for _, p := range s.parts {
+               pm := p.p.partMetadata
+               if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > 
pm.MaxTimestamp {
+                       continue
+               }
+               dst = append(dst, p.p)
+               count++
+       }
+       return dst, count
+}
+
+func (s *snapshot) incRef() {
+       atomic.AddInt32(&s.ref, 1)
+}
+
+func (s *snapshot) decRef() {
+       n := atomic.AddInt32(&s.ref, -1)
+       if n > 0 {
+               return
+       }
+       for i := range s.parts {
+               s.parts[i].decRef()
+       }
+       s.parts = s.parts[:0]
+}
+
+func (s snapshot) copyAllTo(nextEpoch uint64) snapshot {
+       s.epoch = nextEpoch
+       s.ref = 1
+       for i := range s.parts {
+               s.parts[i].incRef()
+       }
+       return s
+}
+
+func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) 
snapshot {
+       var result snapshot
+       result.epoch = nextEpoch
+       result.ref = 1
+       for i := 0; i < len(s.parts); i++ {
+               if n, ok := nextParts[s.parts[i].ID()]; ok {
+                       result.parts = append(result.parts, n)
+                       continue
+               }
+               s.parts[i].incRef()
+               result.parts = append(result.parts, s.parts[i])
+       }
+       return result
+}
+
+func snapshotName(snapshot uint64) string {
+       return fmt.Sprintf("%016x%s", snapshot, snapshotSuffix)
+}
+
+func parseSnapshot(name string) (uint64, error) {
+       if filepath.Ext(name) != snapshotSuffix {
+               return 0, errors.New("invalid snapshot file ext")
+       }
+       if len(name) < 16 {
+               return 0, errors.New("invalid snapshot file name")
+       }
+       return parseEpoch(name[:16])
+}
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
new file mode 100644
index 00000000..1b471745
--- /dev/null
+++ b/banyand/measure/snapshot_test.go
@@ -0,0 +1,287 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+)
+
+func TestSnapshotGetParts(t *testing.T) {
+       tests := []struct {
+               name     string
+               snapshot *snapshot
+               dst      []*part
+               opts     queryOptions
+               expected []*part
+               count    int
+       }{
+               {
+                       name: "Test with empty snapshot",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 0,
+                               maxTimestamp: 10,
+                       },
+                       expected: []*part{},
+                       count:    0,
+               },
+               {
+                       name: "Test with non-empty snapshot and no matching 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 0,
+                                                       MaxTimestamp: 5,
+                                               }},
+                                       },
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 6,
+                                                       MaxTimestamp: 10,
+                                               }},
+                                       },
+                               },
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 11,
+                               maxTimestamp: 15,
+                       },
+                       expected: []*part{},
+                       count:    0,
+               },
+               {
+                       name: "Test with non-empty snapshot and some matching 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 0,
+                                                       MaxTimestamp: 5,
+                                               }},
+                                       },
+                                       {
+                                               p: &part{partMetadata: 
partMetadata{
+                                                       MinTimestamp: 6,
+                                                       MaxTimestamp: 10,
+                                               }},
+                                       },
+                               },
+                       },
+                       dst: []*part{},
+                       opts: queryOptions{
+                               minTimestamp: 5,
+                               maxTimestamp: 10,
+                       },
+                       expected: []*part{
+                               {partMetadata: partMetadata{
+                                       MinTimestamp: 0,
+                                       MaxTimestamp: 5,
+                               }},
+                               {partMetadata: partMetadata{
+                                       MinTimestamp: 6,
+                                       MaxTimestamp: 10,
+                               }},
+                       },
+                       count: 2,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result, count := tt.snapshot.getParts(tt.dst, tt.opts)
+                       assert.Equal(t, tt.expected, result)
+                       assert.Equal(t, tt.count, count)
+               })
+       }
+}
+
+func TestSnapshotCopyAllTo(t *testing.T) {
+       tests := []struct {
+               name      string
+               snapshot  snapshot
+               nextEpoch uint64
+               closePrev bool
+               expected  snapshot
+       }{
+               {
+                       name: "Test with empty snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       nextEpoch: 1,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{},
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {ref: 2},
+                                       {ref: 3},
+                               },
+                       },
+               },
+               {
+                       name: "Test with closed previous snapshot",
+                       snapshot: snapshot{
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       closePrev: true,
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {ref: 1},
+                                       {ref: 2},
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tt.snapshot.copyAllTo(tt.nextEpoch)
+                       if tt.closePrev {
+                               tt.snapshot.decRef()
+                       }
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
+
+func TestSnapshotMerge(t *testing.T) {
+       tests := []struct {
+               name      string
+               snapshot  *snapshot
+               closePrev bool
+               nextEpoch uint64
+               nextParts map[uint64]*partWrapper
+               expected  snapshot
+       }{
+               {
+                       name: "Test with empty snapshot and empty next parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{},
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{},
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: nil,
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot and empty next 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{},
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 2},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 3},
+                               },
+                       },
+               },
+               {
+                       name: "Test with non-empty snapshot and non-empty next 
parts",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{
+                               2: {p: &part{partMetadata: partMetadata{ID: 
2}}, ref: 1},
+                               3: {p: &part{partMetadata: partMetadata{ID: 
3}}, ref: 1},
+                       },
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 2},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 1},
+                               },
+                       },
+               },
+               {
+                       name: "Test with closed previous snapshot",
+                       snapshot: &snapshot{
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 2},
+                               },
+                       },
+                       closePrev: true,
+                       nextEpoch: 1,
+                       nextParts: map[uint64]*partWrapper{
+                               2: {p: &part{partMetadata: partMetadata{ID: 
2}}, ref: 1},
+                               3: {p: &part{partMetadata: partMetadata{ID: 
3}}, ref: 1},
+                       },
+                       expected: snapshot{
+                               epoch: 1,
+                               ref:   1,
+                               parts: []*partWrapper{
+                                       {p: &part{partMetadata: 
partMetadata{ID: 1}}, ref: 1},
+                                       {p: &part{partMetadata: 
partMetadata{ID: 2}}, ref: 1},
+                               },
+                       },
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       result := tt.snapshot.merge(tt.nextEpoch, tt.nextParts)
+                       if tt.closePrev {
+                               tt.snapshot.decRef()
+                       }
+                       assert.Equal(t, tt.expected, result)
+               })
+       }
+}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 6ce4adbe..a51d51e8 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -19,30 +19,192 @@ package measure
 
 import (
        "container/heap"
+       "encoding/json"
        "errors"
        "fmt"
        "io"
+       "path/filepath"
+       "sort"
+       "strconv"
        "sync"
+       "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
-func newTSTable(_ string, _ common.Position, _ *logger.Logger, _ 
timestamp.TimeRange) (*tsTable, error) {
-       return &tsTable{}, nil
+const (
+       snapshotSuffix = ".snp"
+       filePermission = 0o600
+       dirPermission  = 0o700
+)
+
+func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position, 
l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) {
+       var tsTable tsTable
+       tsTable.fileSystem = fileSystem
+       tsTable.root = rootPath
+       tsTable.l = l
+       tsTable.gc.parent = &tsTable
+       ee := fileSystem.ReadDir(rootPath)
+       if len(ee) == 0 {
+               t := &tsTable
+               t.startLoop(uint64(time.Now().UnixNano()))
+               return t, nil
+       }
+       var loadedParts []uint64
+       var loadedSnapshots []uint64
+       var needToDelete []string
+       for i := range ee {
+               if ee[i].IsDir() {
+                       p, err := parseEpoch(ee[i].Name())
+                       if err != nil {
+                               l.Info().Err(err).Msg("cannot parse part file 
name. skip and delete it")
+                               needToDelete = append(needToDelete, 
ee[i].Name())
+                               continue
+                       }
+                       loadedParts = append(loadedParts, p)
+                       continue
+               }
+               if filepath.Ext(ee[i].Name()) != snapshotSuffix {
+                       continue
+               }
+               snapshot, err := parseSnapshot(ee[i].Name())
+               if err != nil {
+                       l.Info().Err(err).Msg("cannot parse snapshot file name. 
skip and delete it")
+                       needToDelete = append(needToDelete, ee[i].Name())
+                       continue
+               }
+               loadedSnapshots = append(loadedSnapshots, snapshot)
+               tsTable.gc.registerSnapshot(snapshot)
+       }
+       for i := range needToDelete {
+               if err := fileSystem.DeleteFile(filepath.Join(rootPath, 
needToDelete[i])); err != nil {
+                       l.Warn().Err(err).Str("path", filepath.Join(rootPath, 
needToDelete[i])).Msg("failed to delete part. Please check manually")
+               }
+       }
+       if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
+               t := &tsTable
+               t.startLoop(uint64(time.Now().UnixNano()))
+               return t, nil
+       }
+       sort.Slice(loadedSnapshots, func(i, j int) bool {
+               return loadedSnapshots[i] > loadedSnapshots[j]
+       })
+       epoch := loadedSnapshots[0]
+       t := &tsTable
+       t.loadSnapshot(epoch, loadedParts)
+       t.startLoop(epoch)
+       return t, nil
 }
 
 type tsTable struct {
-       memParts []*partWrapper
+       l          *logger.Logger
+       fileSystem fs.FileSystem
+       gc         garbageCleaner
+       root       string
+       snapshot   *snapshot
        sync.RWMutex
+       introductions chan *introduction
+       loopCloser    *run.Closer
+}
+
+func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
+       parts := tst.mustReadSnapshot(epoch)
+       var snp snapshot
+       for _, partName := range loadedParts {
+               var find bool
+               for j := range parts {
+                       if partName == parts[j] {
+                               find = true
+                               break
+                       }
+               }
+               if !find {
+                       tst.gc.submitParts(partName)
+               }
+               p := mustOpenFilePart(partPath(tst.root, partName), 
tst.fileSystem)
+               snp.parts = append(snp.parts, newPartWrapper(nil, p, 
tst.fileSystem))
+       }
+       tst.gc.clean()
+       if len(snp.parts) < 1 {
+               return
+       }
+       snp.incRef()
+       tst.snapshot = &snp
+}
+
+func (tst *tsTable) startLoop(cur uint64) {
+       next := cur + 1
+       tst.loopCloser = run.NewCloser(3)
+       tst.introductions = make(chan *introduction)
+       flushCh := make(chan *flusherIntroduction)
+       introducerWatcher := make(watcher.Channel, 1)
+       go tst.introducerLoop(flushCh, introducerWatcher, next)
+       go tst.flusherLoop(flushCh, introducerWatcher, cur)
+}
+
+func parseEpoch(epochStr string) (uint64, error) {
+       p, err := strconv.ParseUint(epochStr, 16, 64)
+       if err != nil {
+               return 0, fmt.Errorf("cannot parse path %s: %w", epochStr, err)
+       }
+       return p, nil
+}
+
+func (tst *tsTable) mustWriteSnapshot(snapshot uint64, partNames []string) {
+       data, err := json.Marshal(partNames)
+       if err != nil {
+               logger.Panicf("cannot marshal partNames to JSON: %s", err)
+       }
+       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+       lf, err := tst.fileSystem.CreateLockFile(snapshotPath, filePermission)
+       if err != nil {
+               logger.Panicf("cannot create lock file %s: %s", snapshotPath, 
err)
+       }
+       n, err := lf.Write(data)
+       if err != nil {
+               logger.Panicf("cannot write snapshot %s: %s", snapshotPath, err)
+       }
+       if n != len(data) {
+               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotPath, n, len(data))
+       }
+}
+
+func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 {
+       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
+       data, err := tst.fileSystem.Read(snapshotPath)
+       if err != nil {
+               logger.Panicf("cannot read %s: %s", snapshotPath, err)
+       }
+       var partNames []string
+       if err := json.Unmarshal(data, &partNames); err != nil {
+               logger.Panicf("cannot parse %s: %s", snapshotPath, err)
+       }
+       var result []uint64
+       for i := range partNames {
+               e, err := parseEpoch(partNames[i])
+               if err != nil {
+                       logger.Panicf("cannot parse %s: %s", partNames[i], err)
+               }
+               result = append(result, e)
+       }
+       return result
 }
 
 func (tst *tsTable) Close() error {
        tst.Lock()
        defer tst.Unlock()
-       for _, p := range tst.memParts {
-               p.decRef()
+       if tst.loopCloser != nil {
+
+               tst.loopCloser.Done()
+               tst.loopCloser.CloseThenWait()
+       }
+       if tst.snapshot != nil {
+               tst.snapshot.decRef()
        }
        return nil
 }
@@ -56,26 +218,19 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
        mp.mustInitFromDataPoints(dps)
        p := openMemPart(mp)
 
-       pw := newMemPartWrapper(mp, p)
-
-       tst.Lock()
-       defer tst.Unlock()
-       tst.memParts = append(tst.memParts, pw)
-}
+       ind := generateIntroduction()
+       ind.applied = make(chan struct{})
+       ind.memPart = newPartWrapper(mp, p, tst.fileSystem)
 
-func (tst *tsTable) getParts(dst []*partWrapper, dstPart []*part, opts 
queryOptions) ([]*partWrapper, []*part) {
-       tst.RLock()
-       defer tst.RUnlock()
-       for _, p := range tst.memParts {
-               pm := p.mp.partMetadata
-               if opts.maxTimestamp < pm.MinTimestamp || opts.minTimestamp > 
pm.MaxTimestamp {
-                       continue
-               }
-               p.incRef()
-               dst = append(dst, p)
-               dstPart = append(dstPart, p.p)
+       select {
+       case tst.introductions <- ind:
+       case <-tst.loopCloser.CloseNotify():
+               return
+       }
+       select {
+       case <-ind.applied:
+       case <-tst.loopCloser.CloseNotify():
        }
-       return dst, dstPart
 }
 
 type tstIter struct {
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index fdfb9879..e4599596 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -94,13 +94,18 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
                                tst.mustAddDataPoints(dps)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       assert.Equal(t, tt.want, len(tst.memParts))
-                       var lastVersion int64
-                       for _, pw := range tst.memParts {
+                       s := tst.currentSnapshot()
+                       if s == nil {
+                               s = new(snapshot)
+                       }
+                       defer s.decRef()
+                       assert.Equal(t, tt.want, len(s.parts))
+                       var lastVersion uint64
+                       for _, pw := range s.parts {
                                if lastVersion == 0 {
-                                       lastVersion = pw.p.partMetadata.Version
+                                       lastVersion = pw.p.partMetadata.ID
                                } else {
-                                       require.Less(t, lastVersion, 
pw.p.partMetadata.Version)
+                                       require.Less(t, lastVersion, 
pw.p.partMetadata.ID)
                                }
                        }
                })
@@ -175,15 +180,16 @@ func Test_tstIter(t *testing.T) {
                                tst.mustAddDataPoints(dps)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       pws, pp := tst.getParts(nil, nil, queryOptions{
+                       s := tst.currentSnapshot()
+                       if s == nil {
+                               s = new(snapshot)
+                       }
+                       defer s.decRef()
+                       pp, n := s.getParts(nil, queryOptions{
                                minTimestamp: tt.minTimestamp,
                                maxTimestamp: tt.maxTimestamp,
                        })
-                       defer func() {
-                               for _, pw := range pws {
-                                       pw.decRef()
-                               }
-                       }()
+                       require.Equal(t, len(s.parts), n)
                        ti := &tstIter{}
                        ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
                        var got []blockMetadata
diff --git a/go.mod b/go.mod
index 3d57a1a9..4a9f0d19 100644
--- a/go.mod
+++ b/go.mod
@@ -139,7 +139,7 @@ require (
        go.uber.org/zap v1.26.0
        golang.org/x/crypto v0.17.0 // indirect
        golang.org/x/net v0.19.0 // indirect
-       golang.org/x/sys v0.15.0 // indirect
+       golang.org/x/sys v0.15.0
        golang.org/x/text v0.14.0 // indirect
        golang.org/x/time v0.5.0 // indirect
        golang.org/x/tools v0.16.1 // indirect
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index a54c513a..6db456a5 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -21,16 +21,17 @@ package fs
 import "fmt"
 
 const (
-       isExistError    = 0
-       isNotExistError = 1
-       permissionError = 2
-       openError       = 3
-       deleteError     = 4
-       writeError      = 5
-       readError       = 6
-       flushError      = 7
-       closeError      = 8
-       otherError      = 9
+       isExistError = iota
+       isNotExistError
+       permissionError
+       openError
+       deleteError
+       writeError
+       readError
+       flushError
+       closeError
+       lockError
+       otherError
 )
 
 // FileSystemError implements the Error interface.
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 098287e2..b5f8aa10 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -94,12 +94,20 @@ type FileSystem interface {
        ReadDir(dirname string) []DirEntry
        // Create and open the file by specified name and mode.
        CreateFile(name string, permission Mode) (File, error)
+       // Create and open lock file by specified name and mode.
+       CreateLockFile(name string, permission Mode) (File, error)
+       // Open the file by specified name and mode.
+       OpenFile(name string) (File, error)
        // Flush mode, which flushes all data to one file.
        Write(buffer []byte, name string, permission Mode) (int, error)
+       // Read the entire file using streaming read.
+       Read(name string) ([]byte, error)
        // Delete the file.
        DeleteFile(name string) error
        // Delete the directory.
        MustRMAll(path string)
+       // SyncPath the directory of file.
+       SyncPath(path string)
 }
 
 // DirEntry is the interface that wraps the basic information about a file or 
directory.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 4de839ff..563a2aa2 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -25,6 +25,7 @@ import (
        "io"
        "os"
        "path/filepath"
+       "time"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -159,6 +160,31 @@ func (fs *localFileSystem) CreateFile(name string, 
permission Mode) (File, error
        }
 }
 
+func (fs *localFileSystem) OpenFile(name string) (File, error) {
+       file, err := os.Open(name)
+       switch {
+       case err == nil:
+               return &LocalFile{
+                       file: file,
+               }, nil
+       case os.IsNotExist(err):
+               return nil, &FileSystemError{
+                       Code:    isNotExistError,
+                       Message: fmt.Sprintf("File is not exist, file name: 
%s,error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return nil, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, error message: %s", name, err),
+               }
+       default:
+               return nil, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create file return error, file 
name: %s,error message: %s", name, err),
+               }
+       }
+}
+
 // Write flushes all data to one file.
 func (fs *localFileSystem) Write(buffer []byte, name string, permission Mode) 
(int, error) {
        file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
@@ -194,6 +220,30 @@ func (fs *localFileSystem) Write(buffer []byte, name 
string, permission Mode) (i
        return size, nil
 }
 
+// Read is used to read the entire file using streaming read.
+func (fs *localFileSystem) Read(name string) ([]byte, error) {
+       data, err := os.ReadFile(name)
+       switch {
+       case err == nil:
+               return data, nil
+       case os.IsNotExist(err):
+               return data, &FileSystemError{
+                       Code:    isNotExistError,
+                       Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return data, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, error message: %s", name, err),
+               }
+       default:
+               return data, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Read file error, file name: %s, 
error message: %s", name, err),
+               }
+       }
+}
+
 // DeleteFile is used to delete the file.
 func (fs *localFileSystem) DeleteFile(name string) error {
        err := os.Remove(name)
@@ -219,9 +269,16 @@ func (fs *localFileSystem) DeleteFile(name string) error {
 }
 
 func (fs *localFileSystem) MustRMAll(path string) {
-       if err := os.RemoveAll(path); err != nil {
-               logger.Panicf("failed to remove all files under %s", path)
+       if err := os.RemoveAll(path); err == nil {
+               return
+       }
+       for i := 0; i < 5; i++ {
+               time.Sleep(time.Second)
+               if err := os.RemoveAll(path); err == nil {
+                       return
+               }
        }
+       fs.logger.Panic().Str("path", path).Msg("failed to remove all files 
under path")
 }
 
 // Write adds new data to the end of a file.
diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go
new file mode 100644
index 00000000..05f71fef
--- /dev/null
+++ b/pkg/fs/local_file_system_nix.go
@@ -0,0 +1,74 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//go:build darwin || dragonfly || freebsd || linux || netbsd || openbsd
+// +build darwin dragonfly freebsd linux netbsd openbsd
+
+package fs
+
+import (
+       "fmt"
+       "os"
+
+       "golang.org/x/sys/unix"
+)
+
+// localFileSystem is the implementation of FileSystem interface.
+func (*localFileSystem) CreateLockFile(name string, permission Mode) (File, 
error) {
+       file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
+       switch {
+       case err == nil:
+               if err := unix.Flock(int(file.Fd()), 
unix.LOCK_EX|unix.LOCK_NB); err != nil {
+                       return nil, &FileSystemError{
+                               Code:    lockError,
+                               Message: fmt.Sprintf("Cannot lock file, file 
name: %s, error message: %s", name, err),
+                       }
+               }
+               return &LocalFile{
+                       file: file,
+               }, nil
+       case os.IsExist(err):
+               return nil, &FileSystemError{
+                       Code:    isExistError,
+                       Message: fmt.Sprintf("File is exist, file name: 
%s,error message: %s", name, err),
+               }
+       case os.IsPermission(err):
+               return nil, &FileSystemError{
+                       Code:    permissionError,
+                       Message: fmt.Sprintf("There is not enough permission, 
file name: %s, permission: %d,error message: %s", name, permission, err),
+               }
+       default:
+               return nil, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create file return error, file 
name: %s,error message: %s", name, err),
+               }
+       }
+}
+
+func (fs *localFileSystem) SyncPath(name string) {
+       file, err := os.Open(name)
+       if err != nil {
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
open file")
+       }
+       if err := file.Sync(); err != nil {
+               _ = file.Close()
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
sync file")
+       }
+       if err := file.Close(); err != nil {
+               fs.logger.Panic().Str("name", name).Err(err).Msg("failed to 
close file")
+       }
+}
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
new file mode 100644
index 00000000..6d5d63d4
--- /dev/null
+++ b/pkg/watcher/watcher.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package watcher provides a watcher to watch the epoch.
+package watcher
+
+// Epoch is a epoch watcher.
+// It will be notified when the epoch is reached.
+type Epoch struct {
+       epoch uint64
+       ch    chan struct{}
+}
+
+// Watch returns a channel that will be notified when the epoch is reached.
+func (e *Epoch) Watch() <-chan struct{} {
+       return e.ch
+}
+
+// Epochs is a list of epoch watchers.
+type Epochs []*Epoch
+
+// Add adds a epoch watcher.
+func (e *Epochs) Add(epoch *Epoch) {
+       *e = append(*e, epoch)
+}
+
+// Notify notifies all epoch watchers that the epoch is reached.
+func (e *Epochs) Notify(epoch uint64) {
+       var remained Epochs
+       for _, ep := range *e {
+               if ep.epoch <= epoch {
+                       close(ep.ch)
+                       continue
+               }
+               remained.Add(ep)
+       }
+       *e = remained
+}
+
+// Channel is a channel of epoch watchers.
+type Channel chan *Epoch
+
+// Add adds a epoch watcher.
+// It returns nil if the channel is closed.
+func (w Channel) Add(epoch uint64, closeCh <-chan struct{}) *Epoch {
+       ep := &Epoch{
+               epoch: epoch,
+               ch:    make(chan struct{}, 1),
+       }
+       select {
+       case w <- ep:
+       case <-closeCh:
+               return nil
+       }
+       return ep
+}

Reply via email to