This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0959cabe5 ignore backup if no segments in the storage (#992)
0959cabe5 is described below
commit 0959cabe58cdf80cc12843b8e43ed360fa44a0d6
Author: mrproliu <[email protected]>
AuthorDate: Mon Mar 9 08:41:37 2026 +0800
ignore backup if no segments in the storage (#992)
* ignore backup if no segments in the storage
---------
Co-authored-by: Copilot <[email protected]>
---
CHANGES.md | 1 +
banyand/internal/storage/rotation_test.go | 4 +-
banyand/internal/storage/segment_test.go | 2 +-
banyand/internal/storage/storage.go | 4 +-
banyand/internal/storage/tsdb.go | 20 ++++---
banyand/internal/storage/tsdb_test.go | 41 +++++++++++++-
banyand/measure/snapshot.go | 38 +++++++++----
banyand/measure/snapshot_test.go | 2 +-
banyand/measure/svc_data.go | 29 +++++++---
banyand/stream/snapshot.go | 42 +++++++++-----
banyand/stream/snapshot_test.go | 52 ++++++++++++++++-
banyand/trace/snapshot.go | 15 +++--
banyand/trace/snapshot_test.go | 94 ++++++++++++++++++++++++++++++-
banyand/trace/svc_standalone.go | 25 +++++---
14 files changed, 307 insertions(+), 62 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 2b5010d24..0275b4129 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -39,6 +39,7 @@ Release Notes.
- Fix panic in sidx merge and flush operations when part counts don't match
expectations.
- Fix trace queries with range conditions on the same tag (e.g., duration)
combined with ORDER BY by deduplicating tag names when merging logical
expression branches.
- Fix sidx tag filter range check returning inverted skip decision and use
correct int64 encoding for block min/max.
+- Ignore take snapshot when no data.
### Document
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index d80618fbf..5a605e67c 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -198,8 +198,8 @@ func (m *MockTSTable) Close() error {
func (m *MockTSTable) Collect(_ Metrics) {}
-func (m *MockTSTable) TakeFileSnapshot(_ string) error {
- return nil
+func (m *MockTSTable) TakeFileSnapshot(_ string) (bool, error) {
+ return true, nil
}
var MockTSTableCreator = func(_ fs.FileSystem, _ string, _ common.Position,
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index cb331568c..7bcc1063b 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -46,7 +46,7 @@ func (m mockTSTable) Close() error {
func (m mockTSTable) Collect(Metrics) {}
-func (m mockTSTable) TakeFileSnapshot(string) error { return nil }
+func (m mockTSTable) TakeFileSnapshot(string) (bool, error) { return true, nil
}
// mockTSTableOpener implements the necessary functions to open a TSTable.
type mockTSTableOpener struct{}
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 41f7280ac..7ff809a95 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -112,7 +112,7 @@ type TSDB[T TSTable, O any] interface {
SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
Tick(ts int64)
UpdateOptions(opts *commonv1.ResourceOpts)
- TakeFileSnapshot(dst string) error
+ TakeFileSnapshot(dst string) (bool, error)
GetExpiredSegmentsTimeRange() *timestamp.TimeRange
DeleteExpiredSegments(segmentSuffixes []string) int64
// PeekOldestSegmentEndTime returns the end time of the oldest segment.
@@ -138,7 +138,7 @@ type Segment[T TSTable, O any] interface {
type TSTable interface {
io.Closer
Collect(Metrics)
- TakeFileSnapshot(dst string) error
+ TakeFileSnapshot(dst string) (bool, error)
}
// TSTableCreator creates a TSTable.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a0ad00e28..244774a0d 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -248,14 +248,14 @@ func (d *database[T, O]) UpdateOptions(resourceOpts
*commonv1.ResourceOpts) {
d.segmentController.updateOptions(resourceOpts)
}
-func (d *database[T, O]) TakeFileSnapshot(dst string) error {
+func (d *database[T, O]) TakeFileSnapshot(dst string) (bool, error) {
if d.closed.Load() {
- return errors.New("database is closed")
+ return false, errors.New("database is closed")
}
segments, err := d.segmentController.segments(true)
if err != nil {
- return errors.Wrap(err, "failed to get segments")
+ return false, errors.Wrap(err, "failed to get segments")
}
defer func() {
for _, seg := range segments {
@@ -263,6 +263,10 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
error {
}
}()
+ if len(segments) == 0 {
+ return false, nil
+ }
+
log.Info().Int("segment_count", len(segments)).Str("db_location",
d.location).
Msgf("taking file snapshot for %s", dst)
for _, seg := range segments {
@@ -273,13 +277,13 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
error {
metadataSrc := filepath.Join(seg.location, metadataFilename)
metadataDest := filepath.Join(segPath, metadataFilename)
if err := d.lfs.CreateHardLink(metadataSrc, metadataDest, nil);
err != nil {
- return errors.Wrapf(err, "failed to snapshot metadata
for segment %s", segDir)
+ return false, errors.Wrapf(err, "failed to snapshot
metadata for segment %s", segDir)
}
indexPath := filepath.Join(segPath, seriesIndexDirName)
d.lfs.MkdirIfNotExist(indexPath, DirPerm)
if err := seg.index.store.TakeFileSnapshot(indexPath); err !=
nil {
- return errors.Wrapf(err, "failed to snapshot index for
segment %s", segDir)
+ return false, errors.Wrapf(err, "failed to snapshot
index for segment %s", segDir)
}
sLst := seg.sLst.Load()
@@ -290,13 +294,13 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
error {
shardDir := filepath.Base(shard.location)
shardPath := filepath.Join(segPath, shardDir)
d.lfs.MkdirIfNotExist(shardPath, DirPerm)
- if err := shard.table.TakeFileSnapshot(shardPath); err
!= nil {
- return errors.Wrapf(err, "failed to snapshot
shard %s in segment %s", shardDir, segDir)
+ if _, err := shard.table.TakeFileSnapshot(shardPath);
err != nil {
+ return false, errors.Wrapf(err, "failed to
snapshot shard %s in segment %s", shardDir, segDir)
}
}
}
- return nil
+ return true, nil
}
func (d *database[T, O]) GetExpiredSegmentsTimeRange() *timestamp.TimeRange {
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index 57b7e4c43..d3b84e889 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -259,8 +259,9 @@ func TestTakeFileSnapshot(t *testing.T) {
segLocation := seg.(*segment[*MockTSTable, any]).location // to
verify snapshot files/dirs later
seg.DecRef()
- err = tsdb.TakeFileSnapshot(snapshotDir)
- require.NoError(t, err, "taking file snapshot should not
produce an error")
+ created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, snapshotErr, "taking file snapshot should
not produce an error")
+ require.True(t, created, "snapshot should have been created")
segDir := filepath.Join(snapshotDir, filepath.Base(segLocation))
require.DirExists(t, segDir, "snapshot of the segment directory
should exist")
@@ -271,6 +272,42 @@ func TestTakeFileSnapshot(t *testing.T) {
require.NoError(t, tsdb.Close())
})
+
+ t.Run("Take snapshot without segments", func(t *testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ serviceCache := NewServiceCache()
+ tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+ require.NoError(t, err)
+ require.NotNil(t, tsdb)
+
+ created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, snapshotErr, "taking file snapshot should
not produce an error")
+ require.False(t, created, "snapshot should not have been
created when no segments exist")
+
+ require.NoDirExists(t, snapshotDir, "snapshot directory should
not exist when no segments")
+
+ require.NoError(t, tsdb.Close())
+ })
}
func TestTSDBCollect(t *testing.T) {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 71ccb2447..80c1df9fd 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -153,30 +153,35 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) error {
+func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
snapshot := tst.currentSnapshot()
if snapshot == nil {
- return fmt.Errorf("no current snapshot available")
+ return false, fmt.Errorf("no current snapshot available")
}
defer snapshot.decRef()
+ hasDiskParts := false
for _, pw := range snapshot.parts {
if pw.mp != nil {
continue
}
+ hasDiskParts = true
part := pw.p
srcPath := part.path
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return fmt.Errorf("failed to create snapshot for part
%d: %w", part.partMetadata.ID, err)
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
}
}
+ if !hasDiskParts {
+ return false, nil
+ }
tst.createMetadata(dst, snapshot)
parent := filepath.Dir(dst)
tst.fileSystem.SyncPath(parent)
- return nil
+ return true, nil
}
func (tst *tsTable) createMetadata(dst string, snapshot *snapshot) {
@@ -202,20 +207,21 @@ func (tst *tsTable) createMetadata(dst string, snapshot
*snapshot) {
}
}
-func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) error {
+func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) (bool,
error) {
group, ok := s.schemaRepo.LoadGroup(groupName)
if !ok {
- return errors.Errorf("group %s not found", groupName)
+ return false, errors.Errorf("group %s not found", groupName)
}
db := group.SupplyTSDB()
if db == nil {
- return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
+ return false, errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
}
tsdb := db.(storage.TSDB[*tsTable, option])
- if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
- return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
+ created, err := tsdb.TakeFileSnapshot(dstDir)
+ if err != nil {
+ return false, errors.WithMessagef(err, "snapshot %s fail to
take file snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
}
- return nil
+ return created, nil
}
// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
@@ -362,6 +368,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
var err error
+ var snapshotCreated int
for _, g := range gg {
select {
case <-ctx.Done():
@@ -370,16 +377,25 @@ func (s *snapshotListener) Rev(ctx context.Context,
message bus.Message) bus.Mes
}
groupName := g.GetSchema().Metadata.Name
snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
- if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ created, errGroup := s.s.takeGroupSnapshot(snapshotPath,
groupName)
+ if errGroup != nil {
s.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+ if !created {
+ s.s.l.Info().Str("group", groupName).Msg("skip empty
group snapshot")
+ continue
+ }
+ snapshotCreated++
// Compare snapshot with data directory to verify consistency
dataPath := filepath.Join(s.s.dataPath, groupName)
s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
+ if snapshotCreated == 0 && err == nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
+ }
snp := &databasev1.Snapshot{
Name: sn,
Catalog: commonv1.Catalog_CATALOG_MEASURE,
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index cfc5e387c..c5c717628 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -478,7 +478,7 @@ func TestSnapshotFunctionality(t *testing.T) {
snapshotPath := filepath.Join(tmpPath, "snapshot")
fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
- if err := tst.TakeFileSnapshot(snapshotPath); err != nil {
+ if _, err := tst.TakeFileSnapshot(snapshotPath); err != nil {
t.Fatalf("TakeFileSnapshot failed: %v", err)
}
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index a447240e5..61a69e777 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -374,20 +374,21 @@ func (s *dataSVC) createDataNativeObservabilityGroup(ctx
context.Context) error
return nil
}
-func (s *dataSVC) takeGroupSnapshot(dstDir string, groupName string) error {
+func (s *dataSVC) takeGroupSnapshot(dstDir string, groupName string) (bool,
error) {
group, ok := s.schemaRepo.LoadGroup(groupName)
if !ok {
- return errors.Errorf("group %s not found", groupName)
+ return false, errors.Errorf("group %s not found", groupName)
}
db := group.SupplyTSDB()
if db == nil {
- return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
+ return false, errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
}
tsdb := db.(storage.TSDB[*tsTable, option])
- if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
- return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
+ created, err := tsdb.TakeFileSnapshot(dstDir)
+ if err != nil {
+ return false, errors.WithMessagef(err, "snapshot %s fail to
take file snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
}
- return nil
+ return created, nil
}
// NewDataSVC returns a new data service.
@@ -493,17 +494,29 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.minFileSnapshotAge, d.s.lfs)
sn := d.snapshotName()
var err error
+ var snapshotCreated int
for _, g := range gg {
select {
case <-ctx.Done():
return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
default:
}
- if errGroup :=
d.s.takeGroupSnapshot(filepath.Join(d.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
- d.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+ groupName := g.GetSchema().Metadata.Name
+ snapshotPath := filepath.Join(d.s.snapshotDir, sn, groupName)
+ created, errGroup := d.s.takeGroupSnapshot(snapshotPath,
groupName)
+ if errGroup != nil {
+ d.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+ if !created {
+ d.s.l.Info().Str("group", groupName).Msg("skip empty
group snapshot")
+ continue
+ }
+ snapshotCreated++
+ }
+ if snapshotCreated == 0 && err == nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
}
snp := &databasev1.Snapshot{
Name: sn,
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 6e45a786b..67f79f335 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -195,22 +195,23 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) error {
+func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
if tst.index == nil {
- return fmt.Errorf("cannot take file snapshot: index is not
initialized for this tsTable")
+ return false, fmt.Errorf("cannot take file snapshot: index is
not initialized for this tsTable")
}
indexDir := filepath.Join(dst, filepath.Base(tst.index.location))
tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
if err := tst.index.store.TakeFileSnapshot(indexDir); err != nil {
- return fmt.Errorf("failed to take file snapshot for index: %w",
err)
+ return false, fmt.Errorf("failed to take file snapshot for
index: %w", err)
}
snapshot := tst.currentSnapshot()
if snapshot == nil {
- return fmt.Errorf("no current snapshot available")
+ return false, fmt.Errorf("no current snapshot available")
}
defer snapshot.decRef()
+ hasDiskParts := false
for _, pw := range snapshot.parts {
if pw.mp != nil {
continue
@@ -221,13 +222,17 @@ func (tst *tsTable) TakeFileSnapshot(dst string) error {
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return fmt.Errorf("failed to create snapshot for part
%d: %w", part.partMetadata.ID, err)
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
}
+ hasDiskParts = true
+ }
+ if !hasDiskParts {
+ return true, nil
}
tst.createMetadata(dst, snapshot)
parent := filepath.Dir(dst)
tst.fileSystem.SyncPath(parent)
- return nil
+ return true, nil
}
func (tst *tsTable) createMetadata(dst string, snapshot *snapshot) {
@@ -253,20 +258,21 @@ func (tst *tsTable) createMetadata(dst string, snapshot
*snapshot) {
}
}
-func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) error {
+func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) (bool,
error) {
group, ok := s.schemaRepo.LoadGroup(groupName)
if !ok {
- return errors.Errorf("group %s not found", groupName)
+ return false, errors.Errorf("group %s not found", groupName)
}
db := group.SupplyTSDB()
if db == nil {
- return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
+ return false, errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
}
tsdb := db.(storage.TSDB[*tsTable, option])
- if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
- return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
+ created, err := tsdb.TakeFileSnapshot(dstDir)
+ if err != nil {
+ return false, errors.WithMessagef(err, "snapshot %s fail to
take file snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
}
- return nil
+ return created, nil
}
// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
@@ -413,6 +419,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Mes
storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum,
s.s.minFileSnapshotAge, s.s.lfs)
sn := s.snapshotName()
var err error
+ var snapshotCreated int
for _, g := range gg {
select {
case <-ctx.Done():
@@ -421,16 +428,25 @@ func (s *snapshotListener) Rev(ctx context.Context,
message bus.Message) bus.Mes
}
groupName := g.GetSchema().Metadata.Name
snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
- if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ created, errGroup := s.s.takeGroupSnapshot(snapshotPath,
groupName)
+ if errGroup != nil {
s.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+ if !created {
+ s.s.l.Info().Str("group", groupName).Msg("skip empty
group snapshot")
+ continue
+ }
+ snapshotCreated++
// Compare snapshot with data directory to verify consistency
dataPath := filepath.Join(s.s.dataPath, groupName)
s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
+ if snapshotCreated == 0 && err == nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
+ }
snp := &databasev1.Snapshot{
Name: sn,
Catalog: commonv1.Catalog_CATALOG_STREAM,
diff --git a/banyand/stream/snapshot_test.go b/banyand/stream/snapshot_test.go
index 6128a1830..bfbad9c79 100644
--- a/banyand/stream/snapshot_test.go
+++ b/banyand/stream/snapshot_test.go
@@ -477,9 +477,11 @@ func TestSnapshotFunctionality(t *testing.T) {
snapshotPath := filepath.Join(tmpPath, "snapshot")
fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
- if err := tst.TakeFileSnapshot(snapshotPath); err != nil {
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ if err != nil {
t.Fatalf("TakeFileSnapshot failed: %v", err)
}
+ assert.True(t, created, "TakeFileSnapshot should return true when disk
parts exist")
entries := fileSystem.ReadDir(snapshotPath)
@@ -559,3 +561,51 @@ func TestGetDisjointParts(t *testing.T) {
require.Equal(t, groupsAsc[1], groupsDesc[0], "first group in
descending order should match second group in ascending order")
require.Equal(t, groupsAsc[0], groupsDesc[1], "second group in
descending order should match first group in ascending order")
}
+
+func TestTakeFileSnapshotNoDiskParts(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ // Set an empty snapshot (no parts) to simulate having a snapshot but
no disk parts.
+ emptySnp := &snapshot{ref: 1}
+ tst.Lock()
+ tst.snapshot = emptySnp
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.NoError(t, err)
+ assert.True(t, created, "TakeFileSnapshot should return true when index
exists even without disk parts")
+
+ entries := fileSystem.ReadDir(snapshotPath)
+ hasIndex := false
+ for _, entry := range entries {
+ if entry.IsDir() && entry.Name() == elementIndexFilename {
+ hasIndex = true
+ }
+ }
+ assert.True(t, hasIndex, "expected index directory in snapshot")
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 431587b07..9b8d3a485 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -217,20 +217,21 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) error {
+func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
for k, v := range tst.sidxMap {
indexDir := filepath.Join(dst, sidxDirName, k)
tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
if err := v.TakeFileSnapshot(indexDir); err != nil {
- return fmt.Errorf("failed to take file snapshot for
index, %s: %w", k, err)
+ return false, fmt.Errorf("failed to take file snapshot
for index, %s: %w", k, err)
}
}
snapshot := tst.currentSnapshot()
if snapshot == nil {
- return fmt.Errorf("no current snapshot available")
+ return false, fmt.Errorf("no current snapshot available")
}
defer snapshot.decRef()
+ hasDiskParts := false
for _, pw := range snapshot.parts {
if pw.mp != nil {
continue
@@ -241,13 +242,17 @@ func (tst *tsTable) TakeFileSnapshot(dst string) error {
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return fmt.Errorf("failed to create snapshot for part
%d: %w", part.partMetadata.ID, err)
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
}
+ hasDiskParts = true
+ }
+ if !hasDiskParts {
+ return len(tst.sidxMap) > 0, nil
}
tst.createMetadata(dst, snapshot)
parent := filepath.Dir(dst)
tst.fileSystem.SyncPath(parent)
- return nil
+ return true, nil
}
func (tst *tsTable) createMetadata(dst string, snapshot *snapshot) {
diff --git a/banyand/trace/snapshot_test.go b/banyand/trace/snapshot_test.go
index 81f8c916c..9f3db9fcc 100644
--- a/banyand/trace/snapshot_test.go
+++ b/banyand/trace/snapshot_test.go
@@ -572,9 +572,11 @@ func TestSnapshotFunctionality(t *testing.T) {
snapshotPath := filepath.Join(tmpPath, "snapshot")
fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
- if err := tst.TakeFileSnapshot(snapshotPath); err != nil {
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ if err != nil {
t.Fatalf("TakeFileSnapshot failed: %v", err)
}
+ assert.True(t, created, "TakeFileSnapshot should return true when disk
parts exist")
entries := fileSystem.ReadDir(snapshotPath)
@@ -646,3 +648,93 @@ func TestGetDisjointParts(t *testing.T) {
require.Equal(t, groupsAsc[1], groupsDesc[0], "first group in
descending order should match second group in ascending order")
require.Equal(t, groupsAsc[0], groupsDesc[1], "second group in
descending order should match first group in ascending order")
}
+
+func TestTakeFileSnapshotNoDiskPartsWithSidx(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ _, err = tst.getOrCreateSidx("test_sidx")
+ require.NoError(t, err)
+
+ // Set an empty snapshot (no parts) to simulate having a snapshot but
no disk parts.
+ emptySnp := &snapshot{ref: 1}
+ tst.Lock()
+ tst.snapshot = emptySnp
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.NoError(t, err)
+ assert.True(t, created, "TakeFileSnapshot should return true when
sidxMap is non-empty even without disk parts")
+
+ entries := fileSystem.ReadDir(filepath.Join(snapshotPath, sidxDirName))
+ hasSidx := false
+ for _, entry := range entries {
+ if entry.IsDir() && entry.Name() == "test_sidx" {
+ hasSidx = true
+ }
+ }
+ assert.True(t, hasSidx, "expected sidx directory in snapshot")
+}
+
+func TestTakeFileSnapshotNoDiskPartsWithoutSidx(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ // Set an empty snapshot (no parts) to simulate having a snapshot but
no disk parts.
+ emptySnp := &snapshot{ref: 1}
+ tst.Lock()
+ tst.snapshot = emptySnp
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.NoError(t, err)
+ assert.False(t, created, "TakeFileSnapshot should return false when
sidxMap is empty and no disk parts")
+}
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index fceaf43d4..7183832f8 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -300,20 +300,21 @@ func (s *standalone) GetServiceName() string {
return s.Name()
}
-func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) error {
+func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) (bool,
error) {
group, ok := s.schemaRepo.LoadGroup(groupName)
if !ok {
- return errors.Errorf("group %s not found", groupName)
+ return false, errors.Errorf("group %s not found", groupName)
}
db := group.SupplyTSDB()
if db == nil {
- return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
+ return false, errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
}
tsdb := db.(storage.TSDB[*tsTable, option])
- if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
- return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
+ created, err := tsdb.TakeFileSnapshot(dstDir)
+ if err != nil {
+ return false, errors.WithMessagef(err, "snapshot %s fail to
take file snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
}
- return nil
+ return created, nil
}
// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
@@ -459,6 +460,7 @@ func (d *standaloneSnapshotListener) Rev(ctx
context.Context, message bus.Messag
storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.minFileSnapshotAge, d.s.lfs)
sn := d.snapshotName()
var err error
+ var snapshotCreated int
for _, g := range gg {
select {
case <-ctx.Done():
@@ -467,16 +469,25 @@ func (d *standaloneSnapshotListener) Rev(ctx
context.Context, message bus.Messag
}
groupName := g.GetSchema().Metadata.Name
snapshotPath := filepath.Join(d.s.snapshotDir, sn, groupName)
- if errGroup := d.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ created, errGroup := d.s.takeGroupSnapshot(snapshotPath,
groupName)
+ if errGroup != nil {
d.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+ if !created {
+ d.s.l.Info().Str("group", groupName).Msg("skip empty
group snapshot")
+ continue
+ }
+ snapshotCreated++
// Compare snapshot with data directory to verify consistency
dataPath := filepath.Join(d.s.dataPath, groupName)
d.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
+ if snapshotCreated == 0 && err == nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
+ }
snp := &databasev1.Snapshot{
Name: sn,
Catalog: commonv1.Catalog_CATALOG_TRACE,