This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit e20e2d47b80aa262849a556d5359a11e133f29ea Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jul 30 20:54:21 2025 +0800 Enhance VisitSeries method to include shard IDs - Updated the `VisitSeries` method in multiple visitor implementations to accept an additional parameter for shard IDs, improving the handling of series segment migrations. - Refactored related methods to collect and pass shard IDs during series visits, ensuring better integration with the migration process. --- .../backup/lifecycle/file_migration_integration.go | 2 +- banyand/backup/lifecycle/file_migration_visitor.go | 24 ++++++++++++---------- banyand/internal/storage/visitor.go | 24 ++++++++++++++++++++-- banyand/internal/storage/visitor_test.go | 2 +- banyand/stream/visitor.go | 6 +++--- banyand/stream/visitor_test.go | 2 +- 6 files changed, 41 insertions(+), 19 deletions(-) diff --git a/banyand/backup/lifecycle/file_migration_integration.go b/banyand/backup/lifecycle/file_migration_integration.go index 35c00482..afef5cc9 100644 --- a/banyand/backup/lifecycle/file_migration_integration.go +++ b/banyand/backup/lifecycle/file_migration_integration.go @@ -110,7 +110,7 @@ type partCountVisitor struct { } // VisitSeries implements stream.Visitor. -func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string) error { +func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ []common.ShardID) error { return nil } diff --git a/banyand/backup/lifecycle/file_migration_visitor.go b/banyand/backup/lifecycle/file_migration_visitor.go index 7a52ca5b..0f69561d 100644 --- a/banyand/backup/lifecycle/file_migration_visitor.go +++ b/banyand/backup/lifecycle/file_migration_visitor.go @@ -97,7 +97,7 @@ func NewMigrationVisitor(group *commonv1.Group, nodeLabels map[string]string, } // VisitSeries implements stream.Visitor. -func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error { +func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error { mv.logger.Info(). Str("path", seriesIndexPath). Int64("min_timestamp", segmentTR.Start.UnixNano()). @@ -187,16 +187,18 @@ func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIn }, } - // Calculate target shard ID (using a simple approach for series index) - targetShardID := uint32(segmentID) % mv.targetShardNum - - // Stream segment to target shard replicas - if err := mv.streamSegmentToTargetShard(targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { - errorMsg := fmt.Sprintf("failed to stream segment to target shard: %v", err) - mv.progress.MarkStreamSeriesError(mv.group, mv.streamName, segmentID, errorMsg) - // Close the file reader - segmentFile.Close() - return fmt.Errorf("failed to stream segment to target shard: %w", err) + // Send segment file to each shard in shardIDs + for _, shardID := range shardIDs { + targetShardID := mv.calculateTargetShardID(uint32(shardID)) + + // Stream segment to target shard replicas + if err := mv.streamSegmentToTargetShard(targetShardID, files, segmentTR, segmentID, segmentFileName); err != nil { + errorMsg := fmt.Sprintf("failed to stream segment to target shard %d: %v", targetShardID, err) + mv.progress.MarkStreamSeriesError(mv.group, mv.streamName, segmentID, errorMsg) + // Close the file reader + segmentFile.Close() + return fmt.Errorf("failed to stream segment to target shard %d: %w", targetShardID, err) + } } // Close the file reader diff --git a/banyand/internal/storage/visitor.go b/banyand/internal/storage/visitor.go index bd82036a..96a3615f 100644 --- a/banyand/internal/storage/visitor.go +++ b/banyand/internal/storage/visitor.go @@ -31,7 +31,7 @@ import ( // SegmentVisitor defines the interface for visiting segment components. type SegmentVisitor interface { // VisitSeries visits the series index directory for a segment. - VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error + VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error // VisitShard visits a shard directory within a segment. VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error } @@ -71,9 +71,15 @@ func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange // Visit each matching segment for _, segInfo := range segmentPaths { + // Collect shard IDs for this segment + shardIDs, err := collectSegmentShardIDs(segInfo.path) + if err != nil { + return errors.Wrapf(err, "failed to collect shard IDs for segment %s", segInfo.suffix) + } + // Visit series index directory seriesIndexPath := filepath.Join(segInfo.path, seriesIndexDirName) - if err := visitor.VisitSeries(&segInfo.timeRange, seriesIndexPath); err != nil { + if err := visitor.VisitSeries(&segInfo.timeRange, seriesIndexPath, shardIDs); err != nil { return errors.Wrapf(err, "failed to visit series index for segment %s", segInfo.suffix) } @@ -93,6 +99,20 @@ type segmentInfo struct { timeRange timestamp.TimeRange } +// collectSegmentShardIDs collects all shard IDs within a segment. +func collectSegmentShardIDs(segmentPath string) ([]common.ShardID, error) { + var shardIDs []common.ShardID + err := walkDir(segmentPath, shardPathPrefix, func(suffix string) error { + shardID, err := strconv.Atoi(suffix) + if err != nil { + return errors.Wrapf(err, "invalid shard suffix: %s", suffix) + } + shardIDs = append(shardIDs, common.ShardID(shardID)) + return nil + }) + return shardIDs, err +} + // visitSegmentShards traverses shard directories within a segment. func visitSegmentShards(segmentPath string, segmentTR *timestamp.TimeRange, visitor SegmentVisitor) error { return walkDir(segmentPath, shardPathPrefix, func(suffix string) error { diff --git a/banyand/internal/storage/visitor_test.go b/banyand/internal/storage/visitor_test.go index 149bdad2..8be90c6a 100644 --- a/banyand/internal/storage/visitor_test.go +++ b/banyand/internal/storage/visitor_test.go @@ -58,7 +58,7 @@ func NewTestVisitor() *TestVisitor { } // VisitSeries implements SegmentVisitor.VisitSeries. -func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { +func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string, _ []common.ShardID) error { if err, exists := v.seriesErrors[seriesIndexPath]; exists { return err } diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go index 586742c8..4959e604 100644 --- a/banyand/stream/visitor.go +++ b/banyand/stream/visitor.go @@ -30,7 +30,7 @@ import ( // Visitor defines the interface for visiting stream components. type Visitor interface { // VisitSeries visits the series index directory for a segment. - VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error + VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error // VisitPart visits a part directory within a shard. VisitPart(segmentTR *timestamp.TimeRange, shardID common.ShardID, partPath string) error // VisitElementIndex visits the element index directory within a shard. @@ -43,8 +43,8 @@ type streamSegmentVisitor struct { } // VisitSeries implements storage.SegmentVisitor. -func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error { - return sv.visitor.VisitSeries(segmentTR, seriesIndexPath) +func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error { + return sv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs) } // VisitShard implements storage.SegmentVisitor. diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go index a1c41d78..3d489088 100644 --- a/banyand/stream/visitor_test.go +++ b/banyand/stream/visitor_test.go @@ -45,7 +45,7 @@ type TestVisitor struct { } // VisitSeries records the visited series path. -func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { +func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string, _ []common.ShardID) error { tv.visitedSeries = append(tv.visitedSeries, seriesIndexPath) return nil }