This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 31043820f3420a59774d028bf259ad5ecf8841f3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jul 30 17:26:46 2025 +0800 Implement element index migration in MigrationVisitor - Enhanced the `VisitElementIndex` method to handle migration of element index segment files, including logging and error management. - Introduced new methods in the `Progress` struct to track the completion status and errors for element index files during migration. --- banyand/backup/lifecycle/file_migration_visitor.go | 235 ++++++++++++++++++++- banyand/backup/lifecycle/progress.go | 176 +++++++++++++-- 2 files changed, 390 insertions(+), 21 deletions(-) diff --git a/banyand/backup/lifecycle/file_migration_visitor.go b/banyand/backup/lifecycle/file_migration_visitor.go index 4d5620d7..7a52ca5b 100644 --- a/banyand/backup/lifecycle/file_migration_visitor.go +++ b/banyand/backup/lifecycle/file_migration_visitor.go @@ -276,11 +276,126 @@ func (mv *MigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardID comm } // VisitElementIndex implements stream.Visitor. -func (mv *MigrationVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ common.ShardID, indexPath string) error { - // TODO: Implement element index migration if needed - mv.logger.Debug(). +func (mv *MigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRange, sourceShardID common.ShardID, indexPath string) error { + mv.logger.Info(). Str("path", indexPath). - Msg("skipping element index migration (not implemented)") + Uint32("shard_id", uint32(sourceShardID)). + Int64("min_timestamp", segmentTR.Start.UnixNano()). + Int64("max_timestamp", segmentTR.End.UnixNano()). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("migrating element index") + + // Find all .seg files in the element index directory + lfs := fs.NewLocalFileSystem() + entries := lfs.ReadDir(indexPath) + + var segmentFiles []string + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") { + segmentFiles = append(segmentFiles, entry.Name()) + } + } + + if len(segmentFiles) == 0 { + mv.logger.Debug(). + Str("path", indexPath). + Msg("no .seg files found in element index directory") + return nil + } + + // Set the total number of element index segment files for progress tracking + mv.SetStreamElementIndexCount(len(segmentFiles)) + + // Calculate target shard ID (using a simple approach for element index) + targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) + mv.logger.Info(). + Int("segment_count", len(segmentFiles)). + Str("path", indexPath). + Uint32("source_shard", uint32(sourceShardID)). + Uint32("target_shard", targetShardID). + Msg("found element index segment files for migration") + + // Process each segment file + for _, segmentFileName := range segmentFiles { + // Extract segment ID from filename (remove .seg extension) + segmentIDStr := strings.TrimSuffix(segmentFileName, ".seg") + + // Parse hex segment ID + segmentID, err := strconv.ParseUint(segmentIDStr, 16, 64) + if err != nil { + mv.logger.Error(). + Str("filename", segmentFileName). + Str("id_str", segmentIDStr). + Err(err). + Msg("failed to parse segment ID from filename") + continue + } + + // Check if this segment has already been completed + if mv.progress.IsStreamElementIndexCompleted(mv.group, mv.streamName, segmentID) { + mv.logger.Debug(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("element index segment already completed, skipping") + continue + } + + mv.logger.Info(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("migrating element index segment file") + + // Create file reader for the segment file + segmentFilePath := filepath.Join(indexPath, segmentFileName) + segmentFile, err := lfs.OpenFile(segmentFilePath) + if err != nil { + errorMsg := fmt.Sprintf("failed to open element index segment file %s: %v", segmentFilePath, err) + mv.progress.MarkStreamElementIndexError(mv.group, mv.streamName, segmentID, errorMsg) + mv.logger.Error(). + Str("path", segmentFilePath). + Err(err). + Msg("failed to open element index segment file") + return fmt.Errorf("failed to open element index segment file %s: %w", segmentFilePath, err) + } + + // Create FileInfo for this segment file + files := []queue.FileInfo{ + { + Name: segmentFileName, + Reader: segmentFile.SequentialRead(), + }, + } + + // Stream segment file to target shard replicas + if err := mv.streamElementIndexToTargetShard(targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { + errorMsg := fmt.Sprintf("failed to stream element index to target shard: %v", err) + mv.progress.MarkStreamElementIndexError(mv.group, mv.streamName, segmentID, errorMsg) + // Close the file reader + segmentFile.Close() + return fmt.Errorf("failed to stream element index to target shard: %w", err) + } + + // Close the file reader + segmentFile.Close() + + // Mark segment as completed + mv.progress.MarkStreamElementIndexCompleted(mv.group, mv.streamName, segmentID) + + mv.logger.Info(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("completed_segments", mv.progress.GetStreamElementIndexProgress(mv.group, mv.streamName)). + Int("total_segments", mv.progress.GetStreamElementIndexCount(mv.group, mv.streamName)). + Msg("element index segment migration completed successfully") + } + return nil } @@ -633,3 +748,115 @@ func (mv *MigrationVisitor) SetStreamSeriesCount(totalSegments int) { Msg("set stream series count for progress tracking") } } + +// SetStreamElementIndexCount sets the total number of element index segment files for the current stream. +func (mv *MigrationVisitor) SetStreamElementIndexCount(totalSegmentFiles int) { + if mv.progress != nil { + mv.progress.SetStreamElementIndexCount(mv.group, mv.streamName, totalSegmentFiles) + mv.logger.Info(). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("total_segment_files", totalSegmentFiles). + Msg("set stream element index segment count for progress tracking") + } +} + +// streamElementIndexToTargetShard sends element index data to all replicas of the target shard. +func (mv *MigrationVisitor) streamElementIndexToTargetShard( + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, + segmentFileName string, +) error { + copies := mv.replicas + 1 + + // Send to all replicas using the exact pattern from steps.go:219-236 + for replicaID := uint32(0); replicaID < copies; replicaID++ { + // Use selector.Pick exactly like steps.go:220 + nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + if err != nil { + return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) + } + + // Stream element index data to target node using chunked sync + if err := mv.streamElementIndexToNode(nodeID, targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { + return fmt.Errorf("failed to stream element index to node %s: %w", nodeID, err) + } + } + + return nil +} + +// streamElementIndexToNode streams element index data to a specific target node. +func (mv *MigrationVisitor) streamElementIndexToNode( + nodeID string, + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, + segmentFileName string, +) error { + // Get or create chunked client for this node (cache hit optimization) + chunkedClient, exists := mv.chunkedClients[nodeID] + if !exists { + var err error + // Create new chunked sync client via queue.Client + chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, uint32(mv.chunkSize)) + if err != nil { + return fmt.Errorf("failed to create chunked sync client for node %s: %w", nodeID, err) + } + mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse + } + + // Create streaming part data from the element index files + streamingParts := mv.createStreamingElementIndexFromFiles(targetShardID, files, segmentTR, segmentID) + + // Stream using chunked transfer + ctx := context.Background() + result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts) + if err != nil { + return fmt.Errorf("failed to sync streaming element index to node %s: %w", nodeID, err) + } + + if !result.Success { + return fmt.Errorf("chunked sync partially failed: %v", result.ErrorMessage) + } + + // Log success metrics + mv.logger.Info(). + Str("node", nodeID). + Str("session", result.SessionID). + Uint64("bytes", result.TotalBytes). + Int64("duration_ms", result.DurationMs). + Uint32("chunks", result.ChunksCount). + Uint32("parts", result.PartsCount). + Uint32("target_shard", targetShardID). + Uint64("segment_id", segmentID). + Str("segment_filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("file-based migration element index segment completed successfully") + + return nil +} + +// createStreamingElementIndexFromFiles creates StreamingPartData from element index files. +func (mv *MigrationVisitor) createStreamingElementIndexFromFiles( + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, +) []queue.StreamingPartData { + elementIndexData := queue.StreamingPartData{ + ID: segmentID, + Group: mv.group, + ShardID: targetShardID, + Topic: data.TopicStreamLocalIndexWrite.String(), // Use local index write topic for element indices + Files: files, + MinTimestamp: segmentTR.Start.UnixNano(), + MaxTimestamp: segmentTR.End.UnixNano(), + } + + return []queue.StreamingPartData{elementIndexData} +} diff --git a/banyand/backup/lifecycle/progress.go b/banyand/backup/lifecycle/progress.go index d8c38061..acb8e493 100644 --- a/banyand/backup/lifecycle/progress.go +++ b/banyand/backup/lifecycle/progress.go @@ -44,9 +44,14 @@ type Progress struct { StreamSeriesErrors map[string]map[string]map[uint64]string `json:"stream_series_errors"` // group -> stream -> segmentID -> error StreamSeriesCounts map[string]map[string]int `json:"stream_series_counts"` // group -> stream -> total series segments StreamSeriesProgress map[string]map[string]int `json:"stream_series_progress"` // group -> stream -> completed series segments count - SnapshotStreamDir string `json:"snapshot_stream_dir"` - SnapshotMeasureDir string `json:"snapshot_measure_dir"` - mu sync.Mutex `json:"-"` + // Element index-level tracking for stream migration + CompletedStreamElementIndex map[string]map[string]map[uint64]bool `json:"completed_stream_element_index"` // group -> stream -> indexFileID -> completed + StreamElementIndexErrors map[string]map[string]map[uint64]string `json:"stream_element_index_errors"` // group -> stream -> indexFileID -> error + StreamElementIndexCounts map[string]map[string]int `json:"stream_element_index_counts"` // group -> stream -> total index files + StreamElementIndexProgress map[string]map[string]int `json:"stream_element_index_progress"` // group -> stream -> completed index files count + SnapshotStreamDir string `json:"snapshot_stream_dir"` + SnapshotMeasureDir string `json:"snapshot_measure_dir"` + mu sync.Mutex `json:"-"` } // AllGroupsFullyCompleted checks if all groups are fully completed. @@ -65,20 +70,24 @@ func (p *Progress) AllGroupsFullyCompleted(groups []*commonv1.Group) bool { // NewProgress creates a new Progress tracker. func NewProgress() *Progress { return &Progress{ - CompletedGroups: make(map[string]bool), - CompletedMeasures: make(map[string]map[string]bool), - DeletedStreamGroups: make(map[string]bool), - DeletedMeasureGroups: make(map[string]bool), - MeasureErrors: make(map[string]map[string]string), - MeasureCounts: make(map[string]map[string]int), - CompletedStreamParts: make(map[string]map[string]map[uint64]bool), - StreamPartErrors: make(map[string]map[string]map[uint64]string), - StreamPartCounts: make(map[string]map[string]int), - StreamPartProgress: make(map[string]map[string]int), - CompletedStreamSeries: make(map[string]map[string]map[uint64]bool), - StreamSeriesErrors: make(map[string]map[string]map[uint64]string), - StreamSeriesCounts: make(map[string]map[string]int), - StreamSeriesProgress: make(map[string]map[string]int), + CompletedGroups: make(map[string]bool), + CompletedMeasures: make(map[string]map[string]bool), + DeletedStreamGroups: make(map[string]bool), + DeletedMeasureGroups: make(map[string]bool), + MeasureErrors: make(map[string]map[string]string), + MeasureCounts: make(map[string]map[string]int), + CompletedStreamParts: make(map[string]map[string]map[uint64]bool), + StreamPartErrors: make(map[string]map[string]map[uint64]string), + StreamPartCounts: make(map[string]map[string]int), + StreamPartProgress: make(map[string]map[string]int), + CompletedStreamSeries: make(map[string]map[string]map[uint64]bool), + StreamSeriesErrors: make(map[string]map[string]map[uint64]string), + StreamSeriesCounts: make(map[string]map[string]int), + StreamSeriesProgress: make(map[string]map[string]int), + CompletedStreamElementIndex: make(map[string]map[string]map[uint64]bool), + StreamElementIndexErrors: make(map[string]map[string]map[uint64]string), + StreamElementIndexCounts: make(map[string]map[string]int), + StreamElementIndexProgress: make(map[string]map[string]int), } } @@ -498,3 +507,136 @@ func (p *Progress) ClearStreamSeriesErrors(group, stream string) { delete(errors, stream) } } + +// MarkStreamElementIndexCompleted marks a specific element index file of a stream as completed. +func (p *Progress) MarkStreamElementIndexCompleted(group, stream string, indexFileID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.CompletedStreamElementIndex[group] == nil { + p.CompletedStreamElementIndex[group] = make(map[string]map[uint64]bool) + } + if p.CompletedStreamElementIndex[group][stream] == nil { + p.CompletedStreamElementIndex[group][stream] = make(map[uint64]bool) + } + + // Mark element index file as completed + p.CompletedStreamElementIndex[group][stream][indexFileID] = true + + // Update progress count + if p.StreamElementIndexProgress[group] == nil { + p.StreamElementIndexProgress[group] = make(map[string]int) + } + p.StreamElementIndexProgress[group][stream]++ +} + +// IsStreamElementIndexCompleted checks if a specific element index file of a stream has been completed. +func (p *Progress) IsStreamElementIndexCompleted(group, stream string, indexFileID uint64) bool { + p.mu.Lock() + defer p.mu.Unlock() + + if streams, ok := p.CompletedStreamElementIndex[group]; ok { + if indexFiles, ok := streams[stream]; ok { + return indexFiles[indexFileID] + } + } + return false +} + +// MarkStreamElementIndexError records an error for a specific element index file of a stream. +func (p *Progress) MarkStreamElementIndexError(group, stream string, indexFileID uint64, errorMsg string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.StreamElementIndexErrors[group] == nil { + p.StreamElementIndexErrors[group] = make(map[string]map[uint64]string) + } + if p.StreamElementIndexErrors[group][stream] == nil { + p.StreamElementIndexErrors[group][stream] = make(map[uint64]string) + } + + // Record the error + p.StreamElementIndexErrors[group][stream][indexFileID] = errorMsg +} + +// SetStreamElementIndexCount sets the total number of element index files for a stream. +func (p *Progress) SetStreamElementIndexCount(group, stream string, totalIndexFiles int) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.StreamElementIndexCounts[group] == nil { + p.StreamElementIndexCounts[group] = make(map[string]int) + } + p.StreamElementIndexCounts[group][stream] = totalIndexFiles + + // Initialize progress tracking + if p.StreamElementIndexProgress[group] == nil { + p.StreamElementIndexProgress[group] = make(map[string]int) + } + if p.StreamElementIndexProgress[group][stream] == 0 { + p.StreamElementIndexProgress[group][stream] = 0 + } +} + +// GetStreamElementIndexCount returns the total number of element index files for a stream. +func (p *Progress) GetStreamElementIndexCount(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if counts, ok := p.StreamElementIndexCounts[group]; ok { + return counts[stream] + } + return 0 +} + +// GetStreamElementIndexProgress returns the number of completed element index files for a stream. +func (p *Progress) GetStreamElementIndexProgress(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if progress, ok := p.StreamElementIndexProgress[group]; ok { + return progress[stream] + } + return 0 +} + +// IsStreamElementIndexFullyCompleted checks if all element index files of a stream have been completed. +func (p *Progress) IsStreamElementIndexFullyCompleted(group, stream string) bool { + p.mu.Lock() + defer p.mu.Unlock() + + totalIndexFiles := p.StreamElementIndexCounts[group][stream] + completedIndexFiles := p.StreamElementIndexProgress[group][stream] + + return totalIndexFiles > 0 && completedIndexFiles >= totalIndexFiles +} + +// GetStreamElementIndexErrors returns all errors for a specific stream element index. +func (p *Progress) GetStreamElementIndexErrors(group, stream string) map[uint64]string { + p.mu.Lock() + defer p.mu.Unlock() + + if errors, ok := p.StreamElementIndexErrors[group]; ok { + if streamErrors, ok := errors[stream]; ok { + // Return a copy to avoid concurrent access issues + result := make(map[uint64]string) + for k, v := range streamErrors { + result[k] = v + } + return result + } + } + return nil +} + +// ClearStreamElementIndexErrors clears all errors for a specific stream element index. +func (p *Progress) ClearStreamElementIndexErrors(group, stream string) { + p.mu.Lock() + defer p.mu.Unlock() + + if errors, ok := p.StreamElementIndexErrors[group]; ok { + delete(errors, stream) + } +}