This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit b9880c5cedd78395e47e1a90201e00a8c4bd192b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Sat Aug 2 17:05:52 2025 +0800

    Enhance progress tracking for migration visitors by incorporating shard IDs
    
    - Updated `measure_migration_visitor.go` and `stream_migration_visitor.go` 
to include `sourceShardID` in progress tracking methods, improving the accuracy 
of completed part checks and error handling.
    - Refactored the `Progress` struct to support nested maps for completed 
parts and errors, allowing for shard-specific tracking during migration 
processes.
---
 .../backup/lifecycle/measure_migration_visitor.go  |  7 ++-
 banyand/backup/lifecycle/progress.go               | 72 +++++++++++++---------
 .../backup/lifecycle/stream_migration_visitor.go   |  7 ++-
 3 files changed, 52 insertions(+), 34 deletions(-)

diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go 
b/banyand/backup/lifecycle/measure_migration_visitor.go
index a56b96bf..1413ec52 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -190,9 +190,10 @@ func (mv *measureMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShard
        }
 
        // Check if this part has already been completed
-       if mv.progress.IsMeasurePartCompleted(mv.group, partID) {
+       if mv.progress.IsMeasurePartCompleted(mv.group, sourceShardID, partID) {
                mv.logger.Warn().
                        Uint64("part_id", partID).
+                       Uint32("source_shard", uint32(sourceShardID)).
                        Str("group", mv.group).
                        Msg("measure part already completed, skipping")
                return nil
@@ -220,12 +221,12 @@ func (mv *measureMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShard
        // Stream entire part to target shard replicas
        if err := mv.streamPartToTargetShard(partData); err != nil {
                errorMsg := fmt.Sprintf("failed to stream measure part to 
target shard: %v", err)
-               mv.progress.MarkMeasurePartError(mv.group, partID, errorMsg)
+               mv.progress.MarkMeasurePartError(mv.group, sourceShardID, 
partID, errorMsg)
                return fmt.Errorf("failed to stream measure part to target 
shard: %w", err)
        }
 
        // Mark part as completed in progress tracker
-       mv.progress.MarkMeasurePartCompleted(mv.group, partID)
+       mv.progress.MarkMeasurePartCompleted(mv.group, sourceShardID, partID)
 
        mv.logger.Info().
                Uint64("part_id", partID).
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index cb60e59d..609f7208 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -33,8 +33,8 @@ type Progress struct {
        CompletedGroups             map[string]bool                      
`json:"completed_groups"`
        DeletedStreamGroups         map[string]bool                      
`json:"deleted_stream_groups"`
        DeletedMeasureGroups        map[string]bool                      
`json:"deleted_measure_groups"`
-       CompletedStreamParts        map[string]map[uint64]bool           
`json:"completed_stream_parts"`
-       StreamPartErrors            map[string]map[uint64]string         
`json:"stream_part_errors"`
+       CompletedStreamParts        
map[string]map[common.ShardID]map[uint64]bool           
`json:"completed_stream_parts"`
+       StreamPartErrors            
map[string]map[common.ShardID]map[uint64]string         
`json:"stream_part_errors"`
        CompletedStreamSeries       map[string]map[common.ShardID]bool   
`json:"completed_stream_series"`
        StreamSeriesErrors          map[string]map[common.ShardID]string 
`json:"stream_series_errors"`
        CompletedStreamElementIndex map[string]bool                      
`json:"completed_stream_element_index"`
@@ -46,8 +46,8 @@ type Progress struct {
        StreamElementIndexCounts    map[string]int                       
`json:"stream_element_index_counts"`
        StreamElementIndexProgress  map[string]int                       
`json:"stream_element_index_progress"`
        // Measure part-specific progress tracking
-       CompletedMeasureParts  map[string]map[uint64]bool           
`json:"completed_measure_parts"`
-       MeasurePartErrors      map[string]map[uint64]string         
`json:"measure_part_errors"`
+       CompletedMeasureParts  map[string]map[common.ShardID]map[uint64]bool    
       `json:"completed_measure_parts"`
+       MeasurePartErrors      map[string]map[common.ShardID]map[uint64]string  
       `json:"measure_part_errors"`
        MeasurePartCounts      map[string]int                       
`json:"measure_part_counts"`
        MeasurePartProgress    map[string]int                       
`json:"measure_part_progress"`
        CompletedMeasureSeries map[string]map[common.ShardID]bool   
`json:"completed_measure_series"`
@@ -79,8 +79,8 @@ func NewProgress(path string, l *logger.Logger) *Progress {
                CompletedGroups:             make(map[string]bool),
                DeletedStreamGroups:         make(map[string]bool),
                DeletedMeasureGroups:        make(map[string]bool),
-               CompletedStreamParts:        make(map[string]map[uint64]bool),
-               StreamPartErrors:            make(map[string]map[uint64]string),
+               CompletedStreamParts:        
make(map[string]map[common.ShardID]map[uint64]bool),
+               StreamPartErrors:            
make(map[string]map[common.ShardID]map[uint64]string),
                StreamPartCounts:            make(map[string]int),
                StreamPartProgress:          make(map[string]int),
                CompletedStreamSeries:       
make(map[string]map[common.ShardID]bool),
@@ -91,8 +91,8 @@ func NewProgress(path string, l *logger.Logger) *Progress {
                StreamElementIndexErrors:    make(map[string]string),
                StreamElementIndexCounts:    make(map[string]int),
                StreamElementIndexProgress:  make(map[string]int),
-               CompletedMeasureParts:       make(map[string]map[uint64]bool),
-               MeasurePartErrors:           make(map[string]map[uint64]string),
+               CompletedMeasureParts:       
make(map[string]map[common.ShardID]map[uint64]bool),
+               MeasurePartErrors:           
make(map[string]map[common.ShardID]map[uint64]string),
                MeasurePartCounts:           make(map[string]int),
                MeasurePartProgress:         make(map[string]int),
                CompletedMeasureSeries:      
make(map[string]map[common.ShardID]bool),
@@ -227,47 +227,55 @@ func (p *Progress) Remove(path string, l *logger.Logger) {
 }
 
 // MarkStreamPartCompleted marks a specific part of a stream as completed.
-func (p *Progress) MarkStreamPartCompleted(group string, partID uint64) {
+func (p *Progress) MarkStreamPartCompleted(group string, shardID 
common.ShardID, partID uint64) {
        defer p.saveProgress()
        p.mu.Lock()
        defer p.mu.Unlock()
 
        // Initialize nested maps if they don't exist
        if p.CompletedStreamParts[group] == nil {
-               p.CompletedStreamParts[group] = make(map[uint64]bool)
+               p.CompletedStreamParts[group] = 
make(map[common.ShardID]map[uint64]bool)
+       }
+       if p.CompletedStreamParts[group][shardID] == nil {
+               p.CompletedStreamParts[group][shardID] = make(map[uint64]bool)
        }
 
        // Mark part as completed
-       p.CompletedStreamParts[group][partID] = true
+       p.CompletedStreamParts[group][shardID][partID] = true
 
        // Update progress count
        p.StreamPartProgress[group]++
 }
 
 // IsStreamPartCompleted checks if a specific part of a stream has been 
completed.
-func (p *Progress) IsStreamPartCompleted(group string, partID uint64) bool {
+func (p *Progress) IsStreamPartCompleted(group string, shardID common.ShardID, 
partID uint64) bool {
        p.mu.Lock()
        defer p.mu.Unlock()
 
-       if parts, ok := p.CompletedStreamParts[group]; ok {
-               return parts[partID]
+       if shards, ok := p.CompletedStreamParts[group]; ok {
+               if parts, ok := shards[shardID]; ok {
+                       return parts[partID]
+               }
        }
        return false
 }
 
 // MarkStreamPartError records an error for a specific part of a stream.
-func (p *Progress) MarkStreamPartError(group string, partID uint64, errorMsg 
string) {
+func (p *Progress) MarkStreamPartError(group string, shardID common.ShardID, 
partID uint64, errorMsg string) {
        defer p.saveProgress()
        p.mu.Lock()
        defer p.mu.Unlock()
 
        // Initialize nested maps if they don't exist
        if p.StreamPartErrors[group] == nil {
-               p.StreamPartErrors[group] = make(map[uint64]string)
+               p.StreamPartErrors[group] = 
make(map[common.ShardID]map[uint64]string)
+       }
+       if p.StreamPartErrors[group][shardID] == nil {
+               p.StreamPartErrors[group][shardID] = make(map[uint64]string)
        }
 
        // Record the error
-       p.StreamPartErrors[group][partID] = errorMsg
+       p.StreamPartErrors[group][shardID][partID] = errorMsg
 }
 
 // SetStreamPartCount sets the total number of parts for a stream.
@@ -463,47 +471,55 @@ func (p *Progress) GetStreamElementIndexProgress(group 
string) int {
 }
 
 // MarkMeasurePartCompleted marks a specific part of a measure as completed.
-func (p *Progress) MarkMeasurePartCompleted(group string, partID uint64) {
+func (p *Progress) MarkMeasurePartCompleted(group string, shardID 
common.ShardID, partID uint64) {
        defer p.saveProgress()
        p.mu.Lock()
        defer p.mu.Unlock()
 
        // Initialize nested maps if they don't exist
        if p.CompletedMeasureParts[group] == nil {
-               p.CompletedMeasureParts[group] = make(map[uint64]bool)
+               p.CompletedMeasureParts[group] = 
make(map[common.ShardID]map[uint64]bool)
+       }
+       if p.CompletedMeasureParts[group][shardID] == nil {
+               p.CompletedMeasureParts[group][shardID] = make(map[uint64]bool)
        }
 
        // Mark part as completed
-       p.CompletedMeasureParts[group][partID] = true
+       p.CompletedMeasureParts[group][shardID][partID] = true
 
        // Update progress count
        p.MeasurePartProgress[group]++
 }
 
 // IsMeasurePartCompleted checks if a specific part of a measure has been 
completed.
-func (p *Progress) IsMeasurePartCompleted(group string, partID uint64) bool {
+func (p *Progress) IsMeasurePartCompleted(group string, shardID 
common.ShardID, partID uint64) bool {
        p.mu.Lock()
        defer p.mu.Unlock()
 
-       if parts, ok := p.CompletedMeasureParts[group]; ok {
-               return parts[partID]
+       if shards, ok := p.CompletedMeasureParts[group]; ok {
+               if parts, ok := shards[shardID]; ok {
+                       return parts[partID]
+               }
        }
        return false
 }
 
 // MarkMeasurePartError records an error for a specific part of a measure.
-func (p *Progress) MarkMeasurePartError(group string, partID uint64, errorMsg 
string) {
+func (p *Progress) MarkMeasurePartError(group string, shardID common.ShardID, 
partID uint64, errorMsg string) {
        defer p.saveProgress()
        p.mu.Lock()
        defer p.mu.Unlock()
 
        // Initialize nested maps if they don't exist
        if p.MeasurePartErrors[group] == nil {
-               p.MeasurePartErrors[group] = make(map[uint64]string)
+               p.MeasurePartErrors[group] = 
make(map[common.ShardID]map[uint64]string)
+       }
+       if p.MeasurePartErrors[group][shardID] == nil {
+               p.MeasurePartErrors[group][shardID] = make(map[uint64]string)
        }
 
        // Record the error
-       p.MeasurePartErrors[group][partID] = errorMsg
+       p.MeasurePartErrors[group][shardID][partID] = errorMsg
 }
 
 // SetMeasurePartCount sets the total number of parts for a measure.
@@ -541,10 +557,10 @@ func (p *Progress) GetMeasurePartProgress(group string) 
int {
 func (p *Progress) ClearErrors() {
        p.mu.Lock()
        defer p.mu.Unlock()
-       p.StreamPartErrors = make(map[string]map[uint64]string)
+       p.StreamPartErrors = 
make(map[string]map[common.ShardID]map[uint64]string)
        p.StreamSeriesErrors = make(map[string]map[common.ShardID]string)
        p.StreamElementIndexErrors = make(map[string]string)
-       p.MeasurePartErrors = make(map[string]map[uint64]string)
+       p.MeasurePartErrors = 
make(map[string]map[common.ShardID]map[uint64]string)
        p.MeasureSeriesErrors = make(map[string]map[common.ShardID]string)
 }
 
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go 
b/banyand/backup/lifecycle/stream_migration_visitor.go
index 77c295d6..47505e47 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -205,9 +205,10 @@ func (mv *streamMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShardI
        }
 
        // Check if this part has already been completed
-       if mv.progress.IsStreamPartCompleted(mv.group, partID) {
+       if mv.progress.IsStreamPartCompleted(mv.group, sourceShardID, partID) {
                mv.logger.Debug().
                        Uint64("part_id", partID).
+                       Uint32("source_shard", uint32(sourceShardID)).
                        Str("group", mv.group).
                        Msg("part already completed, skipping")
                return nil
@@ -235,12 +236,12 @@ func (mv *streamMigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShardI
        // Stream entire part to target shard replicas
        if err := mv.streamPartToTargetShard(partData); err != nil {
                errorMsg := fmt.Sprintf("failed to stream part to target shard: 
%v", err)
-               mv.progress.MarkStreamPartError(mv.group, partID, errorMsg)
+               mv.progress.MarkStreamPartError(mv.group, sourceShardID, 
partID, errorMsg)
                return fmt.Errorf("failed to stream part to target shard: %w", 
err)
        }
 
        // Mark part as completed in progress tracker
-       mv.progress.MarkStreamPartCompleted(mv.group, partID)
+       mv.progress.MarkStreamPartCompleted(mv.group, sourceShardID, partID)
 
        mv.logger.Info().
                Uint64("part_id", partID).

Reply via email to