This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new ee009edd Support lifecycle to trace data (#852)
ee009edd is described below

commit ee009edd711642197c7d8579669611fab9811f52
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 27 10:13:23 2025 +0800

    Support lifecycle to trace data (#852)
    
    * Support lifecycle to trace data
    
    
    ---------
    
    Co-authored-by: 吴晟 Wu Sheng <[email protected]>
    Co-authored-by: Gao Hongtao <[email protected]>
---
 api/data/data.go                                   |   4 +
 api/data/trace.go                                  |   9 +
 api/proto/banyandb/trace/v1/rpc.proto              |  11 +
 banyand/backup/lifecycle/migration_integration.go  |  70 +++
 banyand/backup/lifecycle/progress.go               | 192 ++++++-
 banyand/backup/lifecycle/service.go                | 103 +++-
 banyand/backup/lifecycle/steps.go                  |  16 +-
 .../backup/lifecycle/trace_migration_visitor.go    | 593 +++++++++++++++++++++
 banyand/internal/sidx/part.go                      | 136 +++++
 banyand/internal/storage/tsdb.go                   |   2 +
 banyand/measure/snapshot.go                        | 124 ++++-
 banyand/measure/svc_data.go                        |  11 +
 banyand/queue/sub/segments.go                      |  28 +
 banyand/queue/sub/server.go                        |   2 +
 banyand/stream/snapshot.go                         | 123 ++++-
 banyand/trace/part.go                              | 104 ++++
 banyand/trace/part_metadata.go                     |  24 +
 banyand/trace/snapshot.go                          |   2 +-
 banyand/trace/svc_standalone.go                    | 150 +++++-
 banyand/trace/visitor.go                           |  57 ++
 banyand/trace/write_data.go                        | 123 ++++-
 docs/api-reference.md                              |  35 ++
 pkg/schema/cache.go                                |   2 +
 pkg/test/trace/etcd.go                             |  12 +-
 .../testdata/groups_stages/test-trace-group.json   |  34 ++
 .../testdata/groups_stages/test-trace-updated.json |  34 ++
 .../testdata/groups_stages/zipkin-trace-group.json |  34 ++
 test/cases/lifecycle/lifecycle.go                  |  17 +
 .../distributed/lifecycle/lifecycle_suite_test.go  |   8 +-
 29 files changed, 2024 insertions(+), 36 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
index 5d142277..956c9e33 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -52,6 +52,7 @@ var (
                TopicTraceWrite.String():               TopicTraceWrite,
                TopicTraceQuery.String():               TopicTraceQuery,
                TopicTracePartSync.String():            TopicTracePartSync,
+               TopicTraceSeriesSync.String():          TopicTraceSeriesSync,
                TopicTraceSidxSeriesWrite.String():     
TopicTraceSidxSeriesWrite,
        }
 
@@ -121,6 +122,9 @@ var (
                TopicTracePartSync: func() proto.Message {
                        return nil
                },
+               TopicTraceSeriesSync: func() proto.Message {
+                       return nil
+               },
                TopicTraceSidxSeriesWrite: func() proto.Message {
                        return nil
                },
diff --git a/api/data/trace.go b/api/data/trace.go
index ee309058..3918e3f7 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -58,6 +58,15 @@ var TracePartSyncKindVersion = common.KindVersion{
 // TopicTracePartSync is the part sync topic.
 var TopicTracePartSync = bus.BiTopic(TracePartSyncKindVersion.String())
 
+// TraceSeriesSyncKindVersion is the version tag of series sync kind.
+var TraceSeriesSyncKindVersion = common.KindVersion{
+       Version: "v1",
+       Kind:    "trace-series-sync",
+}
+
+// TopicTraceSeriesSync is the series sync topic.
+var TopicTraceSeriesSync = bus.BiTopic(TraceSeriesSyncKindVersion.String())
+
 // TraceSidxSeriesWriteKindVersion is the version tag of trace sidx series 
write kind.
 var TraceSidxSeriesWriteKindVersion = common.KindVersion{
        Version: "v1",
diff --git a/api/proto/banyandb/trace/v1/rpc.proto 
b/api/proto/banyandb/trace/v1/rpc.proto
index 0c889388..62e04ada 100644
--- a/api/proto/banyandb/trace/v1/rpc.proto
+++ b/api/proto/banyandb/trace/v1/rpc.proto
@@ -28,6 +28,15 @@ option go_package = 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/tr
 option java_package = "org.apache.skywalking.banyandb.trace.v1";
 option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = 
{base_path: "/api"};
 
+message DeleteExpiredSegmentsRequest {
+  string group = 1;
+  repeated string segment_suffixes = 2;
+}
+
+message DeleteExpiredSegmentsResponse {
+  int64 deleted = 1;
+}
+
 service TraceService {
   rpc Query(QueryRequest) returns (QueryResponse) {
     option (google.api.http) = {
@@ -37,4 +46,6 @@ service TraceService {
   }
 
   rpc Write(stream WriteRequest) returns (stream WriteResponse);
+
+  rpc DeleteExpiredSegments(DeleteExpiredSegmentsRequest) returns 
(DeleteExpiredSegmentsResponse);
 }
diff --git a/banyand/backup/lifecycle/migration_integration.go 
b/banyand/backup/lifecycle/migration_integration.go
index b36ae1e5..8036f95d 100644
--- a/banyand/backup/lifecycle/migration_integration.go
+++ b/banyand/backup/lifecycle/migration_integration.go
@@ -22,6 +22,7 @@ import (
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -168,3 +169,72 @@ func (pcv *measurePartCountVisitor) VisitPart(_ 
*timestamp.TimeRange, _ common.S
        pcv.partCount++
        return nil
 }
+
+// migrateTraceWithFileBasedAndProgress performs file-based trace migration 
with progress tracking.
+func migrateTraceWithFileBasedAndProgress(tsdbRootPath string, timeRange 
timestamp.TimeRange, group *GroupConfig,
+       logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+       // Convert segment interval to IntervalRule using 
storage.MustToIntervalRule
+       segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
+
+       // Get target stage configuration
+       targetStageInterval := getTargetStageInterval(group)
+
+       // Count total shards before starting migration
+       totalShards, segmentSuffixes, err := countTraceShards(tsdbRootPath, 
timeRange, segmentIntervalRule)
+       if err != nil {
+               logger.Warn().Err(err).Msg("failed to count trace parts, 
proceeding without part count")
+       } else {
+               logger.Info().Int("total_shards", 
totalShards).Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted trace parts for progress tracking")
+       }
+
+       // Create file-based migration visitor with progress tracking and 
target stage interval
+       visitor := newTraceMigrationVisitor(
+               group.Group, group.TargetShardNum, group.TargetReplicas, 
group.NodeSelector, group.QueueClient,
+               logger, progress, chunkSize, targetStageInterval,
+       )
+       defer visitor.Close()
+
+       // Set the total part count for progress tracking
+       if totalShards > 0 {
+               visitor.SetTraceShardCount(totalShards)
+       }
+
+       // Use the existing VisitTracesInTimeRange function with our file-based 
visitor
+       _, err = trace.VisitTracesInTimeRange(tsdbRootPath, timeRange, visitor, 
segmentIntervalRule)
+       if err != nil {
+               return nil, err
+       }
+       return segmentSuffixes, nil
+}
+
+// countTraceShards counts the total number of shards in the given time range.
+func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
+       // Create a simple visitor to count shards
+       shardCounter := &traceShardsCountVisitor{}
+
+       // Use the existing VisitTracesInTimeRange function to count parts
+       segmentSuffixes, err := trace.VisitTracesInTimeRange(tsdbRootPath, 
timeRange, shardCounter, segmentInterval)
+       if err != nil {
+               return 0, nil, err
+       }
+
+       return shardCounter.shardCount, segmentSuffixes, nil
+}
+
+// traceShardsCountVisitor is a simple visitor that counts trace shards.
+type traceShardsCountVisitor struct {
+       shardCount int
+}
+
+// VisitSeries implements trace.Visitor.
+func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ 
string, _ []common.ShardID) error {
+       return nil
+}
+
+// VisitShard implements trace.Visitor.
+func (pcv *traceShardsCountVisitor) VisitShard(_ *timestamp.TimeRange, _ 
common.ShardID, _ string) error {
+       pcv.shardCount++
+       return nil
+}
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index 7baf214b..99f7083d 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -33,6 +33,7 @@ type Progress struct {
        CompletedGroups             map[string]bool                             
               `json:"completed_groups"`
        DeletedStreamGroups         map[string]bool                             
               `json:"deleted_stream_groups"`
        DeletedMeasureGroups        map[string]bool                             
               `json:"deleted_measure_groups"`
+       DeletedTraceGroups          map[string]bool                             
               `json:"deleted_trace_groups"`
        CompletedStreamParts        
map[string]map[string]map[common.ShardID]map[uint64]bool   
`json:"completed_stream_parts"`
        StreamPartErrors            
map[string]map[string]map[common.ShardID]map[uint64]string 
`json:"stream_part_errors"`
        CompletedStreamSeries       
map[string]map[string]map[common.ShardID]bool              
`json:"completed_stream_series"`
@@ -54,10 +55,20 @@ type Progress struct {
        MeasureSeriesErrors    map[string]map[string]map[common.ShardID]string  
          `json:"measure_series_errors"`
        MeasureSeriesCounts    map[string]int                                   
          `json:"measure_series_counts"`
        MeasureSeriesProgress  map[string]int                                   
          `json:"measure_series_progress"`
-       progressFilePath       string                                           
          `json:"-"`
-       SnapshotStreamDir      string                                           
          `json:"snapshot_stream_dir"`
-       SnapshotMeasureDir     string                                           
          `json:"snapshot_measure_dir"`
-       mu                     sync.Mutex                                       
          `json:"-"`
+       // Trace part-specific progress tracking
+       CompletedTraceShards map[string]map[string]map[common.ShardID]bool   
`json:"completed_trace_shards"`
+       TraceShardErrors     map[string]map[string]map[common.ShardID]string 
`json:"trace_shard_errors"`
+       TraceShardCounts     map[string]int                                  
`json:"trace_shard_counts"`
+       TraceShardProgress   map[string]int                                  
`json:"trace_shard_progress"`
+       CompletedTraceSeries map[string]map[string]map[common.ShardID]bool   
`json:"completed_trace_series"`
+       TraceSeriesErrors    map[string]map[string]map[common.ShardID]string 
`json:"trace_series_errors"`
+       TraceSeriesCounts    map[string]int                                  
`json:"trace_series_counts"`
+       TraceSeriesProgress  map[string]int                                  
`json:"trace_series_progress"`
+       progressFilePath     string                                          
`json:"-"`
+       SnapshotStreamDir    string                                          
`json:"snapshot_stream_dir"`
+       SnapshotMeasureDir   string                                          
`json:"snapshot_measure_dir"`
+       SnapshotTraceDir     string                                          
`json:"snapshot_trace_dir"`
+       mu                   sync.Mutex                                      
`json:"-"`
 }
 
 // AllGroupsNotFullyCompleted find is there have any group not fully completed.
@@ -80,6 +91,7 @@ func NewProgress(path string, l *logger.Logger) *Progress {
                CompletedGroups:             make(map[string]bool),
                DeletedStreamGroups:         make(map[string]bool),
                DeletedMeasureGroups:        make(map[string]bool),
+               DeletedTraceGroups:          make(map[string]bool),
                CompletedStreamParts:        
make(map[string]map[string]map[common.ShardID]map[uint64]bool),
                StreamPartErrors:            
make(map[string]map[string]map[common.ShardID]map[uint64]string),
                StreamPartCounts:            make(map[string]int),
@@ -100,6 +112,14 @@ func NewProgress(path string, l *logger.Logger) *Progress {
                MeasureSeriesErrors:         
make(map[string]map[string]map[common.ShardID]string),
                MeasureSeriesCounts:         make(map[string]int),
                MeasureSeriesProgress:       make(map[string]int),
+               CompletedTraceShards:        
make(map[string]map[string]map[common.ShardID]bool),
+               TraceShardErrors:            
make(map[string]map[string]map[common.ShardID]string),
+               TraceShardCounts:            make(map[string]int),
+               TraceShardProgress:          make(map[string]int),
+               CompletedTraceSeries:        
make(map[string]map[string]map[common.ShardID]bool),
+               TraceSeriesErrors:           
make(map[string]map[string]map[common.ShardID]string),
+               TraceSeriesCounts:           make(map[string]int),
+               TraceSeriesProgress:         make(map[string]int),
                progressFilePath:            path,
                logger:                      l,
        }
@@ -697,3 +717,167 @@ func (p *Progress) GetMeasureSeriesProgress(group string) 
int {
        }
        return 0
 }
+
+// MarkTraceGroupDeleted marks a trace group as deleted.
+func (p *Progress) MarkTraceGroupDeleted(group string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       p.DeletedTraceGroups[group] = true
+}
+
+// IsTraceGroupDeleted checks if a trace group has been deleted.
+func (p *Progress) IsTraceGroupDeleted(group string) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       return p.DeletedTraceGroups[group]
+}
+
+// MarkTraceShardCompleted marks a specific shard of a trace as completed.
+func (p *Progress) MarkTraceShardCompleted(group string, segmentID string, 
shardID common.ShardID) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.CompletedTraceShards[group] == nil {
+               p.CompletedTraceShards[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.CompletedTraceShards[group][segmentID] == nil {
+               p.CompletedTraceShards[group][segmentID] = 
make(map[common.ShardID]bool)
+       }
+       p.CompletedTraceShards[group][segmentID][shardID] = true
+
+       // Increment progress
+       p.TraceShardProgress[group]++
+}
+
+// IsTraceShardCompleted checks if a specific part of a trace has been 
completed.
+func (p *Progress) IsTraceShardCompleted(group string, segmentID string, 
shardID common.ShardID) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if segments, ok := p.CompletedTraceShards[group]; ok {
+               if shards, ok := segments[segmentID]; ok {
+                       return shards[shardID]
+               }
+       }
+       return false
+}
+
+// MarkTraceShardError marks an error for a specific part of a trace.
+func (p *Progress) MarkTraceShardError(group string, segmentID string, shardID 
common.ShardID, errorMsg string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.TraceShardErrors[group] == nil {
+               p.TraceShardErrors[group] = 
make(map[string]map[common.ShardID]string)
+       }
+       if p.TraceShardErrors[group][segmentID] == nil {
+               p.TraceShardErrors[group][segmentID] = 
make(map[common.ShardID]string)
+       }
+       p.TraceShardErrors[group][segmentID][shardID] = errorMsg
+}
+
+// SetTraceShardCount sets the total number of shards for the current trace.
+func (p *Progress) SetTraceShardCount(group string, totalShards int) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       p.TraceShardCounts[group] = totalShards
+}
+
+// GetTraceShards gets the total number of shards for the current trace.
+func (p *Progress) GetTraceShards(group string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if count, ok := p.TraceShardCounts[group]; ok {
+               return count
+       }
+       return 0
+}
+
+// GetTraceShardProgress gets the number of completed shards for the current 
trace.
+func (p *Progress) GetTraceShardProgress(group string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if progress, ok := p.TraceShardProgress[group]; ok {
+               return progress
+       }
+       return 0
+}
+
+// MarkTraceSeriesCompleted marks a specific series segment of a trace as 
completed.
+func (p *Progress) MarkTraceSeriesCompleted(group string, segmentID string, 
shardID common.ShardID) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.CompletedTraceSeries[group] == nil {
+               p.CompletedTraceSeries[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.CompletedTraceSeries[group][segmentID] == nil {
+               p.CompletedTraceSeries[group][segmentID] = 
make(map[common.ShardID]bool)
+       }
+       p.CompletedTraceSeries[group][segmentID][shardID] = true
+
+       // Increment progress
+       p.TraceSeriesProgress[group]++
+}
+
+// IsTraceSeriesCompleted checks if a specific series segment of a trace has 
been completed.
+func (p *Progress) IsTraceSeriesCompleted(group string, segmentID string, 
shardID common.ShardID) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if segments, ok := p.CompletedTraceSeries[group]; ok {
+               if shards, ok := segments[segmentID]; ok {
+                       return shards[shardID]
+               }
+       }
+       return false
+}
+
+// MarkTraceSeriesError marks an error for a specific series segment of a 
trace.
+func (p *Progress) MarkTraceSeriesError(group string, segmentID string, 
shardID common.ShardID, errorMsg string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.TraceSeriesErrors[group] == nil {
+               p.TraceSeriesErrors[group] = 
make(map[string]map[common.ShardID]string)
+       }
+       if p.TraceSeriesErrors[group][segmentID] == nil {
+               p.TraceSeriesErrors[group][segmentID] = 
make(map[common.ShardID]string)
+       }
+       p.TraceSeriesErrors[group][segmentID][shardID] = errorMsg
+}
+
+// SetTraceSeriesCount sets the total number of series segments for the 
current trace.
+func (p *Progress) SetTraceSeriesCount(group string, totalSegments int) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       p.TraceSeriesCounts[group] = totalSegments
+}
+
+// GetTraceSeriesCount gets the total number of series segments for the 
current trace.
+func (p *Progress) GetTraceSeriesCount(group string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if count, ok := p.TraceSeriesCounts[group]; ok {
+               return count
+       }
+       return 0
+}
+
+// GetTraceSeriesProgress gets the number of completed series segments for the 
current trace.
+func (p *Progress) GetTraceSeriesProgress(group string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if progress, ok := p.TraceSeriesProgress[group]; ok {
+               return progress
+       }
+       return 0
+}
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index 243893f0..c001d72f 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -36,6 +36,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -61,6 +62,7 @@ type lifecycleService struct {
        sch               *timestamp.Scheduler
        measureRoot       string
        streamRoot        string
+       traceRoot         string
        progressFilePath  string
        reportDir         string
        schedule          string
@@ -90,6 +92,7 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
        flagS.BoolVar(&l.insecure, "insecure", false, "Skip server certificate 
verification")
        flagS.StringVar(&l.cert, "cert", "", "Path to the gRPC server 
certificate")
        flagS.StringVar(&l.streamRoot, "stream-root-path", "/tmp", "Root 
directory for stream catalog")
+       flagS.StringVar(&l.traceRoot, "trace-root-path", "/tmp", "Root 
directory for trace catalog")
        flagS.StringVar(&l.measureRoot, "measure-root-path", "/tmp", "Root 
directory for measure catalog")
        flagS.StringVar(&l.progressFilePath, "progress-file", 
"/tmp/lifecycle-progress.json", "Path to store progress for crash recovery")
        flagS.StringVar(&l.reportDir, "report-dir", "/tmp/lifecycle-reports", 
"Directory to store migration reports")
@@ -174,12 +177,12 @@ func (l *lifecycleService) action() error {
        }
 
        // Pass progress to getSnapshots
-       streamDir, measureDir, err := l.getSnapshots(groups, progress)
+       streamDir, measureDir, traceDir, err := l.getSnapshots(groups, progress)
        if err != nil {
                l.l.Error().Err(err).Msg("failed to get snapshots")
                return err
        }
-       if streamDir == "" && measureDir == "" {
+       if streamDir == "" && measureDir == "" && traceDir == "" {
                l.l.Warn().Msg("no snapshots found, skipping lifecycle 
migration")
                l.generateReport(progress)
                return nil
@@ -187,6 +190,7 @@ func (l *lifecycleService) action() error {
        l.l.Info().
                Str("stream_snapshot", streamDir).
                Str("measure_snapshot", measureDir).
+               Str("trace_snapshot", traceDir).
                Msg("created snapshots")
        progress.Save(l.progressFilePath, l.l)
 
@@ -215,9 +219,12 @@ func (l *lifecycleService) action() error {
                        }
                        l.processMeasureGroup(ctx, g, measureDir, nodes, 
labels, progress)
                case commonv1.Catalog_CATALOG_TRACE:
-                       progress.MarkGroupCompleted(g.Metadata.Name)
-                       l.l.Info().Msgf("group trace not supported, skipping 
group: %s", g.Metadata.Name)
-                       continue
+                       if traceDir == "" {
+                               l.l.Warn().Msgf("trace snapshot directory is 
not available, skipping group: %s", g.Metadata.Name)
+                               progress.MarkGroupCompleted(g.Metadata.Name)
+                               continue
+                       }
+                       l.processTraceGroup(ctx, g, traceDir, nodes, labels, 
progress)
                default:
                        l.l.Info().Msgf("group catalog: %s doesn't support 
lifecycle management", g.Catalog)
                }
@@ -531,13 +538,16 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
                l.l.Error().Err(err).Msg("failed to list groups")
                return nil, err
        }
+       allGroupNames := getGroupNames(gg)
 
        groups := make([]*commonv1.Group, 0, len(gg))
        for _, g := range gg {
                if g.ResourceOpts == nil {
+                       l.l.Debug().Msgf("skipping group %s because resource 
opts is nil", g.Metadata.Name)
                        continue
                }
                if len(g.ResourceOpts.Stages) == 0 {
+                       l.l.Debug().Msgf("skipping group %s because stages is 
empty", g.Metadata.Name)
                        continue
                }
                if progress.IsGroupCompleted(g.Metadata.Name) {
@@ -546,6 +556,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
                }
                groups = append(groups, g)
        }
+       l.l.Info().Msgf("found groups needs to do lifecycle processing: %v, all 
groups: %v", getGroupNames(groups), allGroupNames)
 
        return groups, nil
 }
@@ -751,3 +762,85 @@ func (l *lifecycleService) 
deleteExpiredMeasureSegments(ctx context.Context, g *
        progress.MarkMeasureGroupDeleted(g.Metadata.Name)
        progress.Save(l.progressFilePath, l.l)
 }
+
+func (l *lifecycleService) deleteExpiredTraceSegments(ctx context.Context, g 
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
+       if progress.IsTraceGroupDeleted(g.Metadata.Name) {
+               l.l.Info().Msgf("skipping already deleted trace group segments: 
%s", g.Metadata.Name)
+               return
+       }
+
+       resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
func(conn *grpc.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
+               client := tracev1.NewTraceServiceClient(conn)
+               return client.DeleteExpiredSegments(ctx, 
&tracev1.DeleteExpiredSegmentsRequest{
+                       Group:           g.Metadata.Name,
+                       SegmentSuffixes: segmentSuffixes,
+               })
+       })
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to delete expired segments in 
group %s, suffixes: %s", g.Metadata.Name, segmentSuffixes)
+               return
+       }
+
+       l.l.Warn().Msgf("deleted %d expired segments in group %s, suffixes: 
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
+       progress.MarkTraceGroupDeleted(g.Metadata.Name)
+       progress.Save(l.progressFilePath, l.l)
+}
+
+func (l *lifecycleService) processTraceGroup(ctx context.Context, g 
*commonv1.Group, traceDir string,
+       nodes []*databasev1.Node, labels map[string]string, progress *Progress,
+) {
+       group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
+               return
+       }
+       defer group.Close()
+
+       tr := l.getRemovalSegmentsTimeRange(group)
+       if tr.Start.IsZero() && tr.End.IsZero() {
+               l.l.Info().Msgf("no removal segments time range for group %s, 
skipping trace migration", g.Metadata.Name)
+               progress.MarkGroupCompleted(g.Metadata.Name)
+               progress.Save(l.progressFilePath, l.l)
+               return
+       }
+
+       // Try file-based migration first
+       segmentSuffixes, err := l.processTraceGroupFileBased(ctx, group, 
traceDir, tr, progress)
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to migrate trace group %s 
using file-based approach", g.Metadata.Name)
+               return
+       }
+
+       l.l.Info().Msgf("deleting expired trace segments for group: %s", 
g.Metadata.Name)
+       l.deleteExpiredTraceSegments(ctx, g, segmentSuffixes, progress)
+       progress.MarkGroupCompleted(g.Metadata.Name)
+       progress.Save(l.progressFilePath, l.l)
+}
+
+func (l *lifecycleService) processTraceGroupFileBased(_ context.Context, g 
*GroupConfig, traceDir string,
+       tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
+       if progress.IsTraceGroupDeleted(g.Metadata.Name) {
+               l.l.Info().Msgf("skipping already completed file-based trace 
migration for group: %s", g.Metadata.Name)
+               return nil, nil
+       }
+
+       l.l.Info().Msgf("starting file-based trace migration for group: %s", 
g.Metadata.Name)
+
+       rootDir := filepath.Join(traceDir, g.Metadata.Name)
+       // skip the counting if the tsdb root path does not exist
+       // may no data found in the snapshot
+       if _, err := os.Stat(rootDir); err != nil && errors.Is(err, 
os.ErrNotExist) {
+               l.l.Info().Msgf("skipping file-based trace migration for group 
because is empty in the snapshot dir: %s", g.Metadata.Name)
+               return nil, nil
+       }
+
+       // Use the file-based migration with existing visitor pattern
+       segmentSuffixes, err := migrateTraceWithFileBasedAndProgress(rootDir, 
*tr, g, l.l, progress, int(l.chunkSize))
+       if err != nil {
+               return nil, fmt.Errorf("file-based trace migration failed: %w", 
err)
+       }
+
+       l.l.Info().Msgf("completed file-based trace migration for group: %s", 
g.Metadata.Name)
+       return segmentSuffixes, nil
+}
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index e54d8a49..6e405e87 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -38,10 +38,10 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/node"
 )
 
-func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress) 
(streamDir string, measureDir string, err error) {
+func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress) 
(streamDir string, measureDir string, traceDir string, err error) {
        // If we already have snapshot dirs in Progress, reuse them
-       if p.SnapshotStreamDir != "" || p.SnapshotMeasureDir != "" {
-               return p.SnapshotStreamDir, p.SnapshotMeasureDir, nil
+       if p.SnapshotStreamDir != "" || p.SnapshotMeasureDir != "" || 
p.SnapshotTraceDir != "" {
+               return p.SnapshotStreamDir, p.SnapshotMeasureDir, 
p.SnapshotTraceDir, nil
        }
 
        snapshotGroups := make([]*databasev1.SnapshotRequest_Group, 0, 
len(groups))
@@ -53,10 +53,10 @@ func (l *lifecycleService) getSnapshots(groups 
[]*commonv1.Group, p *Progress) (
        }
        snn, err := snapshot.Get(l.gRPCAddr, l.enableTLS, l.insecure, l.cert, 
snapshotGroups...)
        if err != nil {
-               return "", "", err
+               return "", "", "", err
        }
        for _, snp := range snn {
-               snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot, 
l.measureRoot, "", "")
+               snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot, 
l.measureRoot, "", l.traceRoot)
                if errDir != nil {
                        l.l.Error().Err(errDir).Msgf("Failed to get snapshot 
directory for %s", snp.Name)
                        continue
@@ -71,11 +71,15 @@ func (l *lifecycleService) getSnapshots(groups 
[]*commonv1.Group, p *Progress) (
                if snp.Catalog == commonv1.Catalog_CATALOG_MEASURE {
                        measureDir = snapshotDir
                }
+               if snp.Catalog == commonv1.Catalog_CATALOG_TRACE {
+                       traceDir = snapshotDir
+               }
        }
        // Save the new snapshot paths into Progress
        p.SnapshotStreamDir = streamDir
        p.SnapshotMeasureDir = measureDir
-       return streamDir, measureDir, nil
+       p.SnapshotTraceDir = traceDir
+       return streamDir, measureDir, traceDir, nil
 }
 
 // GroupConfig encapsulates the parsed lifecycle configuration for a Group.
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go 
b/banyand/backup/lifecycle/trace_migration_visitor.go
new file mode 100644
index 00000000..3b767a86
--- /dev/null
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -0,0 +1,593 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "fmt"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/banyand/trace"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// traceMigrationVisitor implements the trace.Visitor interface for file-based 
migration.
+type traceMigrationVisitor struct {
+       selector            node.Selector                      // From 
parseGroup - node selector
+       client              queue.Client                       // From 
parseGroup - queue client
+       chunkedClients      map[string]queue.ChunkedSyncClient // Per-node 
chunked sync clients cache
+       logger              *logger.Logger
+       progress            *Progress // Progress tracker for migration states
+       lfs                 fs.FileSystem
+       group               string
+       targetShardNum      uint32               // From parseGroup - target 
shard count
+       replicas            uint32               // From parseGroup - replica 
count
+       chunkSize           int                  // Chunk size for streaming 
data
+       targetStageInterval storage.IntervalRule // NEW: target stage's segment 
interval
+}
+
+// newTraceMigrationVisitor creates a new file-based migration visitor.
+func newTraceMigrationVisitor(group *commonv1.Group, shardNum, replicas 
uint32, selector node.Selector, client queue.Client,
+       l *logger.Logger, progress *Progress, chunkSize int, 
targetStageInterval storage.IntervalRule,
+) *traceMigrationVisitor {
+       return &traceMigrationVisitor{
+               group:               group.Metadata.Name,
+               targetShardNum:      shardNum,
+               replicas:            replicas,
+               selector:            selector,
+               client:              client,
+               chunkedClients:      make(map[string]queue.ChunkedSyncClient),
+               logger:              l,
+               progress:            progress,
+               chunkSize:           chunkSize,
+               targetStageInterval: targetStageInterval,
+               lfs:                 fs.NewLocalFileSystem(),
+       }
+}
+
+// VisitSeries implements trace.Visitor.
+func (mv *traceMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string, shardIDs []common.ShardID) error {
+       mv.logger.Info().
+               Str("path", seriesIndexPath).
+               Int64("min_timestamp", segmentTR.Start.UnixNano()).
+               Int64("max_timestamp", segmentTR.End.UnixNano()).
+               Str("group", mv.group).
+               Msg("migrating trace series index")
+
+       // Find all *.seg segment files in the seriesIndexPath
+       entries := mv.lfs.ReadDir(seriesIndexPath)
+
+       var segmentFiles []string
+       for _, entry := range entries {
+               if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") {
+                       segmentFiles = append(segmentFiles, entry.Name())
+               }
+       }
+
+       if len(segmentFiles) == 0 {
+               mv.logger.Debug().
+                       Str("path", seriesIndexPath).
+                       Msg("no .seg files found in trace series index path")
+               return nil
+       }
+
+       mv.logger.Info().
+               Int("segment_count", len(segmentFiles)).
+               Str("path", seriesIndexPath).
+               Msg("found trace segment files for migration")
+
+       // Set the total number of series segments for progress tracking
+       mv.SetTraceSeriesCount(len(segmentFiles))
+
+       // Calculate ALL target segments this series index should go to
+       targetSegments := calculateTargetSegments(
+               segmentTR.Start.UnixNano(),
+               segmentTR.End.UnixNano(),
+               mv.targetStageInterval,
+       )
+
+       mv.logger.Info().
+               Int("target_segments_count", len(targetSegments)).
+               Int64("series_min_ts", segmentTR.Start.UnixNano()).
+               Int64("series_max_ts", segmentTR.End.UnixNano()).
+               Str("group", mv.group).
+               Msg("migrating trace series index to multiple target segments")
+
+       // Send series index to EACH target segment that overlaps with its time 
range
+       for i, targetSegmentTime := range targetSegments {
+               // Create StreamingPartData for this segment
+               files := make([]fileInfo, 0, len(segmentFiles))
+               // Process each segment file
+               for _, segmentFileName := range segmentFiles {
+                       // Extract segment ID from filename (remove .seg 
extension)
+                       fileSegmentIDStr := strings.TrimSuffix(segmentFileName, 
".seg")
+
+                       // Parse hex segment ID
+                       segmentID, err := strconv.ParseUint(fileSegmentIDStr, 
16, 64)
+                       if err != nil {
+                               mv.logger.Error().
+                                       Str("filename", segmentFileName).
+                                       Str("id_str", fileSegmentIDStr).
+                                       Err(err).
+                                       Msg("failed to parse segment ID from 
filename")
+                               continue
+                       }
+
+                       // Convert segmentID to ShardID for progress tracking
+                       shardID := common.ShardID(segmentID)
+
+                       // Check if this segment has already been completed for 
this target segment
+                       segmentIDStr := getSegmentTimeRange(targetSegmentTime, 
mv.targetStageInterval).String()
+                       if mv.progress.IsTraceSeriesCompleted(mv.group, 
segmentIDStr, shardID) {
+                               mv.logger.Debug().
+                                       Uint64("segment_id", segmentID).
+                                       Str("filename", segmentFileName).
+                                       Time("target_segment", 
targetSegmentTime).
+                                       Str("group", mv.group).
+                                       Msg("trace series segment already 
completed for this target segment, skipping")
+                               continue
+                       }
+
+                       mv.logger.Info().
+                               Uint64("segment_id", segmentID).
+                               Str("filename", segmentFileName).
+                               Time("target_segment", targetSegmentTime).
+                               Str("group", mv.group).
+                               Msg("migrating trace series segment file")
+
+                       // Create file reader for the segment file
+                       segmentFilePath := filepath.Join(seriesIndexPath, 
segmentFileName)
+                       segmentFile, err := mv.lfs.OpenFile(segmentFilePath)
+                       if err != nil {
+                               errorMsg := fmt.Sprintf("failed to open trace 
segment file %s: %v", segmentFilePath, err)
+                               mv.progress.MarkTraceSeriesError(mv.group, 
segmentIDStr, shardID, errorMsg)
+                               mv.logger.Error().
+                                       Str("path", segmentFilePath).
+                                       Err(err).
+                                       Msg("failed to open trace segment file")
+                               return fmt.Errorf("failed to open trace segment 
file %s: %w", segmentFilePath, err)
+                       }
+
+                       // Close the file reader
+                       defer func() {
+                               if i == len(targetSegments)-1 { // Only close 
on last iteration
+                                       segmentFile.Close()
+                               }
+                       }()
+
+                       files = append(files, fileInfo{
+                               name: segmentFileName,
+                               file: segmentFile,
+                       })
+
+                       mv.logger.Info().
+                               Uint64("segment_id", segmentID).
+                               Str("filename", segmentFileName).
+                               Time("target_segment", targetSegmentTime).
+                               Str("group", mv.group).
+                               Int("completed_segments", 
mv.progress.GetTraceSeriesProgress(mv.group)).
+                               Int("total_segments", 
mv.progress.GetTraceSeriesCount(mv.group)).
+                               Msgf("trace series segment migration completed 
for target segment %d/%d", i+1, len(targetSegments))
+               }
+
+               // Send segment file to each shard in shardIDs for this target 
segment
+               segmentIDStr := getSegmentTimeRange(targetSegmentTime, 
mv.targetStageInterval).String()
+               for _, shardID := range shardIDs {
+                       targetShardID := 
mv.calculateTargetShardID(uint32(shardID))
+                       ff := make([]queue.FileInfo, 0, len(files))
+                       for _, file := range files {
+                               ff = append(ff, queue.FileInfo{
+                                       Name:   file.name,
+                                       Reader: file.file.SequentialRead(),
+                               })
+                       }
+                       partData := 
mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR, 
data.TopicTraceSeriesSync.String())
+
+                       // Stream segment to target shard replicas
+                       if err := mv.streamPartToTargetShard(targetShardID, 
[]queue.StreamingPartData{partData}); err != nil {
+                               errorMsg := fmt.Sprintf("failed to stream trace 
segment to target shard %d: %v", targetShardID, err)
+                               mv.progress.MarkTraceSeriesError(mv.group, 
segmentIDStr, shardID, errorMsg)
+                               return fmt.Errorf("failed to stream trace 
segment to target shard %d: %w", targetShardID, err)
+                       }
+                       // Mark segment as completed for this specific target 
segment
+                       mv.progress.MarkTraceSeriesCompleted(mv.group, 
segmentIDStr, shardID)
+               }
+       }
+
+       return nil
+}
+
+// VisitShard implements trace.Visitor - core and sidx migration logic.
+func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange, 
sourceShardID common.ShardID, shardPath string) error {
+       segmentIDStr := timestampTR.String()
+       if mv.progress.IsTraceShardCompleted(mv.group, shardPath, 
sourceShardID) {
+               mv.logger.Debug().
+                       Str("shard_path", shardPath).
+                       Str("group", mv.group).
+                       Uint32("source_shard", uint32(sourceShardID)).
+                       Msg("trace shard already completed for this target 
segment, skipping")
+               return nil
+       }
+       allParts := make([]queue.StreamingPartData, 0)
+
+       sidxPartData, sidxReleases, err := 
mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath, 
"sidx"))
+       if err != nil {
+               return fmt.Errorf("failed to generate sidx part data: %s: %w", 
shardPath, err)
+       }
+       defer func() {
+               for _, release := range sidxReleases {
+                       release()
+               }
+       }()
+       allParts = append(allParts, sidxPartData...)
+
+       partDatas, partDataReleases, err := 
mv.generateAllPartData(sourceShardID, shardPath)
+       if err != nil {
+               return fmt.Errorf("failed to generate core par data: %s: %w", 
shardPath, err)
+       }
+       defer func() {
+               for _, release := range partDataReleases {
+                       release()
+               }
+       }()
+       allParts = append(allParts, partDatas...)
+
+       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+
+       sort.Slice(allParts, func(i, j int) bool {
+               if allParts[i].ID == allParts[j].ID {
+                       return allParts[i].PartType < allParts[j].PartType
+               }
+               return allParts[i].ID < allParts[j].ID
+       })
+
+       // Stream part to target segment
+       if err := mv.streamPartToTargetShard(targetShardID, allParts); err != 
nil {
+               errorMsg := fmt.Errorf("failed to stream to target shard %d: 
%w", targetShardID, err)
+               mv.progress.MarkTraceShardError(mv.group, segmentIDStr, 
sourceShardID, errorMsg.Error())
+               return fmt.Errorf("failed to stream trace shard to target 
segment shard: %w", err)
+       }
+
+       // Mark shard as completed for this target segment
+       mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID)
+       mv.logger.Info().
+               Str("group", mv.group).
+               Msgf("trace shard migration completed for target segment")
+
+       return nil
+}
+
+func (mv *traceMigrationVisitor) generateAllSidxPartData(
+       segmentTR *timestamp.TimeRange,
+       sourceShardID common.ShardID,
+       sidxPath string,
+) ([]queue.StreamingPartData, []func(), error) {
+       // Sidx structure: sidx/{index-name}/{part-id}/files
+       // Find all index directories in the sidx directory
+       entries := mv.lfs.ReadDir(sidxPath)
+
+       parts := make([]queue.StreamingPartData, 0, len(entries))
+       releases := make([]func(), 0, len(entries))
+
+       var indexDirs []string
+       for _, entry := range entries {
+               if entry.IsDir() {
+                       indexDirs = append(indexDirs, entry.Name())
+               }
+       }
+
+       if len(indexDirs) == 0 {
+               mv.logger.Debug().
+                       Str("path", sidxPath).
+                       Msg("no index directories found in trace sidx 
directory")
+               return nil, nil, nil
+       }
+
+       mv.logger.Info().
+               Int("index_count", len(indexDirs)).
+               Str("path", sidxPath).
+               Uint32("source_shard", uint32(sourceShardID)).
+               Msg("found trace sidx indexes for migration")
+
+       // Calculate target shard ID
+       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+
+       // Process each index directory
+       for _, indexName := range indexDirs {
+               indexDirPath := filepath.Join(sidxPath, indexName)
+
+               // Find all part directories in this index
+               partEntries := mv.lfs.ReadDir(indexDirPath)
+               var partDirs []string
+               for _, entry := range partEntries {
+                       if entry.IsDir() {
+                               // Validate it's a valid hex string (part ID)
+                               if _, err := strconv.ParseUint(entry.Name(), 
16, 64); err == nil {
+                                       partDirs = append(partDirs, 
entry.Name())
+                               }
+                       }
+               }
+
+               if len(partDirs) == 0 {
+                       mv.logger.Debug().
+                               Str("index_name", indexName).
+                               Str("path", indexDirPath).
+                               Msg("no part directories found in trace sidx 
index")
+                       continue
+               }
+
+               mv.logger.Info().
+                       Str("index_name", indexName).
+                       Int("part_count", len(partDirs)).
+                       Str("path", indexDirPath).
+                       Msg("found trace sidx parts for index")
+
+               // Process each part directory
+               for _, partDirName := range partDirs {
+                       partID, _ := strconv.ParseUint(partDirName, 16, 64)
+                       partPath := filepath.Join(indexDirPath, partDirName)
+
+                       // Create file readers for this sidx part
+                       files, release := 
sidx.CreatePartFileReaderFromPath(partPath, mv.lfs)
+
+                       // Create StreamingPartData with PartType = index name
+                       partData, err := sidx.ParsePartMetadata(mv.lfs, 
partPath)
+                       if err != nil {
+                               return nil, nil, fmt.Errorf("failed to parse 
sidx part metadata: %s: %w", partPath, err)
+                       }
+                       partData.Group = mv.group
+                       partData.ShardID = targetShardID
+                       partData.Topic = data.TopicTracePartSync.String()
+                       partData.ID = partID
+                       partData.PartType = indexName // Use index name as 
PartType (not "core")
+                       partData.Files = files
+                       partData.MinTimestamp = segmentTR.Start.UnixNano()
+                       partData.MaxTimestamp = segmentTR.End.UnixNano()
+
+                       mv.logger.Debug().
+                               Str("index_name", indexName).
+                               Uint64("part_id", partID).
+                               Uint32("source_shard", uint32(sourceShardID)).
+                               Uint32("target_shard", targetShardID).
+                               Str("group", mv.group).
+                               Msg("generated trace sidx part data")
+
+                       parts = append(parts, *partData)
+                       releases = append(releases, release)
+               }
+       }
+
+       return parts, releases, nil
+}
+
+func (mv *traceMigrationVisitor) generateAllPartData(sourceShardID 
common.ShardID, shardPath string) ([]queue.StreamingPartData, []func(), error) {
+       entries := mv.lfs.ReadDir(shardPath)
+
+       allParts := make([]queue.StreamingPartData, 0)
+       allReleases := make([]func(), 0)
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       continue
+               }
+
+               name := entry.Name()
+               // Check if this is a part directory (16-character hex string)
+               if len(name) != 16 {
+                       continue // Skip non-part entries
+               }
+
+               // Validate it's a valid hex string (part ID)
+               if _, err := strconv.ParseUint(name, 16, 64); err != nil {
+                       continue // Skip invalid part entries
+               }
+
+               partPath := filepath.Join(shardPath, name)
+               parts, releases, err := mv.generatePartData(sourceShardID, 
partPath)
+               if err != nil {
+                       return nil, nil, fmt.Errorf("failed to generate part 
data for path %s: %w", partPath, err)
+               }
+               allParts = append(allParts, parts...)
+               allReleases = append(allReleases, releases...)
+       }
+
+       return allParts, allReleases, nil
+}
+
+func (mv *traceMigrationVisitor) generatePartData(sourceShardID 
common.ShardID, partPath string) ([]queue.StreamingPartData, []func(), error) {
+       partData, err := trace.ParsePartMetadata(mv.lfs, partPath)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse trace part 
metadata: %w", err)
+       }
+       partID, err := parsePartIDFromPath(partPath)
+       if err != nil {
+               return nil, nil, fmt.Errorf("failed to parse part ID from path: 
%w", err)
+       }
+       // Calculate ALL target segments this part should go to
+       targetSegments := calculateTargetSegments(
+               partData.MinTimestamp,
+               partData.MaxTimestamp,
+               mv.targetStageInterval,
+       )
+
+       mv.logger.Info().
+               Uint64("part_id", partID).
+               Uint32("source_shard", uint32(sourceShardID)).
+               Int("target_segments_count", len(targetSegments)).
+               Int64("part_min_ts", partData.MinTimestamp).
+               Int64("part_max_ts", partData.MaxTimestamp).
+               Str("group", mv.group).
+               Msg("generate trace part files to multiple target segments")
+
+       parts := make([]queue.StreamingPartData, 0)
+       releases := make([]func(), 0)
+       // Send part to EACH target segment that overlaps with its time range
+       for i, targetSegmentTime := range targetSegments {
+               targetShardID := 
mv.calculateTargetShardID(uint32(sourceShardID))
+
+               // Create file readers for this part
+               files, release := trace.CreatePartFileReaderFromPath(partPath, 
mv.lfs)
+
+               // Clone part data for this target segment
+               targetPartData := partData
+               targetPartData.ID = partID
+               targetPartData.Group = mv.group
+               targetPartData.ShardID = targetShardID
+               targetPartData.Topic = data.TopicTracePartSync.String()
+               targetPartData.Files = files
+               targetPartData.PartType = trace.PartTypeCore
+               parts = append(parts, targetPartData)
+               releases = append(releases, release)
+
+               mv.logger.Info().
+                       Uint64("part_id", partID).
+                       Time("target_segment", targetSegmentTime).
+                       Str("group", mv.group).
+                       Msgf("generated trace part file migration completed for 
target segment %d/%d", i+1, len(targetSegments))
+       }
+
+       return parts, releases, nil
+}
+
+// calculateTargetShardID maps source shard ID to target shard ID.
+func (mv *traceMigrationVisitor) calculateTargetShardID(sourceShardID uint32) 
uint32 {
+       return calculateTargetShardID(sourceShardID, mv.targetShardNum)
+}
+
+// streamPartToTargetShard sends part data to all replicas of the target shard.
+func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32, 
partData []queue.StreamingPartData) error {
+       copies := mv.replicas + 1
+
+       // Send to all replicas using the exact pattern from steps.go:219-236
+       for replicaID := uint32(0); replicaID < copies; replicaID++ {
+               // Use selector.Pick exactly like steps.go:220
+               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               if err != nil {
+                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
+               }
+
+               // Stream part data to target node using chunked sync
+               if err := mv.streamPartToNode(nodeID, targetShardID, partData); 
err != nil {
+                       return fmt.Errorf("failed to stream trace part to node 
%s: %w", nodeID, err)
+               }
+       }
+
+       return nil
+}
+
+// streamPartToNode streams part data to a specific target node.
+func (mv *traceMigrationVisitor) streamPartToNode(nodeID string, targetShardID 
uint32, partData []queue.StreamingPartData) error {
+       // Get or create chunked client for this node (cache hit optimization)
+       chunkedClient, exists := mv.chunkedClients[nodeID]
+       if !exists {
+               var err error
+               // Create new chunked sync client via queue.Client
+               chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, 
uint32(mv.chunkSize))
+               if err != nil {
+                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", nodeID, err)
+               }
+               mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse
+       }
+
+       // Stream using chunked transfer (same as syncer.go:202)
+       ctx := context.Background()
+       result, err := chunkedClient.SyncStreamingParts(ctx, partData)
+       if err != nil {
+               return fmt.Errorf("failed to sync streaming parts to node %s: 
%w", nodeID, err)
+       }
+
+       if !result.Success {
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.FailedParts)
+       }
+
+       mv.logger.Info().
+               Str("node", nodeID).
+               Str("session", result.SessionID).
+               Uint64("bytes", result.TotalBytes).
+               Int64("duration_ms", result.DurationMs).
+               Uint32("chunks", result.ChunksCount).
+               Uint32("parts", result.PartsCount).
+               Uint32("target_shard", targetShardID).
+               Str("group", mv.group).
+               Msg("file-based trace migration shard completed successfully")
+
+       return nil
+}
+
+// Close cleans up all chunked sync clients.
+func (mv *traceMigrationVisitor) Close() error {
+       for nodeID, client := range mv.chunkedClients {
+               if err := client.Close(); err != nil {
+                       mv.logger.Warn().Err(err).Str("node", 
nodeID).Msg("failed to close chunked sync client")
+               }
+       }
+       mv.chunkedClients = make(map[string]queue.ChunkedSyncClient)
+       return nil
+}
+
+// createStreamingSegmentFromFiles creates StreamingPartData from segment 
files.
+func (mv *traceMigrationVisitor) createStreamingSegmentFromFiles(
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       topic string,
+) queue.StreamingPartData {
+       segmentData := queue.StreamingPartData{
+               Group:        mv.group,
+               ShardID:      targetShardID, // Use calculated target shard
+               Topic:        topic,         // Use the new topic
+               Files:        files,
+               MinTimestamp: segmentTR.Start.UnixNano(),
+               MaxTimestamp: segmentTR.End.UnixNano(),
+       }
+
+       return segmentData
+}
+
+// SetTraceShardCount sets the total number of shards for the current trace.
+func (mv *traceMigrationVisitor) SetTraceShardCount(totalShards int) {
+       if mv.progress != nil {
+               mv.progress.SetTraceShardCount(mv.group, totalShards)
+               mv.logger.Info().
+                       Str("group", mv.group).
+                       Int("total_shards", totalShards).
+                       Msg("set trace part count for progress tracking")
+       }
+}
+
+// SetTraceSeriesCount sets the total number of series segments for the 
current trace.
+func (mv *traceMigrationVisitor) SetTraceSeriesCount(totalSegments int) {
+       if mv.progress != nil {
+               mv.progress.SetTraceSeriesCount(mv.group, totalSegments)
+               mv.logger.Info().
+                       Str("group", mv.group).
+                       Int("total_segments", totalSegments).
+                       Msg("set trace series count for progress tracking")
+       }
+}
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 8447c575..96be5642 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -20,6 +20,7 @@ package sidx
 import (
        "encoding/json"
        "fmt"
+       "path"
        "path/filepath"
        "sort"
        "strings"
@@ -27,6 +28,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/common"
        internalencoding 
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -163,6 +165,136 @@ func (p *part) loadPartMetadata() error {
        return nil
 }
 
+// ParsePartMetadata reads and parses part metadata from manifest.json in the 
specified parent path.
+func ParsePartMetadata(fs fs.FileSystem, parentPath string) 
(*queue.StreamingPartData, error) {
+       manifestData, err := fs.Read(filepath.Join(parentPath, 
manifestFilename))
+       if err != nil {
+               return nil, fmt.Errorf("failed to read manifest.json: %w", err)
+       }
+       // Parse JSON manifest
+       pm := &partMetadata{}
+       if unmarshalErr := json.Unmarshal(manifestData, pm); unmarshalErr != 
nil {
+               return nil, fmt.Errorf("failed to unmarshal manifest.json: %w", 
unmarshalErr)
+       }
+       return &queue.StreamingPartData{
+               ID:                    pm.ID,
+               CompressedSizeBytes:   pm.CompressedSizeBytes,
+               UncompressedSizeBytes: pm.UncompressedSizeBytes,
+               TotalCount:            pm.TotalCount,
+               BlocksCount:           pm.BlocksCount,
+               MinKey:                pm.MinKey,
+               MaxKey:                pm.MaxKey,
+       }, nil
+}
+
+// CreatePartFileReaderFromPath opens all files in a part directory and 
returns their FileInfo and a cleanup function.
+func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) 
([]queue.FileInfo, func()) {
+       var files []queue.FileInfo
+       var readers []fs.Reader
+
+       // Core trace files (required files)
+       metaPath := path.Join(partPath, metaFilename)
+       metaReader, err := lfs.OpenFile(metaPath)
+       if err != nil {
+               logger.Panicf("cannot open trace meta file %q: %s", metaPath, 
err)
+       }
+       readers = append(readers, metaReader)
+       files = append(files, queue.FileInfo{
+               Name:   SidxMetaName,
+               Reader: metaReader.SequentialRead(),
+       })
+
+       primaryPath := path.Join(partPath, primaryFilename)
+       primaryReader, err := lfs.OpenFile(primaryPath)
+       if err != nil {
+               logger.Panicf("cannot open trace primary file %q: %s", 
primaryPath, err)
+       }
+       readers = append(readers, primaryReader)
+       files = append(files, queue.FileInfo{
+               Name:   SidxPrimaryName,
+               Reader: primaryReader.SequentialRead(),
+       })
+
+       keysPath := path.Join(partPath, keysFilename)
+       if keysReader, err := lfs.OpenFile(keysPath); err == nil {
+               readers = append(readers, keysReader)
+               files = append(files, queue.FileInfo{
+                       Name:   SidxKeysName,
+                       Reader: keysReader.SequentialRead(),
+               })
+       }
+
+       dataPath := path.Join(partPath, dataFilename)
+       if dataReader, err := lfs.OpenFile(dataPath); err == nil {
+               readers = append(readers, dataReader)
+               files = append(files, queue.FileInfo{
+                       Name:   SidxDataName,
+                       Reader: dataReader.SequentialRead(),
+               })
+       }
+
+       // Dynamic tag files (*.t and *.tm)
+       ee := lfs.ReadDir(partPath)
+       for _, e := range ee {
+               if e.IsDir() {
+                       continue
+               }
+
+               // Tag metadata files (.tm)
+               if filepath.Ext(e.Name()) == tagMetadataExtension {
+                       tmPath := path.Join(partPath, e.Name())
+                       tmReader, err := lfs.OpenFile(tmPath)
+                       if err != nil {
+                               logger.Panicf("cannot open trace tag metadata 
file %q: %s", tmPath, err)
+                       }
+                       readers = append(readers, tmReader)
+                       tagName := removeExt(e.Name(), tagMetadataExtension)
+                       files = append(files, queue.FileInfo{
+                               Name:   TagMetadataPrefix + tagName,
+                               Reader: tmReader.SequentialRead(),
+                       })
+               }
+
+               // Tag family files (.tf)
+               if filepath.Ext(e.Name()) == tagFilterExtension {
+                       tPath := path.Join(partPath, e.Name())
+                       tReader, err := lfs.OpenFile(tPath)
+                       if err != nil {
+                               logger.Panicf("cannot open trace tag file %q: 
%s", tPath, err)
+                       }
+                       readers = append(readers, tReader)
+                       tagName := removeExt(e.Name(), tagFilterExtension)
+                       files = append(files, queue.FileInfo{
+                               Name:   TagFilterPrefix + tagName,
+                               Reader: tReader.SequentialRead(),
+                       })
+               }
+
+               // Tag data files (.td)
+               if filepath.Ext(e.Name()) == tagDataExtension {
+                       tdPath := path.Join(partPath, e.Name())
+                       tdReader, err := lfs.OpenFile(tdPath)
+                       if err != nil {
+                               logger.Panicf("cannot open trace tag data file 
%q: %s", tdPath, err)
+                       }
+                       readers = append(readers, tdReader)
+                       tagName := removeExt(e.Name(), tagDataExtension)
+                       files = append(files, queue.FileInfo{
+                               Name:   TagDataPrefix + tagName,
+                               Reader: tdReader.SequentialRead(),
+                       })
+               }
+       }
+
+       cleanup := func() {
+               for _, reader := range readers {
+                       fs.MustClose(reader)
+               }
+       }
+
+       return files, cleanup
+}
+
 // loadPrimaryBlockMetadata reads and parses primary block metadata from 
meta.bin.
 func (p *part) loadPrimaryBlockMetadata() {
        // Load primary block metadata from meta.bin file (compressed 
primaryBlockMetadata)
@@ -784,3 +916,7 @@ func (spc *SyncPartContext) Close() {
        spc.writers = nil
        spc.memPart = nil
 }
+
+func removeExt(nameWithExt, ext string) string {
+       return nameWithExt[:len(nameWithExt)-len(ext)]
+}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a047bb7b..3754000e 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -262,6 +262,8 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) error 
{
                }
        }()
 
+       log.Info().Int("segment_count", len(segments)).Str("db_location", 
d.location).
+               Msgf("taking file snapshot for %s", dst)
        for _, seg := range segments {
                segDir := filepath.Base(seg.location)
                segPath := filepath.Join(dst, segDir)
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 3a3a9972..4389d7c0 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -21,7 +21,10 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "os"
        "path/filepath"
+       "sort"
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -215,6 +218,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string, 
groupName string) error {
        return nil
 }
 
+// collectSegDirs walks a directory tree and collects all seg-* directory 
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all 
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+       segDirs := make(map[string]bool)
+
+       walkErr := filepath.Walk(rootDir, func(currentPath string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       // If we can't read a directory, log but continue
+                       return nil
+               }
+
+               // Skip files, only process directories
+               if !info.IsDir() {
+                       return nil
+               }
+
+               // Get the directory name
+               dirName := filepath.Base(currentPath)
+
+               // Check if this is a seg-* directory
+               if strings.HasPrefix(dirName, "seg-") {
+                       // Get relative path from root
+                       relPath, relErr := filepath.Rel(rootDir, currentPath)
+                       if relErr != nil {
+                               return fmt.Errorf("failed to get relative path 
for %s: %w", currentPath, relErr)
+                       }
+
+                       // Add to our collection
+                       segDirs[relPath] = true
+
+                       // Don't recurse into seg-* directories
+                       return filepath.SkipDir
+               }
+
+               return nil
+       })
+
+       if walkErr != nil {
+               return nil, fmt.Errorf("failed to walk directory %s: %w", 
rootDir, walkErr)
+       }
+
+       return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three 
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in 
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched, 
onlyInSnapshot, onlyInData []string) {
+       // Find directories in both sets (matched)
+       for dir := range snapshotDirs {
+               if dataDirs[dir] {
+                       matched = append(matched, dir)
+               } else {
+                       onlyInSnapshot = append(onlyInSnapshot, dir)
+               }
+       }
+
+       // Find directories only in data
+       for dir := range dataDirs {
+               if !snapshotDirs[dir] {
+                       onlyInData = append(onlyInData, dir)
+               }
+       }
+
+       // Sort for consistent output
+       sort.Strings(matched)
+       sort.Strings(onlyInSnapshot)
+       sort.Strings(onlyInData)
+
+       return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data 
directory
+// to verify that seg-* directories are consistent. Only logs differences, 
does not return errors.
+func (s *snapshotListener) compareSnapshotWithData(snapshotDir, dataDir, 
groupName string) {
+       // Collect seg-* directories from snapshot
+       snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+       if snapshotErr != nil {
+               s.s.l.Warn().Err(snapshotErr).
+                       Str("group", groupName).
+                       Str("snapshotDir", snapshotDir).
+                       Msg("failed to collect seg-* directories from snapshot, 
skipping comparison")
+               return
+       }
+
+       // Collect seg-* directories from data
+       dataDirs, dataErr := collectSegDirs(dataDir)
+       if dataErr != nil {
+               s.s.l.Warn().Err(dataErr).
+                       Str("group", groupName).
+                       Str("dataDir", dataDir).
+                       Msg("failed to collect seg-* directories from data, 
skipping comparison")
+               return
+       }
+
+       // Compare the directories
+       matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs, 
dataDirs)
+
+       // Log consolidated comparison results at Info level
+       s.s.l.Info().
+               Str("group", groupName).
+               Int("matched", len(matched)).
+               Int("onlyInSnapshot", len(onlyInSnapshot)).
+               Int("onlyInData", len(onlyInData)).
+               Strs("matchedDirs", matched).
+               Strs("onlyInSnapshotDirs", onlyInSnapshot).
+               Strs("onlyInDataDirs", onlyInData).
+               Msgf("snapshot comparison for group %s, data dir: %s, snapshot 
dir: %s",
+                       groupName, dataDir, snapshotDir)
+}
+
 type snapshotListener struct {
        *bus.UnImplementedHealthyListener
        s           *standalone
@@ -254,11 +368,17 @@ func (s *snapshotListener) Rev(ctx context.Context, 
message bus.Message) bus.Mes
                        return 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
                default:
                }
-               if errGroup := 
s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn, 
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
-                       s.s.l.Error().Err(errGroup).Str("group", 
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+               groupName := g.GetSchema().Metadata.Name
+               snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
+               if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName); 
errGroup != nil {
+                       s.s.l.Error().Err(errGroup).Str("group", 
groupName).Msg("fail to take group snapshot")
                        err = multierr.Append(err, errGroup)
                        continue
                }
+
+               // Compare snapshot with data directory to verify consistency
+               dataPath := filepath.Join(s.s.dataPath, groupName)
+               s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
        }
        snp := &databasev1.Snapshot{
                Name:    sn,
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 86358908..0d191027 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -453,6 +453,15 @@ func (d *dataSnapshotListener) Rev(ctx context.Context, 
message bus.Message) bus
        var gg []resourceSchema.Group
        if len(groups) == 0 {
                gg = d.s.schemaRepo.LoadAllGroups()
+               n := ""
+               for _, g := range gg {
+                       if n != "" {
+                               n += ","
+                       }
+                       n += g.GetSchema().Metadata.Name
+               }
+
+               log.Info().Msgf("loaded all snapshots: %s", n)
        } else {
                for _, g := range groups {
                        if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
@@ -464,6 +473,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context, 
message bus.Message) bus
                        }
                        gg = append(gg, group)
                }
+               log.Info().Msgf("loaded groups: %s", gg)
        }
        if len(gg) == 0 {
                return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
@@ -489,6 +499,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context, 
message bus.Message) bus
                Name:    sn,
                Catalog: commonv1.Catalog_CATALOG_MEASURE,
        }
+       log.Info().Msgf("snapshot %s created", sn)
        if err != nil {
                snp.Error = err.Error()
        }
diff --git a/banyand/queue/sub/segments.go b/banyand/queue/sub/segments.go
index 02b27856..1895b6ea 100644
--- a/banyand/queue/sub/segments.go
+++ b/banyand/queue/sub/segments.go
@@ -23,6 +23,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -80,3 +81,30 @@ func (s *measureService) DeleteExpiredSegments(ctx 
context.Context, request *mea
        }
        return &measurev1.DeleteExpiredSegmentsResponse{Deleted: deleted}, nil
 }
+
+type traceService struct {
+       tracev1.UnimplementedTraceServiceServer
+       ser *server
+}
+
+func (s *traceService) DeleteExpiredSegments(ctx context.Context, request 
*tracev1.DeleteExpiredSegmentsRequest) (*tracev1.DeleteExpiredSegmentsResponse, 
error) {
+       s.ser.listenersLock.RLock()
+       defer s.ser.listenersLock.RUnlock()
+       ll := s.ser.getListeners(data.TopicDeleteExpiredTraceSegments)
+       if len(ll) == 0 {
+               logger.Panicf("no listener found for topic %s", 
data.TopicDeleteExpiredTraceSegments)
+       }
+       var deleted int64
+       for _, l := range ll {
+               message := l.Rev(ctx, bus.NewMessage(bus.MessageID(0), request))
+               data := message.Data()
+               if data != nil {
+                       d, ok := data.(int64)
+                       if !ok {
+                               logger.Panicf("invalid data type %T", data)
+                       }
+                       deleted += d
+               }
+       }
+       return &tracev1.DeleteExpiredSegmentsResponse{Deleted: deleted}, nil
+}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 842770a8..c1949b20 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -43,6 +43,7 @@ import (
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
        measurev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bus"
@@ -225,6 +226,7 @@ func (s *server) Serve() run.StopNotify {
        databasev1.RegisterSnapshotServiceServer(s.ser, s)
        streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
        measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})
+       tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s})
 
        var ctx context.Context
        ctx, s.clientCloser = context.WithCancel(context.Background())
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 3e3731e8..41008a5f 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -21,8 +21,10 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "os"
        "path/filepath"
        "sort" // added for sorting parts
+       "strings"
        "sync"
        "sync/atomic"
        "time"
@@ -267,6 +269,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string, 
groupName string) error {
        return nil
 }
 
+// collectSegDirs walks a directory tree and collects all seg-* directory 
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all 
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+       segDirs := make(map[string]bool)
+
+       walkErr := filepath.Walk(rootDir, func(currentPath string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       // If we can't read a directory, log but continue
+                       return nil
+               }
+
+               // Skip files, only process directories
+               if !info.IsDir() {
+                       return nil
+               }
+
+               // Get the directory name
+               dirName := filepath.Base(currentPath)
+
+               // Check if this is a seg-* directory
+               if strings.HasPrefix(dirName, "seg-") {
+                       // Get relative path from root
+                       relPath, relErr := filepath.Rel(rootDir, currentPath)
+                       if relErr != nil {
+                               return fmt.Errorf("failed to get relative path 
for %s: %w", currentPath, relErr)
+                       }
+
+                       // Add to our collection
+                       segDirs[relPath] = true
+
+                       // Don't recurse into seg-* directories
+                       return filepath.SkipDir
+               }
+
+               return nil
+       })
+
+       if walkErr != nil {
+               return nil, fmt.Errorf("failed to walk directory %s: %w", 
rootDir, walkErr)
+       }
+
+       return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three 
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in 
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched, 
onlyInSnapshot, onlyInData []string) {
+       // Find directories in both sets (matched)
+       for dir := range snapshotDirs {
+               if dataDirs[dir] {
+                       matched = append(matched, dir)
+               } else {
+                       onlyInSnapshot = append(onlyInSnapshot, dir)
+               }
+       }
+
+       // Find directories only in data
+       for dir := range dataDirs {
+               if !snapshotDirs[dir] {
+                       onlyInData = append(onlyInData, dir)
+               }
+       }
+
+       // Sort for consistent output
+       sort.Strings(matched)
+       sort.Strings(onlyInSnapshot)
+       sort.Strings(onlyInData)
+
+       return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data 
directory
+// to verify that seg-* directories are consistent. Only logs differences, 
does not return errors.
+func (s *snapshotListener) compareSnapshotWithData(snapshotDir, dataDir, 
groupName string) {
+       // Collect seg-* directories from snapshot
+       snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+       if snapshotErr != nil {
+               s.s.l.Warn().Err(snapshotErr).
+                       Str("group", groupName).
+                       Str("snapshotDir", snapshotDir).
+                       Msg("failed to collect seg-* directories from snapshot, 
skipping comparison")
+               return
+       }
+
+       // Collect seg-* directories from data
+       dataDirs, dataErr := collectSegDirs(dataDir)
+       if dataErr != nil {
+               s.s.l.Warn().Err(dataErr).
+                       Str("group", groupName).
+                       Str("dataDir", dataDir).
+                       Msg("failed to collect seg-* directories from data, 
skipping comparison")
+               return
+       }
+
+       // Compare the directories
+       matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs, 
dataDirs)
+
+       // Log consolidated comparison results at Info level
+       s.s.l.Info().
+               Str("group", groupName).
+               Int("matched", len(matched)).
+               Int("onlyInSnapshot", len(onlyInSnapshot)).
+               Int("onlyInData", len(onlyInData)).
+               Strs("matchedDirs", matched).
+               Strs("onlyInSnapshotDirs", onlyInSnapshot).
+               Strs("onlyInDataDirs", onlyInData).
+               Msgf("snapshot comparison for group %s, data dir: %s, snapshot 
dir: %s",
+                       groupName, dataDir, snapshotDir)
+}
+
 type snapshotListener struct {
        *bus.UnImplementedHealthyListener
        s           *standalone
@@ -306,11 +419,17 @@ func (s *snapshotListener) Rev(ctx context.Context, 
message bus.Message) bus.Mes
                        return 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
                default:
                }
-               if errGroup := 
s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn, 
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
-                       s.s.l.Error().Err(errGroup).Str("group", 
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+               groupName := g.GetSchema().Metadata.Name
+               snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
+               if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName); 
errGroup != nil {
+                       s.s.l.Error().Err(errGroup).Str("group", 
groupName).Msg("fail to take group snapshot")
                        err = multierr.Append(err, errGroup)
                        continue
                }
+
+               // Compare snapshot with data directory to verify consistency
+               dataPath := filepath.Join(s.s.dataPath, groupName)
+               s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
        }
        snp := &databasev1.Snapshot{
                Name:    sn,
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 2f744b61..0e2c50be 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -26,6 +26,7 @@ import (
        "sync/atomic"
 
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/fs"
@@ -409,3 +410,106 @@ func partPath(root string, epoch uint64) string {
 func partName(epoch uint64) string {
        return fmt.Sprintf("%016x", epoch)
 }
+
+// CreatePartFileReaderFromPath opens all files in a part directory and 
returns their FileInfo and a cleanup function.
+func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) 
([]queue.FileInfo, func()) {
+       var files []queue.FileInfo
+       var readers []fs.Reader
+
+       // Core trace files (required files)
+       metaPath := path.Join(partPath, metaFilename)
+       metaReader, err := lfs.OpenFile(metaPath)
+       if err != nil {
+               logger.Panicf("cannot open trace meta file %q: %s", metaPath, 
err)
+       }
+       readers = append(readers, metaReader)
+       files = append(files, queue.FileInfo{
+               Name:   traceMetaName,
+               Reader: metaReader.SequentialRead(),
+       })
+
+       primaryPath := path.Join(partPath, primaryFilename)
+       primaryReader, err := lfs.OpenFile(primaryPath)
+       if err != nil {
+               logger.Panicf("cannot open trace primary file %q: %s", 
primaryPath, err)
+       }
+       readers = append(readers, primaryReader)
+       files = append(files, queue.FileInfo{
+               Name:   tracePrimaryName,
+               Reader: primaryReader.SequentialRead(),
+       })
+
+       spansPath := path.Join(partPath, spansFilename)
+       if spansReader, err := lfs.OpenFile(spansPath); err == nil {
+               readers = append(readers, spansReader)
+               files = append(files, queue.FileInfo{
+                       Name:   traceSpansName,
+                       Reader: spansReader.SequentialRead(),
+               })
+       }
+
+       // Special trace files: traceID.filter and tag.type
+       traceIDFilterPath := path.Join(partPath, traceIDFilterFilename)
+       if filterReader, err := lfs.OpenFile(traceIDFilterPath); err == nil {
+               readers = append(readers, filterReader)
+               files = append(files, queue.FileInfo{
+                       Name:   traceIDFilterFilename,
+                       Reader: filterReader.SequentialRead(),
+               })
+       }
+
+       tagTypePath := path.Join(partPath, tagTypeFilename)
+       if tagTypeReader, err := lfs.OpenFile(tagTypePath); err == nil {
+               readers = append(readers, tagTypeReader)
+               files = append(files, queue.FileInfo{
+                       Name:   tagTypeFilename,
+                       Reader: tagTypeReader.SequentialRead(),
+               })
+       }
+
+       // Dynamic tag files (*.t and *.tm)
+       ee := lfs.ReadDir(partPath)
+       for _, e := range ee {
+               if e.IsDir() {
+                       continue
+               }
+
+               // Tag metadata files (.tm)
+               if filepath.Ext(e.Name()) == tagsMetadataFilenameExt {
+                       tmPath := path.Join(partPath, e.Name())
+                       tmReader, err := lfs.OpenFile(tmPath)
+                       if err != nil {
+                               logger.Panicf("cannot open trace tag metadata 
file %q: %s", tmPath, err)
+                       }
+                       readers = append(readers, tmReader)
+                       tagName := removeExt(e.Name(), tagsMetadataFilenameExt)
+                       files = append(files, queue.FileInfo{
+                               Name:   traceTagMetadataPrefix + tagName,
+                               Reader: tmReader.SequentialRead(),
+                       })
+               }
+
+               // Tag data files (.t)
+               if filepath.Ext(e.Name()) == tagsFilenameExt {
+                       tPath := path.Join(partPath, e.Name())
+                       tReader, err := lfs.OpenFile(tPath)
+                       if err != nil {
+                               logger.Panicf("cannot open trace tag file %q: 
%s", tPath, err)
+                       }
+                       readers = append(readers, tReader)
+                       tagName := removeExt(e.Name(), tagsFilenameExt)
+                       files = append(files, queue.FileInfo{
+                               Name:   traceTagsPrefix + tagName,
+                               Reader: tReader.SequentialRead(),
+                       })
+               }
+       }
+
+       cleanup := func() {
+               for _, reader := range readers {
+                       fs.MustClose(reader)
+               }
+       }
+
+       return files, cleanup
+}
diff --git a/banyand/trace/part_metadata.go b/banyand/trace/part_metadata.go
index b4f5189f..bd3aec7a 100644
--- a/banyand/trace/part_metadata.go
+++ b/banyand/trace/part_metadata.go
@@ -25,6 +25,7 @@ import (
        "github.com/pkg/errors"
 
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/filter"
@@ -248,3 +249,26 @@ func (tf *traceIDFilter) mustWriteTraceIDFilter(fileSystem 
fs.FileSystem, partPa
                logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", traceIDFilterPath, n, len(data))
        }
 }
+
+// ParsePartMetadata parses the part metadata from the metadata.json file.
+func ParsePartMetadata(fileSystem fs.FileSystem, partPath string) 
(queue.StreamingPartData, error) {
+       metadataPath := filepath.Join(partPath, metadataFilename)
+       metadata, err := fileSystem.Read(metadataPath)
+       if err != nil {
+               return queue.StreamingPartData{}, errors.WithMessage(err, 
"cannot read metadata.json")
+       }
+       var pm partMetadata
+       if err := json.Unmarshal(metadata, &pm); err != nil {
+               return queue.StreamingPartData{}, errors.WithMessage(err, 
"cannot parse metadata.json")
+       }
+
+       return queue.StreamingPartData{
+               ID:                    pm.ID,
+               CompressedSizeBytes:   pm.CompressedSizeBytes,
+               UncompressedSizeBytes: pm.UncompressedSpanSizeBytes,
+               TotalCount:            pm.TotalCount,
+               BlocksCount:           pm.BlocksCount,
+               MinTimestamp:          pm.MinTimestamp,
+               MaxTimestamp:          pm.MaxTimestamp,
+       }, nil
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 03eed89f..479cc214 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -205,7 +205,7 @@ func parseSnapshot(name string) (uint64, error) {
 
 func (tst *tsTable) TakeFileSnapshot(dst string) error {
        for k, v := range tst.sidxMap {
-               indexDir := filepath.Join(dst, k)
+               indexDir := filepath.Join(dst, sidxDirName, k)
                tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
                if err := v.TakeFileSnapshot(indexDir); err != nil {
                        return fmt.Errorf("failed to take file snapshot for 
index, %s: %w", k, err)
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 085ba984..8f14c950 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -20,8 +20,10 @@ package trace
 import (
        "context"
        "fmt"
+       "os"
        "path"
        "path/filepath"
+       "sort"
        "strings"
        "sync"
        "time"
@@ -33,6 +35,7 @@ import (
        "github.com/apache/skywalking-banyandb/api/data"
        commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       tracev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/observability"
@@ -163,7 +166,12 @@ func (s *standalone) PreRun(ctx context.Context) error {
        if err != nil {
                return err
        }
+       err = s.pipeline.Subscribe(data.TopicDeleteExpiredTraceSegments, 
&standaloneDeleteTraceSegmentsListener{s: s})
+       if err != nil {
+               return err
+       }
        s.pipeline.RegisterChunkedSyncHandler(data.TopicTracePartSync, 
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
+       s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync, 
setUpSeriesSyncCallback(s.l, &s.schemaRepo))
        err = s.pipeline.Subscribe(data.TopicTraceSidxSeriesWrite, 
setUpSidxSeriesIndexCallback(s.l, &s.schemaRepo))
        if err != nil {
                return err
@@ -297,6 +305,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string, 
groupName string) error {
        return nil
 }
 
+// collectSegDirs walks a directory tree and collects all seg-* directory 
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all 
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+       segDirs := make(map[string]bool)
+
+       walkErr := filepath.Walk(rootDir, func(currentPath string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       // If we can't read a directory, log but continue
+                       return nil
+               }
+
+               // Skip files, only process directories
+               if !info.IsDir() {
+                       return nil
+               }
+
+               // Get the directory name
+               dirName := filepath.Base(currentPath)
+
+               // Check if this is a seg-* directory
+               if strings.HasPrefix(dirName, "seg-") {
+                       // Get relative path from root
+                       relPath, relErr := filepath.Rel(rootDir, currentPath)
+                       if relErr != nil {
+                               return fmt.Errorf("failed to get relative path 
for %s: %w", currentPath, relErr)
+                       }
+
+                       // Add to our collection
+                       segDirs[relPath] = true
+
+                       // Don't recurse into seg-* directories
+                       return filepath.SkipDir
+               }
+
+               return nil
+       })
+
+       if walkErr != nil {
+               return nil, fmt.Errorf("failed to walk directory %s: %w", 
rootDir, walkErr)
+       }
+
+       return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three 
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in 
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched, 
onlyInSnapshot, onlyInData []string) {
+       // Find directories in both sets (matched)
+       for dir := range snapshotDirs {
+               if dataDirs[dir] {
+                       matched = append(matched, dir)
+               } else {
+                       onlyInSnapshot = append(onlyInSnapshot, dir)
+               }
+       }
+
+       // Find directories only in data
+       for dir := range dataDirs {
+               if !snapshotDirs[dir] {
+                       onlyInData = append(onlyInData, dir)
+               }
+       }
+
+       // Sort for consistent output
+       sort.Strings(matched)
+       sort.Strings(onlyInSnapshot)
+       sort.Strings(onlyInData)
+
+       return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data 
directory
+// to verify that seg-* directories are consistent. Only logs differences, 
does not return errors.
+func (d *standaloneSnapshotListener) compareSnapshotWithData(snapshotDir, 
dataDir, groupName string) {
+       // Collect seg-* directories from snapshot
+       snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+       if snapshotErr != nil {
+               d.s.l.Warn().Err(snapshotErr).
+                       Str("group", groupName).
+                       Str("snapshotDir", snapshotDir).
+                       Msg("failed to collect seg-* directories from snapshot, 
skipping comparison")
+               return
+       }
+
+       // Collect seg-* directories from data
+       dataDirs, dataErr := collectSegDirs(dataDir)
+       if dataErr != nil {
+               d.s.l.Warn().Err(dataErr).
+                       Str("group", groupName).
+                       Str("dataDir", dataDir).
+                       Msg("failed to collect seg-* directories from data, 
skipping comparison")
+               return
+       }
+
+       // Compare the directories
+       matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs, 
dataDirs)
+
+       // Log consolidated comparison results at Info level
+       d.s.l.Info().
+               Str("group", groupName).
+               Int("matched", len(matched)).
+               Int("onlyInSnapshot", len(onlyInSnapshot)).
+               Int("onlyInData", len(onlyInData)).
+               Strs("matchedDirs", matched).
+               Strs("onlyInSnapshotDirs", onlyInSnapshot).
+               Strs("onlyInDataDirs", onlyInData).
+               Msgf("snapshot comparison for group %s, data dir: %s, snapshot 
dir: %s",
+                       groupName, dataDir, snapshotDir)
+}
+
 type standaloneSnapshotListener struct {
        *bus.UnImplementedHealthyListener
        s           *standalone
@@ -335,11 +454,17 @@ func (d *standaloneSnapshotListener) Rev(ctx 
context.Context, message bus.Messag
                        return 
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
                default:
                }
-               if errGroup := 
d.s.takeGroupSnapshot(filepath.Join(d.s.snapshotDir, sn, 
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
-                       d.s.l.Error().Err(errGroup).Str("group", 
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+               groupName := g.GetSchema().Metadata.Name
+               snapshotPath := filepath.Join(d.s.snapshotDir, sn, groupName)
+               if errGroup := d.s.takeGroupSnapshot(snapshotPath, groupName); 
errGroup != nil {
+                       d.s.l.Error().Err(errGroup).Str("group", 
groupName).Msg("fail to take group snapshot")
                        err = multierr.Append(err, errGroup)
                        continue
                }
+
+               // Compare snapshot with data directory to verify consistency
+               dataPath := filepath.Join(d.s.dataPath, groupName)
+               d.compareSnapshotWithData(snapshotPath, dataPath, groupName)
        }
        snp := &databasev1.Snapshot{
                Name:    sn,
@@ -356,6 +481,27 @@ func (d *standaloneSnapshotListener) snapshotName() string 
{
        return fmt.Sprintf("%s-%08X", 
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
 }
 
+type standaloneDeleteTraceSegmentsListener struct {
+       *bus.UnImplementedHealthyListener
+       s *standalone
+}
+
+func (s *standaloneDeleteTraceSegmentsListener) Rev(_ context.Context, message 
bus.Message) bus.Message {
+       req := message.Data().(*tracev1.DeleteExpiredSegmentsRequest)
+       if req == nil {
+               return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
int64(0))
+       }
+
+       db, err := s.s.schemaRepo.loadTSDB(req.Group)
+       if err != nil {
+               s.s.l.Error().Err(err).Str("group", req.Group).Msg("fail to 
load tsdb")
+               return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), 
int64(0))
+       }
+       s.s.l.Info().Msg("test")
+       deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
+       return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
+}
+
 // NewService returns a new service.
 func NewService(metadata metadata.Repo, pipeline queue.Server, omr 
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
        return &standalone{
diff --git a/banyand/trace/visitor.go b/banyand/trace/visitor.go
new file mode 100644
index 00000000..8beff6d8
--- /dev/null
+++ b/banyand/trace/visitor.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// Visitor defines the interface for visiting trace components.
+type Visitor interface {
+       // VisitSeries visits the series index directory for a segment.
+       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, 
shardIDs []common.ShardID) error
+       // VisitShard visits the shard directory for a segment.
+       VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, 
segmentPath string) error
+}
+
+// traceSegmentVisitor adapts Visitor to work with storage.SegmentVisitor.
+type traceSegmentVisitor struct {
+       visitor Visitor
+}
+
+// VisitSeries implements storage.SegmentVisitor.
+func (tv *traceSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string, shardIDs []common.ShardID) error {
+       return tv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs)
+}
+
+// VisitShard implements storage.SegmentVisitor.
+func (tv *traceSegmentVisitor) VisitShard(segmentTR *timestamp.TimeRange, 
shardID common.ShardID, shardPath string) error {
+       // Visit parts within the shard
+       return tv.visitor.VisitShard(segmentTR, shardID, shardPath)
+}
+
+// VisitTracesInTimeRange traverses trace parts within the specified time range
+// and calls the visitor methods for parts and sidx directories.
+// This function works directly with the filesystem without requiring a 
database instance.
+// Returns a list of segment suffixes that were visited.
+func VisitTracesInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) 
([]string, error) {
+       adapter := &traceSegmentVisitor{visitor: visitor}
+       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, intervalRule)
+}
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 5fdefc6e..6e44cbb7 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -23,8 +23,11 @@ import (
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/index"
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
@@ -114,24 +117,126 @@ func (s *syncPartContext) Close() error {
        return nil
 }
 
-type syncCallback struct {
+type syncSeriesContext struct {
+       streamer index.ExternalSegmentStreamer
+       segment  storage.Segment[*tsTable, *commonv1.ResourceOpts]
+       l        *logger.Logger
+       fileName string
+}
+
+func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error 
{
+       logger.Panicf("new part type is not supported for trace")
+       return nil
+}
+
+func (s *syncSeriesContext) FinishSync() error {
+       if s.streamer != nil {
+               if err := s.streamer.CompleteSegment(); err != nil {
+                       s.l.Error().Err(err).Msg("failed to complete external 
segment")
+                       return err
+               }
+       }
+       return s.Close()
+}
+
+func (s *syncSeriesContext) Close() error {
+       if s.segment != nil {
+               s.segment.DecRef()
+       }
+       s.streamer = nil
+       s.fileName = ""
+       s.segment = nil
+       return nil
+}
+
+type syncSeriesCallback struct {
+       l          *logger.Logger
+       schemaRepo *schemaRepo
+}
+
+func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo) 
queue.ChunkedSyncHandler {
+       return &syncSeriesCallback{
+               l:          l,
+               schemaRepo: s,
+       }
+}
+
+func (s *syncSeriesCallback) CheckHealth() *common.Error {
+       return nil
+}
+
+func (s *syncSeriesCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+       tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
+       if err != nil {
+               s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
+               return nil, err
+       }
+       segmentTime := time.Unix(0, ctx.MinTimestamp)
+       segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
+       if err != nil {
+               s.l.Error().Err(err).Str("group", 
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
+               return nil, err
+       }
+       return &syncSeriesContext{
+               l:       s.l,
+               segment: segment,
+       }, nil
+}
+
+// HandleFileChunk implements queue.ChunkedSyncHandler for streaming series 
index chunks.
+func (s *syncSeriesCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk []byte) error {
+       if ctx.Handler == nil {
+               return fmt.Errorf("part handler is nil")
+       }
+       seriesCtx := ctx.Handler.(*syncSeriesContext)
+
+       if seriesCtx.segment == nil {
+               return fmt.Errorf("segment is nil")
+       }
+       if seriesCtx.fileName != ctx.FileName {
+               if seriesCtx.streamer != nil {
+                       if err := seriesCtx.streamer.CompleteSegment(); err != 
nil {
+                               s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to complete external segment")
+                               return err
+                       }
+               }
+               indexDB := seriesCtx.segment.IndexDB()
+               streamer, err := indexDB.EnableExternalSegments()
+               if err != nil {
+                       s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to enable external segments")
+                       return err
+               }
+               if err := streamer.StartSegment(); err != nil {
+                       s.l.Error().Err(err).Str("group", 
ctx.Group).Msg("failed to start external segment")
+                       return err
+               }
+               seriesCtx.fileName = ctx.FileName
+               seriesCtx.streamer = streamer
+       }
+       if err := seriesCtx.streamer.WriteChunk(chunk); err != nil {
+               return fmt.Errorf("failed to write chunk (size: %d) to file %q: 
%w", len(chunk), ctx.FileName, err)
+       }
+       return nil
+}
+
+type syncChunkCallback struct {
        l          *logger.Logger
        schemaRepo *schemaRepo
 }
 
 func setUpChunkedSyncCallback(l *logger.Logger, schemaRepo *schemaRepo) 
queue.ChunkedSyncHandler {
-       return &syncCallback{
+       return &syncChunkCallback{
                l:          l,
                schemaRepo: schemaRepo,
        }
 }
 
-func (s *syncCallback) CheckHealth() *common.Error {
+func (s *syncChunkCallback) CheckHealth() *common.Error {
        return nil
 }
 
 // CreatePartHandler implements queue.ChunkedSyncHandler.
-func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) 
(queue.PartHandler, error) {
+func (s *syncChunkCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
        tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
        if err != nil {
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
@@ -159,7 +264,7 @@ func (s *syncCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (que
 }
 
 // HandleFileChunk implements queue.ChunkedSyncHandler for streaming file 
chunks.
-func (s *syncCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, 
chunk []byte) error {
+func (s *syncChunkCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext, 
chunk []byte) error {
        if ctx.Handler == nil {
                return fmt.Errorf("part handler is nil")
        }
@@ -169,7 +274,7 @@ func (s *syncCallback) HandleFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk
        return s.handleTraceFileChunk(ctx, chunk)
 }
 
-func (s *syncCallback) handleSidxFileChunk(ctx *queue.ChunkedSyncPartContext, 
chunk []byte) error {
+func (s *syncChunkCallback) handleSidxFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk []byte) error {
        sidxName := ctx.PartType
        fileName := ctx.FileName
        partCtx := ctx.Handler.(*syncPartContext)
@@ -202,7 +307,7 @@ func (s *syncCallback) handleSidxFileChunk(ctx 
*queue.ChunkedSyncPartContext, ch
        return nil
 }
 
-func (s *syncCallback) handleTraceFileChunk(ctx *queue.ChunkedSyncPartContext, 
chunk []byte) error {
+func (s *syncChunkCallback) handleTraceFileChunk(ctx 
*queue.ChunkedSyncPartContext, chunk []byte) error {
        fileName := ctx.FileName
        partCtx := ctx.Handler.(*syncPartContext)
        switch {
@@ -235,10 +340,10 @@ func (s *syncCallback) handleTraceFileChunk(ctx 
*queue.ChunkedSyncPartContext, c
        return nil
 }
 
-func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, 
chunk []byte) {
+func (s *syncChunkCallback) handleTraceIDFilterChunk(partCtx *syncPartContext, 
chunk []byte) {
        partCtx.traceIDFilterBuffer = append(partCtx.traceIDFilterBuffer, 
chunk...)
 }
 
-func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk 
[]byte) {
+func (s *syncChunkCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk 
[]byte) {
        partCtx.tagTypeBuffer = append(partCtx.tagTypeBuffer, chunk...)
 }
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 18047c3d..ff86ff51 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -335,6 +335,9 @@
     - [WriteResponse](#banyandb-trace-v1-WriteResponse)
   
 - [banyandb/trace/v1/rpc.proto](#banyandb_trace_v1_rpc-proto)
+    - 
[DeleteExpiredSegmentsRequest](#banyandb-trace-v1-DeleteExpiredSegmentsRequest)
+    - 
[DeleteExpiredSegmentsResponse](#banyandb-trace-v1-DeleteExpiredSegmentsResponse)
+  
     - [TraceService](#banyandb-trace-v1-TraceService)
   
 - [Scalar Value Types](#scalar-value-types)
@@ -4947,6 +4950,37 @@ TagFamilySpec defines the specification of a tag family.
 ## banyandb/trace/v1/rpc.proto
 
 
+
+<a name="banyandb-trace-v1-DeleteExpiredSegmentsRequest"></a>
+
+### DeleteExpiredSegmentsRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| group | [string](#string) |  |  |
+| segment_suffixes | [string](#string) | repeated |  |
+
+
+
+
+
+
+<a name="banyandb-trace-v1-DeleteExpiredSegmentsResponse"></a>
+
+### DeleteExpiredSegmentsResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| deleted | [int64](#int64) |  |  |
+
+
+
+
+
  
 
  
@@ -4963,6 +4997,7 @@ TagFamilySpec defines the specification of a tag family.
 | ----------- | ------------ | ------------- | ------------|
 | Query | [QueryRequest](#banyandb-trace-v1-QueryRequest) | 
[QueryResponse](#banyandb-trace-v1-QueryResponse) |  |
 | Write | [WriteRequest](#banyandb-trace-v1-WriteRequest) stream | 
[WriteResponse](#banyandb-trace-v1-WriteResponse) stream |  |
+| DeleteExpiredSegments | 
[DeleteExpiredSegmentsRequest](#banyandb-trace-v1-DeleteExpiredSegmentsRequest) 
| 
[DeleteExpiredSegmentsResponse](#banyandb-trace-v1-DeleteExpiredSegmentsResponse)
 |  |
 
  
 
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 73d15beb..e085f814 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -26,6 +26,7 @@ import (
        "time"
 
        "github.com/pkg/errors"
+       "github.com/rs/zerolog/log"
        "go.uber.org/multierr"
        "google.golang.org/protobuf/proto"
 
@@ -294,6 +295,7 @@ func (sr *schemaRepo) createGroup(name string) (g *group) {
 func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
        name := groupMeta.GetName()
        g, loaded := sr.groupMap.LoadAndDelete(name)
+       log.Info().Str("group", name).Bool("loaded", loaded).Msg("deleting 
group")
        if !loaded {
                return nil
        }
diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go
index 275c1765..414e8ef6 100644
--- a/pkg/test/trace/etcd.go
+++ b/pkg/test/trace/etcd.go
@@ -34,6 +34,7 @@ import (
 
 const (
        groupDir            = "testdata/groups"
+       groupStagesDir      = "testdata/groups_stages"
        traceDir            = "testdata/traces"
        indexRuleDir        = "testdata/index_rules"
        indexRuleBindingDir = "testdata/index_rule_bindings"
@@ -44,14 +45,19 @@ var store embed.FS
 
 // PreloadSchema loads schemas from files in the booting process.
 func PreloadSchema(ctx context.Context, e schema.Registry) error {
-       return loadAllSchemas(ctx, e)
+       return loadAllSchemas(ctx, e, groupDir)
+}
+
+// PreloadSchemaWithStages loads group schemas with stages from files in the 
booting process.
+func PreloadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+       return loadAllSchemas(ctx, e, groupStagesDir)
 }
 
 // loadAllSchemas loads all trace-related schemas from the testdata directory.
-func loadAllSchemas(ctx context.Context, e schema.Registry) error {
+func loadAllSchemas(ctx context.Context, e schema.Registry, group string) 
error {
        return preloadSchemaWithFuncs(ctx, e,
                func(ctx context.Context, e schema.Registry) error {
-                       return loadSchema(groupDir, &commonv1.Group{}, 
func(group *commonv1.Group) error {
+                       return loadSchema(group, &commonv1.Group{}, func(group 
*commonv1.Group) error {
                                return e.CreateGroup(ctx, group)
                        })
                },
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-group.json 
b/pkg/test/trace/testdata/groups_stages/test-trace-group.json
new file mode 100644
index 00000000..4a1b35ba
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-group.json
@@ -0,0 +1,34 @@
+{
+    "metadata": {
+        "name": "test-trace-group"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 0,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 3
+        },
+        "stages": [
+            {
+                "name": "warm",
+                "shard_num": 2,
+                "segment_interval": {
+                    "unit": "UNIT_DAY",
+                    "num": 3
+                },
+                "ttl": {
+                    "unit": "UNIT_DAY",
+                    "num": 7
+                },
+                "node_selector": "type=warm"
+            }
+        ]
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-updated.json 
b/pkg/test/trace/testdata/groups_stages/test-trace-updated.json
new file mode 100644
index 00000000..2e016960
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-updated.json
@@ -0,0 +1,34 @@
+{
+    "metadata": {
+        "name": "test-trace-updated"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 0,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 3
+        },
+        "stages": [
+            {
+                "name": "warm",
+                "shard_num": 2,
+                "segment_interval": {
+                    "unit": "UNIT_DAY",
+                    "num": 3
+                },
+                "ttl": {
+                    "unit": "UNIT_DAY",
+                    "num": 7
+                },
+                "node_selector": "type=warm"
+            }
+        ]
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json 
b/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json
new file mode 100644
index 00000000..25004d1c
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json
@@ -0,0 +1,34 @@
+{
+    "metadata": {
+        "name": "zipkinTrace"
+    },
+    "catalog": "CATALOG_TRACE",
+    "resource_opts": {
+        "shard_num": 2,
+        "replicas": 1,
+        "segment_interval": {
+            "unit": "UNIT_DAY",
+            "num": 1
+        },
+        "ttl": {
+            "unit": "UNIT_DAY",
+            "num": 7
+        },
+        "stages": [
+            {
+                "name": "warm",
+                "shard_num": 2,
+                "segment_interval": {
+                    "unit": "UNIT_DAY",
+                    "num": 3
+                },
+                "ttl": {
+                    "unit": "UNIT_DAY",
+                    "num": 7
+                },
+                "node_selector": "type=warm"
+            }
+        ]
+    },
+    "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/lifecycle/lifecycle.go 
b/test/cases/lifecycle/lifecycle.go
index f831e252..00f0185e 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -36,6 +36,7 @@ import (
        measureTestData 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
        streamTestData 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
        topNTestData 
"github.com/apache/skywalking-banyandb/test/cases/topn/data"
+       traceTestData 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
 )
 
 // SharedContext is the shared context for the snapshot test cases.
@@ -53,6 +54,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                        "--grpc-addr", SharedContext.DataAddr,
                        "--stream-root-path", SharedContext.SrcDir,
                        "--measure-root-path", SharedContext.SrcDir,
+                       "--trace-root-path", SharedContext.SrcDir,
                        "--etcd-endpoints", SharedContext.EtcdAddr,
                        "--progress-file", pf,
                        "--report-dir", rf,
@@ -97,6 +99,13 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                        Duration: 25 * time.Minute,
                        Offset:   -20 * time.Minute,
                })
+
+               // Verify trace data lifecycle stages
+               verifyLifecycleStages(sc, traceTestData.VerifyFn, helpers.Args{
+                       Input:    "having_query_tag",
+                       Duration: 25 * time.Minute,
+                       Offset:   -20 * time.Minute,
+               })
        })
        ginkgo.It("should migrate data correctly with a scheduler", func() {
                dir, err := os.MkdirTemp("", "lifecycle-restore-dest")
@@ -109,6 +118,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                        "--grpc-addr", SharedContext.DataAddr,
                        "--stream-root-path", SharedContext.SrcDir,
                        "--measure-root-path", SharedContext.SrcDir,
+                       "--trace-root-path", SharedContext.SrcDir,
                        "--etcd-endpoints", SharedContext.EtcdAddr,
                        "--progress-file", pf,
                        "--report-dir", rf,
@@ -155,6 +165,13 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                        Duration: 25 * time.Minute,
                        Offset:   -20 * time.Minute,
                })
+
+               // Verify trace data lifecycle stages
+               verifyLifecycleStages(sc, traceTestData.VerifyFn, helpers.Args{
+                       Input:    "having_query_tag",
+                       Duration: 25 * time.Minute,
+                       Offset:   -20 * time.Minute,
+               })
        })
 })
 
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go 
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 8b7b2923..13287bdf 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -41,6 +41,7 @@ import (
        test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
        "github.com/apache/skywalking-banyandb/pkg/test/setup"
        test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
        test_cases "github.com/apache/skywalking-banyandb/test/cases"
        caseslifecycle 
"github.com/apache/skywalking-banyandb/test/cases/lifecycle"
@@ -95,13 +96,16 @@ var _ = SynchronizedBeforeSuite(func() []byte {
        ctx := context.Background()
        test_stream.LoadSchemaWithStages(ctx, schemaRegistry)
        test_measure.LoadSchemaWithStages(ctx, schemaRegistry)
+       test_trace.PreloadSchemaWithStages(ctx, schemaRegistry)
        test_property.PreloadSchema(ctx, schemaRegistry)
        By("Starting hot data node")
        var closeDataNode0 func()
-       dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=hot", "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s")
+       dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=hot",
+               "--measure-flush-timeout", "0s", "--stream-flush-timeout", 
"0s", "--trace-flush-timeout", "0s")
        By("Starting warm data node")
        var closeDataNode1 func()
-       _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=warm", "--measure-flush-timeout", "0s", 
"--stream-flush-timeout", "0s")
+       _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep, 
"--node-labels", "type=warm",
+               "--measure-flush-timeout", "0s", "--stream-flush-timeout", 
"0s", "--trace-flush-timeout", "0s")
        By("Starting liaison node")
        var closerLiaisonNode func()
        liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep, 
"--data-node-selector", "type=hot")

Reply via email to