This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch measure-flusher
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/measure-flusher by this push:
     new 776afbfe Fix race issues
776afbfe is described below

commit 776afbfeae9569d112e2d55de1bd06827b87ecd7
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 03:05:05 2023 +0000

    Fix race issues
    
    Signed-off-by: Gao Hongtao <[email protected]>
---
 banyand/internal/storage/segment.go                |  7 +++--
 banyand/measure/flusher.go                         |  4 +--
 banyand/measure/part.go                            |  2 +-
 banyand/measure/snapshot.go                        | 10 ++++---
 banyand/measure/snapshot_test.go                   | 14 ++++-----
 banyand/measure/tstable.go                         | 33 +++++++++++-----------
 pkg/fs/error.go                                    |  1 +
 pkg/fs/file_system.go                              |  1 +
 pkg/fs/local_file_system_nix.go                    |  2 +-
 pkg/watcher/watcher.go                             |  2 +-
 .../query_ondisk/query_ondisk_suite_test.go        |  1 -
 11 files changed, 40 insertions(+), 37 deletions(-)

diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 85decd8b..d4b0d7ff 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -292,16 +292,17 @@ func (sc *segmentController[T]) sortLst() {
 }
 
 func (sc *segmentController[T]) load(start, end time.Time, root string) (seg 
*segment[T], err error) {
+       suffix := sc.Format(start)
+       segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
        var tsTable T
-       if tsTable, err = sc.tsTableCreator(lfs, sc.location, sc.position, 
sc.l, timestamp.NewSectionTimeRange(start, end)); err != nil {
+       if tsTable, err = sc.tsTableCreator(lfs, segPath, sc.position, sc.l, 
timestamp.NewSectionTimeRange(start, end)); err != nil {
                return nil, err
        }
-       suffix := sc.Format(start)
        ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
        seg, err = openSegment[T](common.SetPosition(ctx, func(p 
common.Position) common.Position {
                p.Segment = suffix
                return p
-       }), start, end, path.Join(root, fmt.Sprintf(segTemplate, suffix)), 
suffix, sc.segmentSize, sc.scheduler, tsTable)
+       }), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 09346d1d..4fe26408 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -34,12 +34,12 @@ func (tst *tsTable) flusherLoop(flushCh chan 
*flusherIntroduction, introducerWat
                        return
                case <-epochWatcher.Watch():
                        var curSnapshot *snapshot
-                       tst.Lock()
+                       tst.RLock()
                        if tst.snapshot != nil && tst.snapshot.epoch > epoch {
                                curSnapshot = tst.snapshot
                                curSnapshot.incRef()
                        }
-                       tst.Unlock()
+                       tst.RUnlock()
                        if curSnapshot != nil {
                                epoch = tst.flush(curSnapshot, flushCh)
                                if epoch == 0 {
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 6b18b597..81fcce0e 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -221,11 +221,11 @@ func releaseMemPart(mp *memPart) {
 var memPartPool sync.Pool
 
 type partWrapper struct {
+       fileSystem    fs.FileSystem
        mp            *memPart
        p             *part
        ref           int32
        mustBeDeleted uint32
-       fileSystem    fs.FileSystem
 }
 
 func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem) 
*partWrapper {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 63339392..ddc995dc 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -70,13 +70,15 @@ func (s *snapshot) decRef() {
        s.parts = s.parts[:0]
 }
 
-func (s snapshot) copyAllTo(nextEpoch uint64) snapshot {
-       s.epoch = nextEpoch
-       s.ref = 1
+func (s *snapshot) copyAllTo(nextEpoch uint64) snapshot {
+       var result snapshot
+       result.epoch = nextEpoch
+       result.ref = 1
        for i := range s.parts {
                s.parts[i].incRef()
+               result.parts = append(result.parts, s.parts[i])
        }
-       return s
+       return result
 }
 
 func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper) 
snapshot {
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index 1b471745..f571d4c9 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -25,11 +25,11 @@ import (
 
 func TestSnapshotGetParts(t *testing.T) {
        tests := []struct {
-               name     string
                snapshot *snapshot
+               name     string
                dst      []*part
-               opts     queryOptions
                expected []*part
+               opts     queryOptions
                count    int
        }{
                {
@@ -121,9 +121,9 @@ func TestSnapshotCopyAllTo(t *testing.T) {
        tests := []struct {
                name      string
                snapshot  snapshot
+               expected  snapshot
                nextEpoch uint64
                closePrev bool
-               expected  snapshot
        }{
                {
                        name: "Test with empty snapshot",
@@ -134,7 +134,7 @@ func TestSnapshotCopyAllTo(t *testing.T) {
                        expected: snapshot{
                                epoch: 1,
                                ref:   1,
-                               parts: []*partWrapper{},
+                               parts: nil,
                        },
                },
                {
@@ -189,12 +189,12 @@ func TestSnapshotCopyAllTo(t *testing.T) {
 
 func TestSnapshotMerge(t *testing.T) {
        tests := []struct {
-               name      string
                snapshot  *snapshot
-               closePrev bool
-               nextEpoch uint64
                nextParts map[uint64]*partWrapper
+               name      string
                expected  snapshot
+               nextEpoch uint64
+               closePrev bool
        }{
                {
                        name: "Test with empty snapshot and empty next parts",
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 3e5b7e2a..f3c445dc 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -44,14 +44,14 @@ const (
 )
 
 func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position, 
l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) {
-       var tsTable tsTable
-       tsTable.fileSystem = fileSystem
-       tsTable.root = rootPath
-       tsTable.l = l
-       tsTable.gc.parent = &tsTable
+       var tst tsTable
+       tst.fileSystem = fileSystem
+       tst.root = rootPath
+       tst.l = l
+       tst.gc.parent = &tst
        ee := fileSystem.ReadDir(rootPath)
        if len(ee) == 0 {
-               t := &tsTable
+               t := &tst
                t.startLoop(uint64(time.Now().UnixNano()))
                return t, nil
        }
@@ -79,7 +79,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, _ 
common.Position, l
                        continue
                }
                loadedSnapshots = append(loadedSnapshots, snapshot)
-               tsTable.gc.registerSnapshot(snapshot)
+               tst.gc.registerSnapshot(snapshot)
        }
        for i := range needToDelete {
                if err := fileSystem.DeleteFile(filepath.Join(rootPath, 
needToDelete[i])); err != nil {
@@ -87,7 +87,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, _ 
common.Position, l
                }
        }
        if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
-               t := &tsTable
+               t := &tst
                t.startLoop(uint64(time.Now().UnixNano()))
                return t, nil
        }
@@ -95,21 +95,21 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, 
_ common.Position, l
                return loadedSnapshots[i] > loadedSnapshots[j]
        })
        epoch := loadedSnapshots[0]
-       t := &tsTable
+       t := &tst
        t.loadSnapshot(epoch, loadedParts)
        t.startLoop(epoch)
        return t, nil
 }
 
 type tsTable struct {
-       l          *logger.Logger
-       fileSystem fs.FileSystem
-       gc         garbageCleaner
-       root       string
-       snapshot   *snapshot
-       sync.RWMutex
+       fileSystem    fs.FileSystem
+       l             *logger.Logger
+       snapshot      *snapshot
        introductions chan *introduction
        loopCloser    *run.Closer
+       root          string
+       gc            garbageCleaner
+       sync.RWMutex
 }
 
 func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
@@ -141,12 +141,11 @@ func (tst *tsTable) loadSnapshot(epoch uint64, 
loadedParts []uint64) {
 }
 
 func (tst *tsTable) startLoop(cur uint64) {
-       next := cur + 1
        tst.loopCloser = run.NewCloser(3)
        tst.introductions = make(chan *introduction)
        flushCh := make(chan *flusherIntroduction)
        introducerWatcher := make(watcher.Channel, 1)
-       go tst.introducerLoop(flushCh, introducerWatcher, next)
+       go tst.introducerLoop(flushCh, introducerWatcher, cur+1)
        go tst.flusherLoop(flushCh, introducerWatcher, cur)
 }
 
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index 0b0d3040..8247d9e3 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -20,6 +20,7 @@ package fs
 
 import "fmt"
 
+// FileSystemError code.
 const (
        isExistError = iota
        IsNotExistError
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 031c568d..da483846 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -119,6 +119,7 @@ type DirEntry interface {
        IsDir() bool
 }
 
+// MustFlush flushes all data to one file and panics if it cannot write all 
data.
 func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) {
        n, err := fs.Write(buffer, name, permission)
        if err != nil {
diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go
index 05f71fef..75db2649 100644
--- a/pkg/fs/local_file_system_nix.go
+++ b/pkg/fs/local_file_system_nix.go
@@ -32,7 +32,7 @@ func (*localFileSystem) CreateLockFile(name string, 
permission Mode) (File, erro
        file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
        switch {
        case err == nil:
-               if err := unix.Flock(int(file.Fd()), 
unix.LOCK_EX|unix.LOCK_NB); err != nil {
+               if err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB); 
err != nil {
                        return nil, &FileSystemError{
                                Code:    lockError,
                                Message: fmt.Sprintf("Cannot lock file, file 
name: %s, error message: %s", name, err),
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
index 6ad9fb5d..eaedaa38 100644
--- a/pkg/watcher/watcher.go
+++ b/pkg/watcher/watcher.go
@@ -21,8 +21,8 @@ package watcher
 // Epoch is a epoch watcher.
 // It will be notified when the epoch is reached.
 type Epoch struct {
-       epoch uint64
        ch    chan struct{}
+       epoch uint64
 }
 
 // Watch returns a channel that will be notified when the epoch is reached.
diff --git 
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go 
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index d6846ee3..ac89d108 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -42,7 +42,6 @@ import (
 )
 
 func TestIntegrationQueryOnDisk(t *testing.T) {
-       t.Skip("skip on-disk integration test until measure parts flushing is 
supported")
        RegisterFailHandler(Fail)
        RunSpecs(t, "Integration Query OnDisk Suite", 
Label(integration_standalone.Labels...))
 }

Reply via email to