This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 775148e7 Refactor lifecycle logical (#848)
775148e7 is described below

commit 775148e72c8b937ee68ed923526dfcf93a5bd43b
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 17 19:30:43 2025 +0900

    Refactor lifecycle logical (#848)
    
    * Refactor lifecycle logical
    
    ---------
    
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/proto/banyandb/measure/v1/rpc.proto            |   3 +-
 api/proto/banyandb/stream/v1/rpc.proto             |   3 +-
 banyand/backup/lifecycle/migration_integration.go  | 101 +++++++----------
 banyand/backup/lifecycle/segment_boundary_utils.go |   3 +-
 banyand/backup/lifecycle/service.go                | 123 +++++++++------------
 banyand/backup/lifecycle/steps.go                  |  56 ++++++++--
 banyand/internal/storage/segment.go                |  23 +++-
 banyand/internal/storage/segment_test.go           |  11 +-
 banyand/internal/storage/storage.go                |   2 +-
 banyand/internal/storage/tsdb.go                   |   4 +-
 banyand/internal/storage/visitor.go                |  33 ++++--
 banyand/internal/storage/visitor_test.go           |  22 ++--
 banyand/measure/svc_data.go                        |   2 +-
 banyand/measure/visitor.go                         |   7 +-
 banyand/stream/svc_standalone.go                   |   2 +-
 banyand/stream/visitor.go                          |   5 +-
 banyand/stream/visitor_test.go                     |   3 +-
 docs/api-reference.md                              |   4 +-
 18 files changed, 224 insertions(+), 183 deletions(-)

diff --git a/api/proto/banyandb/measure/v1/rpc.proto 
b/api/proto/banyandb/measure/v1/rpc.proto
index 22e71173..0f1d1bf5 100644
--- a/api/proto/banyandb/measure/v1/rpc.proto
+++ b/api/proto/banyandb/measure/v1/rpc.proto
@@ -22,7 +22,6 @@ package banyandb.measure.v1;
 import "banyandb/measure/v1/query.proto";
 import "banyandb/measure/v1/topn.proto";
 import "banyandb/measure/v1/write.proto";
-import "banyandb/model/v1/query.proto";
 import "google/api/annotations.proto";
 import "protoc-gen-openapiv2/options/annotations.proto";
 
@@ -32,7 +31,7 @@ option 
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_pat
 
 message DeleteExpiredSegmentsRequest {
   string group = 1;
-  model.v1.TimeRange time_range = 2;
+  repeated string segment_suffixes = 2;
 }
 
 message DeleteExpiredSegmentsResponse {
diff --git a/api/proto/banyandb/stream/v1/rpc.proto 
b/api/proto/banyandb/stream/v1/rpc.proto
index dcbe266e..1712a5f9 100644
--- a/api/proto/banyandb/stream/v1/rpc.proto
+++ b/api/proto/banyandb/stream/v1/rpc.proto
@@ -19,7 +19,6 @@ syntax = "proto3";
 
 package banyandb.stream.v1;
 
-import "banyandb/model/v1/query.proto";
 import "banyandb/stream/v1/query.proto";
 import "banyandb/stream/v1/write.proto";
 import "google/api/annotations.proto";
@@ -31,7 +30,7 @@ option 
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_pat
 
 message DeleteExpiredSegmentsRequest {
   string group = 1;
-  model.v1.TimeRange time_range = 2;
+  repeated string segment_suffixes = 2;
 }
 
 message DeleteExpiredSegmentsResponse {
diff --git a/banyand/backup/lifecycle/migration_integration.go 
b/banyand/backup/lifecycle/migration_integration.go
index d51d9eea..b36ae1e5 100644
--- a/banyand/backup/lifecycle/migration_integration.go
+++ b/banyand/backup/lifecycle/migration_integration.go
@@ -19,51 +19,37 @@ package lifecycle
 
 import (
        "github.com/apache/skywalking-banyandb/api/common"
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
-       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/measure"
-       "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
 // migrateStreamWithFileBasedAndProgress performs file-based stream migration 
with progress tracking.
-func migrateStreamWithFileBasedAndProgress(
-       tsdbRootPath string,
-       timeRange timestamp.TimeRange,
-       group *commonv1.Group,
-       nodeLabels map[string]string,
-       nodes []*databasev1.Node,
-       metadata metadata.Repo,
-       logger *logger.Logger,
-       progress *Progress,
-       chunkSize int,
-) error {
-       // Use parseGroup function to get sharding parameters and TTL
-       shardNum, replicas, ttl, selector, client, err := parseGroup(group, 
nodeLabels, nodes, logger, metadata)
-       if err != nil {
-               return err
-       }
-       defer client.GracefulStop()
-
-       // Convert TTL to IntervalRule using storage.MustToIntervalRule
-       intervalRule := storage.MustToIntervalRule(ttl)
+func migrateStreamWithFileBasedAndProgress(tsdbRootPath string, timeRange 
timestamp.TimeRange, group *GroupConfig,
+       logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+       // Convert segment Interval to IntervalRule using 
storage.MustToIntervalRule
+       segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
 
        // Get target stage configuration
        targetStageInterval := getTargetStageInterval(group)
 
        // Count total parts before starting migration
-       totalParts, err := countStreamParts(tsdbRootPath, timeRange, 
intervalRule)
+       totalParts, segmentSuffixes, err := countStreamParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
        if err != nil {
                logger.Warn().Err(err).Msg("failed to count stream parts, 
proceeding without part count")
        } else {
-               logger.Info().Int("total_parts", totalParts).Msg("counted 
stream parts for progress tracking")
+               logger.Info().Int("total_parts", 
totalParts).Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted stream parts for progres tracking")
        }
 
        // Create file-based migration visitor with progress tracking and 
target stage interval
-       visitor := newStreamMigrationVisitor(group, shardNum, replicas, 
selector, client, logger, progress, chunkSize, targetStageInterval)
+       visitor := newStreamMigrationVisitor(
+               group.Group, group.TargetShardNum, group.TargetReplicas, 
group.NodeSelector, group.QueueClient,
+               logger, progress, chunkSize, targetStageInterval,
+       )
        defer visitor.Close()
 
        // Set the total part count for progress tracking
@@ -72,21 +58,25 @@ func migrateStreamWithFileBasedAndProgress(
        }
 
        // Use the existing VisitStreamsInTimeRange function with our 
file-based visitor
-       return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor, 
intervalRule)
+       _, err = stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, 
visitor, segmentIntervalRule)
+       if err != nil {
+               return nil, err
+       }
+       return segmentSuffixes, nil
 }
 
 // countStreamParts counts the total number of parts in the given time range.
-func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
intervalRule storage.IntervalRule) (int, error) {
+func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
        // Create a simple visitor to count parts
        partCounter := &partCountVisitor{}
 
        // Use the existing VisitStreamsInTimeRange function to count parts
-       err := stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, 
partCounter, intervalRule)
+       segmentSuffixes, err := stream.VisitStreamsInTimeRange(tsdbRootPath, 
timeRange, partCounter, segmentInterval)
        if err != nil {
-               return 0, err
+               return 0, nil, err
        }
 
-       return partCounter.partCount, nil
+       return partCounter.partCount, segmentSuffixes, nil
 }
 
 // partCountVisitor is a simple visitor that counts parts.
@@ -111,40 +101,29 @@ func (pcv *partCountVisitor) VisitElementIndex(_ 
*timestamp.TimeRange, _ common.
 }
 
 // migrateMeasureWithFileBasedAndProgress performs file-based measure 
migration with progress tracking.
-func migrateMeasureWithFileBasedAndProgress(
-       tsdbRootPath string,
-       timeRange timestamp.TimeRange,
-       group *commonv1.Group,
-       nodeLabels map[string]string,
-       nodes []*databasev1.Node,
-       metadata metadata.Repo,
-       logger *logger.Logger,
-       progress *Progress,
-       chunkSize int,
-) error {
-       // Use parseGroup function to get sharding parameters and TTL
-       shardNum, replicas, ttl, selector, client, err := parseGroup(group, 
nodeLabels, nodes, logger, metadata)
-       if err != nil {
-               return err
-       }
-       defer client.GracefulStop()
-
-       // Convert TTL to IntervalRule using storage.MustToIntervalRule
-       intervalRule := storage.MustToIntervalRule(ttl)
+func migrateMeasureWithFileBasedAndProgress(tsdbRootPath string, timeRange 
timestamp.TimeRange, group *GroupConfig,
+       logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+       // Convert segment interval to IntervalRule using 
storage.MustToIntervalRule
+       segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
 
        // Get target stage configuration
        targetStageInterval := getTargetStageInterval(group)
 
        // Count total parts before starting migration
-       totalParts, err := countMeasureParts(tsdbRootPath, timeRange, 
intervalRule)
+       totalParts, segmentSuffixes, err := countMeasureParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
        if err != nil {
                logger.Warn().Err(err).Msg("failed to count measure parts, 
proceeding without part count")
        } else {
-               logger.Info().Int("total_parts", totalParts).Msg("counted 
measure parts for progress tracking")
+               logger.Info().Int("total_parts", 
totalParts).Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted measure parts for progress tracking")
        }
 
        // Create file-based migration visitor with progress tracking and 
target stage interval
-       visitor := newMeasureMigrationVisitor(group, shardNum, replicas, 
selector, client, logger, progress, chunkSize, targetStageInterval)
+       visitor := newMeasureMigrationVisitor(
+               group.Group, group.TargetShardNum, group.TargetReplicas, 
group.NodeSelector, group.QueueClient,
+               logger, progress, chunkSize, targetStageInterval,
+       )
        defer visitor.Close()
 
        // Set the total part count for progress tracking
@@ -153,21 +132,25 @@ func migrateMeasureWithFileBasedAndProgress(
        }
 
        // Use the existing VisitMeasuresInTimeRange function with our 
file-based visitor
-       return measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange, 
visitor, intervalRule)
+       _, err = measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange, 
visitor, segmentIntervalRule)
+       if err != nil {
+               return nil, err
+       }
+       return segmentSuffixes, nil
 }
 
 // countMeasureParts counts the total number of parts in the given time range.
-func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
intervalRule storage.IntervalRule) (int, error) {
+func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
        // Create a simple visitor to count parts
        partCounter := &measurePartCountVisitor{}
 
        // Use the existing VisitMeasuresInTimeRange function to count parts
-       err := measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange, 
partCounter, intervalRule)
+       segmentSuffixes, err := measure.VisitMeasuresInTimeRange(tsdbRootPath, 
timeRange, partCounter, segmentInterval)
        if err != nil {
-               return 0, err
+               return 0, nil, err
        }
 
-       return partCounter.partCount, nil
+       return partCounter.partCount, segmentSuffixes, nil
 }
 
 // measurePartCountVisitor is a simple visitor that counts measure parts.
diff --git a/banyand/backup/lifecycle/segment_boundary_utils.go 
b/banyand/backup/lifecycle/segment_boundary_utils.go
index 243bb4a9..c3f0d6a1 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils.go
@@ -20,7 +20,6 @@ package lifecycle
 import (
        "time"
 
-       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -62,7 +61,7 @@ func getSegmentTimeRange(segmentStart time.Time, interval 
storage.IntervalRule)
        return timestamp.NewSectionTimeRange(segmentStart, segmentEnd)
 }
 
-func getTargetStageInterval(group *commonv1.Group) storage.IntervalRule {
+func getTargetStageInterval(group *GroupConfig) storage.IntervalRule {
        if group.ResourceOpts != nil && len(group.ResourceOpts.Stages) > 0 {
                stage := group.ResourceOpts.Stages[0]
                if stage.SegmentInterval != nil {
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index afa77715..243893f0 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -30,13 +30,11 @@ import (
        "github.com/benbjohnson/clock"
        "github.com/robfig/cron/v3"
        "google.golang.org/grpc"
-       "google.golang.org/protobuf/types/known/timestamppb"
 
        "github.com/apache/skywalking-banyandb/api/common"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
-       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
@@ -555,76 +553,67 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
 func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group,
        streamDir string, nodes []*databasev1.Node, labels map[string]string, 
progress *Progress,
 ) {
-       tr := l.getRemovalSegmentsTimeRange(g)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
+               return
+       }
+       defer group.Close()
+       tr := l.getRemovalSegmentsTimeRange(group)
        if tr.Start.IsZero() && tr.End.IsZero() {
                l.l.Info().Msgf("no removal segments time range for group %s, 
skipping stream migration", g.Metadata.Name)
                progress.MarkGroupCompleted(g.Metadata.Name)
                return
        }
 
-       err := l.processStreamGroupFileBased(ctx, g, streamDir, tr, nodes, 
labels, progress)
+       segmentSuffixes, err := l.processStreamGroupFileBased(ctx, group, 
streamDir, tr, progress)
        if err != nil {
                l.l.Error().Err(err).Msgf("failed to migrate stream group %s 
using file-based approach", g.Metadata.Name)
                return
        }
 
-       l.l.Info().Msgf("deleting expired stream segments for group: %s", 
g.Metadata.Name)
-       l.deleteExpiredStreamSegments(ctx, g, tr, progress)
+       l.l.Info().Msgf("deleting expired stream segments for group: %s, time 
range: %s", g.Metadata.Name, tr.String())
+       l.deleteExpiredStreamSegments(ctx, g, segmentSuffixes, progress)
        progress.MarkGroupCompleted(g.Metadata.Name)
 }
 
 // processStreamGroupFileBased uses file-based migration instead of 
element-based queries.
-func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g 
*commonv1.Group,
-       streamDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node, 
labels map[string]string, progress *Progress,
-) error {
+func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g 
*GroupConfig, streamDir string,
+       tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
        if progress.IsStreamGroupDeleted(g.Metadata.Name) {
                l.l.Info().Msgf("skipping already completed file-based 
migration for group: %s", g.Metadata.Name)
-               return nil
+               return nil, nil
        }
 
-       l.l.Info().Msgf("starting file-based stream migration for group: %s", 
g.Metadata.Name)
+       l.l.Info().Msgf("starting file-based stream migration for group: %s, 
time range: %s", g.Metadata.Name, tr.String())
 
        rootDir := filepath.Join(streamDir, g.Metadata.Name)
        // skip the counting if the tsdb root path does not exist
        // may no data found in the snapshot
        if _, err := os.Stat(rootDir); err != nil && errors.Is(err, 
os.ErrNotExist) {
                l.l.Info().Msgf("skipping file-based stream migration for group 
because is empty in the snapshot dir: %s", g.Metadata.Name)
-               return nil
+               return nil, nil
        }
 
        // Use the file-based migration with existing visitor pattern
-       err := migrateStreamWithFileBasedAndProgress(
-               rootDir,          // Use snapshot directory as source
-               *tr,              // Time range for segments to migrate
-               g,                // Group configuration
-               labels,           // Node labels
-               nodes,            // Target nodes
-               l.metadata,       // Metadata repository
-               l.l,              // Logger
-               progress,         // Progress tracking
-               int(l.chunkSize), // Chunk size for streaming
-       )
+       segmentSuffixes, err := migrateStreamWithFileBasedAndProgress(rootDir, 
*tr, g, l.l, progress, int(l.chunkSize))
        if err != nil {
-               return fmt.Errorf("file-based stream migration failed: %w", err)
+               return nil, fmt.Errorf("file-based stream migration failed: 
%w", err)
        }
 
        l.l.Info().Msgf("completed file-based stream migration for group: %s", 
g.Metadata.Name)
-       return nil
+       return segmentSuffixes, nil
 }
 
 // getRemovalSegmentsTimeRange calculates the time range for segments that 
should be migrated
 // based on the group's TTL configuration, similar to 
storage.segmentController.getExpiredSegmentsTimeRange.
-func (l *lifecycleService) getRemovalSegmentsTimeRange(g *commonv1.Group) 
*timestamp.TimeRange {
-       if g.ResourceOpts == nil || g.ResourceOpts.Ttl == nil {
-               l.l.Debug().Msgf("no TTL configured for group %s", 
g.Metadata.Name)
-               return &timestamp.TimeRange{} // Return empty time range
-       }
-
+func (l *lifecycleService) getRemovalSegmentsTimeRange(g *GroupConfig) 
*timestamp.TimeRange {
        // Convert TTL to storage.IntervalRule
-       ttl := storage.MustToIntervalRule(g.ResourceOpts.Ttl)
+       ttlRule := storage.MustToIntervalRule(g.AccumulatedTTL)
 
        // Calculate deadline based on TTL (same logic as 
segmentController.getExpiredSegmentsTimeRange)
-       deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttl))
+       deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttlRule))
 
        // Create time range for segments before the deadline
        timeRange := &timestamp.TimeRange{
@@ -637,7 +626,7 @@ func (l *lifecycleService) getRemovalSegmentsTimeRange(g 
*commonv1.Group) *times
        l.l.Info().
                Str("group", g.Metadata.Name).
                Time("deadline", deadline).
-               Str("ttl", fmt.Sprintf("%d %s", g.ResourceOpts.Ttl.Num, 
g.ResourceOpts.Ttl.Unit.String())).
+               Str("ttl", fmt.Sprintf("%d %s", g.AccumulatedTTL.Num, 
g.AccumulatedTTL.Unit)).
                Msg("calculated removal segments time range based on TTL")
 
        return timeRange
@@ -657,7 +646,7 @@ func (l *lifecycleService) calculateTTLDuration(ttl 
storage.IntervalRule) time.D
        }
 }
 
-func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g 
*commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
+func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g 
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
        if progress.IsStreamGroupDeleted(g.Metadata.Name) {
                l.l.Info().Msgf("skipping already deleted stream group 
segments: %s", g.Metadata.Name)
                return
@@ -666,19 +655,16 @@ func (l *lifecycleService) 
deleteExpiredStreamSegments(ctx context.Context, g *c
        resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) {
                client := streamv1.NewStreamServiceClient(conn)
                return client.DeleteExpiredSegments(ctx, 
&streamv1.DeleteExpiredSegmentsRequest{
-                       Group: g.Metadata.Name,
-                       TimeRange: &modelv1.TimeRange{
-                               Begin: timestamppb.New(tr.Start),
-                               End:   timestamppb.New(tr.End),
-                       },
+                       Group:           g.Metadata.Name,
+                       SegmentSuffixes: segmentSuffixes,
                })
        })
        if err != nil {
-               l.l.Error().Err(err).Msgf("failed to delete expired segments in 
group %s", g.Metadata.Name)
+               l.l.Error().Err(err).Msgf("failed to delete expired segments in 
group %s, segments: %s", g.Metadata.Name, segmentSuffixes)
                return
        }
 
-       l.l.Info().Msgf("deleted %d expired segments in group %s", 
resp.Deleted, g.Metadata.Name)
+       l.l.Info().Msgf("deleted %d expired segments in group %s, segments: 
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
        progress.MarkStreamGroupDeleted(g.Metadata.Name)
        progress.Save(l.progressFilePath, l.l)
 }
@@ -686,7 +672,14 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx 
context.Context, g *c
 func (l *lifecycleService) processMeasureGroup(ctx context.Context, g 
*commonv1.Group, measureDir string,
        nodes []*databasev1.Node, labels map[string]string, progress *Progress,
 ) {
-       tr := l.getRemovalSegmentsTimeRange(g)
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
+               return
+       }
+       defer group.Close()
+
+       tr := l.getRemovalSegmentsTimeRange(group)
        if tr.Start.IsZero() && tr.End.IsZero() {
                l.l.Info().Msgf("no removal segments time range for group %s, 
skipping measure migration", g.Metadata.Name)
                progress.MarkGroupCompleted(g.Metadata.Name)
@@ -695,24 +688,25 @@ func (l *lifecycleService) processMeasureGroup(ctx 
context.Context, g *commonv1.
        }
 
        // Try file-based migration first
-       if err := l.processMeasureGroupFileBased(ctx, g, measureDir, tr, nodes, 
labels, progress); err != nil {
+       segmentSuffixes, err := l.processMeasureGroupFileBased(ctx, group, 
measureDir, tr, progress)
+       if err != nil {
                l.l.Error().Err(err).Msgf("failed to migrate measure group %s 
using file-based approach", g.Metadata.Name)
                return
        }
 
        l.l.Info().Msgf("deleting expired measure segments for group: %s", 
g.Metadata.Name)
-       l.deleteExpiredMeasureSegments(ctx, g, tr, progress)
+       l.deleteExpiredMeasureSegments(ctx, g, segmentSuffixes, progress)
        progress.MarkGroupCompleted(g.Metadata.Name)
        progress.Save(l.progressFilePath, l.l)
 }
 
 // processMeasureGroupFileBased uses file-based migration instead of 
query-based migration.
-func (l *lifecycleService) processMeasureGroupFileBased(_ context.Context, g 
*commonv1.Group,
-       measureDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node, 
labels map[string]string, progress *Progress,
-) error {
+func (l *lifecycleService) processMeasureGroupFileBased(_ context.Context, g 
*GroupConfig, measureDir string,
+       tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
        if progress.IsMeasureGroupDeleted(g.Metadata.Name) {
                l.l.Info().Msgf("skipping already completed file-based measure 
migration for group: %s", g.Metadata.Name)
-               return nil
+               return nil, nil
        }
 
        l.l.Info().Msgf("starting file-based measure migration for group: %s", 
g.Metadata.Name)
@@ -722,30 +716,20 @@ func (l *lifecycleService) processMeasureGroupFileBased(_ 
context.Context, g *co
        // may no data found in the snapshot
        if _, err := os.Stat(rootDir); err != nil && errors.Is(err, 
os.ErrNotExist) {
                l.l.Info().Msgf("skipping file-based measure migration for 
group because is empty in the snapshot dir: %s", g.Metadata.Name)
-               return nil
+               return nil, nil
        }
 
        // Use the file-based migration with existing visitor pattern
-       err := migrateMeasureWithFileBasedAndProgress(
-               rootDir,          // Use snapshot directory as source
-               *tr,              // Time range for segments to migrate
-               g,                // Group configuration
-               labels,           // Node labels
-               nodes,            // Target nodes
-               l.metadata,       // Metadata repository
-               l.l,              // Logger
-               progress,         // Progress tracking
-               int(l.chunkSize), // Chunk size for streaming
-       )
+       segmentSuffixes, err := migrateMeasureWithFileBasedAndProgress(rootDir, 
*tr, g, l.l, progress, int(l.chunkSize))
        if err != nil {
-               return fmt.Errorf("file-based measure migration failed: %w", 
err)
+               return nil, fmt.Errorf("file-based measure migration failed: 
%w", err)
        }
 
        l.l.Info().Msgf("completed file-based measure migration for group: %s", 
g.Metadata.Name)
-       return nil
+       return segmentSuffixes, nil
 }
 
-func (l *lifecycleService) deleteExpiredMeasureSegments(ctx context.Context, g 
*commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
+func (l *lifecycleService) deleteExpiredMeasureSegments(ctx context.Context, g 
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
        if progress.IsMeasureGroupDeleted(g.Metadata.Name) {
                l.l.Info().Msgf("skipping already deleted measure group 
segments: %s", g.Metadata.Name)
                return
@@ -754,19 +738,16 @@ func (l *lifecycleService) 
deleteExpiredMeasureSegments(ctx context.Context, g *
        resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, error) {
                client := measurev1.NewMeasureServiceClient(conn)
                return client.DeleteExpiredSegments(ctx, 
&measurev1.DeleteExpiredSegmentsRequest{
-                       Group: g.Metadata.Name,
-                       TimeRange: &modelv1.TimeRange{
-                               Begin: timestamppb.New(tr.Start),
-                               End:   timestamppb.New(tr.End),
-                       },
+                       Group:           g.Metadata.Name,
+                       SegmentSuffixes: segmentSuffixes,
                })
        })
        if err != nil {
-               l.l.Error().Err(err).Msgf("failed to delete expired segments in 
group %s", g.Metadata.Name)
+               l.l.Error().Err(err).Msgf("failed to delete expired segments in 
group %s, suffixes: %s", g.Metadata.Name, segmentSuffixes)
                return
        }
 
-       l.l.Info().Msgf("deleted %d expired segments in group %s", 
resp.Deleted, g.Metadata.Name)
+       l.l.Info().Msgf("deleted %d expired segments in group %s, suffixes: 
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
        progress.MarkMeasureGroupDeleted(g.Metadata.Name)
        progress.Save(l.progressFilePath, l.l)
 }
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index 12beb51d..e54d8a49 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -22,6 +22,7 @@ import (
        "os"
 
        "github.com/pkg/errors"
+       "google.golang.org/protobuf/proto"
 
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -77,43 +78,70 @@ func (l *lifecycleService) getSnapshots(groups 
[]*commonv1.Group, p *Progress) (
        return streamDir, measureDir, nil
 }
 
+// GroupConfig encapsulates the parsed lifecycle configuration for a Group.
+// It contains all necessary information for migration and deletion operations.
+type GroupConfig struct {
+       *commonv1.Group
+       NodeSelector    node.Selector
+       QueueClient     queue.Client
+       AccumulatedTTL  *commonv1.IntervalRule
+       SegmentInterval *commonv1.IntervalRule
+       TargetShardNum  uint32
+       TargetReplicas  uint32
+}
+
+// Close releases resources held by the GroupConfig.
+func (gc *GroupConfig) Close() {
+       if gc.QueueClient != nil {
+               gc.QueueClient.GracefulStop()
+       }
+}
+
 func parseGroup(g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
        l *logger.Logger, metadata metadata.Repo,
-) (uint32, uint32, *commonv1.IntervalRule, node.Selector, queue.Client, error) 
{
+) (*GroupConfig, error) {
        ro := g.ResourceOpts
        if ro == nil {
-               return 0, 0, nil, nil, nil, fmt.Errorf("no resource opts in 
group %s", g.Metadata.Name)
+               return nil, fmt.Errorf("no resource opts in group %s", 
g.Metadata.Name)
        }
        if len(ro.Stages) == 0 {
-               return 0, 0, nil, nil, nil, fmt.Errorf("no stages in group %s", 
g.Metadata.Name)
+               return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name)
        }
+       ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+       segmentInterval := 
proto.Clone(ro.SegmentInterval).(*commonv1.IntervalRule)
        var nst *commonv1.LifecycleStage
        for i, st := range ro.Stages {
                selector, err := pub.ParseLabelSelector(st.NodeSelector)
                if err != nil {
-                       return 0, 0, nil, nil, nil, errors.WithMessagef(err, 
"failed to parse node selector %s", st.NodeSelector)
+                       return nil, errors.WithMessagef(err, "failed to parse 
node selector %s", st.NodeSelector)
                }
+               ttlTime.Num += st.Ttl.Num
                if !selector.Matches(nodeLabels) {
                        continue
                }
                if i+1 >= len(ro.Stages) {
                        l.Info().Msgf("no next stage for group %s at stage %s", 
g.Metadata.Name, st.Name)
-                       return 0, 0, nil, nil, nil, nil
+                       return nil, nil
                }
                nst = ro.Stages[i+1]
-               l.Info().Msgf("migrating group %s at stage %s to stage %s", 
g.Metadata.Name, st.Name, nst.Name)
+               segmentInterval = st.SegmentInterval
+               l.Info().Msgf("migrating group %s at stage %s to stage %s, 
segment interval: %d(%s), total ttl needs: %d(%s)",
+                       g.Metadata.Name, st.Name, nst.Name, 
segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num, 
ttlTime.Unit.String())
                break
        }
        if nst == nil {
                nst = ro.Stages[0]
+               ttlTime = proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+               l.Info().Msgf("no matching stage for group %s, defaulting to 
first stage %s segment interval: %d(%s), total ttl needs: %d(%s)",
+                       g.Metadata.Name, nst.Name, segmentInterval.Num, 
segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String())
        }
        nsl, err := pub.ParseLabelSelector(nst.NodeSelector)
        if err != nil {
-               return 0, 0, nil, nil, nil, errors.WithMessagef(err, "failed to 
parse node selector %s", nst.NodeSelector)
+               return nil, errors.WithMessagef(err, "failed to parse node 
selector %s", nst.NodeSelector)
        }
        nodeSel := node.NewRoundRobinSelector("", metadata)
        if ok, _ := nodeSel.OnInit([]schema.Kind{schema.KindGroup}); !ok {
-               return 0, 0, nil, nil, nil, fmt.Errorf("failed to initialize 
node selector for group %s", g.Metadata.Name)
+               return nil, fmt.Errorf("failed to initialize node selector for 
group %s", g.Metadata.Name)
        }
        client := pub.NewWithoutMetadata()
        if g.Catalog == commonv1.Catalog_CATALOG_STREAM {
@@ -138,9 +166,17 @@ func parseGroup(g *commonv1.Group, nodeLabels 
map[string]string, nodes []*databa
                }
        }
        if !existed {
-               return 0, 0, nil, nil, nil, errors.New("no nodes matched")
+               return nil, errors.New("no nodes matched")
        }
-       return nst.ShardNum, nst.Replicas, nst.Ttl, nodeSel, client, nil
+       return &GroupConfig{
+               Group:           g,
+               TargetShardNum:  nst.ShardNum,
+               TargetReplicas:  nst.Replicas,
+               AccumulatedTTL:  ttlTime,
+               SegmentInterval: segmentInterval,
+               NodeSelector:    nodeSel,
+               QueueClient:     client,
+       }, nil
 }
 
 type fileInfo struct {
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 8c164b03..b0609049 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -590,17 +590,36 @@ func (sc *segmentController[T, O]) 
getExpiredSegmentsTimeRange() *timestamp.Time
        return timeRange
 }
 
-func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange 
timestamp.TimeRange) int64 {
+func (sc *segmentController[T, O]) deleteExpiredSegments(segmentSuffixes 
[]string) int64 {
        deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
        var count int64
        ss, _ := sc.segments(false)
+       sc.l.Info().Str("segment_suffixes", fmt.Sprintf("%s", segmentSuffixes)).
+               Str("ttl", fmt.Sprintf("%d(%s)", sc.opts.TTL.Num, 
sc.opts.TTL.Unit)).
+               Str("deadline", deadline.String()).
+               Int("total_segment_count", len(ss)).Msg("deleting expired 
segments")
+       shouldDeleteSuffixes := make(map[string]bool)
+       for _, s := range segmentSuffixes {
+               shouldDeleteSuffixes[s] = true
+       }
        for _, s := range ss {
-               if s.Before(deadline) && s.Overlapping(timeRange) {
+               if shouldDeleteSuffixes[s.suffix] {
+                       sc.l.Info().Str("suffix", s.suffix).
+                               Str("deadline", deadline.String()).
+                               Str("segment_name", s.String()).
+                               Str("segment_time_range", 
s.GetTimeRange().String()).
+                               Msg("deleting an expired segment")
                        s.delete()
                        sc.Lock()
                        sc.removeSeg(s.id)
                        sc.Unlock()
                        count++
+               } else {
+                       sc.l.Info().Str("suffix", s.suffix).
+                               Str("deadline", deadline.String()).
+                               Str("segment_name", s.String()).
+                               Str("segment_time_range", 
s.GetTimeRange().String()).
+                               Msg("segment is not expired or not in the time 
range, skipping deletion")
                }
                s.DecRef()
        }
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 1fc8691a..b7cd892d 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -619,12 +619,11 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
 
        // Now delete expired segments
        // Get the time range for segments 0, 1, and 2 (the expired ones)
-       timeRange := timestamp.NewInclusiveTimeRange(
-               segments[0].Start,
-               segments[2].End,
-       )
-
-       deletedCount := sc.deleteExpiredSegments(timeRange)
+       deletedCount := sc.deleteExpiredSegments([]string{
+               time.Now().AddDate(0, 0, -6).Format(dayFormat),
+               time.Now().AddDate(0, 0, -5).Format(dayFormat),
+               time.Now().AddDate(0, 0, -4).Format(dayFormat),
+       })
        assert.Equal(t, int64(3), deletedCount, "Should have deleted 3 expired 
segments")
 
        // Verify segment controller's segment list
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 2dd40331..e4cb4d5b 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -113,7 +113,7 @@ type TSDB[T TSTable, O any] interface {
        UpdateOptions(opts *commonv1.ResourceOpts)
        TakeFileSnapshot(dst string) error
        GetExpiredSegmentsTimeRange() *timestamp.TimeRange
-       DeleteExpiredSegments(timeRange timestamp.TimeRange) int64
+       DeleteExpiredSegments(segmentSuffixes []string) int64
        // PeekOldestSegmentEndTime returns the end time of the oldest segment.
        // Returns the zero time and false if no segments exist or retention 
gate cannot be acquired.
        PeekOldestSegmentEndTime() (time.Time, bool)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 66d9f129..a047bb7b 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -300,8 +300,8 @@ func (d *database[T, O]) GetExpiredSegmentsTimeRange() 
*timestamp.TimeRange {
        return d.segmentController.getExpiredSegmentsTimeRange()
 }
 
-func (d *database[T, O]) DeleteExpiredSegments(timeRange timestamp.TimeRange) 
int64 {
-       return d.segmentController.deleteExpiredSegments(timeRange)
+func (d *database[T, O]) DeleteExpiredSegments(segmentSuffixes []string) int64 
{
+       return d.segmentController.deleteExpiredSegments(segmentSuffixes)
 }
 
 // PeekOldestSegmentEndTime returns the end time of the oldest segment.
diff --git a/banyand/internal/storage/visitor.go 
b/banyand/internal/storage/visitor.go
index 40e14e1a..2a058b1e 100644
--- a/banyand/internal/storage/visitor.go
+++ b/banyand/internal/storage/visitor.go
@@ -25,9 +25,12 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
+var log = logger.GetLogger("storage", "visitor")
+
 // SegmentVisitor defines the interface for visiting segment components.
 type SegmentVisitor interface {
        // VisitSeries visits the series index directory for a segment.
@@ -39,24 +42,32 @@ type SegmentVisitor interface {
 // VisitSegmentsInTimeRange traverses segments within the specified time range
 // and calls the visitor methods for series index and shard directories.
 // This function works directly with the filesystem without requiring a 
database instance.
-func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor SegmentVisitor, intervalRule IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor SegmentVisitor, segmentInterval IntervalRule) 
([]string, error) {
        // Parse segment directories in the root path
        var segmentPaths []segmentInfo
        err := walkDir(tsdbRootPath, segPathPrefix, func(suffix string) error {
-               startTime, err := parseSegmentTime(suffix, intervalRule.Unit)
+               startTime, err := parseSegmentTime(suffix, segmentInterval.Unit)
                if err != nil {
                        return err
                }
 
                // Calculate end time based on interval rule
-               endTime := intervalRule.NextTime(startTime)
+               endTime := segmentInterval.NextTime(startTime)
                segTR := timestamp.NewSectionTimeRange(startTime, endTime)
 
                // Check if segment is completely included in the requested 
time range
+               logEntry := log.Info().Str("segment_suffix", suffix).
+                       Str("tsdb_root_path", tsdbRootPath).
+                       Str("segment_interval_rule", fmt.Sprintf("%d(%s)", 
segmentInterval.Num, segmentInterval.Unit)).
+                       Str("total_time_range", timeRange.String()).
+                       Str("segment_time_range", segTR.String())
                if !timeRange.Include(segTR) {
+                       logEntry.Msg("segment time range is not included")
                        return nil // Skip segments not fully contained in the 
time range
                }
 
+               logEntry.Msg("segment time range is include")
                segmentPath := filepath.Join(tsdbRootPath, 
fmt.Sprintf(segTemplate, suffix))
                segmentPaths = append(segmentPaths, segmentInfo{
                        path:      segmentPath,
@@ -66,30 +77,36 @@ func VisitSegmentsInTimeRange(tsdbRootPath string, 
timeRange timestamp.TimeRange
                return nil
        })
        if err != nil {
-               return errors.Wrap(err, "failed to walk segment directories")
+               return nil, errors.Wrap(err, "failed to walk segment 
directories")
        }
 
+       // Collect segment suffixes to return
+       var visitedSuffixes []string
+
        // Visit each matching segment
        for _, segInfo := range segmentPaths {
                // Collect shard IDs for this segment
                shardIDs, err := collectSegmentShardIDs(segInfo.path)
                if err != nil {
-                       return errors.Wrapf(err, "failed to collect shard IDs 
for segment %s", segInfo.suffix)
+                       return nil, errors.Wrapf(err, "failed to collect shard 
IDs for segment %s", segInfo.suffix)
                }
 
                // Visit series index directory
                seriesIndexPath := filepath.Join(segInfo.path, 
seriesIndexDirName)
                if err := visitor.VisitSeries(&segInfo.timeRange, 
seriesIndexPath, shardIDs); err != nil {
-                       return errors.Wrapf(err, "failed to visit series index 
for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix, 
segInfo.path, segInfo.timeRange)
+                       return nil, errors.Wrapf(err, "failed to visit series 
index for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix, 
segInfo.path, segInfo.timeRange)
                }
 
                // Visit shard directories
                if err := visitSegmentShards(segInfo.path, &segInfo.timeRange, 
visitor); err != nil {
-                       return errors.Wrapf(err, "failed to visit shards for 
segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix, segInfo.path, 
segInfo.timeRange)
+                       return nil, errors.Wrapf(err, "failed to visit shards 
for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix, 
segInfo.path, segInfo.timeRange)
                }
+
+               // Add visited suffix to result
+               visitedSuffixes = append(visitedSuffixes, segInfo.suffix)
        }
 
-       return nil
+       return visitedSuffixes, nil
 }
 
 // segmentInfo holds information about a segment directory.
diff --git a/banyand/internal/storage/visitor_test.go 
b/banyand/internal/storage/visitor_test.go
index b9b6fc3a..1c1c7fb6 100644
--- a/banyand/internal/storage/visitor_test.go
+++ b/banyand/internal/storage/visitor_test.go
@@ -135,8 +135,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                visitor := NewTestVisitor()
                timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, opts.SegmentInterval)
                require.NoError(t, err)
+               require.Len(t, suffixes, 1)
 
                // Verify series index was visited
                require.Len(t, visitor.visitedSeries, 1)
@@ -202,8 +203,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                visitor := NewTestVisitor()
                timeRange := timestamp.NewSectionTimeRange(baseDate, 
baseDate.AddDate(0, 0, 3))
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, opts.SegmentInterval)
                require.NoError(t, err)
+               require.Len(t, suffixes, 3)
 
                // Verify all series indices were visited
                require.Len(t, visitor.visitedSeries, 3)
@@ -285,8 +287,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                endTime := baseDate.AddDate(0, 0, 4)   // 2024-05-05 (exclusive)
                timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, opts.SegmentInterval)
                require.NoError(t, err)
+               require.Len(t, suffixes, 3)
 
                // Verify only middle segments were visited (3 segments)
                require.Len(t, visitor.visitedSeries, 3)
@@ -311,8 +314,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                timeRange := timestamp.NewSectionTimeRange(time.Now(), 
time.Now().Add(24*time.Hour))
                intervalRule := IntervalRule{Unit: DAY, Num: 1}
 
-               err := VisitSegmentsInTimeRange(dir, timeRange, visitor, 
intervalRule)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, intervalRule)
                require.NoError(t, err)
+               require.Len(t, suffixes, 0)
 
                // Verify nothing was visited
                require.Len(t, visitor.visitedSeries, 0)
@@ -363,7 +367,7 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
 
                timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               _, err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
                require.Error(t, err)
                require.Contains(t, err.Error(), "series access error")
                require.Contains(t, err.Error(), "failed to visit series index 
for segment")
@@ -414,7 +418,7 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
 
                timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               _, err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
                require.Error(t, err)
                require.Contains(t, err.Error(), "shard access error")
                require.Contains(t, err.Error(), "failed to visit shards for 
segment")
@@ -470,8 +474,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                visitor := NewTestVisitor()
                timeRange := timestamp.NewSectionTimeRange(baseTime, 
baseTime.Add(3*time.Hour))
 
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, opts.SegmentInterval)
                require.NoError(t, err)
+               require.Len(t, suffixes, 3)
 
                // Verify all series indices were visited
                require.Len(t, visitor.visitedSeries, 3)
@@ -540,8 +545,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
                tsPlus1Min, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2025-11-01 00:01:00", time.Local)
                require.NoError(t, err)
                timeRange := timestamp.NewSectionTimeRange(time.Time{}, 
tsPlus1Min)
-               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               suffixes, err := VisitSegmentsInTimeRange(dir, timeRange, 
visitor, opts.SegmentInterval)
                require.NoError(t, err)
+               require.Len(t, suffixes, 1)
                require.Len(t, visitor.visitedSeries, 1)
                // Should only oct 31 be including
                expectedSeriesPath := filepath.Join(dir, "seg-20251031", 
seriesIndexDirName)
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 3ab3e56c..86358908 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -516,6 +516,6 @@ func (d *dataDeleteStreamSegmentsListener) Rev(_ 
context.Context, message bus.Me
                d.s.l.Error().Err(err).Str("group", req.Group).Msg("failed to 
load tsdb")
                return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
int64(0))
        }
-       deleted := 
db.DeleteExpiredSegments(timestamp.NewSectionTimeRange(req.TimeRange.Begin.AsTime(),
 req.TimeRange.End.AsTime()))
+       deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
 }
diff --git a/banyand/measure/visitor.go b/banyand/measure/visitor.go
index 44edd1ee..3c011b87 100644
--- a/banyand/measure/visitor.go
+++ b/banyand/measure/visitor.go
@@ -40,7 +40,7 @@ type measureSegmentVisitor struct {
        visitor Visitor
 }
 
-// VisitSeries implements Visitor.
+// VisitSeries implements storage.SegmentVisitor.
 func (mv *measureSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string, shardIDs []common.ShardID) error {
        return mv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs)
 }
@@ -84,7 +84,8 @@ func (mv *measureSegmentVisitor) visitShardParts(segmentTR 
*timestamp.TimeRange,
 // VisitMeasuresInTimeRange traverses measure segments within the specified 
time range
 // and calls the visitor methods for parts within shards.
 // This function works directly with the filesystem without requiring a 
database instance.
-func VisitMeasuresInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitMeasuresInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, segmentInterval storage.IntervalRule) 
([]string, error) {
        adapter := &measureSegmentVisitor{visitor: visitor}
-       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, intervalRule)
+       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, segmentInterval)
 }
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 7b41a194..bfb2e54a 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -341,6 +341,6 @@ func (d *deleteStreamSegmentsListener) Rev(_ 
context.Context, message bus.Messag
                d.s.l.Error().Err(err).Str("group", req.Group).Msg("failed to 
load tsdb")
                return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
int64(0))
        }
-       deleted := 
db.DeleteExpiredSegments(timestamp.NewSectionTimeRange(req.TimeRange.Begin.AsTime(),
 req.TimeRange.End.AsTime()))
+       deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
        return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
 }
diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go
index 4959e604..19ab1d58 100644
--- a/banyand/stream/visitor.go
+++ b/banyand/stream/visitor.go
@@ -97,7 +97,8 @@ func (sv *streamSegmentVisitor) 
visitShardElementIndex(segmentTR *timestamp.Time
 // VisitStreamsInTimeRange traverses stream segments within the specified time 
range
 // and calls the visitor methods for series index, parts, and element indexes.
 // This function works directly with the filesystem without requiring a 
database instance.
-func VisitStreamsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitStreamsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, segmentInterval storage.IntervalRule) 
([]string, error) {
        adapter := &streamSegmentVisitor{visitor: visitor}
-       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, intervalRule)
+       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, segmentInterval)
 }
diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go
index 3d489088..687ac69f 100644
--- a/banyand/stream/visitor_test.go
+++ b/banyand/stream/visitor_test.go
@@ -104,8 +104,9 @@ func TestVisitStreamsInTimeRange(t *testing.T) {
        }
        intervalRule := storage.IntervalRule{Unit: storage.HOUR, Num: 1}
 
-       err = VisitStreamsInTimeRange(tmpDir, timeRange, visitor, intervalRule)
+       suffixes, err := VisitStreamsInTimeRange(tmpDir, timeRange, visitor, 
intervalRule)
        require.NoError(t, err)
+       assert.NotEmpty(t, suffixes)
 
        // Verify visits occurred
        assert.Len(t, visitor.visitedSeries, 1)
diff --git a/docs/api-reference.md b/docs/api-reference.md
index ab7fb8ba..f18bad4d 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -4299,7 +4299,7 @@ WriteResponse is the response contract for write
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
 | group | [string](#string) |  |  |
-| time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |  
|  |
+| segment_suffixes | [string](#string) | repeated |  |
 
 
 
@@ -4761,7 +4761,7 @@ WriteResponse is the response contract for write
 | Field | Type | Label | Description |
 | ----- | ---- | ----- | ----------- |
 | group | [string](#string) |  |  |
-| time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |  
|  |
+| segment_suffixes | [string](#string) | repeated |  |
 
 
 


Reply via email to