This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 06d592ead Fix measure, stream, and trace queries returning TTL-expired
data (#1149)
06d592ead is described below
commit 06d592ead8418df4efd9eb819363196b8e1f882b
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 3 06:44:52 2026 +0800
Fix measure, stream, and trace queries returning TTL-expired data (#1149)
---
CHANGES.md | 6 +-
banyand/internal/storage/segment.go | 11 +++
banyand/internal/storage/tsdb.go | 20 +++++-
banyand/internal/storage/tsdb_test.go | 61 +++++++++++++++++
bydbctl/internal/cmd/measure_test.go | 7 +-
bydbctl/internal/cmd/stream_test.go | 7 +-
bydbctl/internal/cmd/topn_test.go | 20 +++++-
bydbctl/internal/cmd/trace_test.go | 7 +-
test/cases/init.go | 15 +++++
.../testdata/service_traffic_data_expired.json | 78 ++++++++++++++++++++++
test/cases/measure/measure.go | 7 ++
test/cases/stream/stream.go | 5 ++
test/cases/trace/trace.go | 5 ++
13 files changed, 233 insertions(+), 16 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index fd3307325..ccb1a0ed5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,8 +1,5 @@
# Changes by Version
-* Add opt-in vectorized measure query tracing over raw-frame distributed
queries, including a trace envelope and fixed trace-label vocabulary.
-* Fix trace query identity-tag projection: when `trace_id`/`span_id` are
explicitly projected, reconstruct them from span identity at response build
time instead of requesting them as stored tags, and preserve tag order with
null-filled per-span value alignment in the distributed trace result iterator.
-
Release Notes.
## 0.11.0
@@ -60,6 +57,7 @@ Release Notes.
- Introduce measure migration tool.
- Support displaying a measure's indexed tags in the dump tool, resolved per
part so peak memory is bounded by the part rather than a segment-wide series
map.
- Snapshot/backup and data inspection no longer reopen idle-closed segments,
avoiding cold-segment nil-index panics and index lock-file churn.
+- Add opt-in vectorized measure query tracing over raw-frame distributed
queries, including a trace envelope and fixed trace-label vocabulary.
### Bug Fixes
@@ -101,6 +99,8 @@ Release Notes.
- Release bluge index writers on segment rotation so `analysisWorker` pools
sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects
kept the existing idle-segment reclaim path from running: `segmentIdleTimeout`
defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef`
refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never
observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed`
bump to real read/write call [...]
- Fix incorrect counts and missing trace fields in the lifecycle migration
report.
- Fix lifecycle migration placing data in the wrong target segment when the
source segment interval is not a multiple of the target stage's interval, by
row-level replaying parts that straddle a target-segment boundary instead of
chunk-copying them into a single segment.
+- Fix trace query identity-tag projection: when `trace_id`/`span_id` are
explicitly projected, reconstruct them from span identity at response build
time instead of requesting them as stored tags, and preserve tag order with
null-filled per-span value alignment in the distributed trace result iterator.
+- Fix measure, stream, and trace queries returning data from segments already
expired by the TTL. Retention removes a segment only on its next scheduled run,
so a fully expired segment can linger on disk and keep serving TTL-expired
data; queries now skip segments whose whole time range is past the retention
deadline, matching retention's own removal condition.
### Chores
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 3ca282f34..7872cef22 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -882,6 +882,17 @@ func (sc *segmentController[T, O]) remove(deadline
time.Time) (hasSegment bool,
return hasSegment, err
}
+// getRetentionDeadline returns the earliest timestamp that is still within the
+// retention window. Data points with a timestamp before this deadline are
+// expired by the TTL policy. Retention removes a segment only once its whole
+// time range falls before the deadline (see (*segmentController).remove), so a
+// fully expired segment can linger on disk until the next retention run.
+// Queries should exclude such fully expired segments to avoid serving
TTL-expired
+// data; partially expired segments remain visible until their end passes the
deadline.
+func (sc *segmentController[T, O]) getRetentionDeadline() time.Time {
+ return time.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
+}
+
func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange()
*timestamp.TimeRange {
deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
timeRange := ×tamp.TimeRange{
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index afe341ea1..9dff036f1 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -278,7 +278,25 @@ func (d *database[T, O]) SelectSegments(timeRange
timestamp.TimeRange, reopenClo
if d.closed.Load() {
return nil, nil
}
- return d.segmentController.selectSegments(timeRange, reopenClosed)
+ segments, err := d.segmentController.selectSegments(timeRange,
reopenClosed)
+ if err != nil || !reopenClosed || d.disableRetention {
+ return segments, err
+ }
+ // Exclude segments whose whole time range has already passed the
retention
+ // deadline. Retention removes such a segment only on its next cron run
(see
+ // (*segmentController).remove), so between runs a fully expired
segment is
+ // still on disk and would otherwise serve TTL-expired data to queries.
Data
+ // in partially expired segments that retention still keeps stays
visible.
+ deadline := d.segmentController.getRetentionDeadline()
+ kept := segments[:0]
+ for _, s := range segments {
+ if s.GetTimeRange().Before(deadline) {
+ s.DecRef()
+ continue
+ }
+ kept = append(kept, s)
+ }
+ return kept, nil
}
func (d *database[T, O]) SegmentInterval() IntervalRule {
diff --git a/banyand/internal/storage/tsdb_test.go
b/banyand/internal/storage/tsdb_test.go
index 76908953a..e679a6a74 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -224,6 +224,67 @@ func TestOpenTSDB(t *testing.T) {
})
}
+func TestSelectSegmentsRetention(t *testing.T) {
+ logger.Init(logger.Logging{
+ Env: "dev",
+ Level: flags.LogLevel,
+ })
+
+ realNow := time.Now()
+ // A wide query window spanning both the expired and the fresh segment.
+ wide := timestamp.NewInclusiveTimeRange(realNow.Add(-10*24*time.Hour),
realNow.Add(24*time.Hour))
+ // TTL is 3 days, so a segment whose end is older than this is fully
expired.
+ deadline := realNow.Add(-3 * 24 * time.Hour)
+
+ newDB := func(t *testing.T, disableRetention bool) TSDB[*MockTSTable,
any] {
+ dir, defFn := test.Space(require.New(t))
+ t.Cleanup(defFn)
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: 3},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ DisableRetention: disableRetention,
+ }
+ tsdb, err := OpenTSDB(context.Background(), opts,
NewServiceCache(), group)
+ require.NoError(t, err)
+ t.Cleanup(func() { _ = tsdb.Close() })
+ // One fully expired segment (end well past the deadline) and
one fresh.
+ for _, ts := range []time.Time{realNow.Add(-6 * 24 *
time.Hour), realNow} {
+ seg, segErr := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, segErr)
+ seg.DecRef()
+ }
+ return tsdb
+ }
+
+ t.Run("query path excludes fully expired segments", func(t *testing.T) {
+ tsdb := newDB(t, false)
+ segs, err := tsdb.SelectSegments(wide, true)
+ require.NoError(t, err)
+ defer func() {
+ for _, s := range segs {
+ s.DecRef()
+ }
+ }()
+ require.Len(t, segs, 1)
+ require.False(t, segs[0].GetTimeRange().Before(deadline), "the
kept segment must not be fully expired")
+ })
+
+ t.Run("retention disabled keeps all segments", func(t *testing.T) {
+ tsdb := newDB(t, true)
+ segs, err := tsdb.SelectSegments(wide, true)
+ require.NoError(t, err)
+ defer func() {
+ for _, s := range segs {
+ s.DecRef()
+ }
+ }()
+ require.Len(t, segs, 2)
+ })
+}
+
func TestTakeFileSnapshot(t *testing.T) {
logger.Init(logger.Logging{
Env: "dev",
diff --git a/bydbctl/internal/cmd/measure_test.go
b/bydbctl/internal/cmd/measure_test.go
index 48638d6b4..288a3f890 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -220,9 +220,10 @@ var _ = Describe("Measure Data Query", func() {
var startStr, endStr string
var interval time.Duration
BeforeEach(func() {
- var err error
- now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
- Expect(err).NotTo(HaveOccurred())
+ // Seed at a recent time so the data stays within the group
TTL; data
+ // older than the retention deadline is now filtered out at
query time
+ // even when freshly written.
+ now = timestamp.NowMilli()
startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
interval = 1 * time.Millisecond
endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/stream_test.go
b/bydbctl/internal/cmd/stream_test.go
index ff3ac9976..5bb082945 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -220,9 +220,10 @@ var _ = Describe("Stream Data Query", func() {
var nowStr, endStr string
var interval time.Duration
BeforeEach(func() {
- var err error
- now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
- Expect(err).NotTo(HaveOccurred())
+ // Seed at a recent time so the data stays within the group
TTL; data
+ // older than the retention deadline is now filtered out at
query time
+ // even when freshly written.
+ now = timestamp.NowMilli()
nowStr = now.Format(time.RFC3339)
interval = 500 * time.Millisecond
endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/topn_test.go
b/bydbctl/internal/cmd/topn_test.go
index 0ef47328c..0413541d1 100644
--- a/bydbctl/internal/cmd/topn_test.go
+++ b/bydbctl/internal/cmd/topn_test.go
@@ -261,9 +261,12 @@ var _ = Describe("Topn Data Query", func() {
var startStr, endStr string
var interval time.Duration
BeforeEach(func() {
- var err error
- now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
- Expect(err).NotTo(HaveOccurred())
+ // Seed an hour in the past: recent enough to stay within the
group TTL
+ // (so the retention filter keeps it), but old enough that the
TopN
+ // streaming window has already closed and emitted by query
time. Seeding
+ // exactly at now leaves the single 1ms-interval bucket's
window open (no
+ // later events advance the watermark), so TopN would return
nothing.
+ now = timestamp.NowMilli().Add(-time.Hour)
startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
interval = 1 * time.Millisecond
endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
@@ -280,6 +283,11 @@ var _ = Describe("Topn Data Query", func() {
)
Expect(err).NotTo(HaveOccurred())
cases_measure_data.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+ // Advance the TopN streaming watermark past the recent-past
bucket by
+ // writing a current-time datapoint outside the query window,
so the
+ // tumbling window closes and TopN emits. Without a later event
the single
+ // bucket's window stays open and TopN returns nothing.
+ cases_measure_data.Write(conn, "service_instance_cpm_minute",
"sw_metric", "service_instance_cpm_minute_data.json", timestamp.NowMilli(),
interval)
rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f",
"-"})
issue := func() string {
rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
@@ -339,6 +347,9 @@ fieldValueSort: 1`, startStr, endStr)))
resp := new(measurev1.TopNResponse)
helpers.UnmarshalYAML([]byte(out), resp)
GinkgoWriter.Println(resp)
+ if len(resp.Lists) == 0 {
+ return 0
+ }
return len(resp.Lists[0].Items)
}, flags.EventuallyTimeout).Should(Equal(3))
})
@@ -379,6 +390,9 @@ fieldValueSort: 1`))
resp := new(measurev1.TopNResponse)
helpers.UnmarshalYAML([]byte(out), resp)
GinkgoWriter.Println(resp)
+ if len(resp.Lists) == 0 {
+ return 0
+ }
return len(resp.Lists[0].Items)
}, flags.EventuallyTimeout).Should(Equal(3))
},
diff --git a/bydbctl/internal/cmd/trace_test.go
b/bydbctl/internal/cmd/trace_test.go
index ae8cf1a0c..6f503faa7 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -234,9 +234,10 @@ var _ = Describe("Trace Data Query", func() {
var nowStr, endStr string
var interval time.Duration
BeforeEach(func() {
- var err error
- now, err = time.ParseInLocation("2006-01-02T15:04:05",
"2021-09-01T23:30:00", time.Local)
- Expect(err).NotTo(HaveOccurred())
+ // Seed at a recent time so the data stays within the group
TTL; data
+ // older than the retention deadline is now filtered out at
query time
+ // even when freshly written.
+ now = timestamp.NowMilli()
nowStr = now.Format(time.RFC3339)
interval = 500 * time.Millisecond
endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/test/cases/init.go b/test/cases/init.go
index a62b50078..54d07c76c 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -44,6 +44,11 @@ func Initialize(addr string, now time.Time) {
interval := 500 * time.Millisecond
// stream
casesstreamdata.Write(conn, "sw", now, interval)
+ // Seed stream data in a fully expired segment, well past the "default"
+ // group's 3-day TTL. It must never surface in query results: it backs
the
+ // "excludes data expired beyond TTL" case, which fails without the
retention
+ // filter that drops fully expired segments.
+ casesstreamdata.WriteToGroup(conn, "sw", "default", "sw",
now.AddDate(0, 0, -6), interval)
casesstreamdata.Write(conn, "duplicated", now, 0)
casesstreamdata.WriteDeduplicationTest(conn, "deduplication_test", now,
time.Millisecond)
casesstreamdata.WriteToGroup(conn, "sw", "updated", "sw_updated",
now.Add(time.Minute), interval)
@@ -83,6 +88,11 @@ func Initialize(addr string, now time.Time) {
interval = time.Minute
casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data.json", now, interval)
+ // Seed data in a fully expired segment, well past the index_mode
group's
+ // 7-day TTL (distinct entity ids 901/902). It must never surface in
query
+ // results: it backs the "index mode excludes data expired beyond TTL"
case,
+ // which fails without the retention filter that drops fully expired
segments.
+ casesmeasuredata.Write(conn, "service_traffic", "index_mode",
"service_traffic_data_expired.json", now.AddDate(0, 0, -10), interval)
casesmeasuredata.Write(conn, "service_traffic", "replicated_group",
"service_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric",
"service_instance_traffic_data.json", now, interval)
casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", now, interval)
@@ -137,6 +147,11 @@ func Initialize(addr string, now time.Time) {
// trace
interval = 500 * time.Millisecond
casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now,
interval)
+ // Seed trace data in a fully expired segment, well past the
+ // "test-trace-group" group's 3-day TTL. It must never surface in query
+ // results: it backs the "excludes data expired beyond TTL" case, which
fails
+ // without the retention filter that drops fully expired segments.
+ casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw",
now.AddDate(0, 0, -6), interval)
casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now,
interval)
casestrace.WriteToGroup(conn, "sw", "test-trace-updated", "sw_updated",
now.Add(time.Minute), interval)
time.Sleep(2 * time.Second)
diff --git a/test/cases/measure/data/testdata/service_traffic_data_expired.json
b/test/cases/measure/data/testdata/service_traffic_data_expired.json
new file mode 100644
index 000000000..b553ef38f
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_traffic_data_expired.json
@@ -0,0 +1,78 @@
+[
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "901"
+ }
+ },
+ {
+ "str": {
+ "value": "service_901_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_901_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_901_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "group_expired"
+ }
+ },
+ {
+ "int": {
+ "value": 9
+ }
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "tag_families": [
+ {
+ "tags": [
+ {
+ "str": {
+ "value": "902"
+ }
+ },
+ {
+ "str": {
+ "value": "service_902_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_name_902_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "service_short_name_902_expired"
+ }
+ },
+ {
+ "str": {
+ "value": "group_expired"
+ }
+ },
+ {
+ "int": {
+ "value": 9
+ }
+ }
+ ]
+ }
+ ]
+ }
+]
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index a240092e4..20ec2da31 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -89,6 +89,13 @@ var measureEntries = []any{
helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour,
DisOrder: true}),
g.Entry("all in all segments of index mode",
helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour,
DisOrder: true}),
+ // Window reaches ~11 days back, past the index_mode group's 7-day TTL.
The
+ // expired ids 901/902 seeded at now-10d (see test/cases/init.go) sit
in a
+ // fully expired segment and must be dropped by the retention filter,
leaving
+ // exactly the in-retention rows of index_mode_all_xl. Without the
filter the
+ // expired rows leak in and this case fails.
+ g.Entry("index mode excludes data expired beyond TTL",
+ helpers.Args{Input: "index_mode_all", Want:
"index_mode_all_xl", Duration: 288 * time.Hour, Offset: -264 * time.Hour,
DisOrder: true}),
g.Entry("order by desc of index mode", helpers.Args{Input:
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 *
time.Minute}),
g.Entry("range of index mode", helpers.Args{Input: "index_mode_range",
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
g.Entry("none of index mode", helpers.Args{Input: "index_mode_none",
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index f9c73e93b..79ea80330 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -41,6 +41,11 @@ var (
var streamEntries = []any{
g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 *
time.Hour}),
+ // Window sits ~6 days back, well past the "default" group's 3-day TTL.
The
+ // data seeded there (see test/cases/init.go) is in a fully expired
segment
+ // and must be dropped by the retention filter, yielding an empty
result.
+ // Without the filter the expired elements leak in and this case fails.
+ g.Entry("excludes data expired beyond TTL", helpers.Args{Input: "all",
Offset: -156 * time.Hour, Duration: 24 * time.Hour, WantEmpty: true}),
g.Entry("projection with http.method", helpers.Args{Input:
"all_with_http_method", Duration: 1 * time.Hour}),
g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
g.Entry("max limit", helpers.Args{Input: "all_max_limit", Want: "all",
Duration: 1 * time.Hour}),
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index 474b4f677..863cf5f75 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -39,6 +39,11 @@ var (
var traceEntries = []any{
g.Entry("query by trace id", helpers.Args{Input: "eq_trace_id",
Duration: 1 * time.Hour}),
+ // Window sits ~6 days back, well past the "test-trace-group" group's
3-day
+ // TTL. The data seeded there (see test/cases/init.go) is in a fully
expired
+ // segment and must be dropped by the retention filter, yielding an
empty
+ // result. Without the filter the expired traces leak in and this case
fails.
+ g.Entry("excludes data expired beyond TTL", helpers.Args{Input: "all",
Offset: -156 * time.Hour, Duration: 24 * time.Hour, WantEmpty: true}),
g.Entry("query by trace ids", helpers.Args{Input: "in_trace_ids",
Duration: 1 * time.Hour}),
g.Entry("query by empty span ids", helpers.Args{Input:
"in_empty_span_ids", Duration: 1 * time.Hour, WantEmpty: true}),
g.Entry("order by timestamp", helpers.Args{Input:
"order_timestamp_desc", Duration: 1 * time.Hour}),