This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b9880c5cedd78395e47e1a90201e00a8c4bd192b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Sat Aug 2 17:05:52 2025 +0800 Enhance progress tracking for migration visitors by incorporating shard IDs - Updated `measure_migration_visitor.go` and `stream_migration_visitor.go` to include `sourceShardID` in progress tracking methods, improving the accuracy of completed part checks and error handling. - Refactored the `Progress` struct to support nested maps for completed parts and errors, allowing for shard-specific tracking during migration processes. --- .../backup/lifecycle/measure_migration_visitor.go | 7 ++- banyand/backup/lifecycle/progress.go | 72 +++++++++++++--------- .../backup/lifecycle/stream_migration_visitor.go | 7 ++- 3 files changed, 52 insertions(+), 34 deletions(-) diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go b/banyand/backup/lifecycle/measure_migration_visitor.go index a56b96bf..1413ec52 100644 --- a/banyand/backup/lifecycle/measure_migration_visitor.go +++ b/banyand/backup/lifecycle/measure_migration_visitor.go @@ -190,9 +190,10 @@ func (mv *measureMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShard } // Check if this part has already been completed - if mv.progress.IsMeasurePartCompleted(mv.group, partID) { + if mv.progress.IsMeasurePartCompleted(mv.group, sourceShardID, partID) { mv.logger.Warn(). Uint64("part_id", partID). + Uint32("source_shard", uint32(sourceShardID)). Str("group", mv.group). Msg("measure part already completed, skipping") return nil @@ -220,12 +221,12 @@ func (mv *measureMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShard // Stream entire part to target shard replicas if err := mv.streamPartToTargetShard(partData); err != nil { errorMsg := fmt.Sprintf("failed to stream measure part to target shard: %v", err) - mv.progress.MarkMeasurePartError(mv.group, partID, errorMsg) + mv.progress.MarkMeasurePartError(mv.group, sourceShardID, partID, errorMsg) return fmt.Errorf("failed to stream measure part to target shard: %w", err) } // Mark part as completed in progress tracker - mv.progress.MarkMeasurePartCompleted(mv.group, partID) + mv.progress.MarkMeasurePartCompleted(mv.group, sourceShardID, partID) mv.logger.Info(). Uint64("part_id", partID). diff --git a/banyand/backup/lifecycle/progress.go b/banyand/backup/lifecycle/progress.go index cb60e59d..609f7208 100644 --- a/banyand/backup/lifecycle/progress.go +++ b/banyand/backup/lifecycle/progress.go @@ -33,8 +33,8 @@ type Progress struct { CompletedGroups map[string]bool `json:"completed_groups"` DeletedStreamGroups map[string]bool `json:"deleted_stream_groups"` DeletedMeasureGroups map[string]bool `json:"deleted_measure_groups"` - CompletedStreamParts map[string]map[uint64]bool `json:"completed_stream_parts"` - StreamPartErrors map[string]map[uint64]string `json:"stream_part_errors"` + CompletedStreamParts map[string]map[common.ShardID]map[uint64]bool `json:"completed_stream_parts"` + StreamPartErrors map[string]map[common.ShardID]map[uint64]string `json:"stream_part_errors"` CompletedStreamSeries map[string]map[common.ShardID]bool `json:"completed_stream_series"` StreamSeriesErrors map[string]map[common.ShardID]string `json:"stream_series_errors"` CompletedStreamElementIndex map[string]bool `json:"completed_stream_element_index"` @@ -46,8 +46,8 @@ type Progress struct { StreamElementIndexCounts map[string]int `json:"stream_element_index_counts"` StreamElementIndexProgress map[string]int `json:"stream_element_index_progress"` // Measure part-specific progress tracking - CompletedMeasureParts map[string]map[uint64]bool `json:"completed_measure_parts"` - MeasurePartErrors map[string]map[uint64]string `json:"measure_part_errors"` + CompletedMeasureParts map[string]map[common.ShardID]map[uint64]bool `json:"completed_measure_parts"` + MeasurePartErrors map[string]map[common.ShardID]map[uint64]string `json:"measure_part_errors"` MeasurePartCounts map[string]int `json:"measure_part_counts"` MeasurePartProgress map[string]int `json:"measure_part_progress"` CompletedMeasureSeries map[string]map[common.ShardID]bool `json:"completed_measure_series"` @@ -79,8 +79,8 @@ func NewProgress(path string, l *logger.Logger) *Progress { CompletedGroups: make(map[string]bool), DeletedStreamGroups: make(map[string]bool), DeletedMeasureGroups: make(map[string]bool), - CompletedStreamParts: make(map[string]map[uint64]bool), - StreamPartErrors: make(map[string]map[uint64]string), + CompletedStreamParts: make(map[string]map[common.ShardID]map[uint64]bool), + StreamPartErrors: make(map[string]map[common.ShardID]map[uint64]string), StreamPartCounts: make(map[string]int), StreamPartProgress: make(map[string]int), CompletedStreamSeries: make(map[string]map[common.ShardID]bool), @@ -91,8 +91,8 @@ func NewProgress(path string, l *logger.Logger) *Progress { StreamElementIndexErrors: make(map[string]string), StreamElementIndexCounts: make(map[string]int), StreamElementIndexProgress: make(map[string]int), - CompletedMeasureParts: make(map[string]map[uint64]bool), - MeasurePartErrors: make(map[string]map[uint64]string), + CompletedMeasureParts: make(map[string]map[common.ShardID]map[uint64]bool), + MeasurePartErrors: make(map[string]map[common.ShardID]map[uint64]string), MeasurePartCounts: make(map[string]int), MeasurePartProgress: make(map[string]int), CompletedMeasureSeries: make(map[string]map[common.ShardID]bool), @@ -227,47 +227,55 @@ func (p *Progress) Remove(path string, l *logger.Logger) { } // MarkStreamPartCompleted marks a specific part of a stream as completed. -func (p *Progress) MarkStreamPartCompleted(group string, partID uint64) { +func (p *Progress) MarkStreamPartCompleted(group string, shardID common.ShardID, partID uint64) { defer p.saveProgress() p.mu.Lock() defer p.mu.Unlock() // Initialize nested maps if they don't exist if p.CompletedStreamParts[group] == nil { - p.CompletedStreamParts[group] = make(map[uint64]bool) + p.CompletedStreamParts[group] = make(map[common.ShardID]map[uint64]bool) + } + if p.CompletedStreamParts[group][shardID] == nil { + p.CompletedStreamParts[group][shardID] = make(map[uint64]bool) } // Mark part as completed - p.CompletedStreamParts[group][partID] = true + p.CompletedStreamParts[group][shardID][partID] = true // Update progress count p.StreamPartProgress[group]++ } // IsStreamPartCompleted checks if a specific part of a stream has been completed. -func (p *Progress) IsStreamPartCompleted(group string, partID uint64) bool { +func (p *Progress) IsStreamPartCompleted(group string, shardID common.ShardID, partID uint64) bool { p.mu.Lock() defer p.mu.Unlock() - if parts, ok := p.CompletedStreamParts[group]; ok { - return parts[partID] + if shards, ok := p.CompletedStreamParts[group]; ok { + if parts, ok := shards[shardID]; ok { + return parts[partID] + } } return false } // MarkStreamPartError records an error for a specific part of a stream. -func (p *Progress) MarkStreamPartError(group string, partID uint64, errorMsg string) { +func (p *Progress) MarkStreamPartError(group string, shardID common.ShardID, partID uint64, errorMsg string) { defer p.saveProgress() p.mu.Lock() defer p.mu.Unlock() // Initialize nested maps if they don't exist if p.StreamPartErrors[group] == nil { - p.StreamPartErrors[group] = make(map[uint64]string) + p.StreamPartErrors[group] = make(map[common.ShardID]map[uint64]string) + } + if p.StreamPartErrors[group][shardID] == nil { + p.StreamPartErrors[group][shardID] = make(map[uint64]string) } // Record the error - p.StreamPartErrors[group][partID] = errorMsg + p.StreamPartErrors[group][shardID][partID] = errorMsg } // SetStreamPartCount sets the total number of parts for a stream. @@ -463,47 +471,55 @@ func (p *Progress) GetStreamElementIndexProgress(group string) int { } // MarkMeasurePartCompleted marks a specific part of a measure as completed. -func (p *Progress) MarkMeasurePartCompleted(group string, partID uint64) { +func (p *Progress) MarkMeasurePartCompleted(group string, shardID common.ShardID, partID uint64) { defer p.saveProgress() p.mu.Lock() defer p.mu.Unlock() // Initialize nested maps if they don't exist if p.CompletedMeasureParts[group] == nil { - p.CompletedMeasureParts[group] = make(map[uint64]bool) + p.CompletedMeasureParts[group] = make(map[common.ShardID]map[uint64]bool) + } + if p.CompletedMeasureParts[group][shardID] == nil { + p.CompletedMeasureParts[group][shardID] = make(map[uint64]bool) } // Mark part as completed - p.CompletedMeasureParts[group][partID] = true + p.CompletedMeasureParts[group][shardID][partID] = true // Update progress count p.MeasurePartProgress[group]++ } // IsMeasurePartCompleted checks if a specific part of a measure has been completed. -func (p *Progress) IsMeasurePartCompleted(group string, partID uint64) bool { +func (p *Progress) IsMeasurePartCompleted(group string, shardID common.ShardID, partID uint64) bool { p.mu.Lock() defer p.mu.Unlock() - if parts, ok := p.CompletedMeasureParts[group]; ok { - return parts[partID] + if shards, ok := p.CompletedMeasureParts[group]; ok { + if parts, ok := shards[shardID]; ok { + return parts[partID] + } } return false } // MarkMeasurePartError records an error for a specific part of a measure. -func (p *Progress) MarkMeasurePartError(group string, partID uint64, errorMsg string) { +func (p *Progress) MarkMeasurePartError(group string, shardID common.ShardID, partID uint64, errorMsg string) { defer p.saveProgress() p.mu.Lock() defer p.mu.Unlock() // Initialize nested maps if they don't exist if p.MeasurePartErrors[group] == nil { - p.MeasurePartErrors[group] = make(map[uint64]string) + p.MeasurePartErrors[group] = make(map[common.ShardID]map[uint64]string) + } + if p.MeasurePartErrors[group][shardID] == nil { + p.MeasurePartErrors[group][shardID] = make(map[uint64]string) } // Record the error - p.MeasurePartErrors[group][partID] = errorMsg + p.MeasurePartErrors[group][shardID][partID] = errorMsg } // SetMeasurePartCount sets the total number of parts for a measure. @@ -541,10 +557,10 @@ func (p *Progress) GetMeasurePartProgress(group string) int { func (p *Progress) ClearErrors() { p.mu.Lock() defer p.mu.Unlock() - p.StreamPartErrors = make(map[string]map[uint64]string) + p.StreamPartErrors = make(map[string]map[common.ShardID]map[uint64]string) p.StreamSeriesErrors = make(map[string]map[common.ShardID]string) p.StreamElementIndexErrors = make(map[string]string) - p.MeasurePartErrors = make(map[string]map[uint64]string) + p.MeasurePartErrors = make(map[string]map[common.ShardID]map[uint64]string) p.MeasureSeriesErrors = make(map[string]map[common.ShardID]string) } diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go b/banyand/backup/lifecycle/stream_migration_visitor.go index 77c295d6..47505e47 100644 --- a/banyand/backup/lifecycle/stream_migration_visitor.go +++ b/banyand/backup/lifecycle/stream_migration_visitor.go @@ -205,9 +205,10 @@ func (mv *streamMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardI } // Check if this part has already been completed - if mv.progress.IsStreamPartCompleted(mv.group, partID) { + if mv.progress.IsStreamPartCompleted(mv.group, sourceShardID, partID) { mv.logger.Debug(). Uint64("part_id", partID). + Uint32("source_shard", uint32(sourceShardID)). Str("group", mv.group). Msg("part already completed, skipping") return nil @@ -235,12 +236,12 @@ func (mv *streamMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardI // Stream entire part to target shard replicas if err := mv.streamPartToTargetShard(partData); err != nil { errorMsg := fmt.Sprintf("failed to stream part to target shard: %v", err) - mv.progress.MarkStreamPartError(mv.group, partID, errorMsg) + mv.progress.MarkStreamPartError(mv.group, sourceShardID, partID, errorMsg) return fmt.Errorf("failed to stream part to target shard: %w", err) } // Mark part as completed in progress tracker - mv.progress.MarkStreamPartCompleted(mv.group, partID) + mv.progress.MarkStreamPartCompleted(mv.group, sourceShardID, partID) mv.logger.Info(). Uint64("part_id", partID).