This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit e20e2d47b80aa262849a556d5359a11e133f29ea
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 30 20:54:21 2025 +0800

    Enhance VisitSeries method to include shard IDs
    
    - Updated the `VisitSeries` method in multiple visitor implementations to 
accept an additional parameter for shard IDs, improving the handling of series 
segment migrations.
    - Refactored related methods to collect and pass shard IDs during series 
visits, ensuring better integration with the migration process.
---
 .../backup/lifecycle/file_migration_integration.go |  2 +-
 banyand/backup/lifecycle/file_migration_visitor.go | 24 ++++++++++++----------
 banyand/internal/storage/visitor.go                | 24 ++++++++++++++++++++--
 banyand/internal/storage/visitor_test.go           |  2 +-
 banyand/stream/visitor.go                          |  6 +++---
 banyand/stream/visitor_test.go                     |  2 +-
 6 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/banyand/backup/lifecycle/file_migration_integration.go 
b/banyand/backup/lifecycle/file_migration_integration.go
index 35c00482..afef5cc9 100644
--- a/banyand/backup/lifecycle/file_migration_integration.go
+++ b/banyand/backup/lifecycle/file_migration_integration.go
@@ -110,7 +110,7 @@ type partCountVisitor struct {
 }
 
 // VisitSeries implements stream.Visitor.
-func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string) 
error {
+func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ 
[]common.ShardID) error {
        return nil
 }
 
diff --git a/banyand/backup/lifecycle/file_migration_visitor.go 
b/banyand/backup/lifecycle/file_migration_visitor.go
index 7a52ca5b..0f69561d 100644
--- a/banyand/backup/lifecycle/file_migration_visitor.go
+++ b/banyand/backup/lifecycle/file_migration_visitor.go
@@ -97,7 +97,7 @@ func NewMigrationVisitor(group *commonv1.Group, nodeLabels 
map[string]string,
 }
 
 // VisitSeries implements stream.Visitor.
-func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string) error {
+func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string, shardIDs []common.ShardID) error {
        mv.logger.Info().
                Str("path", seriesIndexPath).
                Int64("min_timestamp", segmentTR.Start.UnixNano()).
@@ -187,16 +187,18 @@ func (mv *MigrationVisitor) VisitSeries(segmentTR 
*timestamp.TimeRange, seriesIn
                        },
                }
 
-               // Calculate target shard ID (using a simple approach for 
series index)
-               targetShardID := uint32(segmentID) % mv.targetShardNum
-
-               // Stream segment to target shard replicas
-               if err := mv.streamSegmentToTargetShard(targetShardID, files, 
segmentTR, segmentID, segmentFileName); err != nil {
-                       errorMsg := fmt.Sprintf("failed to stream segment to 
target shard: %v", err)
-                       mv.progress.MarkStreamSeriesError(mv.group, 
mv.streamName, segmentID, errorMsg)
-                       // Close the file reader
-                       segmentFile.Close()
-                       return fmt.Errorf("failed to stream segment to target 
shard: %w", err)
+               // Send segment file to each shard in shardIDs
+               for _, shardID := range shardIDs {
+                       targetShardID := 
mv.calculateTargetShardID(uint32(shardID))
+
+                       // Stream segment to target shard replicas
+                       if err := mv.streamSegmentToTargetShard(targetShardID, 
files, segmentTR, segmentID, segmentFileName); err != nil {
+                               errorMsg := fmt.Sprintf("failed to stream 
segment to target shard %d: %v", targetShardID, err)
+                               mv.progress.MarkStreamSeriesError(mv.group, 
mv.streamName, segmentID, errorMsg)
+                               // Close the file reader
+                               segmentFile.Close()
+                               return fmt.Errorf("failed to stream segment to 
target shard %d: %w", targetShardID, err)
+                       }
                }
 
                // Close the file reader
diff --git a/banyand/internal/storage/visitor.go 
b/banyand/internal/storage/visitor.go
index bd82036a..96a3615f 100644
--- a/banyand/internal/storage/visitor.go
+++ b/banyand/internal/storage/visitor.go
@@ -31,7 +31,7 @@ import (
 // SegmentVisitor defines the interface for visiting segment components.
 type SegmentVisitor interface {
        // VisitSeries visits the series index directory for a segment.
-       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) 
error
+       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, 
shardIDs []common.ShardID) error
        // VisitShard visits a shard directory within a segment.
        VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, 
shardPath string) error
 }
@@ -71,9 +71,15 @@ func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange
 
        // Visit each matching segment
        for _, segInfo := range segmentPaths {
+               // Collect shard IDs for this segment
+               shardIDs, err := collectSegmentShardIDs(segInfo.path)
+               if err != nil {
+                       return errors.Wrapf(err, "failed to collect shard IDs 
for segment %s", segInfo.suffix)
+               }
+
                // Visit series index directory
                seriesIndexPath := filepath.Join(segInfo.path, 
seriesIndexDirName)
-               if err := visitor.VisitSeries(&segInfo.timeRange, 
seriesIndexPath); err != nil {
+               if err := visitor.VisitSeries(&segInfo.timeRange, 
seriesIndexPath, shardIDs); err != nil {
                        return errors.Wrapf(err, "failed to visit series index 
for segment %s", segInfo.suffix)
                }
 
@@ -93,6 +99,20 @@ type segmentInfo struct {
        timeRange timestamp.TimeRange
 }
 
+// collectSegmentShardIDs collects all shard IDs within a segment.
+func collectSegmentShardIDs(segmentPath string) ([]common.ShardID, error) {
+       var shardIDs []common.ShardID
+       err := walkDir(segmentPath, shardPathPrefix, func(suffix string) error {
+               shardID, err := strconv.Atoi(suffix)
+               if err != nil {
+                       return errors.Wrapf(err, "invalid shard suffix: %s", 
suffix)
+               }
+               shardIDs = append(shardIDs, common.ShardID(shardID))
+               return nil
+       })
+       return shardIDs, err
+}
+
 // visitSegmentShards traverses shard directories within a segment.
 func visitSegmentShards(segmentPath string, segmentTR *timestamp.TimeRange, 
visitor SegmentVisitor) error {
        return walkDir(segmentPath, shardPathPrefix, func(suffix string) error {
diff --git a/banyand/internal/storage/visitor_test.go 
b/banyand/internal/storage/visitor_test.go
index 149bdad2..8be90c6a 100644
--- a/banyand/internal/storage/visitor_test.go
+++ b/banyand/internal/storage/visitor_test.go
@@ -58,7 +58,7 @@ func NewTestVisitor() *TestVisitor {
 }
 
 // VisitSeries implements SegmentVisitor.VisitSeries.
-func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string) error {
+func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string, _ []common.ShardID) error {
        if err, exists := v.seriesErrors[seriesIndexPath]; exists {
                return err
        }
diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go
index 586742c8..4959e604 100644
--- a/banyand/stream/visitor.go
+++ b/banyand/stream/visitor.go
@@ -30,7 +30,7 @@ import (
 // Visitor defines the interface for visiting stream components.
 type Visitor interface {
        // VisitSeries visits the series index directory for a segment.
-       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) 
error
+       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, 
shardIDs []common.ShardID) error
        // VisitPart visits a part directory within a shard.
        VisitPart(segmentTR *timestamp.TimeRange, shardID common.ShardID, 
partPath string) error
        // VisitElementIndex visits the element index directory within a shard.
@@ -43,8 +43,8 @@ type streamSegmentVisitor struct {
 }
 
 // VisitSeries implements storage.SegmentVisitor.
-func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string) error {
-       return sv.visitor.VisitSeries(segmentTR, seriesIndexPath)
+func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string, shardIDs []common.ShardID) error {
+       return sv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs)
 }
 
 // VisitShard implements storage.SegmentVisitor.
diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go
index a1c41d78..3d489088 100644
--- a/banyand/stream/visitor_test.go
+++ b/banyand/stream/visitor_test.go
@@ -45,7 +45,7 @@ type TestVisitor struct {
 }
 
 // VisitSeries records the visited series path.
-func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string) error {
+func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string, _ []common.ShardID) error {
        tv.visitedSeries = append(tv.visitedSeries, seriesIndexPath)
        return nil
 }

Reply via email to