This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch fix/flaky-measure-snapshot-tests
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 87a42b43724cf07a51f0e21cc8fd502a4ed0333b
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed Jun 3 13:16:05 2026 +0000

    Fix flaky measure snapshot tests gating on part dir instead of flush persist
    
    The snapshot tests waited for a part directory to appear in tabDir as the
    flush-completion gate, but that directory is created by the first line of
    memPart.mustFlush, before the mem->file introduction reaches the in-memory
    snapshot and before the .snp manifest is persisted. Under -race/CI load the
    gate fired early, so TakeFileSnapshot saw only mem parts (got 0 entries) and
    Close could drop the in-flight flush (no .snp persisted).
    
    Add waitForPersistedSnapshot, which gates on the .snp manifest existing, and
    apply it to the four affected tests; also assert success from 
TakeFileSnapshot.
---
 CHANGES.md                       |  1 +
 banyand/measure/snapshot_test.go | 71 ++++++++++++++++------------------------
 2 files changed, 29 insertions(+), 43 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index e7de1c140..2170765a8 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -103,6 +103,7 @@ Release Notes.
 - Fix lifecycle migration placing data in the wrong target segment when the 
source segment interval is not a multiple of the target stage's interval, by 
row-level replaying parts that straddle a target-segment boundary instead of 
chunk-copying them into a single segment.
 - Fix trace query identity-tag projection: when `trace_id`/`span_id` are 
explicitly projected, reconstruct them from span identity at response build 
time instead of requesting them as stored tags, and preserve tag order with 
null-filled per-span value alignment in the distributed trace result iterator.
 - Fix measure, stream, and trace queries returning data from segments already 
expired by the TTL. Retention removes a segment only on its next scheduled run, 
so a fully expired segment can linger on disk and keep serving TTL-expired 
data; queries now skip segments whose whole time range is past the retention 
deadline, matching retention's own removal condition.
+- Fix flaky measure snapshot tests that gated on the part directory appearing 
in `tab/` as the flush-completion signal. That directory is created by the 
first line of `memPart.mustFlush`, before the mem→file introduction reaches the 
in-memory snapshot and before the `.snp` manifest is persisted, so under 
`-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close` 
could drop the in-flight flush; gate on the persisted `.snp` manifest instead.
 
 ### Chores
 
diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go
index 67aa4f896..272b2a79e 100644
--- a/banyand/measure/snapshot_test.go
+++ b/banyand/measure/snapshot_test.go
@@ -436,6 +436,24 @@ func TestSnapshotRemove(t *testing.T) {
        }
 }
 
+// waitForPersistedSnapshot blocks until a flush has been introduced and its
+// ".snp" manifest persisted. Waiting for the part directory alone is racy: the
+// directory is created by the first line of memPart.mustFlush, before the
+// mem->file introduction reaches the in-memory snapshot and before the 
manifest
+// is written, so the directory can exist while the snapshot still holds only
+// mem parts and no ".snp" has been persisted.
+func waitForPersistedSnapshot(t *testing.T, fileSystem fs.FileSystem, tabDir 
string) {
+       t.Helper()
+       require.Eventually(t, func() bool {
+               for _, e := range fileSystem.ReadDir(tabDir) {
+                       if !e.IsDir() && filepath.Ext(e.Name()) == 
snapshotSuffix {
+                               return true
+                       }
+               }
+               return false
+       }, flags.EventuallyTimeout, time.Millisecond, "wait for snapshot 
manifest to be persisted")
+}
+
 func TestSnapshotFunctionality(t *testing.T) {
        fileSystem := fs.NewLocalFileSystem()
 
@@ -465,25 +483,19 @@ func TestSnapshotFunctionality(t *testing.T) {
 
        tst.mustAddDataPoints(dpsTS1)
        tst.mustAddDataPoints(dpsTS2)
-       time.Sleep(100 * time.Millisecond) // allow time for flushing
-
-       require.Eventually(t, func() bool {
-               dd := fileSystem.ReadDir(tabDir)
-               partNum := 0
-               for _, d := range dd {
-                       if d.IsDir() {
-                               partNum++
-                       }
-               }
-               return partNum >= 1
-       }, flags.EventuallyTimeout, time.Millisecond, "wait for file parts to 
be created")
+       // Wait until at least one flush has been introduced and persisted, not 
merely
+       // until a part directory appears (which happens before the mem->file
+       // introduction reaches the snapshot).
+       waitForPersistedSnapshot(t, fileSystem, tabDir)
 
        snapshotPath := filepath.Join(tmpPath, "snapshot")
        fileSystem.MkdirIfNotExist(snapshotPath, 0o755)
 
-       if _, err := tst.TakeFileSnapshot(snapshotPath); err != nil {
+       success, err := tst.TakeFileSnapshot(snapshotPath)
+       if err != nil {
                t.Fatalf("TakeFileSnapshot failed: %v", err)
        }
+       require.True(t, success, "TakeFileSnapshot should report success when 
disk parts exist")
 
        entries := fileSystem.ReadDir(snapshotPath)
 
@@ -590,16 +602,7 @@ func TestTolerantLoaderFallbackToOlderSnapshot(t 
*testing.T) {
                timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: 
newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil)
        require.NoError(t, err)
        tst.mustAddDataPoints(dpsTS1)
-       time.Sleep(100 * time.Millisecond)
-       require.Eventually(t, func() bool {
-               dd := fileSystem.ReadDir(tabDir)
-               for _, d := range dd {
-                       if d.IsDir() && d.Name() != storage.FailedPartsDirName {
-                               return true
-                       }
-               }
-               return false
-       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       waitForPersistedSnapshot(t, fileSystem, tabDir)
        tst.Close()
        snapshots := make([]uint64, 0)
        for _, e := range fileSystem.ReadDir(tabDir) {
@@ -635,16 +638,7 @@ func 
TestMeasureInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t *testing.T
                timestamp.TimeRange{}, testSnapshotOption(), nil)
        require.NoError(t, err)
        tst.mustAddDataPoints(dpsTS1)
-       time.Sleep(100 * time.Millisecond)
-       require.Eventually(t, func() bool {
-               dd := fileSystem.ReadDir(tabDir)
-               for _, d := range dd {
-                       if d.IsDir() && d.Name() != storage.FailedPartsDirName {
-                               return true
-                       }
-               }
-               return false
-       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       waitForPersistedSnapshot(t, fileSystem, tabDir)
        tst.Close()
        snapshots := make([]uint64, 0)
        for _, e := range fileSystem.ReadDir(tabDir) {
@@ -786,16 +780,7 @@ func TestInitTSTableLoadsNewestWhenMultipleValid(t 
*testing.T) {
                timestamp.TimeRange{}, testSnapshotOption(), nil)
        require.NoError(t, err)
        tst.mustAddDataPoints(dpsTS1)
-       time.Sleep(100 * time.Millisecond)
-       require.Eventually(t, func() bool {
-               dd := fileSystem.ReadDir(tabDir)
-               for _, d := range dd {
-                       if d.IsDir() && d.Name() != storage.FailedPartsDirName {
-                               return true
-                       }
-               }
-               return false
-       }, flags.EventuallyTimeout, time.Millisecond, "wait for part")
+       waitForPersistedSnapshot(t, fileSystem, tabDir)
        tst.Close()
        snapshots := make([]uint64, 0)
        for _, e := range fileSystem.ReadDir(tabDir) {

Reply via email to