This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 96fbbc65 Add snapshot time retention policy and fix lifecycle panic 
issue (#950)
96fbbc65 is described below

commit 96fbbc65409763d0cc5b8c37339618aa89177c02
Author: mrproliu <[email protected]>
AuthorDate: Mon Jan 19 21:37:18 2026 +0800

    Add snapshot time retention policy and fix lifecycle panic issue (#950)
    
    * Add snapshot time retention policy and fix lifecycle panic issue
---
 CHANGES.md                                         |   2 +
 .../backup/lifecycle/trace_migration_visitor.go    |   4 +
 banyand/internal/storage/snapshot.go               |  36 ++++-
 banyand/internal/storage/snapshot_test.go          | 164 ++++++++++++++++++++-
 banyand/measure/snapshot.go                        |   4 +-
 banyand/measure/svc_data.go                        |   6 +-
 banyand/measure/svc_standalone.go                  |   2 +
 banyand/property/listener.go                       |   4 +-
 banyand/property/service.go                        |   4 +-
 banyand/stream/snapshot.go                         |   4 +-
 banyand/stream/svc_standalone.go                   |   4 +-
 banyand/trace/svc_standalone.go                    |   8 +-
 pkg/fs/file_system.go                              |   2 +
 pkg/fs/local_file_system.go                        |   5 +
 14 files changed, 232 insertions(+), 17 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3f096083..3e70e8d2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -17,6 +17,7 @@ Release Notes.
 - Persist series metadata in liaison queue for measure, stream and trace 
models.
 - Update the dump tool to support analyzing the parts with smeta files.
 - Activate the property repair mechanism by default.
+- Add snapshot time retention policy to ensure the snapshot only can be 
deleted after the configured minimum age(time).
 
 ### Bug Fixes
 
@@ -27,6 +28,7 @@ Release Notes.
 - Fix unsupported empty string tag bug.
 - Fix duplicate elements in stream query results by implementing element 
ID-based deduplication across scan, merge, and result building stages.
 - Fix data written to the wrong shard and related stream queries.
+- Fix the lifecycle panic when the trace has no sidx.
 
 ### Document
 
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go 
b/banyand/backup/lifecycle/trace_migration_visitor.go
index 3b767a86..2936d7c1 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -289,6 +289,10 @@ func (mv *traceMigrationVisitor) generateAllSidxPartData(
        sourceShardID common.ShardID,
        sidxPath string,
 ) ([]queue.StreamingPartData, []func(), error) {
+       // If the sidx does not exist, then ignore for the life cycle
+       if !mv.lfs.IsExist(sidxPath) {
+               return nil, nil, nil
+       }
        // Sidx structure: sidx/{index-name}/{part-id}/files
        // Find all index directories in the sidx directory
        entries := mv.lfs.ReadDir(sidxPath)
diff --git a/banyand/internal/storage/snapshot.go 
b/banyand/internal/storage/snapshot.go
index ff2fc615..e5cdd2c6 100644
--- a/banyand/internal/storage/snapshot.go
+++ b/banyand/internal/storage/snapshot.go
@@ -18,16 +18,34 @@
 package storage
 
 import (
+       "fmt"
        "os"
        "path/filepath"
        "sort"
        "time"
 
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+// SnapshotTimeFormat is the timestamp snapshot directory prefix.
+const SnapshotTimeFormat = "20060102150405"
+
+// ParseSnapshotTimestamp extracts the creation time from a snapshot directory 
name.
+func ParseSnapshotTimestamp(name string) (time.Time, error) {
+       if len(name) < 14 {
+               return time.Time{}, fmt.Errorf("snapshot name too short: %s", 
name)
+       }
+       timestampStr := name[:14]
+       parsedTime, parseErr := time.Parse(SnapshotTimeFormat, timestampStr)
+       if parseErr != nil {
+               return time.Time{}, fmt.Errorf("failed to parse timestamp from 
snapshot name %s: %w", name, parseErr)
+       }
+       return parsedTime, nil
+}
+
 // DeleteStaleSnapshots deletes the stale snapshots in the root directory.
-func DeleteStaleSnapshots(root string, maxNum int, lfs fs.FileSystem) {
+func DeleteStaleSnapshots(root string, maxNum int, minAge time.Duration, lfs 
fs.FileSystem) {
        if maxNum <= 0 {
                return
        }
@@ -40,8 +58,22 @@ func DeleteStaleSnapshots(root string, maxNum int, lfs 
fs.FileSystem) {
        sort.Slice(snapshots, func(i, j int) bool {
                return snapshots[i].Name() < snapshots[j].Name()
        })
+       now := time.Now()
        for i := 0; i < len(snapshots)-maxNum; i++ {
-               lfs.MustRMAll(filepath.Join(root, snapshots[i].Name()))
+               snapshotName := snapshots[i].Name()
+               // If the min age is not set, then only keep using the max num 
to delete
+               if minAge == 0 {
+                       lfs.MustRMAll(filepath.Join(root, snapshotName))
+                       continue
+               }
+               snapshotTime, parseErr := ParseSnapshotTimestamp(snapshotName)
+               if parseErr != nil {
+                       logger.GetLogger().Warn().Err(parseErr).Str("snapshot", 
snapshotName).Msg("failed to parse snapshot timestamp, skipping")
+                       continue
+               }
+               if now.Sub(snapshotTime) >= minAge {
+                       lfs.MustRMAll(filepath.Join(root, snapshotName))
+               }
        }
 }
 
diff --git a/banyand/internal/storage/snapshot_test.go 
b/banyand/internal/storage/snapshot_test.go
index b3b4c0da..e4045903 100644
--- a/banyand/internal/storage/snapshot_test.go
+++ b/banyand/internal/storage/snapshot_test.go
@@ -18,9 +18,11 @@
 package storage
 
 import (
+       "fmt"
        "path/filepath"
        "sort"
        "testing"
+       "time"
 
        "github.com/stretchr/testify/assert"
        "github.com/stretchr/testify/require"
@@ -29,7 +31,7 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-func TestDeleteStaleSnapshots(t *testing.T) {
+func TestDeleteStaleSnapshotsWithCount(t *testing.T) {
        fileSystem := fs.NewLocalFileSystem()
        tmpPath, deferFn := test.Space(require.New(t))
        defer deferFn()
@@ -48,7 +50,7 @@ func TestDeleteStaleSnapshots(t *testing.T) {
                fileSystem.MkdirIfNotExist(dirPath, 0o755)
        }
 
-       DeleteStaleSnapshots(snapshotsRoot, 2, fileSystem)
+       DeleteStaleSnapshots(snapshotsRoot, 2, 0, fileSystem)
 
        remaining := fileSystem.ReadDir(snapshotsRoot)
        require.Equal(t, 2, len(remaining))
@@ -60,3 +62,161 @@ func TestDeleteStaleSnapshots(t *testing.T) {
        sort.Strings(names)
        assert.Equal(t, []string{"20201010101010-00000003", 
"20201010101010-00000004"}, names)
 }
+
+func TestParseSnapshotTimestamp(t *testing.T) {
+       tests := []struct {
+               expectTime  time.Time
+               name        string
+               snapshotDir string
+               expectErr   bool
+       }{
+               {
+                       name:        "valid snapshot name",
+                       snapshotDir: "20201010101010-00000001",
+                       expectErr:   false,
+                       expectTime:  time.Date(2020, 10, 10, 10, 10, 10, 0, 
time.UTC),
+               },
+               {
+                       name:        "another valid snapshot",
+                       snapshotDir: "20231225153045-12345678",
+                       expectErr:   false,
+                       expectTime:  time.Date(2023, 12, 25, 15, 30, 45, 0, 
time.UTC),
+               },
+               {
+                       name:        "snapshot name too short",
+                       snapshotDir: "2020101010",
+                       expectErr:   true,
+               },
+               {
+                       name:        "invalid timestamp format",
+                       snapshotDir: "abcd1010101010-00000001",
+                       expectErr:   true,
+               },
+               {
+                       name:        "empty name",
+                       snapshotDir: "",
+                       expectErr:   true,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       parsedTime, parseErr := 
ParseSnapshotTimestamp(tt.snapshotDir)
+                       if tt.expectErr {
+                               require.Error(t, parseErr)
+                       } else {
+                               require.NoError(t, parseErr)
+                               assert.Equal(t, tt.expectTime, parsedTime)
+                       }
+               })
+       }
+}
+
+func TestDeleteStaleSnapshotsWithMinAge(t *testing.T) {
+       tests := []struct {
+               name                  string
+               snapshotAges          []time.Duration
+               maxNum                int
+               minAge                time.Duration
+               expectedRemaining     int
+               oldestDeletedAge      time.Duration
+               validateOldestDeleted bool
+       }{
+               {
+                       name:              "all snapshots within age threshold 
- no deletion",
+                       maxNum:            2,
+                       minAge:            1 * time.Hour,
+                       snapshotAges:      []time.Duration{-30 * time.Minute, 
-45 * time.Minute, -50 * time.Minute},
+                       expectedRemaining: 3,
+               },
+               {
+                       name:                  "count exceeded and old 
snapshots exist - delete old only",
+                       maxNum:                2,
+                       minAge:                1 * time.Hour,
+                       snapshotAges:          []time.Duration{-3 * time.Hour, 
-2 * time.Hour, -30 * time.Minute},
+                       expectedRemaining:     2,
+                       validateOldestDeleted: true,
+                       oldestDeletedAge:      -3 * time.Hour,
+               },
+               {
+                       name:              "count not exceeded - no deletion 
despite old snapshots",
+                       maxNum:            5,
+                       minAge:            1 * time.Hour,
+                       snapshotAges:      []time.Duration{-3 * time.Hour, -2 * 
time.Hour, -30 * time.Minute},
+                       expectedRemaining: 3,
+               },
+               {
+                       name:                  "all snapshots old and count 
exceeded - delete to max",
+                       maxNum:                2,
+                       minAge:                1 * time.Hour,
+                       snapshotAges:          []time.Duration{-5 * time.Hour, 
-4 * time.Hour, -3 * time.Hour, -2 * time.Hour},
+                       expectedRemaining:     2,
+                       validateOldestDeleted: true,
+                       oldestDeletedAge:      -5 * time.Hour,
+               },
+               {
+                       name:              "mixed ages with high max - keep 
all",
+                       maxNum:            10,
+                       minAge:            1 * time.Hour,
+                       snapshotAges:      []time.Duration{-5 * time.Hour, -2 * 
time.Hour, -30 * time.Minute, -10 * time.Minute},
+                       expectedRemaining: 4,
+               },
+               {
+                       name:                  "boundary case - snapshot at 
threshold is deleted",
+                       maxNum:                1,
+                       minAge:                1 * time.Hour,
+                       snapshotAges:          []time.Duration{-1*time.Hour - 
1*time.Minute, -59 * time.Minute},
+                       expectedRemaining:     1,
+                       validateOldestDeleted: true,
+                       oldestDeletedAge:      -1*time.Hour - 1*time.Minute,
+               },
+               {
+                       name:                  "large number of old snapshots",
+                       maxNum:                3,
+                       minAge:                30 * time.Minute,
+                       snapshotAges:          []time.Duration{-5 * time.Hour, 
-4 * time.Hour, -3 * time.Hour, -2 * time.Hour, -1 * time.Hour, -45 * 
time.Minute, -20 * time.Minute},
+                       expectedRemaining:     3,
+                       validateOldestDeleted: true,
+                       oldestDeletedAge:      -5 * time.Hour,
+               },
+               {
+                       name:              "partial deletion - some candidates 
too young",
+                       maxNum:            2,
+                       minAge:            2 * time.Hour,
+                       snapshotAges:      []time.Duration{-5 * time.Hour, 
-1*time.Hour - 30*time.Minute, -45 * time.Minute, -30 * time.Minute},
+                       expectedRemaining: 3,
+               },
+       }
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       fileSystem := fs.NewLocalFileSystem()
+                       tmpPath, deferFn := test.Space(require.New(t))
+                       defer deferFn()
+                       snapshotsRoot := filepath.Join(tmpPath, "snapshots")
+                       fileSystem.MkdirIfNotExist(snapshotsRoot, 0o755)
+                       now := time.Now().UTC()
+                       createdSnapshots := make(map[string]time.Time)
+                       for idx, age := range tt.snapshotAges {
+                               snapshotTime := now.Add(age)
+                               snapshotName := fmt.Sprintf("%s-%08d", 
snapshotTime.Format(SnapshotTimeFormat), idx+1)
+                               dirPath := filepath.Join(snapshotsRoot, 
snapshotName)
+                               fileSystem.MkdirIfNotExist(dirPath, 0o755)
+                               createdSnapshots[snapshotName] = snapshotTime
+                       }
+                       DeleteStaleSnapshots(snapshotsRoot, tt.maxNum, 
tt.minAge, fileSystem)
+                       remaining := fileSystem.ReadDir(snapshotsRoot)
+                       require.Equal(t, tt.expectedRemaining, len(remaining), 
"unexpected number of remaining snapshots")
+                       if tt.validateOldestDeleted {
+                               oldestDeletedTime := 
now.Add(tt.oldestDeletedAge)
+                               oldestDeletedName := fmt.Sprintf("%s-%08d", 
oldestDeletedTime.Format(SnapshotTimeFormat), 1)
+                               found := false
+                               for _, info := range remaining {
+                                       if info.Name() == oldestDeletedName {
+                                               found = true
+                                               break
+                                       }
+                               }
+                               require.False(t, found, "oldest snapshot %s 
should have been deleted", oldestDeletedName)
+                       }
+               })
+       }
+}
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 4389d7c0..71ccb244 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -359,7 +359,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
        }
        s.snapshotMux.Lock()
        defer s.snapshotMux.Unlock()
-       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.lfs)
+       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.minFileSnapshotAge, s.s.lfs)
        sn := s.snapshotName()
        var err error
        for _, g := range gg {
@@ -392,5 +392,5 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
 
 func (s *snapshotListener) snapshotName() string {
        s.snapshotSeq++
-       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
 }
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index e7a5a0b3..fee99941 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -75,6 +75,7 @@ type dataSVC struct {
        retentionConfig    storage.RetentionConfig
        cc                 storage.CacheConfig
        maxFileSnapshotNum int
+       minFileSnapshotAge time.Duration
 }
 
 func (s *dataSVC) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -178,6 +179,7 @@ func (s *dataSVC) FlagSet() *run.FlagSet {
                "enable forced retention cleanup when disk usage exceeds high 
watermark")
 
        flagS.IntVar(&s.maxFileSnapshotNum, "measure-max-file-snapshot-num", 
10, "the maximum number of file snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"measure-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to 
be eligible for deletion")
        s.cc.MaxCacheSize = run.Bytes(100 * 1024 * 1024)
        flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum 
service cache size (e.g., 100M)")
        flagS.DurationVar(&s.cc.CleanupInterval, 
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup 
interval")
@@ -481,7 +483,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context, 
message bus.Message) bus
        }
        d.snapshotMux.Lock()
        defer d.snapshotMux.Unlock()
-       storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum, 
d.s.lfs)
+       storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum, 
d.s.minFileSnapshotAge, d.s.lfs)
        sn := d.snapshotName()
        var err error
        for _, g := range gg {
@@ -509,7 +511,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context, 
message bus.Message) bus
 
 func (d *dataSnapshotListener) snapshotName() string {
        d.snapshotSeq++
-       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), d.snapshotSeq)
 }
 
 type dataDeleteStreamSegmentsListener struct {
diff --git a/banyand/measure/svc_standalone.go 
b/banyand/measure/svc_standalone.go
index 84064a2e..1c444380 100644
--- a/banyand/measure/svc_standalone.go
+++ b/banyand/measure/svc_standalone.go
@@ -79,6 +79,7 @@ type standalone struct {
        retentionConfig    storage.RetentionConfig
        cc                 storage.CacheConfig
        maxFileSnapshotNum int
+       minFileSnapshotAge time.Duration
 }
 
 func (s *standalone) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -182,6 +183,7 @@ func (s *standalone) FlagSet() *run.FlagSet {
                "enable forced retention cleanup when disk usage exceeds high 
watermark")
 
        flagS.IntVar(&s.maxFileSnapshotNum, "measure-max-file-snapshot-num", 
10, "the maximum number of file snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"measure-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to 
be eligible for deletion")
        s.cc.MaxCacheSize = run.Bytes(100 * 1024 * 1024)
        flagS.VarP(&s.cc.MaxCacheSize, "service-cache-max-size", "", "maximum 
service cache size (e.g., 100M)")
        flagS.DurationVar(&s.cc.CleanupInterval, 
"service-cache-cleanup-interval", 30*time.Second, "service cache cleanup 
interval")
diff --git a/banyand/property/listener.go b/banyand/property/listener.go
index c381eaac..bfb13d03 100644
--- a/banyand/property/listener.go
+++ b/banyand/property/listener.go
@@ -232,7 +232,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
        }
        s.snapshotMux.Lock()
        defer s.snapshotMux.Unlock()
-       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.lfs)
+       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.minFileSnapshotAge, s.s.lfs)
        sn := s.snapshotName()
        shardsRef := s.s.db.sLst.Load()
        if shardsRef == nil {
@@ -266,7 +266,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
 
 func (s *snapshotListener) snapshotName() string {
        s.snapshotSeq++
-       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
 }
 
 type repairListener struct {
diff --git a/banyand/property/service.go b/banyand/property/service.go
index b3be9b1d..db239866 100644
--- a/banyand/property/service.go
+++ b/banyand/property/service.go
@@ -76,6 +76,7 @@ type service struct {
        repairTreeSlotCount      int
        maxDiskUsagePercent      int
        maxFileSnapshotNum       int
+       minFileSnapshotAge       time.Duration
        repairEnabled            bool
 }
 
@@ -84,7 +85,8 @@ func (s *service) FlagSet() *run.FlagSet {
        flagS.StringVar(&s.root, "property-root-path", "/tmp", "the root path 
of database")
        flagS.DurationVar(&s.flushTimeout, "property-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
        flagS.IntVar(&s.maxDiskUsagePercent, "property-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
-       flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num", 
2, "the maximum number of file snapshots allowed")
+       flagS.IntVar(&s.maxFileSnapshotNum, "property-max-file-snapshot-num", 
10, "the maximum number of file snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"property-min-file-snapshot-age", time.Hour, "the minimum age for file 
snapshots to be eligible for deletion")
        flagS.DurationVar(&s.expireTimeout, "property-expire-delete-timeout", 
time.Hour*24*7, "the duration of the expired data needs to be deleted")
        flagS.IntVar(&s.repairTreeSlotCount, "property-repair-tree-slot-count", 
32, "the slot count of the repair tree")
        flagS.StringVar(&s.repairBuildTreeCron, 
"property-repair-build-tree-cron", "@every 1h", "the cron expression for 
repairing the build tree")
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 41008a5f..6e45a786 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -410,7 +410,7 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
        }
        s.snapshotMux.Lock()
        defer s.snapshotMux.Unlock()
-       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.lfs)
+       storage.DeleteStaleSnapshots(s.s.snapshotDir, s.s.maxFileSnapshotNum, 
s.s.minFileSnapshotAge, s.s.lfs)
        sn := s.snapshotName()
        var err error
        for _, g := range gg {
@@ -443,5 +443,5 @@ func (s *snapshotListener) Rev(ctx context.Context, message 
bus.Message) bus.Mes
 
 func (s *snapshotListener) snapshotName() string {
        s.snapshotSeq++
-       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), s.snapshotSeq)
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), s.snapshotSeq)
 }
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index cc0dfc46..1be24bf7 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -78,6 +78,7 @@ type standalone struct {
        option                option
        retentionConfig       storage.RetentionConfig
        maxFileSnapshotNum    int
+       minFileSnapshotAge    time.Duration
 }
 
 func (s *standalone) Stream(metadata *commonv1.Metadata) (Stream, error) {
@@ -181,7 +182,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        flagS.BoolVar(&s.retentionConfig.ForceCleanupEnabled, 
"stream-retention-force-cleanup-enabled", false,
                "enable forced retention cleanup when disk usage exceeds high 
watermark")
 
-       flagS.IntVar(&s.maxFileSnapshotNum, "stream-max-file-snapshot-num", 2, 
"the maximum number of file snapshots allowed")
+       flagS.IntVar(&s.maxFileSnapshotNum, "stream-max-file-snapshot-num", 10, 
"the maximum number of file snapshots allowed")
+       flagS.DurationVar(&s.minFileSnapshotAge, 
"stream-min-file-snapshot-age", time.Hour, "minimum age for file snapshots to 
be eligible for deletion")
        return flagS
 }
 
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 6902cab5..ec893bbf 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -74,6 +74,7 @@ type standalone struct {
        option             option
        retentionConfig    storage.RetentionConfig
        maxFileSnapshotNum int
+       minFileSnapshotAge time.Duration
 }
 
 func (s *standalone) FlagSet() *run.FlagSet {
@@ -90,7 +91,8 @@ func (s *standalone) FlagSet() *run.FlagSet {
        fs.BoolVar(&s.retentionConfig.ForceCleanupEnabled, 
"trace-retention-force-cleanup-enabled", false,
                "enable forced retention cleanup when disk usage exceeds high 
watermark")
 
-       fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 2, "the 
maximum number of file snapshots")
+       fs.IntVar(&s.maxFileSnapshotNum, "trace-max-file-snapshot-num", 10, 
"the maximum number of file snapshots")
+       fs.DurationVar(&s.minFileSnapshotAge, "trace-min-file-snapshot-age", 
time.Hour, "minimum age for file snapshots to be eligible for deletion")
        s.option.mergePolicy = newDefaultMergePolicy()
        fs.VarP(&s.option.mergePolicy.maxFanOutSize, "trace-max-fan-out-size", 
"", "the upper bound of a single file size after merge of trace")
        // Additional flags can be added here
@@ -446,7 +448,7 @@ func (d *standaloneSnapshotListener) Rev(ctx 
context.Context, message bus.Messag
        }
        d.snapshotMux.Lock()
        defer d.snapshotMux.Unlock()
-       storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum, 
d.s.lfs)
+       storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum, 
d.s.minFileSnapshotAge, d.s.lfs)
        sn := d.snapshotName()
        var err error
        for _, g := range gg {
@@ -479,7 +481,7 @@ func (d *standaloneSnapshotListener) Rev(ctx 
context.Context, message bus.Messag
 
 func (d *standaloneSnapshotListener) snapshotName() string {
        d.snapshotSeq++
-       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
+       return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format(storage.SnapshotTimeFormat), d.snapshotSeq)
 }
 
 type standaloneDeleteTraceSegmentsListener struct {
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 57d46083..9bd585e8 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -124,6 +124,8 @@ type FileSystem interface {
        MustGetTotalSpace(path string) uint64
        // CreateHardLink creates hard links in destPath for files in srcPath 
that pass the filter.
        CreateHardLink(srcPath, destPath string, filter func(string) bool) error
+       // IsExist checks if the directory/file exists or not.
+       IsExist(path string) bool
 }
 
 // DirEntry is the interface that wraps the basic information about a file or 
directory.
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 451f46c1..f025653f 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -309,6 +309,11 @@ func (fs *localFileSystem) MustGetTotalSpace(path string) 
uint64 {
        return usage.Total
 }
 
+func (fs *localFileSystem) IsExist(path string) bool {
+       _, err := os.Stat(path)
+       return err == nil
+}
+
 func (fs *localFileSystem) CreateHardLink(srcPath, destPath string, filter 
func(string) bool) error {
        fi, err := os.Stat(srcPath)
        if err != nil {

Reply via email to