This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 31043820f3420a59774d028bf259ad5ecf8841f3
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 30 17:26:46 2025 +0800

    Implement element index migration in MigrationVisitor
    
    - Enhanced the `VisitElementIndex` method to handle migration of element 
index segment files, including logging and error management.
    - Introduced new methods in the `Progress` struct to track the completion 
status and errors for element index files during migration.
---
 banyand/backup/lifecycle/file_migration_visitor.go | 235 ++++++++++++++++++++-
 banyand/backup/lifecycle/progress.go               | 176 +++++++++++++--
 2 files changed, 390 insertions(+), 21 deletions(-)

diff --git a/banyand/backup/lifecycle/file_migration_visitor.go 
b/banyand/backup/lifecycle/file_migration_visitor.go
index 4d5620d7..7a52ca5b 100644
--- a/banyand/backup/lifecycle/file_migration_visitor.go
+++ b/banyand/backup/lifecycle/file_migration_visitor.go
@@ -276,11 +276,126 @@ func (mv *MigrationVisitor) VisitPart(_ 
*timestamp.TimeRange, sourceShardID comm
 }
 
 // VisitElementIndex implements stream.Visitor.
-func (mv *MigrationVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ 
common.ShardID, indexPath string) error {
-       // TODO: Implement element index migration if needed
-       mv.logger.Debug().
+func (mv *MigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRange, 
sourceShardID common.ShardID, indexPath string) error {
+       mv.logger.Info().
                Str("path", indexPath).
-               Msg("skipping element index migration (not implemented)")
+               Uint32("shard_id", uint32(sourceShardID)).
+               Int64("min_timestamp", segmentTR.Start.UnixNano()).
+               Int64("max_timestamp", segmentTR.End.UnixNano()).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("migrating element index")
+
+       // Find all .seg files in the element index directory
+       lfs := fs.NewLocalFileSystem()
+       entries := lfs.ReadDir(indexPath)
+
+       var segmentFiles []string
+       for _, entry := range entries {
+               if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") {
+                       segmentFiles = append(segmentFiles, entry.Name())
+               }
+       }
+
+       if len(segmentFiles) == 0 {
+               mv.logger.Debug().
+                       Str("path", indexPath).
+                       Msg("no .seg files found in element index directory")
+               return nil
+       }
+
+       // Set the total number of element index segment files for progress 
tracking
+       mv.SetStreamElementIndexCount(len(segmentFiles))
+
+       // Calculate target shard ID (using a simple approach for element index)
+       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+       mv.logger.Info().
+               Int("segment_count", len(segmentFiles)).
+               Str("path", indexPath).
+               Uint32("source_shard", uint32(sourceShardID)).
+               Uint32("target_shard", targetShardID).
+               Msg("found element index segment files for migration")
+
+       // Process each segment file
+       for _, segmentFileName := range segmentFiles {
+               // Extract segment ID from filename (remove .seg extension)
+               segmentIDStr := strings.TrimSuffix(segmentFileName, ".seg")
+
+               // Parse hex segment ID
+               segmentID, err := strconv.ParseUint(segmentIDStr, 16, 64)
+               if err != nil {
+                       mv.logger.Error().
+                               Str("filename", segmentFileName).
+                               Str("id_str", segmentIDStr).
+                               Err(err).
+                               Msg("failed to parse segment ID from filename")
+                       continue
+               }
+
+               // Check if this segment has already been completed
+               if mv.progress.IsStreamElementIndexCompleted(mv.group, 
mv.streamName, segmentID) {
+                       mv.logger.Debug().
+                               Uint64("segment_id", segmentID).
+                               Str("filename", segmentFileName).
+                               Str("stream", mv.streamName).
+                               Str("group", mv.group).
+                               Msg("element index segment already completed, 
skipping")
+                       continue
+               }
+
+               mv.logger.Info().
+                       Uint64("segment_id", segmentID).
+                       Str("filename", segmentFileName).
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Msg("migrating element index segment file")
+
+               // Create file reader for the segment file
+               segmentFilePath := filepath.Join(indexPath, segmentFileName)
+               segmentFile, err := lfs.OpenFile(segmentFilePath)
+               if err != nil {
+                       errorMsg := fmt.Sprintf("failed to open element index 
segment file %s: %v", segmentFilePath, err)
+                       mv.progress.MarkStreamElementIndexError(mv.group, 
mv.streamName, segmentID, errorMsg)
+                       mv.logger.Error().
+                               Str("path", segmentFilePath).
+                               Err(err).
+                               Msg("failed to open element index segment file")
+                       return fmt.Errorf("failed to open element index segment 
file %s: %w", segmentFilePath, err)
+               }
+
+               // Create FileInfo for this segment file
+               files := []queue.FileInfo{
+                       {
+                               Name:   segmentFileName,
+                               Reader: segmentFile.SequentialRead(),
+                       },
+               }
+
+               // Stream segment file to target shard replicas
+               if err := mv.streamElementIndexToTargetShard(targetShardID, 
files, segmentTR, segmentID, segmentFileName); err != nil {
+                       errorMsg := fmt.Sprintf("failed to stream element index 
to target shard: %v", err)
+                       mv.progress.MarkStreamElementIndexError(mv.group, 
mv.streamName, segmentID, errorMsg)
+                       // Close the file reader
+                       segmentFile.Close()
+                       return fmt.Errorf("failed to stream element index to 
target shard: %w", err)
+               }
+
+               // Close the file reader
+               segmentFile.Close()
+
+               // Mark segment as completed
+               mv.progress.MarkStreamElementIndexCompleted(mv.group, 
mv.streamName, segmentID)
+
+               mv.logger.Info().
+                       Uint64("segment_id", segmentID).
+                       Str("filename", segmentFileName).
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Int("completed_segments", 
mv.progress.GetStreamElementIndexProgress(mv.group, mv.streamName)).
+                       Int("total_segments", 
mv.progress.GetStreamElementIndexCount(mv.group, mv.streamName)).
+                       Msg("element index segment migration completed 
successfully")
+       }
+
        return nil
 }
 
@@ -633,3 +748,115 @@ func (mv *MigrationVisitor) 
SetStreamSeriesCount(totalSegments int) {
                        Msg("set stream series count for progress tracking")
        }
 }
+
+// SetStreamElementIndexCount sets the total number of element index segment 
files for the current stream.
+func (mv *MigrationVisitor) SetStreamElementIndexCount(totalSegmentFiles int) {
+       if mv.progress != nil {
+               mv.progress.SetStreamElementIndexCount(mv.group, mv.streamName, 
totalSegmentFiles)
+               mv.logger.Info().
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Int("total_segment_files", totalSegmentFiles).
+                       Msg("set stream element index segment count for 
progress tracking")
+       }
+}
+
+// streamElementIndexToTargetShard sends element index data to all replicas of 
the target shard.
+func (mv *MigrationVisitor) streamElementIndexToTargetShard(
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+       segmentFileName string,
+) error {
+       copies := mv.replicas + 1
+
+       // Send to all replicas using the exact pattern from steps.go:219-236
+       for replicaID := uint32(0); replicaID < copies; replicaID++ {
+               // Use selector.Pick exactly like steps.go:220
+               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               if err != nil {
+                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
+               }
+
+               // Stream element index data to target node using chunked sync
+               if err := mv.streamElementIndexToNode(nodeID, targetShardID, 
files, segmentTR, segmentID, segmentFileName); err != nil {
+                       return fmt.Errorf("failed to stream element index to 
node %s: %w", nodeID, err)
+               }
+       }
+
+       return nil
+}
+
+// streamElementIndexToNode streams element index data to a specific target 
node.
+func (mv *MigrationVisitor) streamElementIndexToNode(
+       nodeID string,
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+       segmentFileName string,
+) error {
+       // Get or create chunked client for this node (cache hit optimization)
+       chunkedClient, exists := mv.chunkedClients[nodeID]
+       if !exists {
+               var err error
+               // Create new chunked sync client via queue.Client
+               chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, 
uint32(mv.chunkSize))
+               if err != nil {
+                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", nodeID, err)
+               }
+               mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse
+       }
+
+       // Create streaming part data from the element index files
+       streamingParts := 
mv.createStreamingElementIndexFromFiles(targetShardID, files, segmentTR, 
segmentID)
+
+       // Stream using chunked transfer
+       ctx := context.Background()
+       result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts)
+       if err != nil {
+               return fmt.Errorf("failed to sync streaming element index to 
node %s: %w", nodeID, err)
+       }
+
+       if !result.Success {
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+       }
+
+       // Log success metrics
+       mv.logger.Info().
+               Str("node", nodeID).
+               Str("session", result.SessionID).
+               Uint64("bytes", result.TotalBytes).
+               Int64("duration_ms", result.DurationMs).
+               Uint32("chunks", result.ChunksCount).
+               Uint32("parts", result.PartsCount).
+               Uint32("target_shard", targetShardID).
+               Uint64("segment_id", segmentID).
+               Str("segment_filename", segmentFileName).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("file-based migration element index segment completed 
successfully")
+
+       return nil
+}
+
+// createStreamingElementIndexFromFiles creates StreamingPartData from element 
index files.
+func (mv *MigrationVisitor) createStreamingElementIndexFromFiles(
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+) []queue.StreamingPartData {
+       elementIndexData := queue.StreamingPartData{
+               ID:           segmentID,
+               Group:        mv.group,
+               ShardID:      targetShardID,
+               Topic:        data.TopicStreamLocalIndexWrite.String(), // Use 
local index write topic for element indices
+               Files:        files,
+               MinTimestamp: segmentTR.Start.UnixNano(),
+               MaxTimestamp: segmentTR.End.UnixNano(),
+       }
+
+       return []queue.StreamingPartData{elementIndexData}
+}
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index d8c38061..acb8e493 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -44,9 +44,14 @@ type Progress struct {
        StreamSeriesErrors    map[string]map[string]map[uint64]string 
`json:"stream_series_errors"`    // group -> stream -> segmentID -> error
        StreamSeriesCounts    map[string]map[string]int               
`json:"stream_series_counts"`    // group -> stream -> total series segments
        StreamSeriesProgress  map[string]map[string]int               
`json:"stream_series_progress"`  // group -> stream -> completed series 
segments count
-       SnapshotStreamDir     string                                  
`json:"snapshot_stream_dir"`
-       SnapshotMeasureDir    string                                  
`json:"snapshot_measure_dir"`
-       mu                    sync.Mutex                              `json:"-"`
+       // Element index-level tracking for stream migration
+       CompletedStreamElementIndex map[string]map[string]map[uint64]bool   
`json:"completed_stream_element_index"` // group -> stream -> indexFileID -> 
completed
+       StreamElementIndexErrors    map[string]map[string]map[uint64]string 
`json:"stream_element_index_errors"`    // group -> stream -> indexFileID -> 
error
+       StreamElementIndexCounts    map[string]map[string]int               
`json:"stream_element_index_counts"`    // group -> stream -> total index files
+       StreamElementIndexProgress  map[string]map[string]int               
`json:"stream_element_index_progress"`  // group -> stream -> completed index 
files count
+       SnapshotStreamDir           string                                  
`json:"snapshot_stream_dir"`
+       SnapshotMeasureDir          string                                  
`json:"snapshot_measure_dir"`
+       mu                          sync.Mutex                              
`json:"-"`
 }
 
 // AllGroupsFullyCompleted checks if all groups are fully completed.
@@ -65,20 +70,24 @@ func (p *Progress) AllGroupsFullyCompleted(groups 
[]*commonv1.Group) bool {
 // NewProgress creates a new Progress tracker.
 func NewProgress() *Progress {
        return &Progress{
-               CompletedGroups:       make(map[string]bool),
-               CompletedMeasures:     make(map[string]map[string]bool),
-               DeletedStreamGroups:   make(map[string]bool),
-               DeletedMeasureGroups:  make(map[string]bool),
-               MeasureErrors:         make(map[string]map[string]string),
-               MeasureCounts:         make(map[string]map[string]int),
-               CompletedStreamParts:  
make(map[string]map[string]map[uint64]bool),
-               StreamPartErrors:      
make(map[string]map[string]map[uint64]string),
-               StreamPartCounts:      make(map[string]map[string]int),
-               StreamPartProgress:    make(map[string]map[string]int),
-               CompletedStreamSeries: 
make(map[string]map[string]map[uint64]bool),
-               StreamSeriesErrors:    
make(map[string]map[string]map[uint64]string),
-               StreamSeriesCounts:    make(map[string]map[string]int),
-               StreamSeriesProgress:  make(map[string]map[string]int),
+               CompletedGroups:             make(map[string]bool),
+               CompletedMeasures:           make(map[string]map[string]bool),
+               DeletedStreamGroups:         make(map[string]bool),
+               DeletedMeasureGroups:        make(map[string]bool),
+               MeasureErrors:               make(map[string]map[string]string),
+               MeasureCounts:               make(map[string]map[string]int),
+               CompletedStreamParts:        
make(map[string]map[string]map[uint64]bool),
+               StreamPartErrors:            
make(map[string]map[string]map[uint64]string),
+               StreamPartCounts:            make(map[string]map[string]int),
+               StreamPartProgress:          make(map[string]map[string]int),
+               CompletedStreamSeries:       
make(map[string]map[string]map[uint64]bool),
+               StreamSeriesErrors:          
make(map[string]map[string]map[uint64]string),
+               StreamSeriesCounts:          make(map[string]map[string]int),
+               StreamSeriesProgress:        make(map[string]map[string]int),
+               CompletedStreamElementIndex: 
make(map[string]map[string]map[uint64]bool),
+               StreamElementIndexErrors:    
make(map[string]map[string]map[uint64]string),
+               StreamElementIndexCounts:    make(map[string]map[string]int),
+               StreamElementIndexProgress:  make(map[string]map[string]int),
        }
 }
 
@@ -498,3 +507,136 @@ func (p *Progress) ClearStreamSeriesErrors(group, stream 
string) {
                delete(errors, stream)
        }
 }
+
+// MarkStreamElementIndexCompleted marks a specific element index file of a 
stream as completed.
+func (p *Progress) MarkStreamElementIndexCompleted(group, stream string, 
indexFileID uint64) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.CompletedStreamElementIndex[group] == nil {
+               p.CompletedStreamElementIndex[group] = 
make(map[string]map[uint64]bool)
+       }
+       if p.CompletedStreamElementIndex[group][stream] == nil {
+               p.CompletedStreamElementIndex[group][stream] = 
make(map[uint64]bool)
+       }
+
+       // Mark element index file as completed
+       p.CompletedStreamElementIndex[group][stream][indexFileID] = true
+
+       // Update progress count
+       if p.StreamElementIndexProgress[group] == nil {
+               p.StreamElementIndexProgress[group] = make(map[string]int)
+       }
+       p.StreamElementIndexProgress[group][stream]++
+}
+
+// IsStreamElementIndexCompleted checks if a specific element index file of a 
stream has been completed.
+func (p *Progress) IsStreamElementIndexCompleted(group, stream string, 
indexFileID uint64) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if streams, ok := p.CompletedStreamElementIndex[group]; ok {
+               if indexFiles, ok := streams[stream]; ok {
+                       return indexFiles[indexFileID]
+               }
+       }
+       return false
+}
+
+// MarkStreamElementIndexError records an error for a specific element index 
file of a stream.
+func (p *Progress) MarkStreamElementIndexError(group, stream string, 
indexFileID uint64, errorMsg string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.StreamElementIndexErrors[group] == nil {
+               p.StreamElementIndexErrors[group] = 
make(map[string]map[uint64]string)
+       }
+       if p.StreamElementIndexErrors[group][stream] == nil {
+               p.StreamElementIndexErrors[group][stream] = 
make(map[uint64]string)
+       }
+
+       // Record the error
+       p.StreamElementIndexErrors[group][stream][indexFileID] = errorMsg
+}
+
+// SetStreamElementIndexCount sets the total number of element index files for 
a stream.
+func (p *Progress) SetStreamElementIndexCount(group, stream string, 
totalIndexFiles int) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.StreamElementIndexCounts[group] == nil {
+               p.StreamElementIndexCounts[group] = make(map[string]int)
+       }
+       p.StreamElementIndexCounts[group][stream] = totalIndexFiles
+
+       // Initialize progress tracking
+       if p.StreamElementIndexProgress[group] == nil {
+               p.StreamElementIndexProgress[group] = make(map[string]int)
+       }
+       if p.StreamElementIndexProgress[group][stream] == 0 {
+               p.StreamElementIndexProgress[group][stream] = 0
+       }
+}
+
+// GetStreamElementIndexCount returns the total number of element index files 
for a stream.
+func (p *Progress) GetStreamElementIndexCount(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if counts, ok := p.StreamElementIndexCounts[group]; ok {
+               return counts[stream]
+       }
+       return 0
+}
+
+// GetStreamElementIndexProgress returns the number of completed element index 
files for a stream.
+func (p *Progress) GetStreamElementIndexProgress(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if progress, ok := p.StreamElementIndexProgress[group]; ok {
+               return progress[stream]
+       }
+       return 0
+}
+
+// IsStreamElementIndexFullyCompleted checks if all element index files of a 
stream have been completed.
+func (p *Progress) IsStreamElementIndexFullyCompleted(group, stream string) 
bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       totalIndexFiles := p.StreamElementIndexCounts[group][stream]
+       completedIndexFiles := p.StreamElementIndexProgress[group][stream]
+
+       return totalIndexFiles > 0 && completedIndexFiles >= totalIndexFiles
+}
+
+// GetStreamElementIndexErrors returns all errors for a specific stream 
element index.
+func (p *Progress) GetStreamElementIndexErrors(group, stream string) 
map[uint64]string {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if errors, ok := p.StreamElementIndexErrors[group]; ok {
+               if streamErrors, ok := errors[stream]; ok {
+                       // Return a copy to avoid concurrent access issues
+                       result := make(map[uint64]string)
+                       for k, v := range streamErrors {
+                               result[k] = v
+                       }
+                       return result
+               }
+       }
+       return nil
+}
+
+// ClearStreamElementIndexErrors clears all errors for a specific stream 
element index.
+func (p *Progress) ClearStreamElementIndexErrors(group, stream string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if errors, ok := p.StreamElementIndexErrors[group]; ok {
+               delete(errors, stream)
+       }
+}

Reply via email to