This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push: new d3b450c9 Move part deletion to the close phase (#405) d3b450c9 is described below commit d3b450c935beba39df5dc06596de61e64dee7ead Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Mar 8 08:00:09 2024 +0800 Move part deletion to the close phase (#405) --- banyand/measure/gc.go | 34 ++-------------------------------- banyand/measure/introducer.go | 2 +- banyand/measure/part.go | 14 ++++++++++---- banyand/measure/query_test.go | 11 ++++++----- banyand/measure/snapshot.go | 2 ++ banyand/measure/tstable.go | 9 ++++----- banyand/stream/gc.go | 34 ++-------------------------------- banyand/stream/introducer.go | 2 +- banyand/stream/merger_policy.go | 4 ++++ banyand/stream/part.go | 14 ++++++++++---- banyand/stream/part_metadata.go | 15 +++++++++++++++ banyand/stream/query_test.go | 5 +++-- banyand/stream/snapshot.go | 2 ++ banyand/stream/tstable.go | 25 +++++++++++++++++++++---- pkg/index/inverted/inverted.go | 2 +- pkg/index/inverted/inverted_series.go | 9 +++++++++ pkg/index/inverted/inverted_test.go | 2 +- 17 files changed, 94 insertions(+), 92 deletions(-) diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go index 039d5295..a34e3731 100644 --- a/banyand/measure/gc.go +++ b/banyand/measure/gc.go @@ -23,48 +23,31 @@ import ( type garbageCleaner struct { parent *tsTable - liveParts map[uint64]map[uint64]struct{} - knownPartFiles map[uint64]struct{} deletableEpochs []uint64 liveEpoch uint64 } func (g *garbageCleaner) init(parent *tsTable) { g.parent = parent - g.liveParts = make(map[uint64]map[uint64]struct{}) - g.knownPartFiles = make(map[uint64]struct{}) } func (g *garbageCleaner) registerSnapshot(snapshot *snapshot) { - parts := make(map[uint64]struct{}) - for _, part := range snapshot.parts { - parts[part.ID()] = struct{}{} - g.knownPartFiles[part.ID()] = struct{}{} - } - g.liveParts[snapshot.epoch] = parts - if g.liveEpoch > 0 { g.deletableEpochs = append(g.deletableEpochs, g.liveEpoch) } g.liveEpoch = snapshot.epoch } -func (g *garbageCleaner) submitParts(partID uint64) { - g.knownPartFiles[partID] = struct{}{} +func (g *garbageCleaner) removePart(partID uint64) { + g.parent.fileSystem.MustRMAll(partPath(g.parent.root, partID)) } func (g *garbageCleaner) clean() { - g.cleanSnapshots() - g.cleanParts() -} - -func (g *garbageCleaner) cleanSnapshots() { var remainingEpochs []uint64 for _, deletableEpoch := range g.deletableEpochs { path := filepath.Join(g.parent.root, snapshotName(deletableEpoch)) err := g.parent.fileSystem.DeleteFile(path) if err == nil { - delete(g.liveParts, deletableEpoch) continue } g.parent.l.Warn().Err(err).Msgf("cannot delete snapshot file: %s", path) @@ -72,16 +55,3 @@ func (g *garbageCleaner) cleanSnapshots() { } g.deletableEpochs = remainingEpochs } - -func (g garbageCleaner) cleanParts() { -OUTER: - for partID := range g.knownPartFiles { - for _, partInSnapshot := range g.liveParts { - if _, ok := partInSnapshot[partID]; ok { - continue OUTER - } - } - g.parent.fileSystem.MustRMAll(partPath(g.parent.root, partID)) - delete(g.knownPartFiles, partID) - } -} diff --git a/banyand/measure/introducer.go b/banyand/measure/introducer.go index e371d246..8ba0118a 100644 --- a/banyand/measure/introducer.go +++ b/banyand/measure/introducer.go @@ -120,7 +120,7 @@ func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, mergeCh ch epoch++ case next := <-flushCh: tst.introduceFlushed(next, epoch) - tst.gc.cleanSnapshots() + tst.gc.clean() epoch++ case next := <-mergeCh: tst.introduceMerged(next, epoch) diff --git a/banyand/measure/part.go b/banyand/measure/part.go index 4e1acb0c..c6019539 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -43,12 +43,13 @@ const ( ) type part struct { - path string primary fs.Reader timestamps fs.Reader fieldValues fs.Reader + fileSystem fs.FileSystem tagFamilyMetadata map[string]fs.Reader tagFamilies map[string]fs.Reader + path string primaryBlockMetadata []primaryBlockMetadata partMetadata partMetadata } @@ -219,9 +220,10 @@ func releaseMemPart(mp *memPart) { var memPartPool sync.Pool type partWrapper struct { - mp *memPart - p *part - ref int32 + mp *memPart + p *part + ref int32 + removable atomic.Bool } func newPartWrapper(mp *memPart, p *part) *partWrapper { @@ -244,6 +246,9 @@ func (pw *partWrapper) decRef() { return } pw.p.close() + if pw.removable.Load() && pw.p.fileSystem != nil { + pw.p.fileSystem.MustRMAll(pw.p.path) + } } func (pw *partWrapper) ID() uint64 { @@ -254,6 +259,7 @@ func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { var p part partPath := partPath(root, id) p.path = partPath + p.fileSystem = fileSystem p.partMetadata.mustReadMetadata(fileSystem, partPath) p.partMetadata.ID = id diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 7247057e..e88ed586 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -634,7 +634,7 @@ func TestQueryResult(t *testing.T) { fileSystem := fs.NewLocalFileSystem() defer defFn() tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}) + logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting()}) require.NoError(t, err) for _, dps := range tt.dpsList { tst.mustAddDataPoints(dps) @@ -648,13 +648,14 @@ func TestQueryResult(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } - if len(snp.parts) == len(tt.dpsList) { + if snp.creator == snapshotCreatorMemPart { snp.decRef() - tst.Close() - break + time.Sleep(100 * time.Millisecond) + continue } snp.decRef() - time.Sleep(100 * time.Millisecond) + tst.Close() + break } } diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go index 93a1fdeb..041fbfff 100644 --- a/banyand/measure/snapshot.go +++ b/banyand/measure/snapshot.go @@ -114,7 +114,9 @@ func (s *snapshot) remove(nextEpoch uint64, merged map[uint64]struct{}) snapshot if _, ok := merged[s.parts[i].ID()]; !ok { s.parts[i].incRef() result.parts = append(result.parts, s.parts[i]) + continue } + s.parts[i].removable.Store(true) } return result } diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go index 329c469b..96bcfaef 100644 --- a/banyand/measure/tstable.go +++ b/banyand/measure/tstable.go @@ -95,9 +95,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, } for i := range needToDelete { l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file") - if err := fileSystem.DeleteFile(filepath.Join(rootPath, needToDelete[i])); err != nil { - l.Warn().Err(err).Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("failed to delete part. Please check manually") - } + fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i])) } if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { t := &tst @@ -143,12 +141,13 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { } } if !find { - tst.gc.submitParts(id) + tst.gc.removePart(id) + continue } err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) if err != nil { tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") - tst.gc.submitParts(id) + tst.gc.removePart(id) needToPersist = true continue } diff --git a/banyand/stream/gc.go b/banyand/stream/gc.go index 75476fc2..220495ae 100644 --- a/banyand/stream/gc.go +++ b/banyand/stream/gc.go @@ -23,48 +23,31 @@ import ( type garbageCleaner struct { parent *tsTable - liveParts map[uint64]map[uint64]struct{} - knownPartFiles map[uint64]struct{} deletableEpochs []uint64 liveEpoch uint64 } func (g *garbageCleaner) init(parent *tsTable) { g.parent = parent - g.liveParts = make(map[uint64]map[uint64]struct{}) - g.knownPartFiles = make(map[uint64]struct{}) } func (g *garbageCleaner) registerSnapshot(snapshot *snapshot) { - parts := make(map[uint64]struct{}) - for _, part := range snapshot.parts { - parts[part.ID()] = struct{}{} - g.knownPartFiles[part.ID()] = struct{}{} - } - g.liveParts[snapshot.epoch] = parts - if g.liveEpoch > 0 { g.deletableEpochs = append(g.deletableEpochs, g.liveEpoch) } g.liveEpoch = snapshot.epoch } -func (g *garbageCleaner) submitParts(partID uint64) { - g.knownPartFiles[partID] = struct{}{} +func (g *garbageCleaner) removePart(partID uint64) { + g.parent.fileSystem.MustRMAll(partPath(g.parent.root, partID)) } func (g *garbageCleaner) clean() { - g.cleanSnapshots() - g.cleanParts() -} - -func (g *garbageCleaner) cleanSnapshots() { var remainingEpochs []uint64 for _, deletableEpoch := range g.deletableEpochs { path := filepath.Join(g.parent.root, snapshotName(deletableEpoch)) err := g.parent.fileSystem.DeleteFile(path) if err == nil { - delete(g.liveParts, deletableEpoch) continue } g.parent.l.Warn().Err(err).Msgf("cannot delete snapshot file: %s", path) @@ -72,16 +55,3 @@ func (g *garbageCleaner) cleanSnapshots() { } g.deletableEpochs = remainingEpochs } - -func (g garbageCleaner) cleanParts() { -OUTER: - for partID := range g.knownPartFiles { - for _, partInSnapshot := range g.liveParts { - if _, ok := partInSnapshot[partID]; ok { - continue OUTER - } - } - g.parent.fileSystem.MustRMAll(partPath(g.parent.root, partID)) - delete(g.knownPartFiles, partID) - } -} diff --git a/banyand/stream/introducer.go b/banyand/stream/introducer.go index 6365beb3..cd215263 100644 --- a/banyand/stream/introducer.go +++ b/banyand/stream/introducer.go @@ -120,7 +120,7 @@ func (tst *tsTable) introducerLoop(flushCh chan *flusherIntroduction, mergeCh ch epoch++ case next := <-flushCh: tst.introduceFlushed(next, epoch) - tst.gc.cleanSnapshots() + tst.gc.clean() epoch++ case next := <-mergeCh: tst.introduceMerged(next, epoch) diff --git a/banyand/stream/merger_policy.go b/banyand/stream/merger_policy.go index 4a284a4b..254873de 100644 --- a/banyand/stream/merger_policy.go +++ b/banyand/stream/merger_policy.go @@ -39,6 +39,10 @@ func newDefaultMergePolicyForTesting() *mergePolicy { return newMergePolicy(4, 1.7, math.MaxUint64) } +func newDisabledMergePolicyForTesting() *mergePolicy { + return newMergePolicy(0, 0, 0) +} + // NewMergePolicy creates a MergePolicy with given parameters. func newMergePolicy(maxParts int, minMergeMul float64, maxFanOutSize uint64) *mergePolicy { return &mergePolicy{ diff --git a/banyand/stream/part.go b/banyand/stream/part.go index bdca61a1..95a12297 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -109,12 +109,13 @@ type element struct { } type part struct { - path string primary fs.Reader timestamps fs.Reader elementIDs fs.Reader + fileSystem fs.FileSystem tagFamilyMetadata map[string]fs.Reader tagFamilies map[string]fs.Reader + path string primaryBlockMetadata []primaryBlockMetadata partMetadata partMetadata } @@ -382,9 +383,10 @@ func releaseMemPart(mp *memPart) { var memPartPool sync.Pool type partWrapper struct { - mp *memPart - p *part - ref int32 + mp *memPart + p *part + ref int32 + removable atomic.Bool } func newPartWrapper(mp *memPart, p *part) *partWrapper { @@ -407,6 +409,9 @@ func (pw *partWrapper) decRef() { return } pw.p.close() + if pw.removable.Load() && pw.p.fileSystem != nil { + pw.p.fileSystem.MustRMAll(pw.p.path) + } } func (pw *partWrapper) ID() uint64 { @@ -417,6 +422,7 @@ func mustOpenFilePart(id uint64, root string, fileSystem fs.FileSystem) *part { var p part partPath := partPath(root, id) p.path = partPath + p.fileSystem = fileSystem p.partMetadata.mustReadMetadata(fileSystem, partPath) p.partMetadata.ID = id diff --git a/banyand/stream/part_metadata.go b/banyand/stream/part_metadata.go index def81a19..075d2d00 100644 --- a/banyand/stream/part_metadata.go +++ b/banyand/stream/part_metadata.go @@ -21,6 +21,8 @@ import ( "encoding/json" "path/filepath" + "github.com/pkg/errors" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" ) @@ -45,6 +47,19 @@ func (pm *partMetadata) reset() { pm.ID = 0 } +func validatePartMetadata(fileSystem fs.FileSystem, partPath string) error { + metadataPath := filepath.Join(partPath, metadataFilename) + metadata, err := fileSystem.Read(metadataPath) + if err != nil { + return errors.WithMessage(err, "cannot read metadata.json") + } + var pm partMetadata + if err := json.Unmarshal(metadata, &pm); err != nil { + return errors.WithMessage(err, "cannot parse metadata.json") + } + return nil +} + func (pm *partMetadata) mustReadMetadata(fileSystem fs.FileSystem, partPath string) { pm.reset() diff --git a/banyand/stream/query_test.go b/banyand/stream/query_test.go index 562bb9a9..4d4d2bc5 100644 --- a/banyand/stream/query_test.go +++ b/banyand/stream/query_test.go @@ -417,7 +417,8 @@ func TestQueryResult(t *testing.T) { fileSystem := fs.NewLocalFileSystem() defer defFn() tst, err := newTSTable(fileSystem, tmpPath, common.Position{}, - logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: defaultFlushTimeout, mergePolicy: newDefaultMergePolicyForTesting()}) + // Since Stream deduplicate data in merging process, we need to disable the merging in the test. + logger.GetLogger("test"), timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDisabledMergePolicyForTesting()}) require.NoError(t, err) for _, es := range tt.esList { tst.mustAddElements(es) @@ -431,7 +432,7 @@ func TestQueryResult(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } - if len(snp.parts) == len(tt.esList) { + if snp.creator != snapshotCreatorMemPart && len(snp.parts) == len(tt.esList) { snp.decRef() tst.Close() break diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go index 266e089e..9aa274fb 100644 --- a/banyand/stream/snapshot.go +++ b/banyand/stream/snapshot.go @@ -114,7 +114,9 @@ func (s *snapshot) remove(nextEpoch uint64, merged map[uint64]struct{}) snapshot if _, ok := merged[s.parts[i].ID()]; !ok { s.parts[i].incRef() result.parts = append(result.parts, s.parts[i]) + continue } + s.parts[i].removable.Store(true) } return result } diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go index 3dc584eb..35610207 100644 --- a/banyand/stream/tstable.go +++ b/banyand/stream/tstable.go @@ -66,6 +66,7 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { snp := snapshot{ epoch: epoch, } + needToPersist := false for _, id := range loadedParts { var find bool for j := range parts { @@ -75,7 +76,15 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { } } if !find { - tst.gc.submitParts(id) + tst.gc.removePart(id) + continue + } + err := validatePartMetadata(tst.fileSystem, partPath(tst.root, id)) + if err != nil { + tst.l.Info().Err(err).Uint64("id", id).Msg("cannot validate part metadata. skip and delete it") + tst.gc.removePart(id) + needToPersist = true + continue } p := mustOpenFilePart(id, tst.root, tst.fileSystem) p.partMetadata.ID = id @@ -91,6 +100,9 @@ func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) { } snp.incRef() tst.snapshot = &snp + if needToPersist { + tst.persistSnapshot(&snp) + } } func (tst *tsTable) startLoop(cur uint64) { @@ -189,6 +201,12 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, needToDelete = append(needToDelete, ee[i].Name()) continue } + err = validatePartMetadata(fileSystem, filepath.Join(rootPath, ee[i].Name())) + if err != nil { + l.Info().Err(err).Msg("cannot validate part metadata. skip and delete it") + needToDelete = append(needToDelete, ee[i].Name()) + continue + } loadedParts = append(loadedParts, p) continue } @@ -204,9 +222,8 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, p common.Position, loadedSnapshots = append(loadedSnapshots, snapshot) } for i := range needToDelete { - if err := fileSystem.DeleteFile(filepath.Join(rootPath, needToDelete[i])); err != nil { - l.Warn().Err(err).Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("failed to delete part. Please check manually") - } + l.Info().Str("path", filepath.Join(rootPath, needToDelete[i])).Msg("delete invalid directory or file") + fileSystem.MustRMAll(filepath.Join(rootPath, needToDelete[i])) } if len(loadedParts) == 0 || len(loadedSnapshots) == 0 { t := &tst diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go index de11591f..9f4c6cc5 100644 --- a/pkg/index/inverted/inverted.go +++ b/pkg/index/inverted/inverted.go @@ -208,7 +208,7 @@ func (s *store) Iterator(fieldKey index.FieldKey, termRange index.RangeOpts, ord } documentMatchIterator, err := reader.Search(context.Background(), bluge.NewTopNSearch(math.MaxInt64, query).SortBy([]string{sortedKey})) if err != nil { - return nil, err + return nil, multierr.Combine(err, reader.Close()) } result := newBlugeMatchIterator(documentMatchIterator, fk, shouldDecodeTerm, reader) return &result, nil diff --git a/pkg/index/inverted/inverted_series.go b/pkg/index/inverted/inverted_series.go index b6296896..f2080589 100644 --- a/pkg/index/inverted/inverted_series.go +++ b/pkg/index/inverted/inverted_series.go @@ -36,6 +36,9 @@ func (s *store) Search(term []byte) (common.SeriesID, error) { if err != nil { return 0, err } + defer func() { + _ = reader.Close() + }() query := bluge.NewTermQuery(convert.BytesToString(term)).SetField(entityField) dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) if err != nil { @@ -67,6 +70,9 @@ func (s *store) SearchPrefix(prefix []byte) ([]index.Series, error) { if err != nil { return nil, err } + defer func() { + _ = reader.Close() + }() query := bluge.NewPrefixQuery(convert.BytesToString(prefix)).SetField(entityField) dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) if err != nil { @@ -81,6 +87,9 @@ func (s *store) SearchWildcard(wildcard []byte) ([]index.Series, error) { if err != nil { return nil, err } + defer func() { + _ = reader.Close() + }() query := bluge.NewWildcardQuery(convert.BytesToString(wildcard)).SetField(entityField) dmi, err := reader.Search(context.Background(), bluge.NewAllMatches(query)) if err != nil { diff --git a/pkg/index/inverted/inverted_test.go b/pkg/index/inverted/inverted_test.go index bfa2f276..0877761a 100644 --- a/pkg/index/inverted/inverted_test.go +++ b/pkg/index/inverted/inverted_test.go @@ -123,7 +123,7 @@ func TestStore_Match(t *testing.T) { }, } for _, tt := range tests { - name := strings.Join(tt.matches, " and ") + name := strings.Join(tt.matches, "-") t.Run(name, func(t *testing.T) { list, err := s.Match(serviceName, tt.matches) if tt.wantErr {