This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch fix/flaky-measure-snapshot-tests in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 87a42b43724cf07a51f0e21cc8fd502a4ed0333b Author: Hongtao Gao <[email protected]> AuthorDate: Wed Jun 3 13:16:05 2026 +0000 Fix flaky measure snapshot tests gating on part dir instead of flush persist The snapshot tests waited for a part directory to appear in tabDir as the flush-completion gate, but that directory is created by the first line of memPart.mustFlush, before the mem->file introduction reaches the in-memory snapshot and before the .snp manifest is persisted. Under -race/CI load the gate fired early, so TakeFileSnapshot saw only mem parts (got 0 entries) and Close could drop the in-flight flush (no .snp persisted). Add waitForPersistedSnapshot, which gates on the .snp manifest existing, and apply it to the four affected tests; also assert success from TakeFileSnapshot. --- CHANGES.md | 1 + banyand/measure/snapshot_test.go | 71 ++++++++++++++++------------------------ 2 files changed, 29 insertions(+), 43 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e7de1c140..2170765a8 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -103,6 +103,7 @@ Release Notes. - Fix lifecycle migration placing data in the wrong target segment when the source segment interval is not a multiple of the target stage's interval, by row-level replaying parts that straddle a target-segment boundary instead of chunk-copying them into a single segment. - Fix trace query identity-tag projection: when `trace_id`/`span_id` are explicitly projected, reconstruct them from span identity at response build time instead of requesting them as stored tags, and preserve tag order with null-filled per-span value alignment in the distributed trace result iterator. - Fix measure, stream, and trace queries returning data from segments already expired by the TTL. Retention removes a segment only on its next scheduled run, so a fully expired segment can linger on disk and keep serving TTL-expired data; queries now skip segments whose whole time range is past the retention deadline, matching retention's own removal condition. +- Fix flaky measure snapshot tests that gated on the part directory appearing in `tab/` as the flush-completion signal. That directory is created by the first line of `memPart.mustFlush`, before the mem→file introduction reaches the in-memory snapshot and before the `.snp` manifest is persisted, so under `-race`/CI load `TakeFileSnapshot` could observe only mem parts and `Close` could drop the in-flight flush; gate on the persisted `.snp` manifest instead. ### Chores diff --git a/banyand/measure/snapshot_test.go b/banyand/measure/snapshot_test.go index 67aa4f896..272b2a79e 100644 --- a/banyand/measure/snapshot_test.go +++ b/banyand/measure/snapshot_test.go @@ -436,6 +436,24 @@ func TestSnapshotRemove(t *testing.T) { } } +// waitForPersistedSnapshot blocks until a flush has been introduced and its +// ".snp" manifest persisted. Waiting for the part directory alone is racy: the +// directory is created by the first line of memPart.mustFlush, before the +// mem->file introduction reaches the in-memory snapshot and before the manifest +// is written, so the directory can exist while the snapshot still holds only +// mem parts and no ".snp" has been persisted. +func waitForPersistedSnapshot(t *testing.T, fileSystem fs.FileSystem, tabDir string) { + t.Helper() + require.Eventually(t, func() bool { + for _, e := range fileSystem.ReadDir(tabDir) { + if !e.IsDir() && filepath.Ext(e.Name()) == snapshotSuffix { + return true + } + } + return false + }, flags.EventuallyTimeout, time.Millisecond, "wait for snapshot manifest to be persisted") +} + func TestSnapshotFunctionality(t *testing.T) { fileSystem := fs.NewLocalFileSystem() @@ -465,25 +483,19 @@ func TestSnapshotFunctionality(t *testing.T) { tst.mustAddDataPoints(dpsTS1) tst.mustAddDataPoints(dpsTS2) - time.Sleep(100 * time.Millisecond) // allow time for flushing - - require.Eventually(t, func() bool { - dd := fileSystem.ReadDir(tabDir) - partNum := 0 - for _, d := range dd { - if d.IsDir() { - partNum++ - } - } - return partNum >= 1 - }, flags.EventuallyTimeout, time.Millisecond, "wait for file parts to be created") + // Wait until at least one flush has been introduced and persisted, not merely + // until a part directory appears (which happens before the mem->file + // introduction reaches the snapshot). + waitForPersistedSnapshot(t, fileSystem, tabDir) snapshotPath := filepath.Join(tmpPath, "snapshot") fileSystem.MkdirIfNotExist(snapshotPath, 0o755) - if _, err := tst.TakeFileSnapshot(snapshotPath); err != nil { + success, err := tst.TakeFileSnapshot(snapshotPath) + if err != nil { t.Fatalf("TakeFileSnapshot failed: %v", err) } + require.True(t, success, "TakeFileSnapshot should report success when disk parts exist") entries := fileSystem.ReadDir(snapshotPath) @@ -590,16 +602,7 @@ func TestTolerantLoaderFallbackToOlderSnapshot(t *testing.T) { timestamp.TimeRange{}, option{flushTimeout: 0, mergePolicy: newDefaultMergePolicyForTesting(), protector: protector.Nop{}}, nil) require.NoError(t, err) tst.mustAddDataPoints(dpsTS1) - time.Sleep(100 * time.Millisecond) - require.Eventually(t, func() bool { - dd := fileSystem.ReadDir(tabDir) - for _, d := range dd { - if d.IsDir() && d.Name() != storage.FailedPartsDirName { - return true - } - } - return false - }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + waitForPersistedSnapshot(t, fileSystem, tabDir) tst.Close() snapshots := make([]uint64, 0) for _, e := range fileSystem.ReadDir(tabDir) { @@ -635,16 +638,7 @@ func TestMeasureInitTSTableDeletesMultipleFailedSnapshotsOnFallback(t *testing.T timestamp.TimeRange{}, testSnapshotOption(), nil) require.NoError(t, err) tst.mustAddDataPoints(dpsTS1) - time.Sleep(100 * time.Millisecond) - require.Eventually(t, func() bool { - dd := fileSystem.ReadDir(tabDir) - for _, d := range dd { - if d.IsDir() && d.Name() != storage.FailedPartsDirName { - return true - } - } - return false - }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + waitForPersistedSnapshot(t, fileSystem, tabDir) tst.Close() snapshots := make([]uint64, 0) for _, e := range fileSystem.ReadDir(tabDir) { @@ -786,16 +780,7 @@ func TestInitTSTableLoadsNewestWhenMultipleValid(t *testing.T) { timestamp.TimeRange{}, testSnapshotOption(), nil) require.NoError(t, err) tst.mustAddDataPoints(dpsTS1) - time.Sleep(100 * time.Millisecond) - require.Eventually(t, func() bool { - dd := fileSystem.ReadDir(tabDir) - for _, d := range dd { - if d.IsDir() && d.Name() != storage.FailedPartsDirName { - return true - } - } - return false - }, flags.EventuallyTimeout, time.Millisecond, "wait for part") + waitForPersistedSnapshot(t, fileSystem, tabDir) tst.Close() snapshots := make([]uint64, 0) for _, e := range fileSystem.ReadDir(tabDir) {
