This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit ce600a7029dd1c18f52e6eac430f54a69615b81f Author: Gao Hongtao <[email protected]> AuthorDate: Tue Apr 28 15:42:20 2026 +0800 fix(measure,stream,trace): wait until all mem parts flushed in file_snapshot tests (#1098) --- CHANGES.md | 6 ++++ banyand/measure/query_test.go | 64 ++++++++++++++++++------------------ banyand/stream/block_scanner_test.go | 42 ++++++++++++++--------- banyand/stream/query_by_idx_test.go | 22 ++++--------- banyand/trace/query_test.go | 42 ++++++++++++++--------- 5 files changed, 96 insertions(+), 80 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1a0884ddd..eac054fab 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -22,6 +22,12 @@ Release Notes. - Add validation for MATCH and IN conditions in inverted index query builder, and handle nil OR branch when all entities are specific. - Fix wrong backup path of schema property. - Fix lifecycle migration failure when the target stage has `close: true`. +- Fix stale sync request blocking watch session channel, causing repeated "channel full, skipping session" errors when a watch stream is in backoff. +- Fix nil pointer panic in disk monitor when group schema is not yet initialized during early startup, and ensure monitor loop survives recovered panics. +- Fix `FileSystemError` not satisfying `errors.Is(err, io/fs.ErrNotExist)`, which prevented the segment controller from cleaning up half-born segment directories and left groups in a permanent zombie state after a crash or partial sync. +- Fix lifecycle migration panic when a stream shard's snapshot has no element index (`idx/`) directory. +- Avoid FODC lifecycle inspection failing on busy data nodes by raising the per-broadcast `CollectDataInfo` / `CollectLiaisonInfo` deadline from 5s to 30s and parallelizing per-group inspection in the cluster-internal `InspectAll`. +- Fix flaky `file_snapshot` subtest in measure/stream/trace by waiting until every introduced mem part has been flushed to disk, instead of only checking the latest snapshot creator. ### Chores diff --git a/banyand/measure/query_test.go b/banyand/measure/query_test.go index 66ab60779..c72a19719 100644 --- a/banyand/measure/query_test.go +++ b/banyand/measure/query_test.go @@ -1335,23 +1335,13 @@ func TestQueryResult(t *testing.T) { tst.mustAddDataPoints(dps) time.Sleep(100 * time.Millisecond) } - // wait until the introducer is done + // wait until every introduced mem part has been flushed to disk; + // inspecting only snapshot.creator races with concurrent introductions + // (a flush of part N can land while part N+1 is still a mem part). if len(tt.dpsList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator == snapshotCreatorMemPart { - snp.decRef() - time.Sleep(100 * time.Millisecond) - continue - } - snp.decRef() - tst.Close() - break - } + require.Eventually(t, allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond, + "mem parts not flushed to disk in time") + tst.Close() } // reopen the table @@ -1368,6 +1358,26 @@ func TestQueryResult(t *testing.T) { } } +// allPartsFlushed returns a predicate that is true once tst's current +// snapshot exists and every part is file-backed (no mp != nil). +// It is used by file_snapshot tests to wait until every introduced mem +// part has been flushed before closing/reopening the table. +func allPartsFlushed(tst *tsTable) func() bool { + return func() bool { + snp := tst.currentSnapshot() + if snp == nil { + return false + } + defer snp.decRef() + for _, pw := range snp.parts { + if pw.mp != nil { + return false + } + } + return true + } +} + func TestQueryResult_QuotaExceeded(t *testing.T) { tests := []struct { wantErr error @@ -1438,23 +1448,13 @@ func TestQueryResult_QuotaExceeded(t *testing.T) { tst.mustAddDataPoints(dps) time.Sleep(100 * time.Millisecond) } - // wait until the introducer is done + // wait until every introduced mem part has been flushed to disk; + // inspecting only snapshot.creator races with concurrent introductions + // (a flush of part N can land while part N+1 is still a mem part). if len(tt.dpsList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator == snapshotCreatorMemPart { - snp.decRef() - time.Sleep(100 * time.Millisecond) - continue - } - snp.decRef() - tst.Close() - break - } + require.Eventually(t, allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond, + "mem parts not flushed to disk in time") + tst.Close() } // reopen the table diff --git a/banyand/stream/block_scanner_test.go b/banyand/stream/block_scanner_test.go index b7248b7c6..ac592b9d7 100644 --- a/banyand/stream/block_scanner_test.go +++ b/banyand/stream/block_scanner_test.go @@ -87,23 +87,13 @@ func TestBlockScanner_QuotaExceeded(t *testing.T) { tst.mustAddElements(es) time.Sleep(100 * time.Millisecond) } - // wait until the introducer is done + // wait until every introduced mem part has been flushed to disk; + // inspecting only snapshot.creator races with concurrent introductions + // (a flush of part N can land while part N+1 is still a mem part). if len(tt.esList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator == snapshotCreatorMemPart { - snp.decRef() - time.Sleep(100 * time.Millisecond) - continue - } - snp.decRef() - tst.Close() - break - } + require.Eventually(t, allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond, + "mem parts not flushed to disk in time") + tst.Close() } // reopen the table @@ -204,3 +194,23 @@ func TestBlockScanner_QuotaExceeded(t *testing.T) { }) } } + +// allPartsFlushed returns a predicate that is true once tst's current +// snapshot exists and every part is file-backed (no mp != nil). +// It is used by file_snapshot tests to wait until every introduced mem +// part has been flushed before closing/reopening the table. +func allPartsFlushed(tst *tsTable) func() bool { + return func() bool { + snp := tst.currentSnapshot() + if snp == nil { + return false + } + defer snp.decRef() + for _, pw := range snp.parts { + if pw.mp != nil { + return false + } + } + return true + } +} diff --git a/banyand/stream/query_by_idx_test.go b/banyand/stream/query_by_idx_test.go index 526546593..4a695bd8e 100644 --- a/banyand/stream/query_by_idx_test.go +++ b/banyand/stream/query_by_idx_test.go @@ -85,23 +85,13 @@ func TestQueryResult_QuotaExceeded(t *testing.T) { tst.mustAddElements(es) time.Sleep(100 * time.Millisecond) } - // wait until the introducer is done + // wait until every introduced mem part has been flushed to disk; + // inspecting only snapshot.creator races with concurrent introductions + // (a flush of part N can land while part N+1 is still a mem part). if len(tt.esList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator == snapshotCreatorMemPart { - snp.decRef() - time.Sleep(100 * time.Millisecond) - continue - } - snp.decRef() - tst.Close() - break - } + require.Eventually(t, allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond, + "mem parts not flushed to disk in time") + tst.Close() } // reopen the table diff --git a/banyand/trace/query_test.go b/banyand/trace/query_test.go index f318d508d..54abde693 100644 --- a/banyand/trace/query_test.go +++ b/banyand/trace/query_test.go @@ -202,23 +202,13 @@ func TestQueryResult(t *testing.T) { tst.mustAddTraces(traces, nil) time.Sleep(100 * time.Millisecond) } - // wait until the introducer is done + // wait until every introduced mem part has been flushed to disk; + // inspecting only snapshot.creator races with concurrent introductions + // (a flush of part N can land while part N+1 is still a mem part). if len(tt.tracesList) > 0 { - for { - snp := tst.currentSnapshot() - if snp == nil { - time.Sleep(100 * time.Millisecond) - continue - } - if snp.creator == snapshotCreatorMemPart { - snp.decRef() - time.Sleep(100 * time.Millisecond) - continue - } - snp.decRef() - tst.Close() - break - } + require.Eventually(t, allPartsFlushed(tst), 30*time.Second, 100*time.Millisecond, + "mem parts not flushed to disk in time") + tst.Close() } // reopen the table @@ -457,3 +447,23 @@ func TestQueryResultMultipleBatches(t *testing.T) { }) } } + +// allPartsFlushed returns a predicate that is true once tst's current +// snapshot exists and every part is file-backed (no mp != nil). +// It is used by file_snapshot tests to wait until every introduced mem +// part has been flushed before closing/reopening the table. +func allPartsFlushed(tst *tsTable) func() bool { + return func() bool { + snp := tst.currentSnapshot() + if snp == nil { + return false + } + defer snp.decRef() + for _, pw := range snp.parts { + if pw.mp != nil { + return false + } + } + return true + } +}
