This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch measure-flusher in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b61b2540d48469ee8dd3fa6eddd88ba2f5835d02 Author: Gao Hongtao <[email protected]> AuthorDate: Mon Dec 25 07:19:13 2023 +0800 Fix linter Signed-off-by: Gao Hongtao <[email protected]> --- banyand/internal/storage/shard.go | 2 +- banyand/internal/storage/tsdb.go | 2 +- banyand/measure/gc.go | 10 +++++----- banyand/measure/part.go | 13 ++++++------- banyand/measure/query.go | 5 +++++ pkg/fs/file_system.go | 10 ++++++++++ 6 files changed, 28 insertions(+), 14 deletions(-) diff --git a/banyand/internal/storage/shard.go b/banyand/internal/storage/shard.go index cdec81ae..19150f9c 100644 --- a/banyand/internal/storage/shard.go +++ b/banyand/internal/storage/shard.go @@ -75,7 +75,7 @@ func (d *database[T]) openShard(ctx context.Context, id common.ShardID) (*shard[ return s, nil } -func (s *shard[T]) closer() { +func (s *shard[T]) close() { s.closeOnce.Do(func() { s.scheduler.Close() s.segmentManageStrategy.Close() diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index a425609e..fd5b648a 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -73,7 +73,7 @@ func (d *database[T]) Close() error { d.Lock() defer d.Unlock() for _, s := range d.sLst { - s.closer() + s.close() } return d.index.Close() } diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go index 5bab2bb5..8fb6f608 100644 --- a/banyand/measure/gc.go +++ b/banyand/measure/gc.go @@ -18,6 +18,7 @@ package measure import ( + "errors" "path" "sort" @@ -63,11 +64,10 @@ func (g *garbageCleaner) cleanSnapshots() { for i := 0; i < len(g.snapshots)-1; i++ { filePath := path.Join(g.parent.root, snapshotName(g.snapshots[i])) if err := g.parent.fileSystem.DeleteFile(filePath); err != nil { - if fse, ok := err.(*fs.FileSystemError); ok { - if fse.Code != fs.IsNotExistError { - g.parent.l.Warn().Err(err).Str("path", filePath).Msg("failed to delete snapshot, will retry in next round. Please check manually") - remainingSnapshots = append(remainingSnapshots, g.snapshots[i]) - } + var notExistErr *fs.FileSystemError + if errors.As(err, ¬ExistErr) && notExistErr.Code != fs.IsNotExistError { + g.parent.l.Warn().Err(err).Str("path", filePath).Msg("failed to delete snapshot, will retry in next round. Please check manually") + remainingSnapshots = append(remainingSnapshots, g.snapshots[i]) } } } diff --git a/banyand/measure/part.go b/banyand/measure/part.go index f2ab04f2..6b18b597 100644 --- a/banyand/measure/part.go +++ b/banyand/measure/part.go @@ -174,15 +174,15 @@ func (mp *memPart) mustInitFromDataPoints(dps *dataPoints) { func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) { fileSystem.MkdirPanicIfExist(path, dirPermission) - fileSystem.Write(mp.meta.Buf, filepath.Join(path, metaFilename), filePermission) - fileSystem.Write(mp.primary.Buf, filepath.Join(path, primaryFilename), filePermission) - fileSystem.Write(mp.timestamps.Buf, filepath.Join(path, timestampsFilename), filePermission) - fileSystem.Write(mp.fieldValues.Buf, filepath.Join(path, fieldValuesFilename), filePermission) + fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path, metaFilename), filePermission) + fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path, primaryFilename), filePermission) + fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path, timestampsFilename), filePermission) + fs.MustFlush(fileSystem, mp.fieldValues.Buf, filepath.Join(path, fieldValuesFilename), filePermission) for name, tf := range mp.tagFamilies { - fileSystem.Write(tf.Buf, filepath.Join(path, name+tagFamiliesFilenameExt), filePermission) + fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path, name+tagFamiliesFilenameExt), filePermission) } for name, tfh := range mp.tagFamilyMetadata { - fileSystem.Write(tfh.Buf, filepath.Join(path, name+tagFamiliesMetadataFilenameExt), filePermission) + fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, name+tagFamiliesMetadataFilenameExt), filePermission) } mp.partMetadata.mustWriteMetadata(fileSystem, path) @@ -288,7 +288,6 @@ func mustOpenFilePart(partPath string, fileSystem fs.FileSystem) *part { } p.tagFamilies[removeExt(e.Name(), tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), fileSystem) } - } return &p } diff --git a/banyand/measure/query.go b/banyand/measure/query.go index 4451a253..d0b17d7f 100644 --- a/banyand/measure/query.go +++ b/banyand/measure/query.go @@ -73,6 +73,11 @@ func (s *measure) Query(ctx context.Context, mqo pbv1.MeasureQueryOptions) (pbv1 } tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable]) tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange) + defer func() { + for i := range tabWrappers { + tabWrappers[i].DecRef() + } + }() sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, EntityValues: mqo.Entity}, mqo.Filter, mqo.Order) if err != nil { return nil, err diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go index b5f8aa10..031c568d 100644 --- a/pkg/fs/file_system.go +++ b/pkg/fs/file_system.go @@ -119,6 +119,16 @@ type DirEntry interface { IsDir() bool } +func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) { + n, err := fs.Write(buffer, name, permission) + if err != nil { + logger.GetLogger().Panic().Err(err).Str("path", name).Msg("cannot write data") + } + if n != len(buffer) { + logger.GetLogger().Panic().Int("written", n).Int("expected", len(buffer)).Str("path", name).Msg("BUG: writer wrote wrong number of bytes") + } +} + // MustWriteData writes data to w and panics if it cannot write all data. func MustWriteData(w Writer, data []byte) { if len(data) == 0 {
