This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 06d592ead Fix measure, stream, and trace queries returning TTL-expired 
data (#1149)
06d592ead is described below

commit 06d592ead8418df4efd9eb819363196b8e1f882b
Author: Gao Hongtao <[email protected]>
AuthorDate: Wed Jun 3 06:44:52 2026 +0800

    Fix measure, stream, and trace queries returning TTL-expired data (#1149)
---
 CHANGES.md                                         |  6 +-
 banyand/internal/storage/segment.go                | 11 +++
 banyand/internal/storage/tsdb.go                   | 20 +++++-
 banyand/internal/storage/tsdb_test.go              | 61 +++++++++++++++++
 bydbctl/internal/cmd/measure_test.go               |  7 +-
 bydbctl/internal/cmd/stream_test.go                |  7 +-
 bydbctl/internal/cmd/topn_test.go                  | 20 +++++-
 bydbctl/internal/cmd/trace_test.go                 |  7 +-
 test/cases/init.go                                 | 15 +++++
 .../testdata/service_traffic_data_expired.json     | 78 ++++++++++++++++++++++
 test/cases/measure/measure.go                      |  7 ++
 test/cases/stream/stream.go                        |  5 ++
 test/cases/trace/trace.go                          |  5 ++
 13 files changed, 233 insertions(+), 16 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index fd3307325..ccb1a0ed5 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,8 +1,5 @@
 # Changes by Version
 
-* Add opt-in vectorized measure query tracing over raw-frame distributed 
queries, including a trace envelope and fixed trace-label vocabulary.
-* Fix trace query identity-tag projection: when `trace_id`/`span_id` are 
explicitly projected, reconstruct them from span identity at response build 
time instead of requesting them as stored tags, and preserve tag order with 
null-filled per-span value alignment in the distributed trace result iterator.
-
 Release Notes.
 
 ## 0.11.0
@@ -60,6 +57,7 @@ Release Notes.
   - Introduce measure migration tool.
 - Support displaying a measure's indexed tags in the dump tool, resolved per 
part so peak memory is bounded by the part rather than a segment-wide series 
map.
 - Snapshot/backup and data inspection no longer reopen idle-closed segments, 
avoiding cold-segment nil-index panics and index lock-file churn.
+- Add opt-in vectorized measure query tracing over raw-frame distributed 
queries, including a trace envelope and fixed trace-label vocabulary.
 
 ### Bug Fixes
 
@@ -101,6 +99,8 @@ Release Notes.
 - Release bluge index writers on segment rotation so `analysisWorker` pools 
sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects 
kept the existing idle-segment reclaim path from running: `segmentIdleTimeout` 
defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef` 
refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never 
observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed` 
bump to real read/write call [...]
 - Fix incorrect counts and missing trace fields in the lifecycle migration 
report.
 - Fix lifecycle migration placing data in the wrong target segment when the 
source segment interval is not a multiple of the target stage's interval, by 
row-level replaying parts that straddle a target-segment boundary instead of 
chunk-copying them into a single segment.
+- Fix trace query identity-tag projection: when `trace_id`/`span_id` are 
explicitly projected, reconstruct them from span identity at response build 
time instead of requesting them as stored tags, and preserve tag order with 
null-filled per-span value alignment in the distributed trace result iterator.
+- Fix measure, stream, and trace queries returning data from segments already 
expired by the TTL. Retention removes a segment only on its next scheduled run, 
so a fully expired segment can linger on disk and keep serving TTL-expired 
data; queries now skip segments whose whole time range is past the retention 
deadline, matching retention's own removal condition.
 
 ### Chores
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 3ca282f34..7872cef22 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -882,6 +882,17 @@ func (sc *segmentController[T, O]) remove(deadline 
time.Time) (hasSegment bool,
        return hasSegment, err
 }
 
+// getRetentionDeadline returns the earliest timestamp that is still within the
+// retention window. Data points with a timestamp before this deadline are
+// expired by the TTL policy. Retention removes a segment only once its whole
+// time range falls before the deadline (see (*segmentController).remove), so a
+// fully expired segment can linger on disk until the next retention run.
+// Queries should exclude such fully expired segments to avoid serving 
TTL-expired
+// data; partially expired segments remain visible until their end passes the 
deadline.
+func (sc *segmentController[T, O]) getRetentionDeadline() time.Time {
+       return time.Now().Local().Add(-sc.getOptions().TTL.estimatedDuration())
+}
+
 func (sc *segmentController[T, O]) getExpiredSegmentsTimeRange() 
*timestamp.TimeRange {
        deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
        timeRange := &timestamp.TimeRange{
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index afe341ea1..9dff036f1 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -278,7 +278,25 @@ func (d *database[T, O]) SelectSegments(timeRange 
timestamp.TimeRange, reopenClo
        if d.closed.Load() {
                return nil, nil
        }
-       return d.segmentController.selectSegments(timeRange, reopenClosed)
+       segments, err := d.segmentController.selectSegments(timeRange, 
reopenClosed)
+       if err != nil || !reopenClosed || d.disableRetention {
+               return segments, err
+       }
+       // Exclude segments whose whole time range has already passed the 
retention
+       // deadline. Retention removes such a segment only on its next cron run 
(see
+       // (*segmentController).remove), so between runs a fully expired 
segment is
+       // still on disk and would otherwise serve TTL-expired data to queries. 
Data
+       // in partially expired segments that retention still keeps stays 
visible.
+       deadline := d.segmentController.getRetentionDeadline()
+       kept := segments[:0]
+       for _, s := range segments {
+               if s.GetTimeRange().Before(deadline) {
+                       s.DecRef()
+                       continue
+               }
+               kept = append(kept, s)
+       }
+       return kept, nil
 }
 
 func (d *database[T, O]) SegmentInterval() IntervalRule {
diff --git a/banyand/internal/storage/tsdb_test.go 
b/banyand/internal/storage/tsdb_test.go
index 76908953a..e679a6a74 100644
--- a/banyand/internal/storage/tsdb_test.go
+++ b/banyand/internal/storage/tsdb_test.go
@@ -224,6 +224,67 @@ func TestOpenTSDB(t *testing.T) {
        })
 }
 
+func TestSelectSegmentsRetention(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
+
+       realNow := time.Now()
+       // A wide query window spanning both the expired and the fresh segment.
+       wide := timestamp.NewInclusiveTimeRange(realNow.Add(-10*24*time.Hour), 
realNow.Add(24*time.Hour))
+       // TTL is 3 days, so a segment whose end is older than this is fully 
expired.
+       deadline := realNow.Add(-3 * 24 * time.Hour)
+
+       newDB := func(t *testing.T, disableRetention bool) TSDB[*MockTSTable, 
any] {
+               dir, defFn := test.Space(require.New(t))
+               t.Cleanup(defFn)
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:         dir,
+                       SegmentInterval:  IntervalRule{Unit: DAY, Num: 1},
+                       TTL:              IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:         1,
+                       TSTableCreator:   MockTSTableCreator,
+                       DisableRetention: disableRetention,
+               }
+               tsdb, err := OpenTSDB(context.Background(), opts, 
NewServiceCache(), group)
+               require.NoError(t, err)
+               t.Cleanup(func() { _ = tsdb.Close() })
+               // One fully expired segment (end well past the deadline) and 
one fresh.
+               for _, ts := range []time.Time{realNow.Add(-6 * 24 * 
time.Hour), realNow} {
+                       seg, segErr := tsdb.CreateSegmentIfNotExist(ts)
+                       require.NoError(t, segErr)
+                       seg.DecRef()
+               }
+               return tsdb
+       }
+
+       t.Run("query path excludes fully expired segments", func(t *testing.T) {
+               tsdb := newDB(t, false)
+               segs, err := tsdb.SelectSegments(wide, true)
+               require.NoError(t, err)
+               defer func() {
+                       for _, s := range segs {
+                               s.DecRef()
+                       }
+               }()
+               require.Len(t, segs, 1)
+               require.False(t, segs[0].GetTimeRange().Before(deadline), "the 
kept segment must not be fully expired")
+       })
+
+       t.Run("retention disabled keeps all segments", func(t *testing.T) {
+               tsdb := newDB(t, true)
+               segs, err := tsdb.SelectSegments(wide, true)
+               require.NoError(t, err)
+               defer func() {
+                       for _, s := range segs {
+                               s.DecRef()
+                       }
+               }()
+               require.Len(t, segs, 2)
+       })
+}
+
 func TestTakeFileSnapshot(t *testing.T) {
        logger.Init(logger.Logging{
                Env:   "dev",
diff --git a/bydbctl/internal/cmd/measure_test.go 
b/bydbctl/internal/cmd/measure_test.go
index 48638d6b4..288a3f890 100644
--- a/bydbctl/internal/cmd/measure_test.go
+++ b/bydbctl/internal/cmd/measure_test.go
@@ -220,9 +220,10 @@ var _ = Describe("Measure Data Query", func() {
        var startStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               var err error
-               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
-               Expect(err).NotTo(HaveOccurred())
+               // Seed at a recent time so the data stays within the group 
TTL; data
+               // older than the retention deadline is now filtered out at 
query time
+               // even when freshly written.
+               now = timestamp.NowMilli()
                startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
                interval = 1 * time.Millisecond
                endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/stream_test.go 
b/bydbctl/internal/cmd/stream_test.go
index ff3ac9976..5bb082945 100644
--- a/bydbctl/internal/cmd/stream_test.go
+++ b/bydbctl/internal/cmd/stream_test.go
@@ -220,9 +220,10 @@ var _ = Describe("Stream Data Query", func() {
        var nowStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               var err error
-               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
-               Expect(err).NotTo(HaveOccurred())
+               // Seed at a recent time so the data stays within the group 
TTL; data
+               // older than the retention deadline is now filtered out at 
query time
+               // even when freshly written.
+               now = timestamp.NowMilli()
                nowStr = now.Format(time.RFC3339)
                interval = 500 * time.Millisecond
                endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/bydbctl/internal/cmd/topn_test.go 
b/bydbctl/internal/cmd/topn_test.go
index 0ef47328c..0413541d1 100644
--- a/bydbctl/internal/cmd/topn_test.go
+++ b/bydbctl/internal/cmd/topn_test.go
@@ -261,9 +261,12 @@ var _ = Describe("Topn Data Query", func() {
        var startStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               var err error
-               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
-               Expect(err).NotTo(HaveOccurred())
+               // Seed an hour in the past: recent enough to stay within the 
group TTL
+               // (so the retention filter keeps it), but old enough that the 
TopN
+               // streaming window has already closed and emitted by query 
time. Seeding
+               // exactly at now leaves the single 1ms-interval bucket's 
window open (no
+               // later events advance the watermark), so TopN would return 
nothing.
+               now = timestamp.NowMilli().Add(-time.Hour)
                startStr = now.Add(-20 * time.Minute).Format(time.RFC3339)
                interval = 1 * time.Millisecond
                endStr = now.Add(5 * time.Minute).Format(time.RFC3339)
@@ -280,6 +283,11 @@ var _ = Describe("Topn Data Query", func() {
                )
                Expect(err).NotTo(HaveOccurred())
                cases_measure_data.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", now, interval)
+               // Advance the TopN streaming watermark past the recent-past 
bucket by
+               // writing a current-time datapoint outside the query window, 
so the
+               // tumbling window closes and TopN emits. Without a later event 
the single
+               // bucket's window stays open and TopN returns nothing.
+               cases_measure_data.Write(conn, "service_instance_cpm_minute", 
"sw_metric", "service_instance_cpm_minute_data.json", timestamp.NowMilli(), 
interval)
                rootCmd.SetArgs([]string{"measure", "query", "-a", addr, "-f", 
"-"})
                issue := func() string {
                        rootCmd.SetIn(strings.NewReader(fmt.Sprintf(`
@@ -339,6 +347,9 @@ fieldValueSort: 1`, startStr, endStr)))
                        resp := new(measurev1.TopNResponse)
                        helpers.UnmarshalYAML([]byte(out), resp)
                        GinkgoWriter.Println(resp)
+                       if len(resp.Lists) == 0 {
+                               return 0
+                       }
                        return len(resp.Lists[0].Items)
                }, flags.EventuallyTimeout).Should(Equal(3))
        })
@@ -379,6 +390,9 @@ fieldValueSort: 1`))
                        resp := new(measurev1.TopNResponse)
                        helpers.UnmarshalYAML([]byte(out), resp)
                        GinkgoWriter.Println(resp)
+                       if len(resp.Lists) == 0 {
+                               return 0
+                       }
                        return len(resp.Lists[0].Items)
                }, flags.EventuallyTimeout).Should(Equal(3))
        },
diff --git a/bydbctl/internal/cmd/trace_test.go 
b/bydbctl/internal/cmd/trace_test.go
index ae8cf1a0c..6f503faa7 100644
--- a/bydbctl/internal/cmd/trace_test.go
+++ b/bydbctl/internal/cmd/trace_test.go
@@ -234,9 +234,10 @@ var _ = Describe("Trace Data Query", func() {
        var nowStr, endStr string
        var interval time.Duration
        BeforeEach(func() {
-               var err error
-               now, err = time.ParseInLocation("2006-01-02T15:04:05", 
"2021-09-01T23:30:00", time.Local)
-               Expect(err).NotTo(HaveOccurred())
+               // Seed at a recent time so the data stays within the group 
TTL; data
+               // older than the retention deadline is now filtered out at 
query time
+               // even when freshly written.
+               now = timestamp.NowMilli()
                nowStr = now.Format(time.RFC3339)
                interval = 500 * time.Millisecond
                endStr = now.Add(1 * time.Hour).Format(time.RFC3339)
diff --git a/test/cases/init.go b/test/cases/init.go
index a62b50078..54d07c76c 100644
--- a/test/cases/init.go
+++ b/test/cases/init.go
@@ -44,6 +44,11 @@ func Initialize(addr string, now time.Time) {
        interval := 500 * time.Millisecond
        // stream
        casesstreamdata.Write(conn, "sw", now, interval)
+       // Seed stream data in a fully expired segment, well past the "default"
+       // group's 3-day TTL. It must never surface in query results: it backs 
the
+       // "excludes data expired beyond TTL" case, which fails without the 
retention
+       // filter that drops fully expired segments.
+       casesstreamdata.WriteToGroup(conn, "sw", "default", "sw", 
now.AddDate(0, 0, -6), interval)
        casesstreamdata.Write(conn, "duplicated", now, 0)
        casesstreamdata.WriteDeduplicationTest(conn, "deduplication_test", now, 
time.Millisecond)
        casesstreamdata.WriteToGroup(conn, "sw", "updated", "sw_updated", 
now.Add(time.Minute), interval)
@@ -83,6 +88,11 @@ func Initialize(addr string, now time.Time) {
        interval = time.Minute
        casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data_old.json", now.AddDate(0, 0, -2), interval)
        casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data.json", now, interval)
+       // Seed data in a fully expired segment, well past the index_mode 
group's
+       // 7-day TTL (distinct entity ids 901/902). It must never surface in 
query
+       // results: it backs the "index mode excludes data expired beyond TTL" 
case,
+       // which fails without the retention filter that drops fully expired 
segments.
+       casesmeasuredata.Write(conn, "service_traffic", "index_mode", 
"service_traffic_data_expired.json", now.AddDate(0, 0, -10), interval)
        casesmeasuredata.Write(conn, "service_traffic", "replicated_group", 
"service_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_instance_traffic", "sw_metric", 
"service_instance_traffic_data.json", now, interval)
        casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", now, interval)
@@ -137,6 +147,11 @@ func Initialize(addr string, now time.Time) {
        // trace
        interval = 500 * time.Millisecond
        casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", now, 
interval)
+       // Seed trace data in a fully expired segment, well past the
+       // "test-trace-group" group's 3-day TTL. It must never surface in query
+       // results: it backs the "excludes data expired beyond TTL" case, which 
fails
+       // without the retention filter that drops fully expired segments.
+       casestrace.WriteToGroup(conn, "sw", "test-trace-group", "sw", 
now.AddDate(0, 0, -6), interval)
        casestrace.WriteToGroup(conn, "zipkin", "zipkinTrace", "zipkin", now, 
interval)
        casestrace.WriteToGroup(conn, "sw", "test-trace-updated", "sw_updated", 
now.Add(time.Minute), interval)
        time.Sleep(2 * time.Second)
diff --git a/test/cases/measure/data/testdata/service_traffic_data_expired.json 
b/test/cases/measure/data/testdata/service_traffic_data_expired.json
new file mode 100644
index 000000000..b553ef38f
--- /dev/null
+++ b/test/cases/measure/data/testdata/service_traffic_data_expired.json
@@ -0,0 +1,78 @@
+[
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "901"
+            }
+          },
+          {
+            "str": {
+              "value": "service_901_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_901_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_901_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "group_expired"
+            }
+          },
+          {
+            "int": {
+              "value": 9
+            }
+          }
+        ]
+      }
+    ]
+  },
+  {
+    "tag_families": [
+      {
+        "tags": [
+          {
+            "str": {
+              "value": "902"
+            }
+          },
+          {
+            "str": {
+              "value": "service_902_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_name_902_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "service_short_name_902_expired"
+            }
+          },
+          {
+            "str": {
+              "value": "group_expired"
+            }
+          },
+          {
+            "int": {
+              "value": 9
+            }
+          }
+        ]
+      }
+    ]
+  }
+]
diff --git a/test/cases/measure/measure.go b/test/cases/measure/measure.go
index a240092e4..20ec2da31 100644
--- a/test/cases/measure/measure.go
+++ b/test/cases/measure/measure.go
@@ -89,6 +89,13 @@ var measureEntries = []any{
                helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
        g.Entry("all in all segments of index mode",
                helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_segs", Duration: 96 * time.Hour, Offset: -72 * time.Hour, 
DisOrder: true}),
+       // Window reaches ~11 days back, past the index_mode group's 7-day TTL. 
The
+       // expired ids 901/902 seeded at now-10d (see test/cases/init.go) sit 
in a
+       // fully expired segment and must be dropped by the retention filter, 
leaving
+       // exactly the in-retention rows of index_mode_all_xl. Without the 
filter the
+       // expired rows leak in and this case fails.
+       g.Entry("index mode excludes data expired beyond TTL",
+               helpers.Args{Input: "index_mode_all", Want: 
"index_mode_all_xl", Duration: 288 * time.Hour, Offset: -264 * time.Hour, 
DisOrder: true}),
        g.Entry("order by desc of index mode", helpers.Args{Input: 
"index_mode_order_desc", Duration: 25 * time.Minute, Offset: -20 * 
time.Minute}),
        g.Entry("range of index mode", helpers.Args{Input: "index_mode_range", 
Duration: 25 * time.Minute, Offset: -20 * time.Minute, DisOrder: true}),
        g.Entry("none of index mode", helpers.Args{Input: "index_mode_none", 
WantEmpty: true, Duration: 25 * time.Minute, Offset: -20 * time.Minute}),
diff --git a/test/cases/stream/stream.go b/test/cases/stream/stream.go
index f9c73e93b..79ea80330 100644
--- a/test/cases/stream/stream.go
+++ b/test/cases/stream/stream.go
@@ -41,6 +41,11 @@ var (
 
 var streamEntries = []any{
        g.Entry("all elements", helpers.Args{Input: "all", Duration: 1 * 
time.Hour}),
+       // Window sits ~6 days back, well past the "default" group's 3-day TTL. 
The
+       // data seeded there (see test/cases/init.go) is in a fully expired 
segment
+       // and must be dropped by the retention filter, yielding an empty 
result.
+       // Without the filter the expired elements leak in and this case fails.
+       g.Entry("excludes data expired beyond TTL", helpers.Args{Input: "all", 
Offset: -156 * time.Hour, Duration: 24 * time.Hour, WantEmpty: true}),
        g.Entry("projection with http.method", helpers.Args{Input: 
"all_with_http_method", Duration: 1 * time.Hour}),
        g.Entry("limit", helpers.Args{Input: "limit", Duration: 1 * time.Hour}),
        g.Entry("max limit", helpers.Args{Input: "all_max_limit", Want: "all", 
Duration: 1 * time.Hour}),
diff --git a/test/cases/trace/trace.go b/test/cases/trace/trace.go
index 474b4f677..863cf5f75 100644
--- a/test/cases/trace/trace.go
+++ b/test/cases/trace/trace.go
@@ -39,6 +39,11 @@ var (
 
 var traceEntries = []any{
        g.Entry("query by trace id", helpers.Args{Input: "eq_trace_id", 
Duration: 1 * time.Hour}),
+       // Window sits ~6 days back, well past the "test-trace-group" group's 
3-day
+       // TTL. The data seeded there (see test/cases/init.go) is in a fully 
expired
+       // segment and must be dropped by the retention filter, yielding an 
empty
+       // result. Without the filter the expired traces leak in and this case 
fails.
+       g.Entry("excludes data expired beyond TTL", helpers.Args{Input: "all", 
Offset: -156 * time.Hour, Duration: 24 * time.Hour, WantEmpty: true}),
        g.Entry("query by trace ids", helpers.Args{Input: "in_trace_ids", 
Duration: 1 * time.Hour}),
        g.Entry("query by empty span ids", helpers.Args{Input: 
"in_empty_span_ids", Duration: 1 * time.Hour, WantEmpty: true}),
        g.Entry("order by timestamp", helpers.Args{Input: 
"order_timestamp_desc", Duration: 1 * time.Hour}),

Reply via email to