This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch measure-flusher
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/measure-flusher by this push:
new 776afbfe Fix race issues
776afbfe is described below
commit 776afbfeae9569d112e2d55de1bd06827b87ecd7
Author: Gao Hongtao <[email protected]>
AuthorDate: Mon Dec 25 03:05:05 2023 +0000
Fix race issues
Signed-off-by: Gao Hongtao <[email protected]>
---
banyand/internal/storage/segment.go | 7 +++--
banyand/measure/flusher.go | 4 +--
banyand/measure/part.go | 2 +-
banyand/measure/snapshot.go | 10 ++++---
banyand/measure/snapshot_test.go | 14 ++++-----
banyand/measure/tstable.go | 33 +++++++++++-----------
pkg/fs/error.go | 1 +
pkg/fs/file_system.go | 1 +
pkg/fs/local_file_system_nix.go | 2 +-
pkg/watcher/watcher.go | 2 +-
.../query_ondisk/query_ondisk_suite_test.go | 1 -
11 files changed, 40 insertions(+), 37 deletions(-)
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 85decd8b..d4b0d7ff 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -292,16 +292,17 @@ func (sc *segmentController[T]) sortLst() {
}
func (sc *segmentController[T]) load(start, end time.Time, root string) (seg
*segment[T], err error) {
+ suffix := sc.Format(start)
+ segPath := path.Join(root, fmt.Sprintf(segTemplate, suffix))
var tsTable T
- if tsTable, err = sc.tsTableCreator(lfs, sc.location, sc.position,
sc.l, timestamp.NewSectionTimeRange(start, end)); err != nil {
+ if tsTable, err = sc.tsTableCreator(lfs, segPath, sc.position, sc.l,
timestamp.NewSectionTimeRange(start, end)); err != nil {
return nil, err
}
- suffix := sc.Format(start)
ctx := context.WithValue(context.Background(), logger.ContextKey, sc.l)
seg, err = openSegment[T](common.SetPosition(ctx, func(p
common.Position) common.Position {
p.Segment = suffix
return p
- }), start, end, path.Join(root, fmt.Sprintf(segTemplate, suffix)),
suffix, sc.segmentSize, sc.scheduler, tsTable)
+ }), start, end, segPath, suffix, sc.segmentSize, sc.scheduler, tsTable)
if err != nil {
return nil, err
}
diff --git a/banyand/measure/flusher.go b/banyand/measure/flusher.go
index 09346d1d..4fe26408 100644
--- a/banyand/measure/flusher.go
+++ b/banyand/measure/flusher.go
@@ -34,12 +34,12 @@ func (tst *tsTable) flusherLoop(flushCh chan
*flusherIntroduction, introducerWat
return
case <-epochWatcher.Watch():
var curSnapshot *snapshot
- tst.Lock()
+ tst.RLock()
if tst.snapshot != nil && tst.snapshot.epoch > epoch {
curSnapshot = tst.snapshot
curSnapshot.incRef()
}
- tst.Unlock()
+ tst.RUnlock()
if curSnapshot != nil {
epoch = tst.flush(curSnapshot, flushCh)
if epoch == 0 {
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 6b18b597..81fcce0e 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -221,11 +221,11 @@ func releaseMemPart(mp *memPart) {
var memPartPool sync.Pool
type partWrapper struct {
+ fileSystem fs.FileSystem
mp *memPart
p *part
ref int32
mustBeDeleted uint32
- fileSystem fs.FileSystem
}
func newPartWrapper(mp *memPart, p *part, fileSystem fs.FileSystem)
*partWrapper {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 63339392..ddc995dc 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -70,13 +70,15 @@ func (s *snapshot) decRef() {
s.parts = s.parts[:0]
}
-func (s snapshot) copyAllTo(nextEpoch uint64) snapshot {
- s.epoch = nextEpoch
- s.ref = 1
+func (s *snapshot) copyAllTo(nextEpoch uint64) snapshot {
+ var result snapshot
+ result.epoch = nextEpoch
+ result.ref = 1
for i := range s.parts {
s.parts[i].incRef()
+ result.parts = append(result.parts, s.parts[i])
}
- return s
+ return result
}
func (s *snapshot) merge(nextEpoch uint64, nextParts map[uint64]*partWrapper)
snapshot {
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index 1b471745..f571d4c9 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -25,11 +25,11 @@ import (
func TestSnapshotGetParts(t *testing.T) {
tests := []struct {
- name string
snapshot *snapshot
+ name string
dst []*part
- opts queryOptions
expected []*part
+ opts queryOptions
count int
}{
{
@@ -121,9 +121,9 @@ func TestSnapshotCopyAllTo(t *testing.T) {
tests := []struct {
name string
snapshot snapshot
+ expected snapshot
nextEpoch uint64
closePrev bool
- expected snapshot
}{
{
name: "Test with empty snapshot",
@@ -134,7 +134,7 @@ func TestSnapshotCopyAllTo(t *testing.T) {
expected: snapshot{
epoch: 1,
ref: 1,
- parts: []*partWrapper{},
+ parts: nil,
},
},
{
@@ -189,12 +189,12 @@ func TestSnapshotCopyAllTo(t *testing.T) {
func TestSnapshotMerge(t *testing.T) {
tests := []struct {
- name string
snapshot *snapshot
- closePrev bool
- nextEpoch uint64
nextParts map[uint64]*partWrapper
+ name string
expected snapshot
+ nextEpoch uint64
+ closePrev bool
}{
{
name: "Test with empty snapshot and empty next parts",
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 3e5b7e2a..f3c445dc 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -44,14 +44,14 @@ const (
)
func newTSTable(fileSystem fs.FileSystem, rootPath string, _ common.Position,
l *logger.Logger, _ timestamp.TimeRange) (*tsTable, error) {
- var tsTable tsTable
- tsTable.fileSystem = fileSystem
- tsTable.root = rootPath
- tsTable.l = l
- tsTable.gc.parent = &tsTable
+ var tst tsTable
+ tst.fileSystem = fileSystem
+ tst.root = rootPath
+ tst.l = l
+ tst.gc.parent = &tst
ee := fileSystem.ReadDir(rootPath)
if len(ee) == 0 {
- t := &tsTable
+ t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
@@ -79,7 +79,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, _
common.Position, l
continue
}
loadedSnapshots = append(loadedSnapshots, snapshot)
- tsTable.gc.registerSnapshot(snapshot)
+ tst.gc.registerSnapshot(snapshot)
}
for i := range needToDelete {
if err := fileSystem.DeleteFile(filepath.Join(rootPath,
needToDelete[i])); err != nil {
@@ -87,7 +87,7 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string, _
common.Position, l
}
}
if len(loadedParts) == 0 || len(loadedSnapshots) == 0 {
- t := &tsTable
+ t := &tst
t.startLoop(uint64(time.Now().UnixNano()))
return t, nil
}
@@ -95,21 +95,21 @@ func newTSTable(fileSystem fs.FileSystem, rootPath string,
_ common.Position, l
return loadedSnapshots[i] > loadedSnapshots[j]
})
epoch := loadedSnapshots[0]
- t := &tsTable
+ t := &tst
t.loadSnapshot(epoch, loadedParts)
t.startLoop(epoch)
return t, nil
}
type tsTable struct {
- l *logger.Logger
- fileSystem fs.FileSystem
- gc garbageCleaner
- root string
- snapshot *snapshot
- sync.RWMutex
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ snapshot *snapshot
introductions chan *introduction
loopCloser *run.Closer
+ root string
+ gc garbageCleaner
+ sync.RWMutex
}
func (tst *tsTable) loadSnapshot(epoch uint64, loadedParts []uint64) {
@@ -141,12 +141,11 @@ func (tst *tsTable) loadSnapshot(epoch uint64,
loadedParts []uint64) {
}
func (tst *tsTable) startLoop(cur uint64) {
- next := cur + 1
tst.loopCloser = run.NewCloser(3)
tst.introductions = make(chan *introduction)
flushCh := make(chan *flusherIntroduction)
introducerWatcher := make(watcher.Channel, 1)
- go tst.introducerLoop(flushCh, introducerWatcher, next)
+ go tst.introducerLoop(flushCh, introducerWatcher, cur+1)
go tst.flusherLoop(flushCh, introducerWatcher, cur)
}
diff --git a/pkg/fs/error.go b/pkg/fs/error.go
index 0b0d3040..8247d9e3 100644
--- a/pkg/fs/error.go
+++ b/pkg/fs/error.go
@@ -20,6 +20,7 @@ package fs
import "fmt"
+// FileSystemError code.
const (
isExistError = iota
IsNotExistError
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 031c568d..da483846 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -119,6 +119,7 @@ type DirEntry interface {
IsDir() bool
}
+// MustFlush flushes all data to one file and panics if it cannot write all
data.
func MustFlush(fs FileSystem, buffer []byte, name string, permission Mode) {
n, err := fs.Write(buffer, name, permission)
if err != nil {
diff --git a/pkg/fs/local_file_system_nix.go b/pkg/fs/local_file_system_nix.go
index 05f71fef..75db2649 100644
--- a/pkg/fs/local_file_system_nix.go
+++ b/pkg/fs/local_file_system_nix.go
@@ -32,7 +32,7 @@ func (*localFileSystem) CreateLockFile(name string,
permission Mode) (File, erro
file, err := os.OpenFile(name, os.O_RDWR|os.O_CREATE|os.O_TRUNC,
os.FileMode(permission))
switch {
case err == nil:
- if err := unix.Flock(int(file.Fd()),
unix.LOCK_EX|unix.LOCK_NB); err != nil {
+ if err = unix.Flock(int(file.Fd()), unix.LOCK_EX|unix.LOCK_NB);
err != nil {
return nil, &FileSystemError{
Code: lockError,
Message: fmt.Sprintf("Cannot lock file, file
name: %s, error message: %s", name, err),
diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go
index 6ad9fb5d..eaedaa38 100644
--- a/pkg/watcher/watcher.go
+++ b/pkg/watcher/watcher.go
@@ -21,8 +21,8 @@ package watcher
// Epoch is a epoch watcher.
// It will be notified when the epoch is reached.
type Epoch struct {
- epoch uint64
ch chan struct{}
+ epoch uint64
}
// Watch returns a channel that will be notified when the epoch is reached.
diff --git
a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
index d6846ee3..ac89d108 100644
--- a/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
+++ b/test/integration/standalone/query_ondisk/query_ondisk_suite_test.go
@@ -42,7 +42,6 @@ import (
)
func TestIntegrationQueryOnDisk(t *testing.T) {
- t.Skip("skip on-disk integration test until measure parts flushing is
supported")
RegisterFailHandler(Fail)
RunSpecs(t, "Integration Query OnDisk Suite",
Label(integration_standalone.Labels...))
}