This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7d69446bf90674735a6e3ac85ac45759a085df8b Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jul 30 16:53:02 2025 +0800 Add series sync topic and enhance migration visitor functionality - Introduced `TopicStreamSeriesSync` and its associated versioning in `stream.go`. - Updated `MigrationVisitor` to handle series segment migration, including progress tracking and error management. - Enhanced `Progress` struct to support series-level tracking for stream migrations. --- api/data/data.go | 4 + api/data/stream.go | 9 + banyand/backup/lifecycle/file_migration_visitor.go | 233 ++++++++++++++++++++- banyand/backup/lifecycle/progress.go | 184 ++++++++++++++-- banyand/backup/lifecycle/progress_test.go | 35 ++++ 5 files changed, 440 insertions(+), 25 deletions(-) diff --git a/api/data/data.go b/api/data/data.go index dc3ce00b..8a6876b8 100644 --- a/api/data/data.go +++ b/api/data/data.go @@ -45,6 +45,7 @@ var ( TopicPropertyRepair.String(): TopicPropertyRepair, TopicStreamSeriesIndexWrite.String(): TopicStreamSeriesIndexWrite, TopicStreamLocalIndexWrite.String(): TopicStreamLocalIndexWrite, + TopicStreamSeriesSync.String(): TopicStreamSeriesSync, } // TopicRequestMap is the map of topic name to request message. @@ -95,6 +96,9 @@ var ( TopicStreamLocalIndexWrite: func() proto.Message { return nil }, + TopicStreamSeriesSync: func() proto.Message { + return nil + }, } // TopicResponseMap is the map of topic name to response message. diff --git a/api/data/stream.go b/api/data/stream.go index dd43c974..7067de9a 100644 --- a/api/data/stream.go +++ b/api/data/stream.go @@ -75,3 +75,12 @@ var StreamLocalIndexWriteKindVersion = common.KindVersion{ // TopicStreamLocalIndexWrite is the stream local index write topic. var TopicStreamLocalIndexWrite = bus.BiTopic(StreamLocalIndexWriteKindVersion.String()) + +// StreamSeriesSyncKindVersion is the version tag of series sync kind. +var StreamSeriesSyncKindVersion = common.KindVersion{ + Version: "v1", + Kind: "series-sync", +} + +// TopicStreamSeriesSync is the series sync topic. +var TopicStreamSeriesSync = bus.BiTopic(StreamSeriesSyncKindVersion.String()) diff --git a/banyand/backup/lifecycle/file_migration_visitor.go b/banyand/backup/lifecycle/file_migration_visitor.go index a4a9ae41..4d5620d7 100644 --- a/banyand/backup/lifecycle/file_migration_visitor.go +++ b/banyand/backup/lifecycle/file_migration_visitor.go @@ -97,11 +97,124 @@ func NewMigrationVisitor(group *commonv1.Group, nodeLabels map[string]string, } // VisitSeries implements stream.Visitor. -func (mv *MigrationVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { - // TODO: Implement series index migration if needed - mv.logger.Debug(). +func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error { + mv.logger.Info(). + Str("path", seriesIndexPath). + Int64("min_timestamp", segmentTR.Start.UnixNano()). + Int64("max_timestamp", segmentTR.End.UnixNano()). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("migrating series index") + + // Find all *.seg segment files in the seriesIndexPath + lfs := fs.NewLocalFileSystem() + entries := lfs.ReadDir(seriesIndexPath) + + var segmentFiles []string + for _, entry := range entries { + if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") { + segmentFiles = append(segmentFiles, entry.Name()) + } + } + + if len(segmentFiles) == 0 { + mv.logger.Debug(). + Str("path", seriesIndexPath). + Msg("no .seg files found in series index path") + return nil + } + + mv.logger.Info(). + Int("segment_count", len(segmentFiles)). Str("path", seriesIndexPath). - Msg("skipping series index migration (not implemented)") + Msg("found segment files for migration") + + // Set the total number of series segments for progress tracking + mv.SetStreamSeriesCount(len(segmentFiles)) + + // Process each segment file + for _, segmentFileName := range segmentFiles { + // Extract segment ID from filename (remove .seg extension) + segmentIDStr := strings.TrimSuffix(segmentFileName, ".seg") + + // Parse hex segment ID + segmentID, err := strconv.ParseUint(segmentIDStr, 16, 64) + if err != nil { + mv.logger.Error(). + Str("filename", segmentFileName). + Str("id_str", segmentIDStr). + Err(err). + Msg("failed to parse segment ID from filename") + continue + } + + // Check if this segment has already been completed + if mv.progress.IsStreamSeriesCompleted(mv.group, mv.streamName, segmentID) { + mv.logger.Debug(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("segment already completed, skipping") + continue + } + + mv.logger.Info(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("migrating segment file") + + // Create file reader for the segment file + segmentFilePath := filepath.Join(seriesIndexPath, segmentFileName) + segmentFile, err := lfs.OpenFile(segmentFilePath) + if err != nil { + errorMsg := fmt.Sprintf("failed to open segment file %s: %v", segmentFilePath, err) + mv.progress.MarkStreamSeriesError(mv.group, mv.streamName, segmentID, errorMsg) + mv.logger.Error(). + Str("path", segmentFilePath). + Err(err). + Msg("failed to open segment file") + return fmt.Errorf("failed to open segment file %s: %w", segmentFilePath, err) + } + + // Create StreamingPartData for this segment + files := []queue.FileInfo{ + { + Name: segmentFileName, + Reader: segmentFile.SequentialRead(), + }, + } + + // Calculate target shard ID (using a simple approach for series index) + targetShardID := uint32(segmentID) % mv.targetShardNum + + // Stream segment to target shard replicas + if err := mv.streamSegmentToTargetShard(targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { + errorMsg := fmt.Sprintf("failed to stream segment to target shard: %v", err) + mv.progress.MarkStreamSeriesError(mv.group, mv.streamName, segmentID, errorMsg) + // Close the file reader + segmentFile.Close() + return fmt.Errorf("failed to stream segment to target shard: %w", err) + } + + // Close the file reader + segmentFile.Close() + + // Mark segment as completed + mv.progress.MarkStreamSeriesCompleted(mv.group, mv.streamName, segmentID) + + mv.logger.Info(). + Uint64("segment_id", segmentID). + Str("filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("completed_segments", mv.progress.GetStreamSeriesProgress(mv.group, mv.streamName)). + Int("total_segments", mv.progress.GetStreamSeriesCount(mv.group, mv.streamName)). + Msg("segment migration completed successfully") + } + return nil } @@ -397,6 +510,106 @@ func (mv *MigrationVisitor) Close() error { return nil } +// streamSegmentToTargetShard sends segment data to all replicas of the target shard. +func (mv *MigrationVisitor) streamSegmentToTargetShard( + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, + segmentFileName string, +) error { + copies := mv.replicas + 1 + + // Send to all replicas using the exact pattern from steps.go:219-236 + for replicaID := uint32(0); replicaID < copies; replicaID++ { + // Use selector.Pick exactly like steps.go:220 + nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + if err != nil { + return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) + } + + // Stream segment data to target node using chunked sync + if err := mv.streamSegmentToNode(nodeID, targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { + return fmt.Errorf("failed to stream segment to node %s: %w", nodeID, err) + } + } + + return nil +} + +// streamSegmentToNode streams segment data to a specific target node. +func (mv *MigrationVisitor) streamSegmentToNode( + nodeID string, + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, + segmentFileName string, +) error { + // Get or create chunked client for this node (cache hit optimization) + chunkedClient, exists := mv.chunkedClients[nodeID] + if !exists { + var err error + // Create new chunked sync client via queue.Client + chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, uint32(mv.chunkSize)) + if err != nil { + return fmt.Errorf("failed to create chunked sync client for node %s: %w", nodeID, err) + } + mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse + } + + // Create streaming part data from the segment files + streamingParts := mv.createStreamingSegmentFromFiles(targetShardID, files, segmentTR, segmentID) + + // Stream using chunked transfer (same as syncer.go:202) + ctx := context.Background() + result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts) + if err != nil { + return fmt.Errorf("failed to sync streaming segments to node %s: %w", nodeID, err) + } + + if !result.Success { + return fmt.Errorf("chunked sync partially failed: %v", result.ErrorMessage) + } + + // Log success metrics (same pattern as syncer.go:210-217) + mv.logger.Info(). + Str("node", nodeID). + Str("session", result.SessionID). + Uint64("bytes", result.TotalBytes). + Int64("duration_ms", result.DurationMs). + Uint32("chunks", result.ChunksCount). + Uint32("parts", result.PartsCount). + Uint32("target_shard", targetShardID). + Uint64("segment_id", segmentID). + Str("segment_filename", segmentFileName). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("file-based migration segment completed successfully") + + return nil +} + +// createStreamingSegmentFromFiles creates StreamingPartData from segment files. +func (mv *MigrationVisitor) createStreamingSegmentFromFiles( + targetShardID uint32, + files []queue.FileInfo, + segmentTR *timestamp.TimeRange, + segmentID uint64, +) []queue.StreamingPartData { + segmentData := queue.StreamingPartData{ + ID: segmentID, + Group: mv.group, + ShardID: targetShardID, // Use calculated target shard + Topic: data.TopicStreamSeriesSync.String(), // Use the new topic + Files: files, + MinTimestamp: segmentTR.Start.UnixNano(), + MaxTimestamp: segmentTR.End.UnixNano(), + } + + return []queue.StreamingPartData{segmentData} +} + // SetStreamPartCount sets the total number of parts for the current stream. func (mv *MigrationVisitor) SetStreamPartCount(totalParts int) { if mv.progress != nil { @@ -408,3 +621,15 @@ func (mv *MigrationVisitor) SetStreamPartCount(totalParts int) { Msg("set stream part count for progress tracking") } } + +// SetStreamSeriesCount sets the total number of series segments for the current stream. +func (mv *MigrationVisitor) SetStreamSeriesCount(totalSegments int) { + if mv.progress != nil { + mv.progress.SetStreamSeriesCount(mv.group, mv.streamName, totalSegments) + mv.logger.Info(). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("total_segments", totalSegments). + Msg("set stream series count for progress tracking") + } +} diff --git a/banyand/backup/lifecycle/progress.go b/banyand/backup/lifecycle/progress.go index 32d4c907..d8c38061 100644 --- a/banyand/backup/lifecycle/progress.go +++ b/banyand/backup/lifecycle/progress.go @@ -39,9 +39,14 @@ type Progress struct { StreamPartErrors map[string]map[string]map[uint64]string `json:"stream_part_errors"` // group -> stream -> partID -> error StreamPartCounts map[string]map[string]int `json:"stream_part_counts"` // group -> stream -> total parts StreamPartProgress map[string]map[string]int `json:"stream_part_progress"` // group -> stream -> completed parts count - SnapshotStreamDir string `json:"snapshot_stream_dir"` - SnapshotMeasureDir string `json:"snapshot_measure_dir"` - mu sync.Mutex `json:"-"` + // Series-level tracking for stream migration + CompletedStreamSeries map[string]map[string]map[uint64]bool `json:"completed_stream_series"` // group -> stream -> segmentID -> completed + StreamSeriesErrors map[string]map[string]map[uint64]string `json:"stream_series_errors"` // group -> stream -> segmentID -> error + StreamSeriesCounts map[string]map[string]int `json:"stream_series_counts"` // group -> stream -> total series segments + StreamSeriesProgress map[string]map[string]int `json:"stream_series_progress"` // group -> stream -> completed series segments count + SnapshotStreamDir string `json:"snapshot_stream_dir"` + SnapshotMeasureDir string `json:"snapshot_measure_dir"` + mu sync.Mutex `json:"-"` } // AllGroupsFullyCompleted checks if all groups are fully completed. @@ -60,16 +65,20 @@ func (p *Progress) AllGroupsFullyCompleted(groups []*commonv1.Group) bool { // NewProgress creates a new Progress tracker. func NewProgress() *Progress { return &Progress{ - CompletedGroups: make(map[string]bool), - CompletedMeasures: make(map[string]map[string]bool), - DeletedStreamGroups: make(map[string]bool), - DeletedMeasureGroups: make(map[string]bool), - MeasureErrors: make(map[string]map[string]string), - MeasureCounts: make(map[string]map[string]int), - CompletedStreamParts: make(map[string]map[string]map[uint64]bool), - StreamPartErrors: make(map[string]map[string]map[uint64]string), - StreamPartCounts: make(map[string]map[string]int), - StreamPartProgress: make(map[string]map[string]int), + CompletedGroups: make(map[string]bool), + CompletedMeasures: make(map[string]map[string]bool), + DeletedStreamGroups: make(map[string]bool), + DeletedMeasureGroups: make(map[string]bool), + MeasureErrors: make(map[string]map[string]string), + MeasureCounts: make(map[string]map[string]int), + CompletedStreamParts: make(map[string]map[string]map[uint64]bool), + StreamPartErrors: make(map[string]map[string]map[uint64]string), + StreamPartCounts: make(map[string]map[string]int), + StreamPartProgress: make(map[string]map[string]int), + CompletedStreamSeries: make(map[string]map[string]map[uint64]bool), + StreamSeriesErrors: make(map[string]map[string]map[uint64]string), + StreamSeriesCounts: make(map[string]map[string]int), + StreamSeriesProgress: make(map[string]map[string]int), } } @@ -334,12 +343,12 @@ func (p *Progress) GetStreamPartErrors(group, stream string) map[uint64]string { p.mu.Lock() defer p.mu.Unlock() - if streams, ok := p.StreamPartErrors[group]; ok { - if parts, ok := streams[stream]; ok { - // Return a copy to avoid race conditions + if errors, ok := p.StreamPartErrors[group]; ok { + if streamErrors, ok := errors[stream]; ok { + // Return a copy to avoid concurrent access issues result := make(map[uint64]string) - for partID, errorMsg := range parts { - result[partID] = errorMsg + for k, v := range streamErrors { + result[k] = v } return result } @@ -347,12 +356,145 @@ func (p *Progress) GetStreamPartErrors(group, stream string) map[uint64]string { return nil } -// ClearStreamPartErrors clears all part errors for a specific stream. +// ClearStreamPartErrors clears all errors for a specific stream. func (p *Progress) ClearStreamPartErrors(group, stream string) { p.mu.Lock() defer p.mu.Unlock() - if streams, ok := p.StreamPartErrors[group]; ok { - delete(streams, stream) + if errors, ok := p.StreamPartErrors[group]; ok { + delete(errors, stream) + } +} + +// MarkStreamSeriesCompleted marks a specific series segment of a stream as completed. +func (p *Progress) MarkStreamSeriesCompleted(group, stream string, segmentID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.CompletedStreamSeries[group] == nil { + p.CompletedStreamSeries[group] = make(map[string]map[uint64]bool) + } + if p.CompletedStreamSeries[group][stream] == nil { + p.CompletedStreamSeries[group][stream] = make(map[uint64]bool) + } + + // Mark series segment as completed + p.CompletedStreamSeries[group][stream][segmentID] = true + + // Update progress count + if p.StreamSeriesProgress[group] == nil { + p.StreamSeriesProgress[group] = make(map[string]int) + } + p.StreamSeriesProgress[group][stream]++ +} + +// IsStreamSeriesCompleted checks if a specific series segment of a stream has been completed. +func (p *Progress) IsStreamSeriesCompleted(group, stream string, segmentID uint64) bool { + p.mu.Lock() + defer p.mu.Unlock() + + if streams, ok := p.CompletedStreamSeries[group]; ok { + if segments, ok := streams[stream]; ok { + return segments[segmentID] + } + } + return false +} + +// MarkStreamSeriesError records an error for a specific series segment of a stream. +func (p *Progress) MarkStreamSeriesError(group, stream string, segmentID uint64, errorMsg string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.StreamSeriesErrors[group] == nil { + p.StreamSeriesErrors[group] = make(map[string]map[uint64]string) + } + if p.StreamSeriesErrors[group][stream] == nil { + p.StreamSeriesErrors[group][stream] = make(map[uint64]string) + } + + // Record the error + p.StreamSeriesErrors[group][stream][segmentID] = errorMsg +} + +// SetStreamSeriesCount sets the total number of series segments for a stream. +func (p *Progress) SetStreamSeriesCount(group, stream string, totalSegments int) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.StreamSeriesCounts[group] == nil { + p.StreamSeriesCounts[group] = make(map[string]int) + } + p.StreamSeriesCounts[group][stream] = totalSegments + + // Initialize progress tracking + if p.StreamSeriesProgress[group] == nil { + p.StreamSeriesProgress[group] = make(map[string]int) + } + if p.StreamSeriesProgress[group][stream] == 0 { + p.StreamSeriesProgress[group][stream] = 0 + } +} + +// GetStreamSeriesCount returns the total number of series segments for a stream. +func (p *Progress) GetStreamSeriesCount(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if counts, ok := p.StreamSeriesCounts[group]; ok { + return counts[stream] + } + return 0 +} + +// GetStreamSeriesProgress returns the number of completed series segments for a stream. +func (p *Progress) GetStreamSeriesProgress(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if progress, ok := p.StreamSeriesProgress[group]; ok { + return progress[stream] + } + return 0 +} + +// IsStreamSeriesFullyCompleted checks if all series segments of a stream have been completed. +func (p *Progress) IsStreamSeriesFullyCompleted(group, stream string) bool { + p.mu.Lock() + defer p.mu.Unlock() + + totalSegments := p.StreamSeriesCounts[group][stream] + completedSegments := p.StreamSeriesProgress[group][stream] + + return totalSegments > 0 && completedSegments >= totalSegments +} + +// GetStreamSeriesErrors returns all errors for a specific stream series. +func (p *Progress) GetStreamSeriesErrors(group, stream string) map[uint64]string { + p.mu.Lock() + defer p.mu.Unlock() + + if errors, ok := p.StreamSeriesErrors[group]; ok { + if streamErrors, ok := errors[stream]; ok { + // Return a copy to avoid concurrent access issues + result := make(map[uint64]string) + for k, v := range streamErrors { + result[k] = v + } + return result + } + } + return nil +} + +// ClearStreamSeriesErrors clears all errors for a specific stream series. +func (p *Progress) ClearStreamSeriesErrors(group, stream string) { + p.mu.Lock() + defer p.mu.Unlock() + + if errors, ok := p.StreamSeriesErrors[group]; ok { + delete(errors, stream) } } diff --git a/banyand/backup/lifecycle/progress_test.go b/banyand/backup/lifecycle/progress_test.go index 09f67151..d29dd798 100644 --- a/banyand/backup/lifecycle/progress_test.go +++ b/banyand/backup/lifecycle/progress_test.go @@ -97,6 +97,7 @@ func TestProgress(t *testing.T) { t.Run("RemoveProgressFile", func(t *testing.T) { progress := NewProgress() + progress.MarkGroupCompleted("group1") progress.Save(progressPath, l) _, err := os.Stat(progressPath) @@ -107,4 +108,38 @@ func TestProgress(t *testing.T) { _, err = os.Stat(progressPath) assert.True(t, os.IsNotExist(err)) }) + + t.Run("StreamSeriesProgress", func(t *testing.T) { + progress := NewProgress() + + // Test series progress tracking + progress.SetStreamSeriesCount("group1", "stream1", 5) + assert.Equal(t, 5, progress.GetStreamSeriesCount("group1", "stream1")) + assert.Equal(t, 0, progress.GetStreamSeriesProgress("group1", "stream1")) + + // Mark some series segments as completed + progress.MarkStreamSeriesCompleted("group1", "stream1", 1) + progress.MarkStreamSeriesCompleted("group1", "stream1", 2) + assert.Equal(t, 2, progress.GetStreamSeriesProgress("group1", "stream1")) + assert.True(t, progress.IsStreamSeriesCompleted("group1", "stream1", 1)) + assert.True(t, progress.IsStreamSeriesCompleted("group1", "stream1", 2)) + assert.False(t, progress.IsStreamSeriesCompleted("group1", "stream1", 3)) + + // Test error tracking + progress.MarkStreamSeriesError("group1", "stream1", 3, "test error") + errors := progress.GetStreamSeriesErrors("group1", "stream1") + assert.Equal(t, "test error", errors[3]) + + // Test completion check + assert.False(t, progress.IsStreamSeriesFullyCompleted("group1", "stream1")) + progress.MarkStreamSeriesCompleted("group1", "stream1", 3) + progress.MarkStreamSeriesCompleted("group1", "stream1", 4) + progress.MarkStreamSeriesCompleted("group1", "stream1", 5) + assert.True(t, progress.IsStreamSeriesFullyCompleted("group1", "stream1")) + + // Test error clearing + progress.ClearStreamSeriesErrors("group1", "stream1") + errors = progress.GetStreamSeriesErrors("group1", "stream1") + assert.Nil(t, errors) + }) }