This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch measure-flusher
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 5e578bc5f0424c37141afdacad627b2763ca0a5d
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 06:50:03 2023 +0800

    Fix measure UT
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/measure/flusher.go      |  14 +++-
 banyand/measure/gc.go           |  27 +++++--
 banyand/measure/introducer.go   |   6 +-
 banyand/measure/part.go         |  10 ++-
 banyand/measure/query_test.go   | 154 +++++++++++++++++++++++++---------------
 banyand/measure/tstable.go      |  11 +--
 banyand/measure/tstable_test.go | 119 +++++++++++++++++++++----------
 pkg/fs/error.go                 |   2 +-
 pkg/fs/local_file_system.go     |  14 ++--
 pkg/watcher/watcher.go          |   2 +-
 10 files changed, 240 insertions(+), 119 deletions(-)

diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 2609f09e..09346d1d 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -24,6 +24,9 @@ import (
 func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, 
introducerWatcher watcher.Channel, epoch uint64) {
        defer tst.loopCloser.Done()
        epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify())
+       if epochWatcher == nil {
+               return
+       }
 
        for {
                select {
@@ -43,11 +46,15 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, introducerWat
                                        return
                                }
                                tst.persistSnapshot(curSnapshot)
+                               curSnapshot.decRef()
                                if tst.currentEpoch() != epoch {
                                        continue
                                }
                        }
                        epochWatcher = introducerWatcher.Add(epoch, 
tst.loopCloser.CloseNotify())
+                       if epochWatcher == nil {
+                               return
+                       }
                        tst.gc.clean()
                }
        }
@@ -55,6 +62,7 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, introducerWat
 
 func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction) uint64 {
        ind := generateFlusherIntroduction()
+       defer releaseFlusherIntroduction(ind)
        for _, pw := range snapshot.parts {
                if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 {
                        continue
@@ -62,8 +70,12 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
                partPath := partPath(tst.root, pw.ID())
                pw.mp.mustFlush(tst.fileSystem, partPath)
                newPW := newPartWrapper(nil, mustOpenFilePart(partPath, 
tst.fileSystem), tst.fileSystem)
+               newPW.p.partMetadata.ID = pw.ID()
                ind.flushed[newPW.ID()] = newPW
        }
+       if len(ind.flushed) < 1 {
+               return snapshot.epoch
+       }
        ind.applied = make(chan struct{})
        select {
        case flushCh <- ind:
@@ -81,7 +93,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
 func (tst *tsTable) persistSnapshot(snapshot *snapshot) {
        var partNames []string
        for i := range snapshot.parts {
-               partNames = append(partNames, 
snapshotName(snapshot.parts[i].ID()))
+               partNames = append(partNames, partName(snapshot.parts[i].ID()))
        }
        tst.mustWriteSnapshot(snapshot.epoch, partNames)
        tst.gc.registerSnapshot(snapshot.epoch)
diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go
index dbdde01d..5bab2bb5 100644
--- a/banyand/measure/gc.go
+++ b/banyand/measure/gc.go
@@ -20,6 +20,8 @@ package measure
 import (
        "path"
        "sort"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
 )
 
 type garbageCleaner struct {
@@ -49,17 +51,34 @@ func (g *garbageCleaner) cleanSnapshots() {
        if len(g.snapshots) < 2 {
                return
        }
-       sort.Slice(g.snapshots, func(i, j int) bool {
+       if !sort.SliceIsSorted(g.snapshots, func(i, j int) bool {
                return g.snapshots[i] < g.snapshots[j]
-       })
+       }) {
+               sort.Slice(g.snapshots, func(i, j int) bool {
+                       return g.snapshots[i] < g.snapshots[j]
+               })
+       }
        // keep the latest snapshot
+       var remainingSnapshots []uint64
        for i := 0; i < len(g.snapshots)-1; i++ {
                filePath := path.Join(g.parent.root, 
snapshotName(g.snapshots[i]))
                if err := g.parent.fileSystem.DeleteFile(filePath); err != nil {
-                       g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot. Please check manually")
+                       if fse, ok := err.(*fs.FileSystemError); ok {
+                               if fse.Code != fs.IsNotExistError {
+                                       g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot, will retry in next round. Please 
check manually")
+                                       remainingSnapshots = 
append(remainingSnapshots, g.snapshots[i])
+                               }
+                       }
                }
        }
-       g.snapshots = g.snapshots[len(g.snapshots)-1:]
+       if remainingSnapshots == nil {
+               g.snapshots = g.snapshots[len(g.snapshots)-1:]
+               return
+       }
+       remained := g.snapshots[len(g.snapshots)-1]
+       g.snapshots = g.snapshots[:0]
+       g.snapshots = append(g.snapshots, remainingSnapshots...)
+       g.snapshots = append(g.snapshots, remained)
 }
 
 func (g garbageCleaner) cleanParts() {
diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go
index 3c124d2a..17a4edef 100644
--- a/banyand/measure/introducer.go
+++ b/banyand/measure/introducer.go
@@ -105,7 +105,6 @@ func (tst *tsTable) introduceMemPart(nextIntroduction 
*introduction, epoch uint6
        } else {
                cur = new(snapshot)
        }
-       defer releaseIntroduction(nextIntroduction)
 
        next := nextIntroduction.memPart
        nextSnp := cur.copyAllTo(epoch)
@@ -122,10 +121,7 @@ func (tst *tsTable) introduceFlushed(nextIntroduction 
*flusherIntroduction, epoc
        if cur == nil {
                tst.l.Panic().Msg("current snapshot is nil")
        }
-       defer func() {
-               cur.decRef()
-               releaseFlusherIntroduction(nextIntroduction)
-       }()
+       defer cur.decRef()
        nextSnp := cur.merge(epoch, nextIntroduction.flushed)
        tst.replaceSnapshot(&nextSnp)
        if nextIntroduction.applied != nil {
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 11e635b8..f2ab04f2 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -66,6 +66,10 @@ func (p *part) close() {
        }
 }
 
+func (p *part) String() string {
+       return fmt.Sprintf("part %d", p.partMetadata.ID)
+}
+
 func openMemPart(mp *memPart) *part {
        var p part
        p.partMetadata = mp.partMetadata
@@ -302,5 +306,9 @@ func removeExt(nameWithExt, ext string) string {
 }
 
 func partPath(root string, epoch uint64) string {
-       return filepath.Join(root, fmt.Sprintf("%016x", epoch))
+       return filepath.Join(root, partName(epoch))
+}
+
+func partName(epoch uint64) string {
+       return fmt.Sprintf("%016x", epoch)
 }
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 44ad9c52..4226dfc7 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -24,12 +24,19 @@ import (
        "time"
 
        "github.com/google/go-cmp/cmp"
+       "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
        "google.golang.org/protobuf/testing/protocmp"
 
        "github.com/apache/skywalking-banyandb/api/common"
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
 func TestQueryResult(t *testing.T) {
@@ -364,70 +371,101 @@ func TestQueryResult(t *testing.T) {
 
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       // Initialize a tstIter object.
-                       tst := &tsTable{}
-                       defer tst.Close()
-                       for _, dps := range tt.dpsList {
-                               tst.mustAddDataPoints(dps)
-                               time.Sleep(100 * time.Millisecond)
-                       }
-                       queryOpts := queryOptions{
-                               minTimestamp: tt.minTimestamp,
-                               maxTimestamp: tt.maxTimestamp,
-                       }
-                       s := tst.currentSnapshot()
-                       require.NotNil(t, s)
-                       defer s.decRef()
-                       pp, n := s.getParts(nil, queryOpts)
-                       require.Equal(t, len(tt.dpsList), n)
-                       sids := make([]common.SeriesID, len(tt.sids))
-                       copy(sids, tt.sids)
-                       sort.Slice(sids, func(i, j int) bool {
-                               return sids[i] < tt.sids[j]
-                       })
-                       ti := &tstIter{}
-                       ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp)
+                       verify := func(tst *tsTable) uint64 {
+                               defer tst.Close()
+                               queryOpts := queryOptions{
+                                       minTimestamp: tt.minTimestamp,
+                                       maxTimestamp: tt.maxTimestamp,
+                               }
+                               s := tst.currentSnapshot()
+                               require.NotNil(t, s)
+                               defer s.decRef()
+                               pp, n := s.getParts(nil, queryOpts)
+                               require.Equal(t, len(tt.dpsList), n, "parts: 
%+v", pp)
+                               sids := make([]common.SeriesID, len(tt.sids))
+                               copy(sids, tt.sids)
+                               sort.Slice(sids, func(i, j int) bool {
+                                       return sids[i] < tt.sids[j]
+                               })
+                               ti := &tstIter{}
+                               ti.init(pp, sids, tt.minTimestamp, 
tt.maxTimestamp)
 
-                       var result queryResult
-                       for ti.nextBlock() {
-                               bc := generateBlockCursor()
-                               p := ti.piHeap[0]
-                               opts := queryOpts
-                               opts.TagProjection = 
tagProjections[int(p.curBlock.seriesID)]
-                               opts.FieldProjection = 
fieldProjections[int(p.curBlock.seriesID)]
-                               bc.init(p.p, p.curBlock, opts)
-                               result.data = append(result.data, bc)
-                       }
-                       defer result.Release()
-                       if tt.orderBySeries {
-                               result.sidToIndex = 
make(map[common.SeriesID]int)
-                               for i, si := range tt.sids {
-                                       result.sidToIndex[si] = i
+                               var result queryResult
+                               for ti.nextBlock() {
+                                       bc := generateBlockCursor()
+                                       p := ti.piHeap[0]
+                                       opts := queryOpts
+                                       opts.TagProjection = 
tagProjections[int(p.curBlock.seriesID)]
+                                       opts.FieldProjection = 
fieldProjections[int(p.curBlock.seriesID)]
+                                       bc.init(p.p, p.curBlock, opts)
+                                       result.data = append(result.data, bc)
                                }
-                       } else {
-                               result.orderByTS = true
-                               result.ascTS = tt.ascTS
-                       }
-                       var got []pbv1.Result
-                       for {
-                               r := result.Pull()
-                               if r == nil {
-                                       break
+                               defer result.Release()
+                               if tt.orderBySeries {
+                                       result.sidToIndex = 
make(map[common.SeriesID]int)
+                                       for i, si := range tt.sids {
+                                               result.sidToIndex[si] = i
+                                       }
+                               } else {
+                                       result.orderByTS = true
+                                       result.ascTS = tt.ascTS
+                               }
+                               var got []pbv1.Result
+                               for {
+                                       r := result.Pull()
+                                       if r == nil {
+                                               break
+                                       }
+                                       sort.Slice(r.TagFamilies, func(i, j 
int) bool {
+                                               return r.TagFamilies[i].Name < 
r.TagFamilies[j].Name
+                                       })
+                                       got = append(got, *r)
                                }
-                               sort.Slice(r.TagFamilies, func(i, j int) bool {
-                                       return r.TagFamilies[i].Name < 
r.TagFamilies[j].Name
-                               })
-                               got = append(got, *r)
-                       }
 
-                       if !errors.Is(ti.Error(), tt.wantErr) {
-                               t.Errorf("Unexpected error: got %v, want %v", 
ti.err, tt.wantErr)
-                       }
+                               if !errors.Is(ti.Error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", ti.err, tt.wantErr)
+                               }
 
-                       if diff := cmp.Diff(got, tt.want,
-                               protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
-                               t.Errorf("Unexpected []pbv1.Result (-got 
+want):\n%s", diff)
+                               if diff := cmp.Diff(got, tt.want,
+                                       protocmp.IgnoreUnknown(), 
protocmp.Transform()); diff != "" {
+                                       t.Errorf("Unexpected []pbv1.Result 
(-got +want):\n%s", diff)
+                               }
+                               return s.epoch
                        }
+
+                       t.Run("memory snapshot", func(t *testing.T) {
+                               tst := &tsTable{
+                                       loopCloser:    run.NewCloser(2),
+                                       introductions: make(chan *introduction),
+                               }
+                               flushCh := make(chan *flusherIntroduction)
+                               introducerWatcher := make(watcher.Channel, 1)
+                               go tst.introducerLoop(flushCh, 
introducerWatcher, 1)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               _ = verify(tst)
+                       })
+
+                       t.Run("file snapshot", func(t *testing.T) {
+                               // Initialize a tstIter object.
+                               tmpPath, defFn := test.Space(require.New(t))
+                               fileSystem := fs.NewLocalFileSystem()
+                               defer defFn()
+                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               epoch := verify(tst)
+
+                               // reopen the table
+                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               assert.Equal(t, epoch, verify(tst))
+                       })
                })
        }
 }
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index a51d51e8..3e5b7e2a 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -114,7 +114,9 @@ type tsTable struct {
 
 func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
        parts := tst.mustReadSnapshot(epoch)
-       var snp snapshot
+       snp := snapshot{
+               epoch: epoch,
+       }
        for _, partName := range loadedParts {
                var find bool
                for j := range parts {
@@ -127,6 +129,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts 
[]uint64) {
                        tst.gc.submitParts(partName)
                }
                p := mustOpenFilePart(partPath(tst.root, partName), 
tst.fileSystem)
+               p.partMetadata.ID = partName
                snp.parts = append(snp.parts, newPartWrapper(nil, p, 
tst.fileSystem))
        }
        tst.gc.clean()
@@ -196,13 +199,12 @@ func (tst *tsTable) mustReadSnapshot(snapshot uint64) 
[]uint64 {
 }
 
 func (tst *tsTable) Close() error {
-       tst.Lock()
-       defer tst.Unlock()
        if tst.loopCloser != nil {
-
                tst.loopCloser.Done()
                tst.loopCloser.CloseThenWait()
        }
+       tst.RLock()
+       defer tst.RUnlock()
        if tst.snapshot != nil {
                tst.snapshot.decRef()
        }
@@ -219,6 +221,7 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) {
        p := openMemPart(mp)
 
        ind := generateIntroduction()
+       defer releaseIntroduction(ind)
        ind.applied = make(chan struct{})
        ind.memPart = newPartWrapper(mp, p, tst.fileSystem)
 
diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go
index e4599596..ca231e4f 100644
--- a/banyand/measure/tstable_test.go
+++ b/banyand/measure/tstable_test.go
@@ -29,7 +29,13 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/pkg/convert"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+       "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
 func Test_tsTable_mustAddDataPoints(t *testing.T) {
@@ -88,7 +94,13 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       tst := &tsTable{}
+                       tst := &tsTable{
+                               loopCloser:    run.NewCloser(2),
+                               introductions: make(chan *introduction),
+                       }
+                       flushCh := make(chan *flusherIntroduction)
+                       introducerWatcher := make(watcher.Channel, 1)
+                       go tst.introducerLoop(flushCh, introducerWatcher, 1)
                        defer tst.Close()
                        for _, dps := range tt.dpsList {
                                tst.mustAddDataPoints(dps)
@@ -102,10 +114,11 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) {
                        assert.Equal(t, tt.want, len(s.parts))
                        var lastVersion uint64
                        for _, pw := range s.parts {
+                               require.Greater(t, pw.ID(), uint64(0))
                                if lastVersion == 0 {
-                                       lastVersion = pw.p.partMetadata.ID
+                                       lastVersion = pw.ID()
                                } else {
-                                       require.Less(t, lastVersion, 
pw.p.partMetadata.ID)
+                                       require.Less(t, lastVersion, pw.ID())
                                }
                        }
                })
@@ -174,44 +187,76 @@ func Test_tstIter(t *testing.T) {
        }
        for _, tt := range tests {
                t.Run(tt.name, func(t *testing.T) {
-                       tst := &tsTable{}
-                       defer tst.Close()
-                       for _, dps := range tt.dpsList {
-                               tst.mustAddDataPoints(dps)
-                               time.Sleep(100 * time.Millisecond)
-                       }
-                       s := tst.currentSnapshot()
-                       if s == nil {
-                               s = new(snapshot)
-                       }
-                       defer s.decRef()
-                       pp, n := s.getParts(nil, queryOptions{
-                               minTimestamp: tt.minTimestamp,
-                               maxTimestamp: tt.maxTimestamp,
-                       })
-                       require.Equal(t, len(s.parts), n)
-                       ti := &tstIter{}
-                       ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp)
-                       var got []blockMetadata
-                       for ti.nextBlock() {
-                               if ti.piHeap[0].curBlock.seriesID == 0 {
-                                       t.Errorf("Expected curBlock to be 
initialized, but it was nil")
+                       verify := func(tst *tsTable) uint64 {
+                               defer tst.Close()
+                               s := tst.currentSnapshot()
+                               if s == nil {
+                                       s = new(snapshot)
+                               }
+                               defer s.decRef()
+                               pp, n := s.getParts(nil, queryOptions{
+                                       minTimestamp: tt.minTimestamp,
+                                       maxTimestamp: tt.maxTimestamp,
+                               })
+                               require.Equal(t, len(s.parts), n)
+                               ti := &tstIter{}
+                               ti.init(pp, tt.sids, tt.minTimestamp, 
tt.maxTimestamp)
+                               var got []blockMetadata
+                               for ti.nextBlock() {
+                                       if ti.piHeap[0].curBlock.seriesID == 0 {
+                                               t.Errorf("Expected curBlock to 
be initialized, but it was nil")
+                                       }
+                                       got = append(got, ti.piHeap[0].curBlock)
                                }
-                               got = append(got, ti.piHeap[0].curBlock)
-                       }
 
-                       if !errors.Is(ti.Error(), tt.wantErr) {
-                               t.Errorf("Unexpected error: got %v, want %v", 
ti.err, tt.wantErr)
-                       }
+                               if !errors.Is(ti.Error(), tt.wantErr) {
+                                       t.Errorf("Unexpected error: got %v, 
want %v", ti.err, tt.wantErr)
+                               }
 
-                       if diff := cmp.Diff(got, tt.want,
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
-                               cmpopts.IgnoreFields(blockMetadata{}, "field"),
-                               cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
-                               cmp.AllowUnexported(blockMetadata{}),
-                       ); diff != "" {
-                               t.Errorf("Unexpected blockMetadata (-got 
+want):\n%s", diff)
+                               if diff := cmp.Diff(got, tt.want,
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"timestamps"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"field"),
+                                       cmpopts.IgnoreFields(blockMetadata{}, 
"tagFamilies"),
+                                       cmp.AllowUnexported(blockMetadata{}),
+                               ); diff != "" {
+                                       t.Errorf("Unexpected blockMetadata 
(-got +want):\n%s", diff)
+                               }
+                               return s.epoch
                        }
+
+                       t.Run("memory snapshot", func(t *testing.T) {
+                               tst := &tsTable{
+                                       loopCloser:    run.NewCloser(2),
+                                       introductions: make(chan *introduction),
+                               }
+                               flushCh := make(chan *flusherIntroduction)
+                               introducerWatcher := make(watcher.Channel, 1)
+                               go tst.introducerLoop(flushCh, 
introducerWatcher, 1)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               verify(tst)
+                       })
+
+                       t.Run("file snapshot", func(t *testing.T) {
+                               tmpPath, defFn := test.Space(require.New(t))
+                               fileSystem := fs.NewLocalFileSystem()
+                               defer defFn()
+
+                               tst, err := newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               for _, dps := range tt.dpsList {
+                                       tst.mustAddDataPoints(dps)
+                                       time.Sleep(100 * time.Millisecond)
+                               }
+                               epoch := verify(tst)
+
+                               // reopen the table
+                               tst, err = newTSTable(fileSystem, tmpPath, 
common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{})
+                               require.NoError(t, err)
+                               assert.Equal(t, epoch, verify(tst))
+                       })
                })
        }
 }
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index 6db456a5..0b0d3040 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -22,7 +22,7 @@ import "fmt"
 
 const (
        isExistError = iota
-       isNotExistError
+       IsNotExistError
        permissionError
        openError
        deleteError
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 563a2aa2..006d7245 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -60,7 +60,7 @@ func readErrorHandle(operation string, err error, name 
string, size int) (int, e
                return size, err
        case os.IsNotExist(err):
                return size, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("%s failed, file is not exist, 
file name: %s, error message: %s", operation, name, err),
                }
        case os.IsPermission(err):
@@ -169,7 +169,7 @@ func (fs *localFileSystem) OpenFile(name string) (File, 
error) {
                }, nil
        case os.IsNotExist(err):
                return nil, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: 
%s,error message: %s", name, err),
                }
        case os.IsPermission(err):
@@ -228,7 +228,7 @@ func (fs *localFileSystem) Read(name string) ([]byte, 
error) {
                return data, nil
        case os.IsNotExist(err):
                return data, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", name, err),
                }
        case os.IsPermission(err):
@@ -252,7 +252,7 @@ func (fs *localFileSystem) DeleteFile(name string) error {
                return nil
        case os.IsNotExist(err):
                return &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", name, err),
                }
        case os.IsPermission(err):
@@ -289,7 +289,7 @@ func (file *LocalFile) Write(buffer []byte) (int, error) {
                return size, nil
        case os.IsNotExist(err):
                return size, &FileSystemError{
-                       Code:    isNotExistError,
+                       Code:    IsNotExistError,
                        Message: fmt.Sprintf("File is not exist, file name: %s, 
error message: %s", file.file.Name(), err),
                }
        case os.IsPermission(err):
@@ -317,7 +317,7 @@ func (file *LocalFile) Writev(iov *[][]byte) (int, error) {
                        size += wsize
                case os.IsNotExist(err):
                        return size, &FileSystemError{
-                               Code:    isNotExistError,
+                               Code:    IsNotExistError,
                                Message: fmt.Sprintf("File is not exist, file 
name: %s, error message: %s", file.file.Name(), err),
                        }
                case os.IsPermission(err):
@@ -374,7 +374,7 @@ func (file *LocalFile) Size() (int64, error) {
        if err != nil {
                if os.IsNotExist(err) {
                        return -1, &FileSystemError{
-                               Code:    isNotExistError,
+                               Code:    IsNotExistError,
                                Message: fmt.Sprintf("File is not exist, file 
name: %s, error message: %s", file.file.Name(), err),
                        }
                } else if os.IsPermission(err) {
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
index 6d5d63d4..6ad9fb5d 100644
--- a/pkg/watcher/watcher.go
+++ b/pkg/watcher/watcher.go
@@ -42,7 +42,7 @@ func (e *Epochs) Add(epoch *Epoch) {
 func (e *Epochs) Notify(epoch uint64) {
        var remained Epochs
        for _, ep := range *e {
-               if ep.epoch <= epoch {
+               if ep.epoch < epoch {
                        close(ep.ch)
                        continue
                }

Reply via email to