This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit ce600a7029dd1c18f52e6eac430f54a69615b81f
Author: Gao Hongtao <[email protected]>
AuthorDate: Tue Apr 28 15:42:20 2026 +0800

    fix(measure,stream,trace): wait until all mem parts flushed in 
file_snapshot tests (#1098)
---
 CHANGES.md                           |  6 ++++
 banyand/measure/query_test.go        | 64 ++++++++++++++++++------------------
 banyand/stream/block_scanner_test.go | 42 ++++++++++++++---------
 banyand/stream/query_by_idx_test.go  | 22 ++++---------
 banyand/trace/query_test.go          | 42 ++++++++++++++---------
 5 files changed, 96 insertions(+), 80 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1a0884ddd..eac054fab 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,12 @@ Release Notes.
 - Add validation for MATCH and IN conditions in inverted index query builder, 
and handle nil OR branch when all entities are specific.
 - Fix wrong backup path of schema property.
 - Fix lifecycle migration failure when the target stage has `close: true`.
+- Fix stale sync request blocking watch session channel, causing repeated 
"channel full, skipping session" errors when a watch stream is in backoff.
+- Fix nil pointer panic in disk monitor when group schema is not yet 
initialized during early startup, and ensure monitor loop survives recovered 
panics.
+- Fix `FileSystemError` not satisfying `errors.Is(err, io/fs.ErrNotExist)`, 
which prevented the segment controller from cleaning up half-born segment 
directories and left groups in a permanent zombie state after a crash or 
partial sync.
+- Fix lifecycle migration panic when a stream shard's snapshot has no element 
index (`idx/`) directory.
+- Avoid FODC lifecycle inspection failing on busy data nodes by raising the 
per-broadcast `CollectDataInfo` / `CollectLiaisonInfo` deadline from 5s to 30s 
and parallelizing per-group inspection in the cluster-internal `InspectAll`.
+- Fix flaky `file_snapshot` subtest in measure/stream/trace by waiting until 
every introduced mem part has been flushed to disk, instead of only checking 
the latest snapshot creator.
 
 ### Chores
 
diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go
index 66ab60779..c72a19719 100644
--- a/banyand/measure/query_test.go
+++ b/banyand/measure/query_test.go
@@ -1335,23 +1335,13 @@ func TestQueryResult(t *testing.T) {
                                        tst.mustAddDataPoints(dps)
                                        time.Sleep(100 * time.Millisecond)
                                }
-                               // wait until the introducer is done
+                               // wait until every introduced mem part has 
been flushed to disk;
+                               // inspecting only snapshot.creator races with 
concurrent introductions
+                               // (a flush of part N can land while part N+1 
is still a mem part).
                                if len(tt.dpsList) > 0 {
-                                       for {
-                                               snp := tst.currentSnapshot()
-                                               if snp == nil {
-                                                       time.Sleep(100 * 
time.Millisecond)
-                                                       continue
-                                               }
-                                               if snp.creator == 
snapshotCreatorMemPart {
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
-                                                       continue
-                                               }
-                                               snp.decRef()
-                                               tst.Close()
-                                               break
-                                       }
+                                       require.Eventually(t, 
allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond,
+                                               "mem parts not flushed to disk 
in time")
+                                       tst.Close()
                                }
 
                                // reopen the table
@@ -1368,6 +1358,26 @@ func TestQueryResult(t *testing.T) {
        }
 }
 
+// allPartsFlushed returns a predicate that is true once tst's current
+// snapshot exists and every part is file-backed (no mp != nil).
+// It is used by file_snapshot tests to wait until every introduced mem
+// part has been flushed before closing/reopening the table.
+func allPartsFlushed(tst *tsTable) func() bool {
+       return func() bool {
+               snp := tst.currentSnapshot()
+               if snp == nil {
+                       return false
+               }
+               defer snp.decRef()
+               for _, pw := range snp.parts {
+                       if pw.mp != nil {
+                               return false
+                       }
+               }
+               return true
+       }
+}
+
 func TestQueryResult_QuotaExceeded(t *testing.T) {
        tests := []struct {
                wantErr             error
@@ -1438,23 +1448,13 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                                tst.mustAddDataPoints(dps)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       // wait until the introducer is done
+                       // wait until every introduced mem part has been 
flushed to disk;
+                       // inspecting only snapshot.creator races with 
concurrent introductions
+                       // (a flush of part N can land while part N+1 is still 
a mem part).
                        if len(tt.dpsList) > 0 {
-                               for {
-                                       snp := tst.currentSnapshot()
-                                       if snp == nil {
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       if snp.creator == 
snapshotCreatorMemPart {
-                                               snp.decRef()
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       snp.decRef()
-                                       tst.Close()
-                                       break
-                               }
+                               require.Eventually(t, allPartsFlushed(tst), 
30*time.Second, 100*time.Millisecond,
+                                       "mem parts not flushed to disk in time")
+                               tst.Close()
                        }
 
                        // reopen the table
diff --git a/banyand/stream/block_scanner_test.go 
b/banyand/stream/block_scanner_test.go
index b7248b7c6..ac592b9d7 100644
--- a/banyand/stream/block_scanner_test.go
+++ b/banyand/stream/block_scanner_test.go
@@ -87,23 +87,13 @@ func TestBlockScanner_QuotaExceeded(t *testing.T) {
                                tst.mustAddElements(es)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       // wait until the introducer is done
+                       // wait until every introduced mem part has been 
flushed to disk;
+                       // inspecting only snapshot.creator races with 
concurrent introductions
+                       // (a flush of part N can land while part N+1 is still 
a mem part).
                        if len(tt.esList) > 0 {
-                               for {
-                                       snp := tst.currentSnapshot()
-                                       if snp == nil {
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       if snp.creator == 
snapshotCreatorMemPart {
-                                               snp.decRef()
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       snp.decRef()
-                                       tst.Close()
-                                       break
-                               }
+                               require.Eventually(t, allPartsFlushed(tst), 
30*time.Second, 100*time.Millisecond,
+                                       "mem parts not flushed to disk in time")
+                               tst.Close()
                        }
 
                        // reopen the table
@@ -204,3 +194,23 @@ func TestBlockScanner_QuotaExceeded(t *testing.T) {
                })
        }
 }
+
+// allPartsFlushed returns a predicate that is true once tst's current
+// snapshot exists and every part is file-backed (no mp != nil).
+// It is used by file_snapshot tests to wait until every introduced mem
+// part has been flushed before closing/reopening the table.
+func allPartsFlushed(tst *tsTable) func() bool {
+       return func() bool {
+               snp := tst.currentSnapshot()
+               if snp == nil {
+                       return false
+               }
+               defer snp.decRef()
+               for _, pw := range snp.parts {
+                       if pw.mp != nil {
+                               return false
+                       }
+               }
+               return true
+       }
+}
diff --git a/banyand/stream/query_by_idx_test.go 
b/banyand/stream/query_by_idx_test.go
index 526546593..4a695bd8e 100644
--- a/banyand/stream/query_by_idx_test.go
+++ b/banyand/stream/query_by_idx_test.go
@@ -85,23 +85,13 @@ func TestQueryResult_QuotaExceeded(t *testing.T) {
                                tst.mustAddElements(es)
                                time.Sleep(100 * time.Millisecond)
                        }
-                       // wait until the introducer is done
+                       // wait until every introduced mem part has been 
flushed to disk;
+                       // inspecting only snapshot.creator races with 
concurrent introductions
+                       // (a flush of part N can land while part N+1 is still 
a mem part).
                        if len(tt.esList) > 0 {
-                               for {
-                                       snp := tst.currentSnapshot()
-                                       if snp == nil {
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       if snp.creator == 
snapshotCreatorMemPart {
-                                               snp.decRef()
-                                               time.Sleep(100 * 
time.Millisecond)
-                                               continue
-                                       }
-                                       snp.decRef()
-                                       tst.Close()
-                                       break
-                               }
+                               require.Eventually(t, allPartsFlushed(tst), 
30*time.Second, 100*time.Millisecond,
+                                       "mem parts not flushed to disk in time")
+                               tst.Close()
                        }
 
                        // reopen the table
diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go
index f318d508d..54abde693 100644
--- a/banyand/trace/query_test.go
+++ b/banyand/trace/query_test.go
@@ -202,23 +202,13 @@ func TestQueryResult(t *testing.T) {
                                        tst.mustAddTraces(traces, nil)
                                        time.Sleep(100 * time.Millisecond)
                                }
-                               // wait until the introducer is done
+                               // wait until every introduced mem part has 
been flushed to disk;
+                               // inspecting only snapshot.creator races with 
concurrent introductions
+                               // (a flush of part N can land while part N+1 
is still a mem part).
                                if len(tt.tracesList) > 0 {
-                                       for {
-                                               snp := tst.currentSnapshot()
-                                               if snp == nil {
-                                                       time.Sleep(100 * 
time.Millisecond)
-                                                       continue
-                                               }
-                                               if snp.creator == 
snapshotCreatorMemPart {
-                                                       snp.decRef()
-                                                       time.Sleep(100 * 
time.Millisecond)
-                                                       continue
-                                               }
-                                               snp.decRef()
-                                               tst.Close()
-                                               break
-                                       }
+                                       require.Eventually(t, 
allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond,
+                                               "mem parts not flushed to disk 
in time")
+                                       tst.Close()
                                }
 
                                // reopen the table
@@ -457,3 +447,23 @@ func TestQueryResultMultipleBatches(t *testing.T) {
                })
        }
 }
+
+// allPartsFlushed returns a predicate that is true once tst's current
+// snapshot exists and every part is file-backed (no mp != nil).
+// It is used by file_snapshot tests to wait until every introduced mem
+// part has been flushed before closing/reopening the table.
+func allPartsFlushed(tst *tsTable) func() bool {
+       return func() bool {
+               snp := tst.currentSnapshot()
+               if snp == nil {
+                       return false
+               }
+               defer snp.decRef()
+               for _, pw := range snp.parts {
+                       if pw.mp != nil {
+                               return false
+                       }
+               }
+               return true
+       }
+}

Reply via email to