This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ae3de0997 fix take snapshot error when no data in the segment (#1046)
ae3de0997 is described below
commit ae3de0997032e59c3c7bafb2cd6c8c9874e90fb4
Author: mrproliu <[email protected]>
AuthorDate: Wed Apr 8 13:03:46 2026 +0800
fix take snapshot error when no data in the segment (#1046)
* fix take snapshot error when no data in the segment
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
CHANGES.md | 1 +
banyand/internal/storage/rotation_test.go | 21 +++++++++++++
banyand/internal/storage/storage.go | 4 ++-
banyand/internal/storage/tsdb.go | 31 ++++++++++++-------
banyand/internal/storage/tsdb_test.go | 50 +++++++++++++++++++++++++++++++
banyand/measure/snapshot.go | 13 +++++---
banyand/measure/snapshot_test.go | 40 +++++++++++++++++++++++++
banyand/stream/snapshot.go | 24 ++++++++-------
banyand/stream/snapshot_test.go | 41 +++++++++++++++++++++++++
banyand/trace/snapshot.go | 26 +++++++++-------
banyand/trace/snapshot_test.go | 46 ++++++++++++++++++++++++++++
11 files changed, 262 insertions(+), 35 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 728519a7e..e6c355c75 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@ Release Notes.
- MCP: Add validation for properties and harden the mcp server.
- Fix property schema client connection not stable after data node restarted.
- Fix flaky on-disk integration tests caused by Ginkgo v2 random container
shuffling closing gRPC connections prematurely.
+- Fix snapshot error when there is no data in a segment.
- ui: fix query editor refresh/reset behavior and BydbQL keyword highlighting.
## 0.10.0
diff --git a/banyand/internal/storage/rotation_test.go
b/banyand/internal/storage/rotation_test.go
index 5a605e67c..e9e003916 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -208,6 +208,27 @@ var MockTSTableCreator = func(_ fs.FileSystem, _ string, _
common.Position,
return &MockTSTable{}, nil
}
+type SnapshotMockTSTable struct {
+ timeRange timestamp.TimeRange
+}
+
+func (m *SnapshotMockTSTable) Close() error { return nil }
+
+func (m *SnapshotMockTSTable) Collect(_ Metrics) {}
+
+func (m *SnapshotMockTSTable) TakeFileSnapshot(_ string) (bool, error) {
+ if m.timeRange.Start.Equal(time.Unix(0, 0)) {
+ return false, ErrNoCurrentSnapshot
+ }
+ return true, nil
+}
+
+var SnapshotMockTSTableCreator = func(_ fs.FileSystem, _ string, _
common.Position,
+ _ *logger.Logger, timeRange timestamp.TimeRange, _, _ any,
+) (*SnapshotMockTSTable, error) {
+ return &SnapshotMockTSTable{timeRange: timeRange}, nil
+}
+
type MockMetrics struct{}
func (m *MockMetrics) DeleteAll() {}
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 3b30bef84..799f42b82 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -64,7 +64,9 @@ const (
var (
// ErrUnknownShard indicates that the shard is not found.
ErrUnknownShard = errors.New("unknown shard")
- errOpenDatabase = errors.New("fails to open the database")
+ // ErrNoCurrentSnapshot is returned when a shard has no current
snapshot available.
+ ErrNoCurrentSnapshot = errors.New("no current snapshot available")
+ errOpenDatabase = errors.New("fails to open the database")
lfs = fs.NewLocalFileSystemWithLogger(logger.GetLogger("storage"))
)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 27df4885c..ca6b74ba9 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -268,14 +268,14 @@ func (d *database[T, O]) UpdateOptions(resourceOpts
*commonv1.ResourceOpts) {
d.segmentController.updateOptions(resourceOpts)
}
-func (d *database[T, O]) TakeFileSnapshot(dst string) (bool, error) {
+func (d *database[T, O]) TakeFileSnapshot(dst string) (success bool, err
error) {
if d.closed.Load() {
return false, errors.New("database is closed")
}
- segments, err := d.segmentController.segments(true)
- if err != nil {
- return false, errors.Wrap(err, "failed to get segments")
+ segments, segErr := d.segmentController.segments(true)
+ if segErr != nil {
+ return false, errors.Wrap(segErr, "failed to get segments")
}
defer func() {
for _, seg := range segments {
@@ -287,6 +287,12 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
(bool, error) {
return false, nil
}
+ defer func() {
+ if err != nil {
+ d.lfs.MustRMAll(dst)
+ }
+ }()
+
log.Info().Int("segment_count", len(segments)).Str("db_location",
d.location).
Msgf("taking file snapshot for %s", dst)
for _, seg := range segments {
@@ -296,14 +302,14 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
(bool, error) {
metadataSrc := filepath.Join(seg.location, metadataFilename)
metadataDest := filepath.Join(segPath, metadataFilename)
- if err := d.lfs.CreateHardLink(metadataSrc, metadataDest, nil);
err != nil {
- return false, errors.Wrapf(err, "failed to snapshot
metadata for segment %s", segDir)
+ if linkErr := d.lfs.CreateHardLink(metadataSrc, metadataDest,
nil); linkErr != nil {
+ return false, errors.Wrapf(linkErr, "failed to snapshot
metadata for segment %s", segDir)
}
indexPath := filepath.Join(segPath, seriesIndexDirName)
d.lfs.MkdirIfNotExist(indexPath, DirPerm)
- if err := seg.index.store.TakeFileSnapshot(indexPath); err !=
nil {
- return false, errors.Wrapf(err, "failed to snapshot
index for segment %s", segDir)
+ if indexErr := seg.index.store.TakeFileSnapshot(indexPath);
indexErr != nil {
+ return false, errors.Wrapf(indexErr, "failed to
snapshot index for segment %s", segDir)
}
sLst := seg.sLst.Load()
@@ -314,8 +320,13 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
(bool, error) {
shardDir := filepath.Base(shard.location)
shardPath := filepath.Join(segPath, shardDir)
d.lfs.MkdirIfNotExist(shardPath, DirPerm)
- if _, err := shard.table.TakeFileSnapshot(shardPath);
err != nil {
- return false, errors.Wrapf(err, "failed to
snapshot shard %s in segment %s", shardDir, segDir)
+ if _, shardErr :=
shard.table.TakeFileSnapshot(shardPath); shardErr != nil {
+ if errors.Is(shardErr, ErrNoCurrentSnapshot) {
+ log.Debug().Str("shard",
shardDir).Str("segment", segDir).
+ Msg("skipping empty shard
snapshot")
+ continue
+ }
+ return false, errors.Wrapf(shardErr, "failed to
snapshot shard %s in segment %s", shardDir, segDir)
}
}
}
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index d3b84e889..08b285d7a 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -308,6 +308,56 @@ func TestTakeFileSnapshot(t *testing.T) {
require.NoError(t, tsdb.Close())
})
+
+ t.Run("Take snapshot skips shard with no current snapshot", func(t
*testing.T) {
+ dir, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+
+ opts := TSDBOpts[*SnapshotMockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 7},
+ ShardNum: 1,
+ TSTableCreator: SnapshotMockTSTableCreator,
+ }
+
+ ctx := context.Background()
+ mc := timestamp.NewMockClock()
+ ts, err := time.ParseInLocation("2006-01-02 15:04:05",
"2024-05-01 00:00:00", time.Local)
+ require.NoError(t, err)
+ mc.Set(ts)
+ ctx = timestamp.SetClock(ctx, mc)
+
+ serviceCache := NewServiceCache()
+ tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+ require.NoError(t, err)
+ defer tsdb.Close()
+
+ normalSeg, err := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, err)
+ normalSegLocation := normalSeg.(*segment[*SnapshotMockTSTable,
any]).location
+ normalSeg.DecRef()
+
+ epochSeg, err := tsdb.CreateSegmentIfNotExist(time.Unix(0, 0))
+ require.NoError(t, err)
+ epochSegLocation := epochSeg.(*segment[*SnapshotMockTSTable,
any]).location
+ epochSeg.DecRef()
+
+ created, snapshotErr := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, snapshotErr,
+ "snapshot should not fail due to empty shard in epoch
segment")
+ require.True(t, created)
+
+ normalSegDir := filepath.Join(snapshotDir,
filepath.Base(normalSegLocation))
+ require.DirExists(t, normalSegDir,
+ "normal segment should be present in snapshot")
+
+ epochSegDir := filepath.Join(snapshotDir,
filepath.Base(epochSegLocation))
+ require.DirExists(t, epochSegDir,
+ "epoch segment directory should still be created")
+ })
}
func TestTSDBCollect(t *testing.T) {
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 80c1df9fd..fa8323a94 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -153,12 +153,17 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
snapshot := tst.currentSnapshot()
if snapshot == nil {
- return false, fmt.Errorf("no current snapshot available")
+ return false, storage.ErrNoCurrentSnapshot
}
defer snapshot.decRef()
+ defer func() {
+ if err != nil {
+ tst.fileSystem.MustRMAll(dst)
+ }
+ }()
hasDiskParts := false
for _, pw := range snapshot.parts {
@@ -171,8 +176,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool,
error) {
srcPath := part.path
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
- if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
+ if linkErr := tst.fileSystem.CreateHardLink(srcPath,
destPartPath, nil); linkErr != nil {
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, linkErr)
}
}
if !hasDiskParts {
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index dc8c51d6b..67aa4f896 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -812,3 +812,43 @@ func TestInitTSTableLoadsNewestWhenMultipleValid(t
*testing.T) {
sort.Slice(snapshots, func(i, j int) bool { return snapshots[i] >
snapshots[j] })
require.Equal(t, snapshots[0], epoch2, "must load newest valid
snapshot")
}
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ tst.Lock()
+ tst.snapshot = nil
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+ assert.False(t, created)
+
+ entries := fileSystem.ReadDir(snapshotPath)
+ assert.Empty(t, entries, "no files or dirs should remain when snapshot
is nil")
+}
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 67f79f335..e3fd0d9ff 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -195,21 +195,25 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
if tst.index == nil {
return false, fmt.Errorf("cannot take file snapshot: index is
not initialized for this tsTable")
}
- indexDir := filepath.Join(dst, filepath.Base(tst.index.location))
- tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
- if err := tst.index.store.TakeFileSnapshot(indexDir); err != nil {
- return false, fmt.Errorf("failed to take file snapshot for
index: %w", err)
- }
-
snapshot := tst.currentSnapshot()
if snapshot == nil {
- return false, fmt.Errorf("no current snapshot available")
+ return false, storage.ErrNoCurrentSnapshot
}
defer snapshot.decRef()
+ defer func() {
+ if err != nil {
+ tst.fileSystem.MustRMAll(dst)
+ }
+ }()
+ indexDir := filepath.Join(dst, filepath.Base(tst.index.location))
+ tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
+ if indexErr := tst.index.store.TakeFileSnapshot(indexDir); indexErr !=
nil {
+ return false, fmt.Errorf("failed to take file snapshot for
index: %w", indexErr)
+ }
hasDiskParts := false
for _, pw := range snapshot.parts {
@@ -221,8 +225,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool,
error) {
srcPath := part.path
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
- if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
+ if linkErr := tst.fileSystem.CreateHardLink(srcPath,
destPartPath, nil); linkErr != nil {
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, linkErr)
}
hasDiskParts = true
}
diff --git a/banyand/stream/snapshot_test.go b/banyand/stream/snapshot_test.go
index bfbad9c79..d98560b19 100644
--- a/banyand/stream/snapshot_test.go
+++ b/banyand/stream/snapshot_test.go
@@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -609,3 +610,43 @@ func TestTakeFileSnapshotNoDiskParts(t *testing.T) {
}
assert.True(t, hasIndex, "expected index directory in snapshot")
}
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ tst.Lock()
+ tst.snapshot = nil
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+ assert.False(t, created)
+
+ entries := fileSystem.ReadDir(snapshotPath)
+ assert.Empty(t, entries, "no files or dirs should remain when snapshot
is nil")
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 9b8d3a485..6c120d7d3 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -217,19 +217,25 @@ func parseSnapshot(name string) (uint64, error) {
return parseEpoch(name[:16])
}
-func (tst *tsTable) TakeFileSnapshot(dst string) (bool, error) {
+func (tst *tsTable) TakeFileSnapshot(dst string) (success bool, err error) {
+ snapshot := tst.currentSnapshot()
+ if snapshot == nil {
+ return false, storage.ErrNoCurrentSnapshot
+ }
+ defer snapshot.decRef()
+ defer func() {
+ if err != nil {
+ tst.fileSystem.MustRMAll(dst)
+ }
+ }()
+
for k, v := range tst.sidxMap {
indexDir := filepath.Join(dst, sidxDirName, k)
tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
- if err := v.TakeFileSnapshot(indexDir); err != nil {
- return false, fmt.Errorf("failed to take file snapshot
for index, %s: %w", k, err)
+ if sidxErr := v.TakeFileSnapshot(indexDir); sidxErr != nil {
+ return false, fmt.Errorf("failed to take file snapshot
for index, %s: %w", k, sidxErr)
}
}
- snapshot := tst.currentSnapshot()
- if snapshot == nil {
- return false, fmt.Errorf("no current snapshot available")
- }
- defer snapshot.decRef()
hasDiskParts := false
for _, pw := range snapshot.parts {
@@ -241,8 +247,8 @@ func (tst *tsTable) TakeFileSnapshot(dst string) (bool,
error) {
srcPath := part.path
destPartPath := filepath.Join(dst, filepath.Base(srcPath))
- if err := tst.fileSystem.CreateHardLink(srcPath, destPartPath,
nil); err != nil {
- return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, err)
+ if linkErr := tst.fileSystem.CreateHardLink(srcPath,
destPartPath, nil); linkErr != nil {
+ return false, fmt.Errorf("failed to create snapshot for
part %d: %w", part.partMetadata.ID, linkErr)
}
hasDiskParts = true
}
diff --git a/banyand/trace/snapshot_test.go b/banyand/trace/snapshot_test.go
index 9f3db9fcc..f8909c9dd 100644
--- a/banyand/trace/snapshot_test.go
+++ b/banyand/trace/snapshot_test.go
@@ -27,6 +27,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/convert"
@@ -738,3 +739,48 @@ func TestTakeFileSnapshotNoDiskPartsWithoutSidx(t
*testing.T) {
require.NoError(t, err)
assert.False(t, created, "TakeFileSnapshot should return false when
sidxMap is empty and no disk parts")
}
+
+func TestTakeFileSnapshotEmptySegment(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ tabDir := filepath.Join(tmpPath, "tab")
+ fileSystem.MkdirPanicIfExist(tabDir, 0o755)
+
+ tst, err := newTSTable(
+ fileSystem,
+ tabDir,
+ common.Position{},
+ logger.GetLogger("test"),
+ timestamp.TimeRange{},
+ option{
+ flushTimeout: 0,
+ mergePolicy: newDefaultMergePolicy(),
+ protector: protector.Nop{},
+ },
+ nil,
+ )
+ require.NoError(t, err)
+ defer tst.Close()
+
+ // Populate sidxMap so the test guards against the prior bug where the
+ // sidxMap loop created sidx directories before the nil-snapshot check.
+ _, err = tst.getOrCreateSidx("test_sidx")
+ require.NoError(t, err)
+
+ tst.Lock()
+ tst.snapshot = nil
+ tst.Unlock()
+
+ snapshotPath := filepath.Join(tmpPath, "snapshot")
+ fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
+
+ created, err := tst.TakeFileSnapshot(snapshotPath)
+ require.ErrorIs(t, err, storage.ErrNoCurrentSnapshot)
+ assert.False(t, created)
+
+ entries := fileSystem.ReadDir(snapshotPath)
+ assert.Empty(t, entries, "no files or dirs should remain when snapshot
is nil")
+}