This is an automated email from the ASF dual-hosted git repository.

ButterBright pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7cd2616f66bd2da68358782dcc2510429633d496
Author: mrproliu <[email protected]>
AuthorDate: Fri May 22 14:04:24 2026 +0800

    Fix incorrect counts, rate and missing trace fields in the lifecycle 
migration report (#1133)
---
 CHANGES.md                                         |   1 +
 .../backup/lifecycle/measure_migration_visitor.go  |  14 +-
 banyand/backup/lifecycle/migration_integration.go  | 148 +++++++-----
 banyand/backup/lifecycle/progress.go               | 198 +++++++++++++---
 banyand/backup/lifecycle/progress_mark_test.go     | 192 ++++++++++++++++
 .../lifecycle/report_partial_failure_test.go       | 255 +++++++++++++++++++++
 banyand/backup/lifecycle/service.go                |  69 +++++-
 .../backup/lifecycle/stream_migration_visitor.go   |  21 +-
 .../backup/lifecycle/trace_migration_visitor.go    |  21 +-
 test/cases/lifecycle/lifecycle.go                  | 141 +++++++++++-
 10 files changed, 947 insertions(+), 113 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 8a64b558d..0f007b0aa 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -41,6 +41,7 @@ Release Notes.
 - Fix lifecycle migration where the receiving node could create segments 
shorter than the configured `SegmentInterval`.
 - Fail fast on incompatible storage version at boot. Previously the server 
would start in a degraded `SERVING` state with affected groups un-loaded 
because the property schema-registry retry loop swallowed the 
version-incompatibility panic. Compatible versions are listed in 
`banyand/internal/storage/versions.yml`.
 - Release bluge index writers on segment rotation so `analysisWorker` pools 
sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects 
kept the existing idle-segment reclaim path from running: `segmentIdleTimeout` 
defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef` 
refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never 
observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed` 
bump to real read/write call [...]
+- Fix incorrect counts and missing trace fields in the lifecycle migration 
report.
 
 ### Chores
 
diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go 
b/banyand/backup/lifecycle/measure_migration_visitor.go
index fd12b85ba..7a72e8394 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -102,9 +102,6 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, s
                Str("path", seriesIndexPath).
                Msg("found measure segment files for migration")
 
-       // Set the total number of series segments for progress tracking
-       mv.SetMeasureSeriesCount(len(segmentFiles))
-
        // Calculate ALL target segments this series index should go to
        targetSegments := calculateTargetSegments(
                segmentTR.Start.UnixNano(),
@@ -213,6 +210,15 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, s
                }
        }
 
+       // Mark each source series file as fully migrated (idempotent — bumps 
Progress once per source).
+       for _, segmentFileName := range segmentFiles {
+               fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg")
+               segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 
64)
+               if parseErr != nil {
+                       continue
+               }
+               mv.progress.MarkSourceMeasureSeriesCompleted(mv.group, 
seriesIndexPath, common.ShardID(segmentID))
+       }
        return nil
 }
 
@@ -226,7 +232,6 @@ func (mv *measureMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShard
        if err != nil {
                return fmt.Errorf("failed to parse part ID from path: %w", err)
        }
-
        // Calculate ALL target segments this part should go to
        targetSegments := calculateTargetSegments(
                partData.MinTimestamp,
@@ -291,6 +296,7 @@ func (mv *measureMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShard
                        Msgf("measure part migration completed for target 
segment %d/%d", i+1, len(targetSegments))
        }
 
+       mv.progress.MarkSourceMeasurePartCompleted(mv.group, partPath, 
sourceShardID, partID)
        return nil
 }
 
diff --git a/banyand/backup/lifecycle/migration_integration.go 
b/banyand/backup/lifecycle/migration_integration.go
index 8036f95d8..742bfeef6 100644
--- a/banyand/backup/lifecycle/migration_integration.go
+++ b/banyand/backup/lifecycle/migration_integration.go
@@ -18,11 +18,14 @@
 package lifecycle
 
 import (
+       "strings"
+
        "github.com/apache/skywalking-banyandb/api/common"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/banyand/trace"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
@@ -37,13 +40,16 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timest
        // Get target stage configuration
        targetStageInterval := getTargetStageInterval(group)
 
-       // Count total parts before starting migration
-       totalParts, segmentSuffixes, err := countStreamParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
+       // Pre-walk source items so each resource has a fixed planned 
denominator.
+       counter, segmentSuffixes, err := countStreamParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
        if err != nil {
-               logger.Warn().Err(err).Msg("failed to count stream parts, 
proceeding without part count")
+               logger.Warn().Err(err).Msg("failed to count stream source 
items, proceeding without planned counts")
        } else {
-               logger.Info().Int("total_parts", 
totalParts).Strs("segment_suffixes", segmentSuffixes).
-                       Msg("counted stream parts for progres tracking")
+               logger.Info().Int("total_parts", counter.partCount).
+                       Int("total_series_files", counter.seriesFileCount).
+                       Int("total_element_index_visits", 
counter.elementIndexCount).
+                       Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted stream source items for progress tracking")
        }
 
        // Create file-based migration visitor with progress tracking and 
target stage interval
@@ -53,9 +59,11 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timest
        )
        defer visitor.Close()
 
-       // Set the total part count for progress tracking
-       if totalParts > 0 {
-               visitor.SetStreamPartCount(totalParts)
+       // Set the planned source-item counts for progress tracking.
+       if counter != nil {
+               visitor.SetStreamPartCount(counter.partCount)
+               visitor.SetStreamSeriesCount(counter.seriesFileCount)
+               visitor.SetStreamElementIndexCount(counter.elementIndexCount)
        }
 
        // Use the existing VisitStreamsInTimeRange function with our 
file-based visitor
@@ -66,27 +74,36 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timest
        return segmentSuffixes, nil
 }
 
-// countStreamParts counts the total number of parts in the given time range.
-func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
-       // Create a simple visitor to count parts
-       partCounter := &partCountVisitor{}
+// countStreamParts counts the total number of source items in the given time 
range.
+func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (*partCountVisitor, []string, error) {
+       // Create a simple visitor to count source items
+       partCounter := &partCountVisitor{lfs: fs.NewLocalFileSystem()}
 
        // Use the existing VisitStreamsInTimeRange function to count parts
        segmentSuffixes, err := stream.VisitStreamsInTimeRange(tsdbRootPath, 
timeRange, partCounter, segmentInterval)
        if err != nil {
-               return 0, nil, err
+               return nil, nil, err
        }
 
-       return partCounter.partCount, segmentSuffixes, nil
+       return partCounter, segmentSuffixes, nil
 }
 
-// partCountVisitor is a simple visitor that counts parts.
+// partCountVisitor is a simple visitor that counts source parts, series files 
and element-index visits.
 type partCountVisitor struct {
-       partCount int
+       lfs               fs.FileSystem
+       partCount         int
+       seriesFileCount   int
+       elementIndexCount int
 }
 
-// VisitSeries implements stream.Visitor.
-func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ 
[]common.ShardID) error {
+// VisitSeries implements stream.Visitor; counts .seg files under the source 
series index directory.
+func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, 
seriesIndexPath string, _ []common.ShardID) error {
+       entries := pcv.lfs.ReadDir(seriesIndexPath)
+       for _, e := range entries {
+               if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") {
+                       pcv.seriesFileCount++
+               }
+       }
        return nil
 }
 
@@ -98,6 +115,7 @@ func (pcv *partCountVisitor) VisitPart(_ 
*timestamp.TimeRange, _ common.ShardID,
 
 // VisitElementIndex implements stream.Visitor.
 func (pcv *partCountVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ 
common.ShardID, _ string) error {
+       pcv.elementIndexCount++
        return nil
 }
 
@@ -111,13 +129,15 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath 
string, timeRange times
        // Get target stage configuration
        targetStageInterval := getTargetStageInterval(group)
 
-       // Count total parts before starting migration
-       totalParts, segmentSuffixes, err := countMeasureParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
+       // Pre-walk source items so each resource has a fixed planned 
denominator.
+       counter, segmentSuffixes, err := countMeasureParts(tsdbRootPath, 
timeRange, segmentIntervalRule)
        if err != nil {
-               logger.Warn().Err(err).Msg("failed to count measure parts, 
proceeding without part count")
+               logger.Warn().Err(err).Msg("failed to count measure source 
items, proceeding without planned counts")
        } else {
-               logger.Info().Int("total_parts", 
totalParts).Strs("segment_suffixes", segmentSuffixes).
-                       Msg("counted measure parts for progress tracking")
+               logger.Info().Int("total_parts", counter.partCount).
+                       Int("total_series_files", counter.seriesFileCount).
+                       Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted measure source items for progress 
tracking")
        }
 
        // Create file-based migration visitor with progress tracking and 
target stage interval
@@ -127,9 +147,10 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath 
string, timeRange times
        )
        defer visitor.Close()
 
-       // Set the total part count for progress tracking
-       if totalParts > 0 {
-               visitor.SetMeasurePartCount(totalParts)
+       // Set the planned source-item counts for progress tracking.
+       if counter != nil {
+               visitor.SetMeasurePartCount(counter.partCount)
+               visitor.SetMeasureSeriesCount(counter.seriesFileCount)
        }
 
        // Use the existing VisitMeasuresInTimeRange function with our 
file-based visitor
@@ -140,27 +161,35 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath 
string, timeRange times
        return segmentSuffixes, nil
 }
 
-// countMeasureParts counts the total number of parts in the given time range.
-func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
-       // Create a simple visitor to count parts
-       partCounter := &measurePartCountVisitor{}
+// countMeasureParts counts the total number of source items in the given time 
range.
+func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (*measurePartCountVisitor, []string, 
error) {
+       // Create a simple visitor to count source items
+       partCounter := &measurePartCountVisitor{lfs: fs.NewLocalFileSystem()}
 
        // Use the existing VisitMeasuresInTimeRange function to count parts
        segmentSuffixes, err := measure.VisitMeasuresInTimeRange(tsdbRootPath, 
timeRange, partCounter, segmentInterval)
        if err != nil {
-               return 0, nil, err
+               return nil, nil, err
        }
 
-       return partCounter.partCount, segmentSuffixes, nil
+       return partCounter, segmentSuffixes, nil
 }
 
-// measurePartCountVisitor is a simple visitor that counts measure parts.
+// measurePartCountVisitor is a simple visitor that counts measure source 
parts and series files.
 type measurePartCountVisitor struct {
-       partCount int
+       lfs             fs.FileSystem
+       partCount       int
+       seriesFileCount int
 }
 
-// VisitSeries implements measure.Visitor.
-func (pcv *measurePartCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ 
string, _ []common.ShardID) error {
+// VisitSeries implements measure.Visitor; counts .seg files under the source 
series index directory.
+func (pcv *measurePartCountVisitor) VisitSeries(_ *timestamp.TimeRange, 
seriesIndexPath string, _ []common.ShardID) error {
+       entries := pcv.lfs.ReadDir(seriesIndexPath)
+       for _, e := range entries {
+               if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") {
+                       pcv.seriesFileCount++
+               }
+       }
        return nil
 }
 
@@ -180,13 +209,15 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timesta
        // Get target stage configuration
        targetStageInterval := getTargetStageInterval(group)
 
-       // Count total shards before starting migration
-       totalShards, segmentSuffixes, err := countTraceShards(tsdbRootPath, 
timeRange, segmentIntervalRule)
+       // Pre-walk source items so each resource has a fixed planned 
denominator.
+       counter, segmentSuffixes, err := countTraceShards(tsdbRootPath, 
timeRange, segmentIntervalRule)
        if err != nil {
-               logger.Warn().Err(err).Msg("failed to count trace parts, 
proceeding without part count")
+               logger.Warn().Err(err).Msg("failed to count trace source items, 
proceeding without planned counts")
        } else {
-               logger.Info().Int("total_shards", 
totalShards).Strs("segment_suffixes", segmentSuffixes).
-                       Msg("counted trace parts for progress tracking")
+               logger.Info().Int("total_shards", counter.shardCount).
+                       Int("total_series_files", counter.seriesFileCount).
+                       Strs("segment_suffixes", segmentSuffixes).
+                       Msg("counted trace source items for progress tracking")
        }
 
        // Create file-based migration visitor with progress tracking and 
target stage interval
@@ -196,9 +227,10 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timesta
        )
        defer visitor.Close()
 
-       // Set the total part count for progress tracking
-       if totalShards > 0 {
-               visitor.SetTraceShardCount(totalShards)
+       // Set the planned source-item counts for progress tracking.
+       if counter != nil {
+               visitor.SetTraceShardCount(counter.shardCount)
+               visitor.SetTraceSeriesCount(counter.seriesFileCount)
        }
 
        // Use the existing VisitTracesInTimeRange function with our file-based 
visitor
@@ -209,27 +241,35 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath 
string, timeRange timesta
        return segmentSuffixes, nil
 }
 
-// countTraceShards counts the total number of shards in the given time range.
-func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (int, []string, error) {
-       // Create a simple visitor to count shards
-       shardCounter := &traceShardsCountVisitor{}
+// countTraceShards counts the total number of source items in the given time 
range.
+func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange, 
segmentInterval storage.IntervalRule) (*traceShardsCountVisitor, []string, 
error) {
+       // Create a simple visitor to count source items
+       shardCounter := &traceShardsCountVisitor{lfs: fs.NewLocalFileSystem()}
 
        // Use the existing VisitTracesInTimeRange function to count parts
        segmentSuffixes, err := trace.VisitTracesInTimeRange(tsdbRootPath, 
timeRange, shardCounter, segmentInterval)
        if err != nil {
-               return 0, nil, err
+               return nil, nil, err
        }
 
-       return shardCounter.shardCount, segmentSuffixes, nil
+       return shardCounter, segmentSuffixes, nil
 }
 
-// traceShardsCountVisitor is a simple visitor that counts trace shards.
+// traceShardsCountVisitor is a simple visitor that counts trace source shards 
and series files.
 type traceShardsCountVisitor struct {
-       shardCount int
+       lfs             fs.FileSystem
+       shardCount      int
+       seriesFileCount int
 }
 
-// VisitSeries implements trace.Visitor.
-func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ 
string, _ []common.ShardID) error {
+// VisitSeries implements trace.Visitor; counts .seg files under the source 
series index directory.
+func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, 
seriesIndexPath string, _ []common.ShardID) error {
+       entries := pcv.lfs.ReadDir(seriesIndexPath)
+       for _, e := range entries {
+               if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") {
+                       pcv.seriesFileCount++
+               }
+       }
        return nil
 }
 
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index 99f7083d1..91deb265f 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -64,11 +64,31 @@ type Progress struct {
        TraceSeriesErrors    map[string]map[string]map[common.ShardID]string 
`json:"trace_series_errors"`
        TraceSeriesCounts    map[string]int                                  
`json:"trace_series_counts"`
        TraceSeriesProgress  map[string]int                                  
`json:"trace_series_progress"`
-       progressFilePath     string                                          
`json:"-"`
-       SnapshotStreamDir    string                                          
`json:"snapshot_stream_dir"`
-       SnapshotMeasureDir   string                                          
`json:"snapshot_measure_dir"`
-       SnapshotTraceDir     string                                          
`json:"snapshot_trace_dir"`
-       mu                   sync.Mutex                                      
`json:"-"`
+
+       // Per-source completion tracking (advances *Progress idempotently once 
per source item).
+       SourceCompletedStreamParts        
map[string]map[string]map[common.ShardID]map[uint64]bool 
`json:"source_completed_stream_parts"`
+       SourceCompletedStreamSeries       
map[string]map[string]map[common.ShardID]bool            
`json:"source_completed_stream_series"`
+       SourceCompletedStreamElementIndex 
map[string]map[string]map[common.ShardID]bool            
`json:"source_completed_stream_element_index"`
+       SourceCompletedMeasureParts       
map[string]map[string]map[common.ShardID]map[uint64]bool 
`json:"source_completed_measure_parts"`
+       SourceCompletedMeasureSeries      
map[string]map[string]map[common.ShardID]bool            
`json:"source_completed_measure_series"`
+       SourceCompletedTraceShards        
map[string]map[string]map[common.ShardID]bool            
`json:"source_completed_trace_shards"`
+       SourceCompletedTraceSeries        
map[string]map[string]map[common.ShardID]bool            
`json:"source_completed_trace_series"`
+
+       progressFilePath   string     `json:"-"`
+       SnapshotStreamDir  string     `json:"snapshot_stream_dir"`
+       SnapshotMeasureDir string     `json:"snapshot_measure_dir"`
+       SnapshotTraceDir   string     `json:"snapshot_trace_dir"`
+       GroupsToProcess    []string   `json:"groups_to_process"`
+       mu                 sync.Mutex `json:"-"`
+}
+
+// SetGroupsToProcess records the set of groups picked up by this migration 
cycle.
+// Used as the denominator for migration_status.completion_rate so the report
+// reflects "completed / scheduled" rather than overlapping per-catalog 
buckets.
+func (p *Progress) SetGroupsToProcess(groups []string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       p.GroupsToProcess = append(p.GroupsToProcess[:0], groups...)
 }
 
 // AllGroupsNotFullyCompleted find is there have any group not fully completed.
@@ -120,8 +140,17 @@ func NewProgress(path string, l *logger.Logger) *Progress {
                TraceSeriesErrors:           
make(map[string]map[string]map[common.ShardID]string),
                TraceSeriesCounts:           make(map[string]int),
                TraceSeriesProgress:         make(map[string]int),
-               progressFilePath:            path,
-               logger:                      l,
+
+               SourceCompletedStreamParts:        
make(map[string]map[string]map[common.ShardID]map[uint64]bool),
+               SourceCompletedStreamSeries:       
make(map[string]map[string]map[common.ShardID]bool),
+               SourceCompletedStreamElementIndex: 
make(map[string]map[string]map[common.ShardID]bool),
+               SourceCompletedMeasureParts:       
make(map[string]map[string]map[common.ShardID]map[uint64]bool),
+               SourceCompletedMeasureSeries:      
make(map[string]map[string]map[common.ShardID]bool),
+               SourceCompletedTraceShards:        
make(map[string]map[string]map[common.ShardID]bool),
+               SourceCompletedTraceSeries:        
make(map[string]map[string]map[common.ShardID]bool),
+
+               progressFilePath: path,
+               logger:           l,
        }
 }
 
@@ -266,9 +295,6 @@ func (p *Progress) MarkStreamPartCompleted(group string, 
segmentID string, shard
 
        // Mark part as completed
        p.CompletedStreamParts[group][segmentID][shardID][partID] = true
-
-       // Update progress count
-       p.StreamPartProgress[group]++
 }
 
 // IsStreamPartCompleted checks if a specific part of a stream has been 
completed.
@@ -354,9 +380,6 @@ func (p *Progress) MarkStreamSeriesCompleted(group string, 
segmentID string, sha
 
        // Mark series segment as completed
        p.CompletedStreamSeries[group][segmentID][shardID] = true
-
-       // Update progress count
-       p.StreamSeriesProgress[group]++
 }
 
 // IsStreamSeriesCompleted checks if a specific series segment of a stream has 
been completed.
@@ -466,9 +489,6 @@ func (p *Progress) MarkStreamElementIndexCompleted(group 
string, segmentID strin
 
        // Mark shard as completed
        p.CompletedStreamElementIndex[group][segmentID][shardID] = true
-
-       // Update progress count
-       p.StreamElementIndexProgress[group]++
 }
 
 // IsStreamElementIndexCompleted checks if a specific element index file of a 
stream has been completed.
@@ -552,9 +572,6 @@ func (p *Progress) MarkMeasurePartCompleted(group string, 
segmentID string, shar
 
        // Mark part as completed
        p.CompletedMeasureParts[group][segmentID][shardID][partID] = true
-
-       // Update progress count
-       p.MeasurePartProgress[group]++
 }
 
 // IsMeasurePartCompleted checks if a specific part of a measure has been 
completed.
@@ -633,6 +650,8 @@ func (p *Progress) ClearErrors() {
        p.StreamElementIndexErrors = 
make(map[string]map[string]map[common.ShardID]string)
        p.MeasurePartErrors = 
make(map[string]map[string]map[common.ShardID]map[uint64]string)
        p.MeasureSeriesErrors = 
make(map[string]map[string]map[common.ShardID]string)
+       p.TraceShardErrors = 
make(map[string]map[string]map[common.ShardID]string)
+       p.TraceSeriesErrors = 
make(map[string]map[string]map[common.ShardID]string)
 }
 
 // MarkMeasureSeriesCompleted marks a specific series segment of a measure as 
completed.
@@ -651,9 +670,6 @@ func (p *Progress) MarkMeasureSeriesCompleted(group string, 
segmentID string, sh
 
        // Mark series segment as completed
        p.CompletedMeasureSeries[group][segmentID][shardID] = true
-
-       // Update progress count
-       p.MeasureSeriesProgress[group]++
 }
 
 // IsMeasureSeriesCompleted checks if a specific series segment of a measure 
has been completed.
@@ -746,9 +762,6 @@ func (p *Progress) MarkTraceShardCompleted(group string, 
segmentID string, shard
                p.CompletedTraceShards[group][segmentID] = 
make(map[common.ShardID]bool)
        }
        p.CompletedTraceShards[group][segmentID][shardID] = true
-
-       // Increment progress
-       p.TraceShardProgress[group]++
 }
 
 // IsTraceShardCompleted checks if a specific part of a trace has been 
completed.
@@ -820,9 +833,6 @@ func (p *Progress) MarkTraceSeriesCompleted(group string, 
segmentID string, shar
                p.CompletedTraceSeries[group][segmentID] = 
make(map[common.ShardID]bool)
        }
        p.CompletedTraceSeries[group][segmentID][shardID] = true
-
-       // Increment progress
-       p.TraceSeriesProgress[group]++
 }
 
 // IsTraceSeriesCompleted checks if a specific series segment of a trace has 
been completed.
@@ -881,3 +891,135 @@ func (p *Progress) GetTraceSeriesProgress(group string) 
int {
        }
        return 0
 }
+
+// MarkSourceStreamPartCompleted records that one source stream part finished
+// every target write successfully; idempotent (++Progress on first call only).
+func (p *Progress) MarkSourceStreamPartCompleted(group string, sourceSegmentID 
string, sourceShardID common.ShardID, partID uint64) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedStreamParts[group] == nil {
+               p.SourceCompletedStreamParts[group] = 
make(map[string]map[common.ShardID]map[uint64]bool)
+       }
+       if p.SourceCompletedStreamParts[group][sourceSegmentID] == nil {
+               p.SourceCompletedStreamParts[group][sourceSegmentID] = 
make(map[common.ShardID]map[uint64]bool)
+       }
+       if p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID] 
== nil {
+               
p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID] = 
make(map[uint64]bool)
+       }
+       if 
!p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID][partID] {
+               
p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID][partID] = 
true
+               p.StreamPartProgress[group]++
+       }
+}
+
+// MarkSourceStreamSeriesCompleted records that one source stream series file 
finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceStreamSeriesCompleted(group string, 
sourceSegmentID string, sourceShardID common.ShardID) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedStreamSeries[group] == nil {
+               p.SourceCompletedStreamSeries[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.SourceCompletedStreamSeries[group][sourceSegmentID] == nil {
+               p.SourceCompletedStreamSeries[group][sourceSegmentID] = 
make(map[common.ShardID]bool)
+       }
+       if 
!p.SourceCompletedStreamSeries[group][sourceSegmentID][sourceShardID] {
+               
p.SourceCompletedStreamSeries[group][sourceSegmentID][sourceShardID] = true
+               p.StreamSeriesProgress[group]++
+       }
+}
+
+// MarkSourceStreamElementIndexCompleted records that one source stream 
element-index visit finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceStreamElementIndexCompleted(group string, 
sourceSegmentID string, sourceShardID common.ShardID) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedStreamElementIndex[group] == nil {
+               p.SourceCompletedStreamElementIndex[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.SourceCompletedStreamElementIndex[group][sourceSegmentID] == nil {
+               p.SourceCompletedStreamElementIndex[group][sourceSegmentID] = 
make(map[common.ShardID]bool)
+       }
+       if 
!p.SourceCompletedStreamElementIndex[group][sourceSegmentID][sourceShardID] {
+               
p.SourceCompletedStreamElementIndex[group][sourceSegmentID][sourceShardID] = 
true
+               p.StreamElementIndexProgress[group]++
+       }
+}
+
+// MarkSourceMeasurePartCompleted records that one source measure part finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceMeasurePartCompleted(group string, 
sourceSegmentID string, sourceShardID common.ShardID, partID uint64) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedMeasureParts[group] == nil {
+               p.SourceCompletedMeasureParts[group] = 
make(map[string]map[common.ShardID]map[uint64]bool)
+       }
+       if p.SourceCompletedMeasureParts[group][sourceSegmentID] == nil {
+               p.SourceCompletedMeasureParts[group][sourceSegmentID] = 
make(map[common.ShardID]map[uint64]bool)
+       }
+       if p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID] 
== nil {
+               
p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID] = 
make(map[uint64]bool)
+       }
+       if 
!p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID][partID] {
+               
p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID][partID] = 
true
+               p.MeasurePartProgress[group]++
+       }
+}
+
+// MarkSourceMeasureSeriesCompleted records that one source measure series 
file finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceMeasureSeriesCompleted(group string, 
sourceSegmentID string, sourceShardID common.ShardID) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedMeasureSeries[group] == nil {
+               p.SourceCompletedMeasureSeries[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.SourceCompletedMeasureSeries[group][sourceSegmentID] == nil {
+               p.SourceCompletedMeasureSeries[group][sourceSegmentID] = 
make(map[common.ShardID]bool)
+       }
+       if 
!p.SourceCompletedMeasureSeries[group][sourceSegmentID][sourceShardID] {
+               
p.SourceCompletedMeasureSeries[group][sourceSegmentID][sourceShardID] = true
+               p.MeasureSeriesProgress[group]++
+       }
+}
+
+// MarkSourceTraceShardCompleted records that one source trace shard finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceTraceShardCompleted(group string, sourceSegmentID 
string, sourceShardID common.ShardID) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedTraceShards[group] == nil {
+               p.SourceCompletedTraceShards[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.SourceCompletedTraceShards[group][sourceSegmentID] == nil {
+               p.SourceCompletedTraceShards[group][sourceSegmentID] = 
make(map[common.ShardID]bool)
+       }
+       if !p.SourceCompletedTraceShards[group][sourceSegmentID][sourceShardID] 
{
+               
p.SourceCompletedTraceShards[group][sourceSegmentID][sourceShardID] = true
+               p.TraceShardProgress[group]++
+       }
+}
+
+// MarkSourceTraceSeriesCompleted records that one source trace series file 
finished
+// every target write successfully; idempotent.
+func (p *Progress) MarkSourceTraceSeriesCompleted(group string, 
sourceSegmentID string, sourceShardID common.ShardID) {
+       defer p.saveProgress()
+       p.mu.Lock()
+       defer p.mu.Unlock()
+       if p.SourceCompletedTraceSeries[group] == nil {
+               p.SourceCompletedTraceSeries[group] = 
make(map[string]map[common.ShardID]bool)
+       }
+       if p.SourceCompletedTraceSeries[group][sourceSegmentID] == nil {
+               p.SourceCompletedTraceSeries[group][sourceSegmentID] = 
make(map[common.ShardID]bool)
+       }
+       if !p.SourceCompletedTraceSeries[group][sourceSegmentID][sourceShardID] 
{
+               
p.SourceCompletedTraceSeries[group][sourceSegmentID][sourceShardID] = true
+               p.TraceSeriesProgress[group]++
+       }
+}
diff --git a/banyand/backup/lifecycle/progress_mark_test.go 
b/banyand/backup/lifecycle/progress_mark_test.go
new file mode 100644
index 000000000..367330d70
--- /dev/null
+++ b/banyand/backup/lifecycle/progress_mark_test.go
@@ -0,0 +1,192 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// TestMarkPerTargetDoesNotMoveCounters pins the contract that per-target
+// Mark*Completed / Mark*Error only maintain the per-target dedup / error
+// maps and do NOT advance Counts or Progress. Counts is set by pre-walk
+// (Set*Count) and Progress is advanced exclusively by MarkSource*Completed.
+func TestMarkPerTargetDoesNotMoveCounters(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+
+       // Stream part: per-target mark several times — counters must stay 0.
+       p.MarkStreamPartCompleted("g", "tgt-seg-1", 0, 1)
+       p.MarkStreamPartCompleted("g", "tgt-seg-2", 0, 1)
+       p.MarkStreamPartError("g", "tgt-seg-3", 0, 1, "err")
+       assert.Equal(t, 0, p.StreamPartCounts["g"])
+       assert.Equal(t, 0, p.StreamPartProgress["g"])
+
+       // Per-target dedup map populated.
+       assert.True(t, p.IsStreamPartCompleted("g", "tgt-seg-1", 0, 1))
+       assert.True(t, p.IsStreamPartCompleted("g", "tgt-seg-2", 0, 1))
+
+       // Same for series / element_index / measure / trace.
+       p.MarkStreamSeriesCompleted("g", "tgt-seg-1", 0)
+       p.MarkStreamElementIndexCompleted("g", "tgt-seg-1", 0)
+       p.MarkMeasurePartCompleted("g", "tgt-seg-1", 0, 1)
+       p.MarkMeasureSeriesCompleted("g", "tgt-seg-1", 0)
+       p.MarkTraceShardCompleted("g", "tgt-seg-1", 0)
+       p.MarkTraceSeriesCompleted("g", "tgt-seg-1", 0)
+       for _, counter := range []int{
+               p.StreamSeriesCounts["g"], p.StreamSeriesProgress["g"],
+               p.StreamElementIndexCounts["g"], 
p.StreamElementIndexProgress["g"],
+               p.MeasurePartCounts["g"], p.MeasurePartProgress["g"],
+               p.MeasureSeriesCounts["g"], p.MeasureSeriesProgress["g"],
+               p.TraceShardCounts["g"], p.TraceShardProgress["g"],
+               p.TraceSeriesCounts["g"], p.TraceSeriesProgress["g"],
+       } {
+               assert.Equal(t, 0, counter, "per-target marks must not move 
counters")
+       }
+}
+
+// TestSetCountIsThePlannedDenominator pins that Set*Count is the only writer
+// of the per-resource Counts denominator (set by pre-walk).
+func TestSetCountIsThePlannedDenominator(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+
+       p.SetStreamPartCount("g", 100)
+       p.SetStreamSeriesCount("g", 50)
+       p.SetStreamElementIndexCount("g", 10)
+       p.SetMeasurePartCount("g", 70)
+       p.SetMeasureSeriesCount("g", 35)
+       p.SetTraceShardCount("g", 5)
+       p.SetTraceSeriesCount("g", 8)
+
+       assert.Equal(t, 100, p.StreamPartCounts["g"])
+       assert.Equal(t, 50, p.StreamSeriesCounts["g"])
+       assert.Equal(t, 10, p.StreamElementIndexCounts["g"])
+       assert.Equal(t, 70, p.MeasurePartCounts["g"])
+       assert.Equal(t, 35, p.MeasureSeriesCounts["g"])
+       assert.Equal(t, 5, p.TraceShardCounts["g"])
+       assert.Equal(t, 8, p.TraceSeriesCounts["g"])
+
+       // Set is overwrite semantics — last call wins.
+       p.SetStreamPartCount("g", 200)
+       assert.Equal(t, 200, p.StreamPartCounts["g"])
+}
+
+// TestMarkSourceCompletedIdempotent pins that MarkSource*Completed advances
+// Progress exactly once per unique source key across all 7 resources.
+func TestMarkSourceCompletedIdempotent(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+
+       // Stream part: same key 3 times → Progress=1.
+       p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1)
+       p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1)
+       p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1)
+       assert.Equal(t, 1, p.StreamPartProgress["g"])
+       assert.True(t, p.SourceCompletedStreamParts["g"]["src-seg"][0][1])
+       // Different partID = different source = +1.
+       p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 2)
+       assert.Equal(t, 2, p.StreamPartProgress["g"])
+
+       // Stream series, element_index, measure, trace: same pattern, +1 once.
+       p.MarkSourceStreamSeriesCompleted("g", "src-seg", 0)
+       p.MarkSourceStreamSeriesCompleted("g", "src-seg", 0)
+       assert.Equal(t, 1, p.StreamSeriesProgress["g"])
+
+       p.MarkSourceStreamElementIndexCompleted("g", "src-seg", 0)
+       p.MarkSourceStreamElementIndexCompleted("g", "src-seg", 0)
+       assert.Equal(t, 1, p.StreamElementIndexProgress["g"])
+
+       p.MarkSourceMeasurePartCompleted("g", "src-seg", 0, 1)
+       p.MarkSourceMeasurePartCompleted("g", "src-seg", 0, 1)
+       assert.Equal(t, 1, p.MeasurePartProgress["g"])
+
+       p.MarkSourceMeasureSeriesCompleted("g", "src-seg", 0)
+       p.MarkSourceMeasureSeriesCompleted("g", "src-seg", 0)
+       assert.Equal(t, 1, p.MeasureSeriesProgress["g"])
+
+       p.MarkSourceTraceShardCompleted("g", "src-seg", 0)
+       p.MarkSourceTraceShardCompleted("g", "src-seg", 0)
+       assert.Equal(t, 1, p.TraceShardProgress["g"])
+
+       p.MarkSourceTraceSeriesCompleted("g", "src-seg", 0)
+       p.MarkSourceTraceSeriesCompleted("g", "src-seg", 0)
+       assert.Equal(t, 1, p.TraceSeriesProgress["g"])
+}
+
+// TestClearErrors_AllSevenBuckets pins that ClearErrors resets every error
+// map (including trace) and does NOT touch Counts/Progress (which now
+// derive from pre-walk + MarkSource and are immutable to error replay).
+func TestClearErrors_AllSevenBuckets(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+
+       // Set planned counts (pre-walk).
+       p.SetStreamPartCount("g", 10)
+       p.SetStreamSeriesCount("g", 10)
+       p.SetStreamElementIndexCount("g", 10)
+       p.SetMeasurePartCount("g", 10)
+       p.SetMeasureSeriesCount("g", 10)
+       p.SetTraceShardCount("g", 10)
+       p.SetTraceSeriesCount("g", 10)
+
+       // Drop one error per bucket.
+       p.MarkStreamPartError("g", "s", 0, 1, "x")
+       p.MarkStreamSeriesError("g", "s", 0, "x")
+       p.MarkStreamElementIndexError("g", "s", 0, "x")
+       p.MarkMeasurePartError("g", "s", 0, 1, "x")
+       p.MarkMeasureSeriesError("g", "s", 0, "x")
+       p.MarkTraceShardError("g", "s", 0, "x")
+       p.MarkTraceSeriesError("g", "s", 0, "x")
+
+       p.ClearErrors()
+
+       assert.Empty(t, p.StreamPartErrors["g"])
+       assert.Empty(t, p.StreamSeriesErrors["g"])
+       assert.Empty(t, p.StreamElementIndexErrors["g"])
+       assert.Empty(t, p.MeasurePartErrors["g"])
+       assert.Empty(t, p.MeasureSeriesErrors["g"])
+       assert.Empty(t, p.TraceShardErrors["g"])
+       assert.Empty(t, p.TraceSeriesErrors["g"])
+
+       // Counts are pre-walk planned — must survive ClearErrors.
+       for _, c := range []int{
+               p.StreamPartCounts["g"], p.StreamSeriesCounts["g"], 
p.StreamElementIndexCounts["g"],
+               p.MeasurePartCounts["g"], p.MeasureSeriesCounts["g"],
+               p.TraceShardCounts["g"], p.TraceSeriesCounts["g"],
+       } {
+               assert.Equal(t, 10, c, "Counts must not be touched by 
ClearErrors")
+       }
+}
+
+// TestPerTargetAndPerSourceAreIndependent pins that per-target dedup and
+// per-source progress are tracked independently: per-target Mark does not
+// move source progress, and MarkSource does not populate the per-target map.
+func TestPerTargetAndPerSourceAreIndependent(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+
+       p.MarkStreamPartCompleted("g", "tgt-1", 0, 1)
+       assert.True(t, p.IsStreamPartCompleted("g", "tgt-1", 0, 1))
+       assert.False(t, p.SourceCompletedStreamParts["g"]["src-1"][0][1])
+       assert.Equal(t, 0, p.StreamPartProgress["g"])
+
+       p.MarkSourceStreamPartCompleted("g", "src-1", 0, 1)
+       assert.True(t, p.SourceCompletedStreamParts["g"]["src-1"][0][1])
+       // Per-target map untouched by source mark.
+       assert.False(t, p.IsStreamPartCompleted("g", "src-1", 0, 1))
+       assert.Equal(t, 1, p.StreamPartProgress["g"])
+}
diff --git a/banyand/backup/lifecycle/report_partial_failure_test.go 
b/banyand/backup/lifecycle/report_partial_failure_test.go
new file mode 100644
index 000000000..1fd3a9683
--- /dev/null
+++ b/banyand/backup/lifecycle/report_partial_failure_test.go
@@ -0,0 +1,255 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+// TestBuildMigrationReport_PartialFailure pins down the JSON shape and
+// numeric values that buildMigrationReport must produce under a partial
+// migration: one stream group has a failed part write while every other
+// group and every other resource within the failing group completes. The
+// shapes and counts are source-level — Counts comes from the pre-walk
+// (Set*Count), Progress is bumped per source by MarkSource*Completed, and
+// completion_rate = Progress / Counts is always in [0, 100] with 100 %
+// meaning every source item migrated.
+func TestBuildMigrationReport_PartialFailure(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+       p.SetGroupsToProcess([]string{"sw_a", "sw_b", "sw_metrics", "sw_trace"})
+
+       p.SnapshotStreamDir = "/tmp/stream/snapshots/example"
+       p.SnapshotMeasureDir = "/tmp/measure/snapshots/example"
+       p.SnapshotTraceDir = "/tmp/trace/snapshots/example"
+
+       // === sw_a (stream): full success on 2 source parts, 2 source series 
files, 1 source element_index visit. ===
+       p.SetStreamPartCount("sw_a", 2)
+       p.MarkSourceStreamPartCompleted("sw_a", "20260513", 0, 1)
+       p.MarkSourceStreamPartCompleted("sw_a", "20260513", 0, 2)
+       p.SetStreamSeriesCount("sw_a", 2)
+       p.MarkSourceStreamSeriesCompleted("sw_a", "20260513", 100)
+       p.MarkSourceStreamSeriesCompleted("sw_a", "20260513", 101)
+       p.SetStreamElementIndexCount("sw_a", 1)
+       p.MarkSourceStreamElementIndexCompleted("sw_a", "20260513", 0)
+       p.MarkGroupCompleted("sw_a")
+       p.MarkStreamGroupDeleted("sw_a")
+
+       // === sw_b (stream): 4 source parts planned, 3 complete + 1 fails 
(partID 7 on shard 1). ===
+       // series + element_index all complete.
+       p.SetStreamPartCount("sw_b", 4)
+       p.MarkSourceStreamPartCompleted("sw_b", "20260512", 0, 1)
+       p.MarkSourceStreamPartCompleted("sw_b", "20260512", 0, 2)
+       p.MarkSourceStreamPartCompleted("sw_b", "20260512", 1, 1)
+       // partID 7 fails on shard 1 — record the per-target error, source not 
advanced.
+       p.MarkStreamPartError("sw_b", "20260512", 1, 7, "failed to stream part 
to target node-cold-0: connection refused")
+       p.SetStreamSeriesCount("sw_b", 2)
+       p.MarkSourceStreamSeriesCompleted("sw_b", "20260512", 0)
+       p.MarkSourceStreamSeriesCompleted("sw_b", "20260512", 1)
+       p.SetStreamElementIndexCount("sw_b", 2)
+       p.MarkSourceStreamElementIndexCompleted("sw_b", "20260512", 0)
+       p.MarkSourceStreamElementIndexCompleted("sw_b", "20260512", 1)
+       // sw_b is NOT marked completed (processStreamGroup early-returns on 
the failure).
+
+       // === sw_metrics (measure): full success. ===
+       p.SetMeasurePartCount("sw_metrics", 3)
+       p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 1)
+       p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 2)
+       p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 3)
+       p.SetMeasureSeriesCount("sw_metrics", 1)
+       p.MarkSourceMeasureSeriesCompleted("sw_metrics", "20260513", 100)
+       p.MarkGroupCompleted("sw_metrics")
+       p.MarkMeasureGroupDeleted("sw_metrics")
+
+       // === sw_trace: full success. ===
+       p.SetTraceShardCount("sw_trace", 1)
+       p.MarkSourceTraceShardCompleted("sw_trace", "20260513", 0)
+       p.SetTraceSeriesCount("sw_trace", 1)
+       p.MarkSourceTraceSeriesCompleted("sw_trace", "20260513", 100)
+       p.MarkGroupCompleted("sw_trace")
+       p.MarkTraceGroupDeleted("sw_trace")
+
+       // --- Build report ---
+       svc := &lifecycleService{l: logger.GetLogger("test")}
+       report := svc.buildMigrationReport(p)
+
+       require.Equal(t, "2.0", report["report_version"])
+       summary, ok := report["summary"].(map[string]interface{})
+       require.True(t, ok)
+       errs, ok := report["errors"].(map[string]interface{})
+       require.True(t, ok)
+
+       // migration_status: 4 scheduled, 3 completed (sw_a, sw_metrics, 
sw_trace), 75 %.
+       ms, ok := summary["migration_status"].(map[string]interface{})
+       require.True(t, ok)
+       assert.Equal(t, 4, ms["total_groups"])
+       assert.Equal(t, 3, ms["completed_groups"])
+       assert.InDelta(t, 75.0, ms["completion_rate"], 1e-9)
+
+       // stream parts: 2+4=6 planned, 2+3=5 source-completed.
+       assertResource(t, summary, "stream_migration", "parts", 6, 5, 
83.33333333333334, 1)
+       assertResource(t, summary, "stream_migration", "series", 4, 4, 100.0, 0)
+       assertResource(t, summary, "stream_migration", "element_index", 3, 3, 
100.0, 0)
+
+       // measure
+       assertResource(t, summary, "measure_migration", "parts", 3, 3, 100.0, 0)
+       assertResource(t, summary, "measure_migration", "series", 1, 1, 100.0, 
0)
+
+       // trace
+       assertResource(t, summary, "trace_migration", "parts", 1, 1, 100.0, 0)
+       assertResource(t, summary, "trace_migration", "series", 1, 1, 100.0, 0)
+
+       // errors: stream_parts has one entry for sw_b; everything else empty.
+       for _, key := range []string{
+               "stream_parts", "stream_series", "stream_element_index",
+               "measure_parts", "measure_series",
+               "trace_parts", "trace_series",
+       } {
+               v, found := errs[key]
+               require.Truef(t, found, "errors.%s must be present", key)
+               _, isMap := v.(map[string]interface{})
+               require.Truef(t, isMap, "errors.%s must be 
map[string]interface{}", key)
+       }
+       streamPartErrs := errs["stream_parts"].(map[string]interface{})
+       require.Lenf(t, streamPartErrs, 1, "errors.stream_parts must hold one 
group entry, got %v", streamPartErrs)
+       require.Contains(t, streamPartErrs, "sw_b")
+
+       for _, key := range []string{
+               "stream_series", "stream_element_index",
+               "measure_parts", "measure_series",
+               "trace_parts", "trace_series",
+       } {
+               v := errs[key].(map[string]interface{})
+               assert.Emptyf(t, v, "errors.%s must be empty for this 
scenario", key)
+       }
+}
+
+// TestBuildMigrationReport_CompletedScopedToScheduledSet pins the
+// resume-safety invariant: CompletedGroups carrying entries from a prior
+// cycle that are no longer in the current GroupsToProcess set must NOT
+// inflate completed_groups beyond total_groups.
+func TestBuildMigrationReport_CompletedScopedToScheduledSet(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+       p.MarkGroupCompleted("sw_a")
+       p.MarkGroupCompleted("sw_b")
+       p.MarkGroupCompleted("sw_c")
+       p.MarkGroupCompleted("sw_d")
+       p.MarkGroupCompleted("sw_e")
+       p.SetGroupsToProcess([]string{"sw_f", "sw_g", "sw_h"})
+       p.MarkGroupCompleted("sw_f")
+       p.MarkGroupCompleted("sw_g")
+       p.MarkGroupCompleted("sw_h")
+
+       svc := &lifecycleService{l: logger.GetLogger("test")}
+       report := svc.buildMigrationReport(p)
+       summary := report["summary"].(map[string]interface{})
+       ms := summary["migration_status"].(map[string]interface{})
+
+       assert.Equal(t, 3, ms["total_groups"])
+       assert.Equal(t, 3, ms["completed_groups"])
+       assert.InDelta(t, 100.0, ms["completion_rate"], 1e-9)
+}
+
+// TestBuildMigrationReport_EmptyCycleHonestTotalGroups pins that an
+// empty-snapshot cycle reports total_groups=0 / rate=0 instead of inheriting
+// a stale prior-cycle GroupsToProcess.
+func TestBuildMigrationReport_EmptyCycleHonestTotalGroups(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+       p.MarkGroupCompleted("sw_a")
+       p.MarkGroupCompleted("sw_b")
+       p.SetGroupsToProcess(nil)
+
+       svc := &lifecycleService{l: logger.GetLogger("test")}
+       report := svc.buildMigrationReport(p)
+       summary := report["summary"].(map[string]interface{})
+       ms := summary["migration_status"].(map[string]interface{})
+
+       assert.Equal(t, 0, ms["total_groups"])
+       assert.Equal(t, 0, ms["completed_groups"])
+       assert.InDelta(t, 0.0, ms["completion_rate"], 1e-9)
+}
+
+// assertResource pins down a single resource sub-block under a catalog
+// (stream | measure | trace)_migration. total = pre-walk source count;
+// completed = source items fully migrated; errors = per-target error count;
+// rate = completed / total ∈ [0, 100].
+func assertResource(t *testing.T, summary map[string]interface{}, catalog, 
resource string, total, completed int, rate float64, errors int) {
+       t.Helper()
+       cat, ok := summary[catalog].(map[string]interface{})
+       require.Truef(t, ok, "summary.%s must be a map", catalog)
+       res, ok := cat[resource].(map[string]interface{})
+       require.Truef(t, ok, "summary.%s.%s must be a map", catalog, resource)
+       assert.Equalf(t, total, res["total"], "summary.%s.%s.total", catalog, 
resource)
+       assert.Equalf(t, completed, res["completed"], 
"summary.%s.%s.completed", catalog, resource)
+       assert.Equalf(t, errors, res["errors"], "summary.%s.%s.errors", 
catalog, resource)
+       assert.InDeltaf(t, rate, res["completion_rate"], 1e-9, 
"summary.%s.%s.completion_rate", catalog, resource)
+}
+
+// TestCompletionRateNeverExceeds100UnderFanOut replays the original 266 %
+// regression scenario: per-target Mark*Completed is invoked once per
+// (source × target_segment × target_shard) tuple while MarkSource*Completed
+// is invoked once per source. Under Option 3 the per-target Marks must NOT
+// move counters and MarkSource bumps Progress exactly once per source, so
+// completion_rate = Progress / pre-walk-Counts stays in [0, 100] no matter
+// how large the fan-out is. This is the regression test that pins the 266 %
+// bug as fixed.
+func TestCompletionRateNeverExceeds100UnderFanOut(t *testing.T) {
+       p := NewProgress("", logger.GetLogger("test"))
+       p.SetGroupsToProcess([]string{"sw_fanout"})
+
+       // 4 source series files × 2 target segments × 3 target shards = 24
+       // per-target Mark calls. Pre-walk says 4 source items.
+       const sourceCount = 4
+       p.SetStreamSeriesCount("sw_fanout", sourceCount)
+       targetSegs := []string{"tgt-seg-0", "tgt-seg-1"}
+
+       for src := 0; src < sourceCount; src++ {
+               sourceShard := common.ShardID(src)
+               // Fan-out: emit per-target Marks for every (target_segment, 
target_shard).
+               for _, tgtSeg := range targetSegs {
+                       for tgtShard := common.ShardID(0); tgtShard < 3; 
tgtShard++ {
+                               p.MarkStreamSeriesCompleted("sw_fanout", 
tgtSeg, tgtShard)
+                       }
+               }
+               // Per-source: advance Progress exactly once per source file.
+               p.MarkSourceStreamSeriesCompleted("sw_fanout", "src-segment", 
sourceShard)
+       }
+       p.MarkGroupCompleted("sw_fanout")
+
+       svc := &lifecycleService{l: logger.GetLogger("test")}
+       report := svc.buildMigrationReport(p)
+       summary := report["summary"].(map[string]interface{})
+
+       // Counts comes from pre-walk Set*Count, not from per-target Marks.
+       // Progress is one bump per source MarkSource, not one per fan-out Mark.
+       // rate = 4/4 = 100 % (NOT 24/4 = 600 %).
+       streamMigration := summary["stream_migration"].(map[string]interface{})
+       series := streamMigration["series"].(map[string]interface{})
+       assert.Equal(t, sourceCount, series["total"], "Counts must come from 
Set*Count, not from per-target Marks")
+       assert.Equal(t, sourceCount, series["completed"], "Progress must be one 
bump per source, not per fan-out target")
+
+       rate := series["completion_rate"].(float64)
+       assert.GreaterOrEqualf(t, rate, 0.0, "completion_rate must be >= 0, got 
%v", rate)
+       assert.LessOrEqualf(t, rate, 100.0, "completion_rate must stay <= 100%% 
under fan-out (266%% regression), got %v", rate)
+       assert.InDelta(t, 100.0, rate, 1e-9)
+}
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index 85a78bf01..61eb8eae6 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -429,6 +429,10 @@ func (l *lifecycleService) action() error {
        }
        if streamDir == "" && measureDir == "" && traceDir == "" {
                l.l.Warn().Msg("no snapshots found, skipping lifecycle 
migration")
+               // Clear any GroupsToProcess persisted from a prior cycle so the
+               // emitted report honestly reports total_groups=0 for this empty
+               // cycle instead of inheriting a stale denominator.
+               progress.SetGroupsToProcess(nil)
                l.generateReport(progress)
                return nil
        }
@@ -437,6 +441,10 @@ func (l *lifecycleService) action() error {
                Str("measure_snapshot", measureDir).
                Str("trace_snapshot", traceDir).
                Msg("created snapshots")
+       // Record the scheduled group set only after snapshots are confirmed so
+       // the report distinguishes "no work this cycle" (total_groups=0) from a
+       // real cycle that processed N groups (total_groups=N).
+       progress.SetGroupsToProcess(getGroupNames(groups))
        progress.Save(l.progressFilePath, l.l)
 
        nodes, err := l.metadata.NodeRegistry().ListNode(ctx, 
databasev1.Role_ROLE_DATA)
@@ -446,7 +454,6 @@ func (l *lifecycleService) action() error {
        }
        labels := common.ParseNodeFlags()
 
-       allGroupsCompleted := true
        for _, g := range groups {
                switch g.Catalog {
                case commonv1.Catalog_CATALOG_STREAM:
@@ -478,13 +485,18 @@ func (l *lifecycleService) action() error {
 
        // Only remove progress file if ALL groups are fully completed
        notCompleteGroups := progress.AllGroupsNotFullyCompleted(groups)
-       if allGroupsCompleted && len(notCompleteGroups) == 0 {
+       if len(notCompleteGroups) == 0 {
                progress.Remove(l.progressFilePath, l.l)
                l.l.Info().Msg("lifecycle migration completed successfully")
                l.generateReport(progress)
                return nil
        }
+       // Partial-failure path: also emit a report so operators can inspect
+       // errors.* and per-resource completion_rate without parsing raw
+       // progress.json. The report distinguishes itself from a clean cycle
+       // by having errors.* non-empty and completion_rate < 100.
        l.l.Info().Msg("lifecycle migration partially completed, progress file 
retained")
+       l.generateReport(progress)
        return fmt.Errorf("lifecycle migration partially completed, progress 
file retained; %v groups not fully completed", notCompleteGroups)
 }
 
@@ -531,6 +543,7 @@ func (l *lifecycleService) buildMigrationReport(p 
*Progress) map[string]interfac
                "snapshot_info": map[string]interface{}{
                        "stream_dir":  p.SnapshotStreamDir,
                        "measure_dir": p.SnapshotMeasureDir,
+                       "trace_dir":   p.SnapshotTraceDir,
                },
        }
 
@@ -539,8 +552,20 @@ func (l *lifecycleService) buildMigrationReport(p 
*Progress) map[string]interfac
 
 // buildSummaryStats creates overall migration statistics.
 func (l *lifecycleService) buildSummaryStats(p *Progress) 
map[string]interface{} {
-       totalGroups := len(p.CompletedGroups) + len(p.DeletedStreamGroups) + 
len(p.DeletedMeasureGroups)
-       completedGroups := len(p.CompletedGroups)
+       // total_groups reflects the cycle's scheduled set captured at the top 
of
+       // action(); completed_groups counts groups that ran the full
+       // processXxxGroup → MarkGroupCompleted path. The intersection guards
+       // against resume from a partial-failure progress.json where
+       // CompletedGroups carries entries from prior cycles that are not in
+       // the current scheduled set, which would otherwise yield
+       // completed_groups > total_groups.
+       totalGroups := len(p.GroupsToProcess)
+       completedGroups := 0
+       for _, name := range p.GroupsToProcess {
+               if p.CompletedGroups[name] {
+                       completedGroups++
+               }
+       }
 
        // Calculate total parts and series across all groups
        totalStreamParts, completedStreamParts := 
l.calculateTotalCounts(p.StreamPartCounts, p.StreamPartProgress)
@@ -548,6 +573,8 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) 
map[string]interface{}
        totalStreamElementIndex, completedStreamElementIndex := 
l.calculateTotalCounts(p.StreamElementIndexCounts, p.StreamElementIndexProgress)
        totalMeasureParts, completedMeasureParts := 
l.calculateTotalCounts(p.MeasurePartCounts, p.MeasurePartProgress)
        totalMeasureSeries, completedMeasureSeries := 
l.calculateTotalCounts(p.MeasureSeriesCounts, p.MeasureSeriesProgress)
+       totalTraceShards, completedTraceShards := 
l.calculateTotalCounts(p.TraceShardCounts, p.TraceShardProgress)
+       totalTraceSeries, completedTraceSeries := 
l.calculateTotalCounts(p.TraceSeriesCounts, p.TraceSeriesProgress)
 
        // Calculate error counts
        streamPartErrors := l.countErrors(p.StreamPartErrors)
@@ -555,6 +582,8 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) 
map[string]interface{}
        streamElementIndexErrors := l.countErrors(p.StreamElementIndexErrors)
        measurePartErrors := l.countErrors(p.MeasurePartErrors)
        measureSeriesErrors := l.countErrors(p.MeasureSeriesErrors)
+       traceShardErrors := l.countErrors(p.TraceShardErrors)
+       traceSeriesErrors := l.countErrors(p.TraceSeriesErrors)
 
        return map[string]interface{}{
                "migration_status": map[string]interface{}{
@@ -596,20 +625,37 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) 
map[string]interface{}
                                "completion_rate": 
l.calculatePercentage(completedMeasureSeries, totalMeasureSeries),
                        },
                },
+               // Field names parts/series mirror stream_migration / 
measure_migration.
+               // "parts" is fed by the per-shard TraceShard counters because 
trace
+               // migration is shard-batched (sidx + core parts streamed 
together).
+               "trace_migration": map[string]interface{}{
+                       "parts": map[string]interface{}{
+                               "total":           totalTraceShards,
+                               "completed":       completedTraceShards,
+                               "errors":          traceShardErrors,
+                               "completion_rate": 
l.calculatePercentage(completedTraceShards, totalTraceShards),
+                       },
+                       "series": map[string]interface{}{
+                               "total":           totalTraceSeries,
+                               "completed":       completedTraceSeries,
+                               "errors":          traceSeriesErrors,
+                               "completion_rate": 
l.calculatePercentage(completedTraceSeries, totalTraceSeries),
+                       },
+               },
        }
 }
 
 // buildErrorSummary creates detailed error information.
 func (l *lifecycleService) buildErrorSummary(p *Progress) 
map[string]interface{} {
-       errors := map[string]interface{}{
+       return map[string]interface{}{
                "stream_parts":         l.buildErrorDetails(p.StreamPartErrors),
                "stream_series":        
l.buildErrorDetails(p.StreamSeriesErrors),
                "stream_element_index": 
l.buildErrorDetails(p.StreamElementIndexErrors),
                "measure_parts":        
l.buildErrorDetails(p.MeasurePartErrors),
                "measure_series":       
l.buildErrorDetails(p.MeasureSeriesErrors),
+               "trace_parts":          l.buildErrorDetails(p.TraceShardErrors),
+               "trace_series":         
l.buildErrorDetails(p.TraceSeriesErrors),
        }
-
-       return errors
 }
 
 // Helper functions.
@@ -684,7 +730,14 @@ func (l *lifecycleService) buildErrorDetails(errorMaps 
interface{}) map[string]i
                                segmentDetails := make(map[string]interface{})
                                for shardID, parts := range shards {
                                        if len(parts) > 0 {
-                                               
segmentDetails[fmt.Sprintf("shard_%d", shardID)] = parts
+                                               // Copy the inner map so the 
report does not alias
+                                               // Progress state; aliasing 
would race with a
+                                               // concurrent Mark*Error during 
JSON marshaling.
+                                               partDetails := 
make(map[uint64]string, len(parts))
+                                               for partID, errorMsg := range 
parts {
+                                                       partDetails[partID] = 
errorMsg
+                                               }
+                                               
segmentDetails[fmt.Sprintf("shard_%d", shardID)] = partDetails
                                        }
                                }
                                if len(segmentDetails) > 0 {
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go 
b/banyand/backup/lifecycle/stream_migration_visitor.go
index 71b03e5fd..42ca53405 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -102,9 +102,6 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, se
                Str("path", seriesIndexPath).
                Msg("found segment files for migration")
 
-       // Set the total number of series segments for progress tracking
-       mv.SetStreamSeriesCount(len(segmentFiles))
-
        // Calculate ALL target segments this series index should go to
        targetSegments := calculateTargetSegments(
                segmentTR.Start.UnixNano(),
@@ -220,6 +217,15 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, se
                }
        }
 
+       // Mark each source series file as fully migrated (idempotent — bumps 
Progress once per source).
+       for _, segmentFileName := range segmentFiles {
+               fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg")
+               segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 
64)
+               if parseErr != nil {
+                       continue
+               }
+               mv.progress.MarkSourceStreamSeriesCompleted(mv.group, 
seriesIndexPath, common.ShardID(segmentID))
+       }
        return nil
 }
 
@@ -233,7 +239,6 @@ func (mv *streamMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShardI
        if err != nil {
                return fmt.Errorf("failed to parse part ID from path: %w", err)
        }
-
        // Calculate ALL target segments this part should go to
        targetSegments := calculateTargetSegments(
                partData.MinTimestamp,
@@ -298,6 +303,7 @@ func (mv *streamMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShardI
                        Msgf("part migration completed for target segment 
%d/%d", i+1, len(targetSegments))
        }
 
+       mv.progress.MarkSourceStreamPartCompleted(mv.group, partPath, 
sourceShardID, partID)
        return nil
 }
 
@@ -309,6 +315,9 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                        Str("group", mv.group).
                        Uint32("source_shard", uint32(sourceShardID)).
                        Msg("element index segment already completed, skipping")
+               // Per-target write done; ensure source progress is recorded to 
survive
+               // a crash between MarkStreamElementIndexCompleted and 
MarkSource (idempotent).
+               mv.progress.MarkSourceStreamElementIndexCompleted(mv.group, 
indexPath, sourceShardID)
                return nil
        }
 
@@ -337,9 +346,6 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                return nil
        }
 
-       // Set the total number of element index segment files for progress 
tracking
-       mv.SetStreamElementIndexCount(len(segmentFiles))
-
        // Calculate target shard ID (using a simple approach for element index)
        targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
        mv.logger.Info().
@@ -411,6 +417,7 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
 
        // Mark segment as completed
        mv.progress.MarkStreamElementIndexCompleted(mv.group, segmentIDStr, 
sourceShardID)
+       mv.progress.MarkSourceStreamElementIndexCompleted(mv.group, indexPath, 
sourceShardID)
 
        return nil
 }
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go 
b/banyand/backup/lifecycle/trace_migration_visitor.go
index 2936d7c14..10d4a2d00 100644
--- a/banyand/backup/lifecycle/trace_migration_visitor.go
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -103,9 +103,6 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, ser
                Str("path", seriesIndexPath).
                Msg("found trace segment files for migration")
 
-       // Set the total number of series segments for progress tracking
-       mv.SetTraceSeriesCount(len(segmentFiles))
-
        // Calculate ALL target segments this series index should go to
        targetSegments := calculateTargetSegments(
                segmentTR.Start.UnixNano(),
@@ -221,18 +218,33 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, ser
                }
        }
 
+       // Mark each source series file as fully migrated (idempotent — bumps 
Progress once per source).
+       for _, segmentFileName := range segmentFiles {
+               fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg")
+               segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 
64)
+               if parseErr != nil {
+                       continue
+               }
+               mv.progress.MarkSourceTraceSeriesCompleted(mv.group, 
seriesIndexPath, common.ShardID(segmentID))
+       }
        return nil
 }
 
 // VisitShard implements trace.Visitor - core and sidx migration logic.
 func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange, 
sourceShardID common.ShardID, shardPath string) error {
        segmentIDStr := timestampTR.String()
-       if mv.progress.IsTraceShardCompleted(mv.group, shardPath, 
sourceShardID) {
+       // Match the key used by Mark below; using shardPath here would always
+       // miss on resume and re-run the migration even though the shard had
+       // completed, wasting bandwidth and writing duplicates to the target.
+       if mv.progress.IsTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID) {
                mv.logger.Debug().
                        Str("shard_path", shardPath).
                        Str("group", mv.group).
                        Uint32("source_shard", uint32(sourceShardID)).
                        Msg("trace shard already completed for this target 
segment, skipping")
+               // Per-target write done; ensure source progress is recorded to 
survive
+               // a crash between MarkTraceShardCompleted and MarkSource 
(idempotent).
+               mv.progress.MarkSourceTraceShardCompleted(mv.group, 
segmentIDStr, sourceShardID)
                return nil
        }
        allParts := make([]queue.StreamingPartData, 0)
@@ -277,6 +289,7 @@ func (mv *traceMigrationVisitor) VisitShard(timestampTR 
*timestamp.TimeRange, so
 
        // Mark shard as completed for this target segment
        mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID)
+       mv.progress.MarkSourceTraceShardCompleted(mv.group, segmentIDStr, 
sourceShardID)
        mv.logger.Info().
                Str("group", mv.group).
                Msgf("trace shard migration completed for target segment")
diff --git a/test/cases/lifecycle/lifecycle.go 
b/test/cases/lifecycle/lifecycle.go
index cecc841e1..40c7c682b 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -19,6 +19,7 @@
 package lifecycle_test
 
 import (
+       "encoding/json"
        "io/fs"
        "os"
        "path/filepath"
@@ -64,10 +65,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                verifySourceDirectoriesAfterMigration()
                verifyDestinationDirectoriesAfterMigration()
-               // Check report directory has files
-               rEntries, err := os.ReadDir(rf)
-               gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report 
directory should exist")
-               gomega.Expect(len(rEntries)).To(gomega.BeNumerically(">", 0), 
"Report directory should contain files")
+               verifyMigrationReport(rf)
                conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 
10*time.Second,
                        
grpc.WithTransportCredentials(insecure.NewCredentials()))
                defer func() {
@@ -131,10 +129,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
                gomega.Expect(err).NotTo(gomega.HaveOccurred())
                verifySourceDirectoriesAfterMigration()
                verifyDestinationDirectoriesAfterMigration()
-               // Check report directory has files
-               rEntries, err := os.ReadDir(rf)
-               gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report 
directory should exist")
-               gomega.Expect(len(rEntries)).To(gomega.BeNumerically(">", 0), 
"Report directory should contain files")
+               verifyMigrationReport(rf)
                conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 
10*time.Second,
                        
grpc.WithTransportCredentials(insecure.NewCredentials()))
                defer func() {
@@ -273,3 +268,133 @@ func verifyLockFileAndSegFolder(entries []fs.DirEntry) 
(hasLockFile bool, hasSeg
        }
        return hasLockFile, hasSegFolder
 }
+
+// verifyMigrationReport reads every JSON report file generated under rf
+// and asserts the invariants of the comprehensive migration report:
+//
+//   - migration_status.completion_rate is 100 when the cycle had scheduled
+//     groups; the denominator equals the scheduled group set.
+//   - Every per-resource block (parts / series / element_index) reaches
+//     100 % when the catalog had work to do, or 0 % when the cycle had
+//     nothing to migrate (total == 0).
+//   - The trace_migration block is present with parts + series fields
+//     aligned to stream_migration / measure_migration; snapshot_info
+//     includes trace_dir; errors includes trace_parts / trace_series keys.
+//   - All errors.* maps are empty for a clean cycle.
+func verifyMigrationReport(rf string) {
+       rEntries, err := os.ReadDir(rf)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report directory 
should exist")
+       gomega.Expect(rEntries).NotTo(gomega.BeEmpty(), "Report directory 
should contain files")
+
+       jsonReports := 0
+       for _, entry := range rEntries {
+               if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" {
+                       continue
+               }
+               jsonReports++
+               path := filepath.Join(rf, entry.Name())
+               raw, readErr := os.ReadFile(path)
+               gomega.Expect(readErr).NotTo(gomega.HaveOccurred(), "Report 
file %s should be readable", path)
+
+               var report map[string]interface{}
+               gomega.Expect(json.Unmarshal(raw, 
&report)).To(gomega.Succeed(), "Report file %s should be valid JSON", path)
+
+               gomega.Expect(report).To(gomega.HaveKey("summary"), "report %s 
missing summary", path)
+               gomega.Expect(report).To(gomega.HaveKey("errors"), "report %s 
missing errors", path)
+               gomega.Expect(report).To(gomega.HaveKey("snapshot_info"), 
"report %s missing snapshot_info", path)
+               gomega.Expect(report["report_version"]).To(gomega.Equal("2.0"), 
"report %s has unexpected report_version", path)
+
+               // snapshot_info: trace_dir must surface alongside 
stream/measure.
+               snap, ok := report["snapshot_info"].(map[string]interface{})
+               gomega.Expect(ok).To(gomega.BeTrue(), "snapshot_info must be a 
map in %s", path)
+               gomega.Expect(snap).To(gomega.HaveKey("trace_dir"), 
"snapshot_info must include trace_dir in %s", path)
+
+               summary, ok := report["summary"].(map[string]interface{})
+               gomega.Expect(ok).To(gomega.BeTrue(), "summary must be a map in 
%s", path)
+
+               // migration_status: when total_groups=0 the cycle had no 
scheduled
+               // work (e.g. snapshots were empty); the rate is 0 by 
construction.
+               // Otherwise the report reflects a fully completed cycle, so the
+               // rate must be 100.
+               ms, ok := summary["migration_status"].(map[string]interface{})
+               gomega.Expect(ok).To(gomega.BeTrue(), "summary.migration_status 
must be a map in %s", path)
+               if asInt(ms["total_groups"]) == 0 {
+                       
gomega.Expect(asFloat(ms["completion_rate"])).To(gomega.BeNumerically("~", 0.0, 
1e-9),
+                               "migration_status.completion_rate must be 0 
when total_groups=0 in %s", path)
+               } else {
+                       
gomega.Expect(asFloat(ms["completion_rate"])).To(gomega.BeNumerically("~", 
100.0, 1e-9),
+                               "migration_status.completion_rate should be 100 
in %s", path)
+               }
+
+               // Per-resource invariants.
+               verifyAllRatesAt100(summary, "stream_migration", 
[]string{"parts", "series", "element_index"}, path)
+               verifyAllRatesAt100(summary, "measure_migration", 
[]string{"parts", "series"}, path)
+
+               // trace_migration: block must exist with parts + series.
+               verifyAllRatesAt100(summary, "trace_migration", 
[]string{"parts", "series"}, path)
+
+               // errors must include trace_* keys and stay empty.
+               errs, ok := report["errors"].(map[string]interface{})
+               gomega.Expect(ok).To(gomega.BeTrue(), "errors must be a map in 
%s", path)
+               for _, key := range []string{
+                       "stream_parts", "stream_series", "stream_element_index",
+                       "measure_parts", "measure_series",
+                       "trace_parts", "trace_series",
+               } {
+                       v, found := errs[key]
+                       gomega.Expect(found).To(gomega.BeTrue(), "errors.%s 
must be present in %s", key, path)
+                       errMap, isMap := v.(map[string]interface{})
+                       gomega.Expect(isMap).To(gomega.BeTrue(), "errors.%s 
must be a map in %s", key, path)
+                       gomega.Expect(errMap).To(gomega.BeEmpty(), "errors.%s 
must be empty for a clean cycle in %s", key, path)
+               }
+       }
+       gomega.Expect(jsonReports).To(gomega.BeNumerically(">", 0), "no JSON 
report files found under %s", rf)
+}
+
+// verifyAllRatesAt100 enforces the per-resource accounting invariant: when
+// total == 0 the cycle had no work for that resource and the rate stays at
+// 0; otherwise completed must equal total and the rate must be 100.
+func verifyAllRatesAt100(summary map[string]interface{}, catalog string, 
resources []string, path string) {
+       cat, ok := summary[catalog].(map[string]interface{})
+       gomega.Expect(ok).To(gomega.BeTrue(), "summary.%s must be a map in %s", 
catalog, path)
+       for _, r := range resources {
+               res, isMap := cat[r].(map[string]interface{})
+               gomega.Expect(isMap).To(gomega.BeTrue(), "summary.%s.%s must be 
a map in %s", catalog, r, path)
+               gomega.Expect(res).To(gomega.HaveKey("total"), "summary.%s.%s 
missing total in %s", catalog, r, path)
+               gomega.Expect(res).To(gomega.HaveKey("completed"), 
"summary.%s.%s missing completed in %s", catalog, r, path)
+               gomega.Expect(res).To(gomega.HaveKey("errors"), "summary.%s.%s 
missing errors in %s", catalog, r, path)
+               gomega.Expect(res).To(gomega.HaveKey("completion_rate"), 
"summary.%s.%s missing completion_rate in %s", catalog, r, path)
+
+               total := asInt(res["total"])
+               completed := asInt(res["completed"])
+               errors := asInt(res["errors"])
+               rate := asFloat(res["completion_rate"])
+
+               gomega.Expect(errors).To(gomega.Equal(0), "summary.%s.%s.errors 
must be 0 in %s", catalog, r, path)
+               if total == 0 {
+                       gomega.Expect(rate).To(gomega.BeNumerically("~", 0.0, 
1e-9),
+                               "summary.%s.%s.completion_rate must be 0 when 
total=0 in %s", catalog, r, path)
+                       continue
+               }
+               gomega.Expect(completed).To(gomega.Equal(total),
+                       "summary.%s.%s.completed must equal total in %s", 
catalog, r, path)
+               gomega.Expect(rate).To(gomega.BeNumerically("~", 100.0, 1e-9),
+                       "summary.%s.%s.completion_rate must be 100 in %s", 
catalog, r, path)
+       }
+}
+
+// asFloat coerces a JSON-decoded numeric value to float64.
+func asFloat(v interface{}) float64 {
+       if f, ok := v.(float64); ok {
+               return f
+       }
+       return 0
+}
+
+// asInt coerces a JSON-decoded numeric value to int.
+func asInt(v interface{}) int {
+       if f, ok := v.(float64); ok {
+               return int(f)
+       }
+       return 0
+}

Reply via email to