This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch measure-flusher
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b61b2540d48469ee8dd3fa6eddd88ba2f5835d02
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 07:19:13 2023 +0800

    Fix linter
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/shard.go |  2 +-
 banyand/internal/storage/tsdb.go  |  2 +-
 banyand/measure/gc.go             | 10 +++++-----
 banyand/measure/part.go           | 13 ++++++-------
 banyand/measure/query.go          |  5 +++++
 pkg/fs/file_system.go             | 10 ++++++++++
 6 files changed, 28 insertions(+), 14 deletions(-)

diff --git a/banyand/internal/storage/shard.go 
b/banyand/internal/storage/shard.go
index cdec81ae..19150f9c 100644
--- a/banyand/internal/storage/shard.go
+++ b/banyand/internal/storage/shard.go
@@ -75,7 +75,7 @@ func (d *database[T]) openShard(ctx context.Context, id 
common.ShardID) (*shard[
        return s, nil
 }
 
-func (s *shard[T]) closer() {
+func (s *shard[T]) close() {
        s.closeOnce.Do(func() {
                s.scheduler.Close()
                s.segmentManageStrategy.Close()
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a425609e..fd5b648a 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -73,7 +73,7 @@ func (d *database[T]) Close() error {
        d.Lock()
        defer d.Unlock()
        for _, s := range d.sLst {
-               s.closer()
+               s.close()
        }
        return d.index.Close()
 }
diff --git a/banyand/measure/gc.go b/banyand/measure/gc.go
index 5bab2bb5..8fb6f608 100644
--- a/banyand/measure/gc.go
+++ b/banyand/measure/gc.go
@@ -18,6 +18,7 @@
 package measure
 
 import (
+       "errors"
        "path"
        "sort"
 
@@ -63,11 +64,10 @@ func (g *garbageCleaner) cleanSnapshots() {
        for i := 0; i < len(g.snapshots)-1; i++ {
                filePath := path.Join(g.parent.root, 
snapshotName(g.snapshots[i]))
                if err := g.parent.fileSystem.DeleteFile(filePath); err != nil {
-                       if fse, ok := err.(*fs.FileSystemError); ok {
-                               if fse.Code != fs.IsNotExistError {
-                                       g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot, will retry in next round. Please 
check manually")
-                                       remainingSnapshots = 
append(remainingSnapshots, g.snapshots[i])
-                               }
+                       var notExistErr *fs.FileSystemError
+                       if errors.As(err, &notExistErr) && notExistErr.Code != 
fs.IsNotExistError {
+                               g.parent.l.Warn().Err(err).Str("path", 
filePath).Msg("failed to delete snapshot, will retry in next round. Please 
check manually")
+                               remainingSnapshots = append(remainingSnapshots, 
g.snapshots[i])
                        }
                }
        }
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index f2ab04f2..6b18b597 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -174,15 +174,15 @@ func (mp *memPart) mustInitFromDataPoints(dps 
*dataPoints) {
 func (mp *memPart) mustFlush(fileSystem fs.FileSystem, path string) {
        fileSystem.MkdirPanicIfExist(path, dirPermission)
 
-       fileSystem.Write(mp.meta.Buf, filepath.Join(path, metaFilename), 
filePermission)
-       fileSystem.Write(mp.primary.Buf, filepath.Join(path, primaryFilename), 
filePermission)
-       fileSystem.Write(mp.timestamps.Buf, filepath.Join(path, 
timestampsFilename), filePermission)
-       fileSystem.Write(mp.fieldValues.Buf, filepath.Join(path, 
fieldValuesFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.meta.Buf, filepath.Join(path, 
metaFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.primary.Buf, filepath.Join(path, 
primaryFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.timestamps.Buf, filepath.Join(path, 
timestampsFilename), filePermission)
+       fs.MustFlush(fileSystem, mp.fieldValues.Buf, filepath.Join(path, 
fieldValuesFilename), filePermission)
        for name, tf := range mp.tagFamilies {
-               fileSystem.Write(tf.Buf, filepath.Join(path, 
name+tagFamiliesFilenameExt), filePermission)
+               fs.MustFlush(fileSystem, tf.Buf, filepath.Join(path, 
name+tagFamiliesFilenameExt), filePermission)
        }
        for name, tfh := range mp.tagFamilyMetadata {
-               fileSystem.Write(tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), filePermission)
+               fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), filePermission)
        }
 
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
@@ -288,7 +288,6 @@ func mustOpenFilePart(partPath string, fileSystem 
fs.FileSystem) *part {
                        }
                        p.tagFamilies[removeExt(e.Name(), 
tagFamiliesFilenameExt)] = mustOpenReader(path.Join(partPath, e.Name()), 
fileSystem)
                }
-
        }
        return &p
 }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 4451a253..d0b17d7f 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -73,6 +73,11 @@ func (s *measure) Query(ctx context.Context, mqo 
pbv1.MeasureQueryOptions) (pbv1
        }
        tsdb := s.databaseSupplier.SupplyTSDB().(storage.TSDB[*tsTable])
        tabWrappers := tsdb.SelectTSTables(*mqo.TimeRange)
+       defer func() {
+               for i := range tabWrappers {
+                       tabWrappers[i].DecRef()
+               }
+       }()
        sl, err := tsdb.IndexDB().Search(ctx, &pbv1.Series{Subject: mqo.Name, 
EntityValues: mqo.Entity}, mqo.Filter, mqo.Order)
        if err != nil {
                return nil, err
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index b5f8aa10..031c568d 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -119,6 +119,16 @@ type DirEntry interface {
        IsDir() bool
 }
 
+func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) {
+       n, err := fs.Write(buffer, name, permission)
+       if err != nil {
+               logger.GetLogger().Panic().Err(err).Str("path", 
name).Msg("cannot write data")
+       }
+       if n != len(buffer) {
+               logger.GetLogger().Panic().Int("written", n).Int("expected", 
len(buffer)).Str("path", name).Msg("BUG: writer wrote wrong number of bytes")
+       }
+}
+
 // MustWriteData writes data to w and panics if it cannot write all data.
 func MustWriteData(w Writer, data []byte) {
        if len(data) == 0 {

Reply via email to