This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 3366d2a76 Fix flaky measure snapshot tests gating on part dir instead
of flush persist (#1152)
3366d2a76 is described below
commit 3366d2a764fb404dae3ff44ab702a01cb8f77f73
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Jun 4 09:36:18 2026 +0800
Fix flaky measure snapshot tests gating on part dir instead of flush
persist (#1152)
---
CHANGES.md | 1 +
banyand/measure/snapshot_test.go | 71 ++++++++++++++++------------------------
2 files changed, 29 insertions(+), 43 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index e7de1c140..2170765a8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -103,6 +103,7 @@ Release Notes.
- Fix lifecycle migration placing data in the wrong target segment when the
source segment interval is not a multiple of the target stage's interval, by
row-level replaying parts that straddle a target-segment boundary instead of
chunk-copying them into a single segment.
- Fix trace query identity-tag projection: when `trace_id`/`span_id` are
explicitly projected, reconstruct them from span identity at response build
time instead of requesting them as stored tags, and preserve tag order with
null-filled per-span value alignment in the distributed trace result iterator.
- Fix measure, stream, and trace queries returning data from segments already
expired by the TTL. Retention removes a segment only on its next scheduled run,
so a fully expired segment can linger on disk and keep serving TTL-expired
data; queries now skip segments whose whole time range is past the retention
deadline, matching retention's own removal condition.
+- Fix flaky measure snapshot tests that gated on the part directory appearing
in `tab/` as the flush-completion signal. That directory is created by the
first line of `memPart.mustFlush`, before the mem→file introduction reaches the
in-memory snapshot and before the `.snp` manifest is persisted, so under
`-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close`
could drop the in-flight flush; gate on the persisted `.snp` manifest instead.
### Chores
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index 67aa4f896..272b2a79e 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -436,6 +436,24 @@ func TestSnapshotRemove(t *testing.T) {
}
}
+// waitForPersistedSnapshot blocks until a flush has been introduced and its
+// ".snp" manifest persisted. Waiting for the part directory alone is racy: the
+// directory is created by the first line of memPart.mustFlush, before the
+// mem->file introduction reaches the in-memory snapshot and before the
manifest
+// is written, so the directory can exist while the snapshot still holds only
+// mem parts and no ".snp" has been persisted.
+func waitForPersistedSnapshot(t *testing.T, fileSystem fs.FileSystem, tabDir
string) {
+ t.Helper()
+ require.Eventually(t, func() bool {
+ for _, e := range fileSystem.ReadDir(tabDir) {
+ if !e.IsDir() && filepath.Ext(e.Name()) ==
snapshotSuffix {
+ return true
+ }
+ }
+ return false
+ }, flags.EventuallyTimeout, time.Millisecond, "wait for snapshot
manifest to be persisted")
+}
+
func TestSnapshotFunctionality(t *testing.T) {
fileSystem := fs.NewLocalFileSystem()
@@ -465,25 +483,19 @@ func TestSnapshotFunctionality(t *testing.T) {
tst.mustAddDataPoints(dpsTS1)
tst.mustAddDataPoints(dpsTS2)
- time.Sleep(100 * time.Millisecond) // allow time for flushing
-
- require.Eventually(t, func() bool {
- dd := fileSystem.ReadDir(tabDir)
- partNum := 0
- for _, d := range dd {
- if d.IsDir() {
- partNum++
- }
- }
- return partNum >= 1
- }, flags.EventuallyTimeout, time.Millisecond, "wait for file parts to
be created")
+ // Wait until at least one flush has been introduced and persisted, not
merely
+ // until a part directory appears (which happens before the mem->file
+ // introduction reaches the snapshot).
+ waitForPersistedSnapshot(t, fileSystem, tabDir)
snapshotPath := filepath.Join(tmpPath, "snapshot")
fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
- if _, err := tst.TakeFileSnapshot(snapshotPath); err != nil {
+ success, err := tst.TakeFileSnapshot(snapshotPath)
+ if err != nil {
t.Fatalf("TakeFileSnapshot failed: %v", err)
}
+ require.True(t, success, "TakeFileSnapshot should report success when
disk parts exist")
entries := fileSystem.ReadDir(snapshotPath)
@@ -590,16 +602,7 @@ func TestTolerantLoaderFallbackToOlderSnapshot(t
*testing.T) {
timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy:
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
require.NoError(t, err)
tst.mustAddDataPoints(dpsTS1)
- time.Sleep(100 * time.Millisecond)
- require.Eventually(t, func() bool {
- dd := fileSystem.ReadDir(tabDir)
- for _, d := range dd {
- if d.IsDir() && d.Name() != storage.FailedPartsDirName {
- return true
- }
- }
- return false
- }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ waitForPersistedSnapshot(t, fileSystem, tabDir)
tst.Close()
snapshots := make([]uint64, 0)
for _, e := range fileSystem.ReadDir(tabDir) {
@@ -635,16 +638,7 @@ func
TestMeasureInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t *testing.T
timestamp.TimeRange{}, testSnapshotOption(), nil)
require.NoError(t, err)
tst.mustAddDataPoints(dpsTS1)
- time.Sleep(100 * time.Millisecond)
- require.Eventually(t, func() bool {
- dd := fileSystem.ReadDir(tabDir)
- for _, d := range dd {
- if d.IsDir() && d.Name() != storage.FailedPartsDirName {
- return true
- }
- }
- return false
- }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ waitForPersistedSnapshot(t, fileSystem, tabDir)
tst.Close()
snapshots := make([]uint64, 0)
for _, e := range fileSystem.ReadDir(tabDir) {
@@ -786,16 +780,7 @@ func TestInitTSTableLoadsNewestWhenMultipleValid(t
*testing.T) {
timestamp.TimeRange{}, testSnapshotOption(), nil)
require.NoError(t, err)
tst.mustAddDataPoints(dpsTS1)
- time.Sleep(100 * time.Millisecond)
- require.Eventually(t, func() bool {
- dd := fileSystem.ReadDir(tabDir)
- for _, d := range dd {
- if d.IsDir() && d.Name() != storage.FailedPartsDirName {
- return true
- }
- }
- return false
- }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+ waitForPersistedSnapshot(t, fileSystem, tabDir)
tst.Close()
snapshots := make([]uint64, 0)
for _, e := range fileSystem.ReadDir(tabDir) {