This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 6f4ef25b8 Make dump UT more stable (#1188)
6f4ef25b8 is described below
commit 6f4ef25b8185c7c0ab4c255d8223ff186cb7dd9a
Author: mrproliu <[email protected]>
AuthorDate: Thu Jun 25 10:55:26 2026 +0800
Make dump UT more stable (#1188)
---
banyand/backup/lifecycle/roundtrip_test.go | 13 ++++++--
banyand/internal/dump/measure/suite_test.go | 52 +++++++++++++++++++----------
2 files changed, 46 insertions(+), 19 deletions(-)
diff --git a/banyand/backup/lifecycle/roundtrip_test.go
b/banyand/backup/lifecycle/roundtrip_test.go
index a488345d5..46fd1d0fd 100644
--- a/banyand/backup/lifecycle/roundtrip_test.go
+++ b/banyand/backup/lifecycle/roundtrip_test.go
@@ -60,6 +60,7 @@ import (
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/convert"
localfs "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/node"
pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -996,8 +997,9 @@ func roundtripTagValue(iwr *measurev1.InternalWriteRequest,
tagName string) stri
return ""
}
-// roundtripAllSidxDirsHaveSnapshot returns true when every
<groupRoot>/seg-*/sidx
-// dir carries at least one .snp file (bluge's snapshot marker).
+// roundtripAllSidxDirsHaveSnapshot returns true when every non-empty
+// <groupRoot>/seg-*/sidx dir carries at least one .snp file (bluge's snapshot
+// marker), and at least one such segment exists.
func roundtripAllSidxDirsHaveSnapshot(groupRoot string) bool {
segs, err := os.ReadDir(groupRoot)
if err != nil {
@@ -1009,6 +1011,13 @@ func roundtripAllSidxDirsHaveSnapshot(groupRoot string)
bool {
continue
}
sidxDir := filepath.Join(groupRoot, seg.Name(), "sidx")
+ // Skip empty segments. Near a day boundary an empty adjacent
segment can
+ // exist (e.g. the live current-day segment while the data
lands in the
+ // previous day around midnight); its series index holds no
documents and
+ // therefore never gets a committed .snp, so requiring one
would hang.
+ if docs, _ := inverted.ReadOnlyDocCount(sidxDir); docs == 0 {
+ continue
+ }
entries, readErr := os.ReadDir(sidxDir)
if readErr != nil {
return false
diff --git a/banyand/internal/dump/measure/suite_test.go
b/banyand/internal/dump/measure/suite_test.go
index b0b28271b..9ec73e6d3 100644
--- a/banyand/internal/dump/measure/suite_test.go
+++ b/banyand/internal/dump/measure/suite_test.go
@@ -304,6 +304,14 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T) {
const total = 2
baseTS := time.Now().Truncate(time.Minute)
+ // Keep all `total` points (one minute apart) inside a single daily
segment.
+ // Near local midnight the default span would cross the day boundary
and split
+ // the series across two segments, which breaks the single-segment
resolver
+ // below; pull the window back so it ends before midnight.
+ midnight := time.Date(baseTS.Year(), baseTS.Month(), baseTS.Day(), 0,
0, 0, 0, baseTS.Location()).Add(24 * time.Hour)
+ if span := time.Duration(total) * time.Minute;
baseTS.Add(span).After(midnight) {
+ baseTS = midnight.Add(-span - time.Minute)
+ }
bp := pipeline.NewBatchPublisher(5 * time.Second)
for i := 0; i < total; i++ {
iStr := strconv.Itoa(i)
@@ -351,25 +359,16 @@ func TestMeasureIndexedTagResolvedFromIndex(t *testing.T)
{
}
strRuleID, intRuleID, arrRuleID := ruleID("idxr_str_rule"),
ruleID("idxr_int_rule"), ruleID("idxr_arr_rule")
- segmentPath := findSidxSegmentPath(t, rootPath)
-
- // The series index persists asynchronously (unsafe batches +
persister) and is
- // flushed on stop; after a hard stop there is a brief window before
all series
- // are readable on disk. The fallback scan below sources EntityValues
purely
- // from this index, so wait until every written series is visible before
- // asserting, otherwise the scan can race the flush and recover nothing.
- sidxPath := filepath.Join(segmentPath, "sidx")
- require.Eventually(t, func() bool {
- count, _ := inverted.ReadOnlyDocCount(sidxPath)
- return count >= int64(total)
- }, 60*time.Second, 100*time.Millisecond, "series index not fully
persisted after stop")
-
// Stop the live service so it releases bluge's exclusive lock on the
series
// index; the dump (like the offline CLI) reads the index from a
quiesced
- // database. Deferred cleanup is suppressed via the stopped flag.
+ // database. The write path above is synchronous (safe-batch insert
blocks
+ // until the series index is persisted), so the index is already
durable on
+ // disk before this stop. Deferred cleanup is suppressed via the
stopped flag.
moduleDefer()
stopped = true
+ segmentPath := findSidxSegmentPath(t, rootPath)
+
ruleToTag := map[uint32]dump.IndexedTagSpec{
strRuleID: {Family: "default", Name: "idxStr", Type:
pbv1.ValueTypeStr},
intRuleID: {Family: "default", Name: "idxInt", Type:
pbv1.ValueTypeInt64},
@@ -598,15 +597,34 @@ func strArrTagValue(vals ...string) *modelv1.TagValue {
return &modelv1.TagValue{Value: &modelv1.TagValue_StrArray{StrArray:
&modelv1.StrArray{Value: vals}}}
}
+// findSidxSegmentPath returns the segment directory whose series index (sidx)
+// actually holds the written series. When the data lands near a
segment-interval
+// boundary an empty adjacent (e.g. next-day) segment dir can also exist, so
+// picking the lexically-last sidx is wrong -- it can select the empty segment
and
+// make later reads see zero docs. Choose the sidx with the most documents.
func findSidxSegmentPath(t *testing.T, root string) string {
t.Helper()
- var seg string
- _ = filepath.WalkDir(root, func(path string, d fs.DirEntry, err error)
error {
+ var seg, candidates string
+ best := int64(-1)
+ count := 0
+ _ = filepath.WalkDir(root, func(p string, d fs.DirEntry, err error)
error {
if err == nil && d.IsDir() && d.Name() == "sidx" {
- seg = filepath.Dir(path)
+ c, e := inverted.ReadOnlyDocCount(p)
+ candidates += fmt.Sprintf("{path=%s count=%d err=%v} ",
p, c, e)
+ count++
+ if c > best {
+ best = c
+ seg = filepath.Dir(p)
+ }
}
return nil
})
require.NotEmpty(t, seg, "sidx directory not found under %s", root)
+ // Record the selection: near a segment-interval boundary more than one
sidx
+ // dir can exist (an empty adjacent segment), so logging the candidates
and the
+ // chosen one makes any future mis-selection self-evident.
+ logger.GetLogger("dump-measure-test").Info().
+ Int("candidates", count).Str("chosen", seg).Int64("chosenDocs",
best).
+ Msgf("findSidxSegmentPath selected series-index segment among:
%s", candidates)
return seg
}