This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch measure-flusher in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 5e578bc5f0424c37141afdacad627b2763ca0a5d Author: Gao Hongtao <[email protected]> AuthorDate: Mon Dec 25 06:50:03 2023 +0800 Fix measure UT Signed-off-by: Gao Hongtao <[email protected]> --- banyand/measure/flusher.go | 14 +++- banyand/measure/gc.go | 27 +++++-- banyand/measure/introducer.go | 6 +- banyand/measure/part.go | 10 ++- banyand/measure/query_test.go | 154 +++++++++++++++++++++++++--------------- banyand/measure/tstable.go | 11 +-- banyand/measure/tstable_test.go | 119 +++++++++++++++++++++---------- pkg/fs/error.go | 2 +- pkg/fs/local_file_system.go | 14 ++-- pkg/watcher/watcher.go | 2 +- 10 files changed, 240 insertions(+), 119 deletions(-) diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go index 2609f09e..09346d1d 100644 --- a/banyand/measure/flusher.go +++ b/banyand/measure/flusher.go @@ -24,6 +24,9 @@ import ( func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, introducerWatcher watcher.Channel, epoch uint64) { defer tst.loopCloser.Done() epochWatcher := introducerWatcher.Add(0, tst.loopCloser.CloseNotify()) + if epochWatcher == nil { + return + } for { select { @@ -43,11 +46,15 @@ func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, introducerWat return } tst.persistSnapshot(curSnapshot) + curSnapshot.decRef() if tst.currentEpoch() != epoch { continue } } epochWatcher = introducerWatcher.Add(epoch, tst.loopCloser.CloseNotify()) + if epochWatcher == nil { + return + } tst.gc.clean() } } @@ -55,6 +62,7 @@ func (tst *tsTable) flusherLoop(flushCh chan *flusherIntroduction, introducerWat func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) uint64 { ind := generateFlusherIntroduction() + defer releaseFlusherIntroduction(ind) for _, pw := range snapshot.parts { if pw.mp == nil || pw.mp.partMetadata.TotalCount < 1 { continue @@ -62,8 +70,12 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) partPath := partPath(tst.root, pw.ID()) pw.mp.mustFlush(tst.fileSystem, partPath) newPW := newPartWrapper(nil, mustOpenFilePart(partPath, tst.fileSystem), tst.fileSystem) + newPW.p.partMetadata.ID = pw.ID() ind.flushed[newPW.ID()] = newPW } + if len(ind.flushed) < 1 { + return snapshot.epoch + } ind.applied = make(chan struct{}) select { case flushCh <- ind: @@ -81,7 +93,7 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) func (tst *tsTable) persistSnapshot(snapshot *snapshot) { var partNames []string for i := range snapshot.parts { - partNames = append(partNames, snapshotName(snapshot.parts[i].ID())) + partNames = append(partNames, partName(snapshot.parts[i].ID())) } tst.mustWriteSnapshot(snapshot.epoch, partNames) tst.gc.registerSnapshot(snapshot.epoch) diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go index dbdde01d..5bab2bb5 100644 --- a/banyand/measure/gc.go +++ b/banyand/measure/gc.go @@ -20,6 +20,8 @@ package measure import ( "path" "sort" + + "github.com/apache/skywalking-banyandb/pkg/fs" ) type garbageCleaner struct { @@ -49,17 +51,34 @@ func (g *garbageCleaner) cleanSnapshots() { if len(g.snapshots) < 2 { return } - sort.Slice(g.snapshots, func(i, j int) bool { + if !sort.SliceIsSorted(g.snapshots, func(i, j int) bool { return g.snapshots[i] < g.snapshots[j] - }) + }) { + sort.Slice(g.snapshots, func(i, j int) bool { + return g.snapshots[i] < g.snapshots[j] + }) + } // keep the latest snapshot + var remainingSnapshots []uint64 for i := 0; i < len(g.snapshots)-1; i++ { filePath := path.Join(g.parent.root, snapshotName(g.snapshots[i])) if err := g.parent.fileSystem.DeleteFile(filePath); err != nil { - g.parent.l.Warn().Err(err).Str("path", filePath).Msg("failed to delete snapshot. Please check manually") + if fse, ok := err.(*fs.FileSystemError); ok { + if fse.Code != fs.IsNotExistError { + g.parent.l.Warn().Err(err).Str("path", filePath).Msg("failed to delete snapshot, will retry in next round. Please check manually") + remainingSnapshots = append(remainingSnapshots, g.snapshots[i]) + } + } } } - g.snapshots = g.snapshots[len(g.snapshots)-1:] + if remainingSnapshots == nil { + g.snapshots = g.snapshots[len(g.snapshots)-1:] + return + } + remained := g.snapshots[len(g.snapshots)-1] + g.snapshots = g.snapshots[:0] + g.snapshots = append(g.snapshots, remainingSnapshots...) + g.snapshots = append(g.snapshots, remained) } func (g garbageCleaner) cleanParts() { diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index 3c124d2a..17a4edef 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -105,7 +105,6 @@ func (tst *tsTable) introduceMemPart(nextIntroduction *introduction, epoch uint6 } else { cur = new(snapshot) } - defer releaseIntroduction(nextIntroduction) next := nextIntroduction.memPart nextSnp := cur.copyAllTo(epoch) @@ -122,10 +121,7 @@ func (tst *tsTable) introduceFlushed(nextIntroduction *flusherIntroduction, epoc if cur == nil { tst.l.Panic().Msg("current snapshot is nil") } - defer func() { - cur.decRef() - releaseFlusherIntroduction(nextIntroduction) - }() + defer cur.decRef() nextSnp := cur.merge(epoch, nextIntroduction.flushed) tst.replaceSnapshot(&nextSnp) if nextIntroduction.applied != nil { diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 11e635b8..f2ab04f2 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -66,6 +66,10 @@ func (p *part) close() { } } +func (p *part) String() string { + return fmt.Sprintf("part %d", p.partMetadata.ID) +} + func openMemPart(mp *memPart) *part { var p part p.partMetadata = mp.partMetadata @@ -302,5 +306,9 @@ func removeExt(nameWithExt, ext string) string { } func partPath(root string, epoch uint64) string { - return filepath.Join(root, fmt.Sprintf("%016x", epoch)) + return filepath.Join(root, partName(epoch)) +} + +func partName(epoch uint64) string { + return fmt.Sprintf("%016x", epoch) } diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 44ad9c52..4226dfc7 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -24,12 +24,19 @@ import ( "time" "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/protobuf/testing/protocmp" "github.com/apache/skywalking-banyandb/api/common" modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + "github.com/apache/skywalking-banyandb/pkg/watcher" ) func TestQueryResult(t *testing.T) { @@ -364,70 +371,101 @@ func TestQueryResult(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - // Initialize a tstIter object. - tst := &tsTable{} - defer tst.Close() - for _, dps := range tt.dpsList { - tst.mustAddDataPoints(dps) - time.Sleep(100 * time.Millisecond) - } - queryOpts := queryOptions{ - minTimestamp: tt.minTimestamp, - maxTimestamp: tt.maxTimestamp, - } - s := tst.currentSnapshot() - require.NotNil(t, s) - defer s.decRef() - pp, n := s.getParts(nil, queryOpts) - require.Equal(t, len(tt.dpsList), n) - sids := make([]common.SeriesID, len(tt.sids)) - copy(sids, tt.sids) - sort.Slice(sids, func(i, j int) bool { - return sids[i] < tt.sids[j] - }) - ti := &tstIter{} - ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) + verify := func(tst *tsTable) uint64 { + defer tst.Close() + queryOpts := queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + } + s := tst.currentSnapshot() + require.NotNil(t, s) + defer s.decRef() + pp, n := s.getParts(nil, queryOpts) + require.Equal(t, len(tt.dpsList), n, "parts: %+v", pp) + sids := make([]common.SeriesID, len(tt.sids)) + copy(sids, tt.sids) + sort.Slice(sids, func(i, j int) bool { + return sids[i] < tt.sids[j] + }) + ti := &tstIter{} + ti.init(pp, sids, tt.minTimestamp, tt.maxTimestamp) - var result queryResult - for ti.nextBlock() { - bc := generateBlockCursor() - p := ti.piHeap[0] - opts := queryOpts - opts.TagProjection = tagProjections[int(p.curBlock.seriesID)] - opts.FieldProjection = fieldProjections[int(p.curBlock.seriesID)] - bc.init(p.p, p.curBlock, opts) - result.data = append(result.data, bc) - } - defer result.Release() - if tt.orderBySeries { - result.sidToIndex = make(map[common.SeriesID]int) - for i, si := range tt.sids { - result.sidToIndex[si] = i + var result queryResult + for ti.nextBlock() { + bc := generateBlockCursor() + p := ti.piHeap[0] + opts := queryOpts + opts.TagProjection = tagProjections[int(p.curBlock.seriesID)] + opts.FieldProjection = fieldProjections[int(p.curBlock.seriesID)] + bc.init(p.p, p.curBlock, opts) + result.data = append(result.data, bc) } - } else { - result.orderByTS = true - result.ascTS = tt.ascTS - } - var got []pbv1.Result - for { - r := result.Pull() - if r == nil { - break + defer result.Release() + if tt.orderBySeries { + result.sidToIndex = make(map[common.SeriesID]int) + for i, si := range tt.sids { + result.sidToIndex[si] = i + } + } else { + result.orderByTS = true + result.ascTS = tt.ascTS + } + var got []pbv1.Result + for { + r := result.Pull() + if r == nil { + break + } + sort.Slice(r.TagFamilies, func(i, j int) bool { + return r.TagFamilies[i].Name < r.TagFamilies[j].Name + }) + got = append(got, *r) } - sort.Slice(r.TagFamilies, func(i, j int) bool { - return r.TagFamilies[i].Name < r.TagFamilies[j].Name - }) - got = append(got, *r) - } - if !errors.Is(ti.Error(), tt.wantErr) { - t.Errorf("Unexpected error: got %v, want %v", ti.err, tt.wantErr) - } + if !errors.Is(ti.Error(), tt.wantErr) { + t.Errorf("Unexpected error: got %v, want %v", ti.err, tt.wantErr) + } - if diff := cmp.Diff(got, tt.want, - protocmp.IgnoreUnknown(), protocmp.Transform()); diff != "" { - t.Errorf("Unexpected []pbv1.Result (-got +want):\n%s", diff) + if diff := cmp.Diff(got, tt.want, + protocmp.IgnoreUnknown(), protocmp.Transform()); diff != "" { + t.Errorf("Unexpected []pbv1.Result (-got +want):\n%s", diff) + } + return s.epoch } + + t.Run("memory snapshot", func(t *testing.T) { + tst := &tsTable{ + loopCloser: run.NewCloser(2), + introductions: make(chan *introduction), + } + flushCh := make(chan *flusherIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, introducerWatcher, 1) + for _, dps := range tt.dpsList { + tst.mustAddDataPoints(dps) + time.Sleep(100 * time.Millisecond) + } + _ = verify(tst) + }) + + t.Run("file snapshot", func(t *testing.T) { + // Initialize a tstIter object. + tmpPath, defFn := test.Space(require.New(t)) + fileSystem := fs.NewLocalFileSystem() + defer defFn() + tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{}) + require.NoError(t, err) + for _, dps := range tt.dpsList { + tst.mustAddDataPoints(dps) + time.Sleep(100 * time.Millisecond) + } + epoch := verify(tst) + + // reopen the table + tst, err = newTSTable(fileSystem, tmpPath, common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{}) + require.NoError(t, err) + assert.Equal(t, epoch, verify(tst)) + }) }) } } diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index a51d51e8..3e5b7e2a 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -114,7 +114,9 @@ type tsTable struct { func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { parts := tst.mustReadSnapshot(epoch) - var snp snapshot + snp := snapshot{ + epoch: epoch, + } for _, partName := range loadedParts { var find bool for j := range parts { @@ -127,6 +129,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { tst.gc.submitParts(partName) } p := mustOpenFilePart(partPath(tst.root, partName), tst.fileSystem) + p.partMetadata.ID = partName snp.parts = append(snp.parts, newPartWrapper(nil, p, tst.fileSystem)) } tst.gc.clean() @@ -196,13 +199,12 @@ func (tst *tsTable) mustReadSnapshot(snapshot uint64) []uint64 { } func (tst *tsTable) Close() error { - tst.Lock() - defer tst.Unlock() if tst.loopCloser != nil { - tst.loopCloser.Done() tst.loopCloser.CloseThenWait() } + tst.RLock() + defer tst.RUnlock() if tst.snapshot != nil { tst.snapshot.decRef() } @@ -219,6 +221,7 @@ func (tst *tsTable) mustAddDataPoints(dps *dataPoints) { p := openMemPart(mp) ind := generateIntroduction() + defer releaseIntroduction(ind) ind.applied = make(chan struct{}) ind.memPart = newPartWrapper(mp, p, tst.fileSystem) diff --git a/banyand/measure/tstable_test.go b/banyand/measure/tstable_test.go index e4599596..ca231e4f 100644 --- a/banyand/measure/tstable_test.go +++ b/banyand/measure/tstable_test.go @@ -29,7 +29,13 @@ import ( "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/pkg/convert" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/run" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/timestamp" + "github.com/apache/skywalking-banyandb/pkg/watcher" ) func Test_tsTable_mustAddDataPoints(t *testing.T) { @@ -88,7 +94,13 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tst := &tsTable{} + tst := &tsTable{ + loopCloser: run.NewCloser(2), + introductions: make(chan *introduction), + } + flushCh := make(chan *flusherIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, introducerWatcher, 1) defer tst.Close() for _, dps := range tt.dpsList { tst.mustAddDataPoints(dps) @@ -102,10 +114,11 @@ func Test_tsTable_mustAddDataPoints(t *testing.T) { assert.Equal(t, tt.want, len(s.parts)) var lastVersion uint64 for _, pw := range s.parts { + require.Greater(t, pw.ID(), uint64(0)) if lastVersion == 0 { - lastVersion = pw.p.partMetadata.ID + lastVersion = pw.ID() } else { - require.Less(t, lastVersion, pw.p.partMetadata.ID) + require.Less(t, lastVersion, pw.ID()) } } }) @@ -174,44 +187,76 @@ func Test_tstIter(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - tst := &tsTable{} - defer tst.Close() - for _, dps := range tt.dpsList { - tst.mustAddDataPoints(dps) - time.Sleep(100 * time.Millisecond) - } - s := tst.currentSnapshot() - if s == nil { - s = new(snapshot) - } - defer s.decRef() - pp, n := s.getParts(nil, queryOptions{ - minTimestamp: tt.minTimestamp, - maxTimestamp: tt.maxTimestamp, - }) - require.Equal(t, len(s.parts), n) - ti := &tstIter{} - ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) - var got []blockMetadata - for ti.nextBlock() { - if ti.piHeap[0].curBlock.seriesID == 0 { - t.Errorf("Expected curBlock to be initialized, but it was nil") + verify := func(tst *tsTable) uint64 { + defer tst.Close() + s := tst.currentSnapshot() + if s == nil { + s = new(snapshot) + } + defer s.decRef() + pp, n := s.getParts(nil, queryOptions{ + minTimestamp: tt.minTimestamp, + maxTimestamp: tt.maxTimestamp, + }) + require.Equal(t, len(s.parts), n) + ti := &tstIter{} + ti.init(pp, tt.sids, tt.minTimestamp, tt.maxTimestamp) + var got []blockMetadata + for ti.nextBlock() { + if ti.piHeap[0].curBlock.seriesID == 0 { + t.Errorf("Expected curBlock to be initialized, but it was nil") + } + got = append(got, ti.piHeap[0].curBlock) } - got = append(got, ti.piHeap[0].curBlock) - } - if !errors.Is(ti.Error(), tt.wantErr) { - t.Errorf("Unexpected error: got %v, want %v", ti.err, tt.wantErr) - } + if !errors.Is(ti.Error(), tt.wantErr) { + t.Errorf("Unexpected error: got %v, want %v", ti.err, tt.wantErr) + } - if diff := cmp.Diff(got, tt.want, - cmpopts.IgnoreFields(blockMetadata{}, "timestamps"), - cmpopts.IgnoreFields(blockMetadata{}, "field"), - cmpopts.IgnoreFields(blockMetadata{}, "tagFamilies"), - cmp.AllowUnexported(blockMetadata{}), - ); diff != "" { - t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) + if diff := cmp.Diff(got, tt.want, + cmpopts.IgnoreFields(blockMetadata{}, "timestamps"), + cmpopts.IgnoreFields(blockMetadata{}, "field"), + cmpopts.IgnoreFields(blockMetadata{}, "tagFamilies"), + cmp.AllowUnexported(blockMetadata{}), + ); diff != "" { + t.Errorf("Unexpected blockMetadata (-got +want):\n%s", diff) + } + return s.epoch } + + t.Run("memory snapshot", func(t *testing.T) { + tst := &tsTable{ + loopCloser: run.NewCloser(2), + introductions: make(chan *introduction), + } + flushCh := make(chan *flusherIntroduction) + introducerWatcher := make(watcher.Channel, 1) + go tst.introducerLoop(flushCh, introducerWatcher, 1) + for _, dps := range tt.dpsList { + tst.mustAddDataPoints(dps) + time.Sleep(100 * time.Millisecond) + } + verify(tst) + }) + + t.Run("file snapshot", func(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + fileSystem := fs.NewLocalFileSystem() + defer defFn() + + tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{}) + require.NoError(t, err) + for _, dps := range tt.dpsList { + tst.mustAddDataPoints(dps) + time.Sleep(100 * time.Millisecond) + } + epoch := verify(tst) + + // reopen the table + tst, err = newTSTable(fileSystem, tmpPath, common.Position{}, logger.GetLogger("test"), timestamp.TimeRange{}) + require.NoError(t, err) + assert.Equal(t, epoch, verify(tst)) + }) }) } } diff --git a/pkg/fs/error.go b/pkg/fs/error.go index 6db456a5..0b0d3040 100644 --- a/pkg/fs/error.go +++ b/pkg/fs/error.go @@ -22,7 +22,7 @@ import "fmt" const ( isExistError = iota - isNotExistError + IsNotExistError permissionError openError deleteError diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go index 563a2aa2..006d7245 100644 --- a/pkg/fs/local_file_system.go +++ b/pkg/fs/local_file_system.go @@ -60,7 +60,7 @@ func readErrorHandle(operation string, err error, name string, size int) (int, e return size, err case os.IsNotExist(err): return size, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("%s failed, file is not exist, file name: %s, error message: %s", operation, name, err), } case os.IsPermission(err): @@ -169,7 +169,7 @@ func (fs *localFileSystem) OpenFile(name string) (File, error) { }, nil case os.IsNotExist(err): return nil, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s,error message: %s", name, err), } case os.IsPermission(err): @@ -228,7 +228,7 @@ func (fs *localFileSystem) Read(name string) ([]byte, error) { return data, nil case os.IsNotExist(err): return data, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", name, err), } case os.IsPermission(err): @@ -252,7 +252,7 @@ func (fs *localFileSystem) DeleteFile(name string) error { return nil case os.IsNotExist(err): return &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", name, err), } case os.IsPermission(err): @@ -289,7 +289,7 @@ func (file *LocalFile) Write(buffer []byte) (int, error) { return size, nil case os.IsNotExist(err): return size, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), } case os.IsPermission(err): @@ -317,7 +317,7 @@ func (file *LocalFile) Writev(iov *[][]byte) (int, error) { size += wsize case os.IsNotExist(err): return size, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), } case os.IsPermission(err): @@ -374,7 +374,7 @@ func (file *LocalFile) Size() (int64, error) { if err != nil { if os.IsNotExist(err) { return -1, &FileSystemError{ - Code: isNotExistError, + Code: IsNotExistError, Message: fmt.Sprintf("File is not exist, file name: %s, error message: %s", file.file.Name(), err), } } else if os.IsPermission(err) { diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index 6d5d63d4..6ad9fb5d 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -42,7 +42,7 @@ func (e *Epochs) Add(epoch *Epoch) { func (e *Epochs) Notify(epoch uint64) { var remained Epochs for _, ep := range *e { - if ep.epoch <= epoch { + if ep.epoch < epoch { close(ep.ch) continue }
