This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 7d69446bf90674735a6e3ac85ac45759a085df8b
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 30 16:53:02 2025 +0800

    Add series sync topic and enhance migration visitor functionality
    
    - Introduced `TopicStreamSeriesSync` and its associated versioning in 
`stream.go`.
    - Updated `MigrationVisitor` to handle series segment migration, including 
progress tracking and error management.
    - Enhanced `Progress` struct to support series-level tracking for stream 
migrations.
---
 api/data/data.go                                   |   4 +
 api/data/stream.go                                 |   9 +
 banyand/backup/lifecycle/file_migration_visitor.go | 233 ++++++++++++++++++++-
 banyand/backup/lifecycle/progress.go               | 184 ++++++++++++++--
 banyand/backup/lifecycle/progress_test.go          |  35 ++++
 5 files changed, 440 insertions(+), 25 deletions(-)

diff --git a/api/data/data.go b/api/data/data.go
index dc3ce00b..8a6876b8 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -45,6 +45,7 @@ var (
                TopicPropertyRepair.String():           TopicPropertyRepair,
                TopicStreamSeriesIndexWrite.String():   
TopicStreamSeriesIndexWrite,
                TopicStreamLocalIndexWrite.String():    
TopicStreamLocalIndexWrite,
+               TopicStreamSeriesSync.String():         TopicStreamSeriesSync,
        }
 
        // TopicRequestMap is the map of topic name to request message.
@@ -95,6 +96,9 @@ var (
                TopicStreamLocalIndexWrite: func() proto.Message {
                        return nil
                },
+               TopicStreamSeriesSync: func() proto.Message {
+                       return nil
+               },
        }
 
        // TopicResponseMap is the map of topic name to response message.
diff --git a/api/data/stream.go b/api/data/stream.go
index dd43c974..7067de9a 100644
--- a/api/data/stream.go
+++ b/api/data/stream.go
@@ -75,3 +75,12 @@ var StreamLocalIndexWriteKindVersion = common.KindVersion{
 
 // TopicStreamLocalIndexWrite is the stream local index write topic.
 var TopicStreamLocalIndexWrite = 
bus.BiTopic(StreamLocalIndexWriteKindVersion.String())
+
+// StreamSeriesSyncKindVersion is the version tag of series sync kind.
+var StreamSeriesSyncKindVersion = common.KindVersion{
+       Version: "v1",
+       Kind:    "series-sync",
+}
+
+// TopicStreamSeriesSync is the series sync topic.
+var TopicStreamSeriesSync = bus.BiTopic(StreamSeriesSyncKindVersion.String())
diff --git a/banyand/backup/lifecycle/file_migration_visitor.go 
b/banyand/backup/lifecycle/file_migration_visitor.go
index a4a9ae41..4d5620d7 100644
--- a/banyand/backup/lifecycle/file_migration_visitor.go
+++ b/banyand/backup/lifecycle/file_migration_visitor.go
@@ -97,11 +97,124 @@ func NewMigrationVisitor(group *commonv1.Group, nodeLabels 
map[string]string,
 }
 
 // VisitSeries implements stream.Visitor.
-func (mv *MigrationVisitor) VisitSeries(_ *timestamp.TimeRange, 
seriesIndexPath string) error {
-       // TODO: Implement series index migration if needed
-       mv.logger.Debug().
+func (mv *MigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string) error {
+       mv.logger.Info().
+               Str("path", seriesIndexPath).
+               Int64("min_timestamp", segmentTR.Start.UnixNano()).
+               Int64("max_timestamp", segmentTR.End.UnixNano()).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("migrating series index")
+
+       // Find all *.seg segment files in the seriesIndexPath
+       lfs := fs.NewLocalFileSystem()
+       entries := lfs.ReadDir(seriesIndexPath)
+
+       var segmentFiles []string
+       for _, entry := range entries {
+               if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") {
+                       segmentFiles = append(segmentFiles, entry.Name())
+               }
+       }
+
+       if len(segmentFiles) == 0 {
+               mv.logger.Debug().
+                       Str("path", seriesIndexPath).
+                       Msg("no .seg files found in series index path")
+               return nil
+       }
+
+       mv.logger.Info().
+               Int("segment_count", len(segmentFiles)).
                Str("path", seriesIndexPath).
-               Msg("skipping series index migration (not implemented)")
+               Msg("found segment files for migration")
+
+       // Set the total number of series segments for progress tracking
+       mv.SetStreamSeriesCount(len(segmentFiles))
+
+       // Process each segment file
+       for _, segmentFileName := range segmentFiles {
+               // Extract segment ID from filename (remove .seg extension)
+               segmentIDStr := strings.TrimSuffix(segmentFileName, ".seg")
+
+               // Parse hex segment ID
+               segmentID, err := strconv.ParseUint(segmentIDStr, 16, 64)
+               if err != nil {
+                       mv.logger.Error().
+                               Str("filename", segmentFileName).
+                               Str("id_str", segmentIDStr).
+                               Err(err).
+                               Msg("failed to parse segment ID from filename")
+                       continue
+               }
+
+               // Check if this segment has already been completed
+               if mv.progress.IsStreamSeriesCompleted(mv.group, mv.streamName, 
segmentID) {
+                       mv.logger.Debug().
+                               Uint64("segment_id", segmentID).
+                               Str("filename", segmentFileName).
+                               Str("stream", mv.streamName).
+                               Str("group", mv.group).
+                               Msg("segment already completed, skipping")
+                       continue
+               }
+
+               mv.logger.Info().
+                       Uint64("segment_id", segmentID).
+                       Str("filename", segmentFileName).
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Msg("migrating segment file")
+
+               // Create file reader for the segment file
+               segmentFilePath := filepath.Join(seriesIndexPath, 
segmentFileName)
+               segmentFile, err := lfs.OpenFile(segmentFilePath)
+               if err != nil {
+                       errorMsg := fmt.Sprintf("failed to open segment file 
%s: %v", segmentFilePath, err)
+                       mv.progress.MarkStreamSeriesError(mv.group, 
mv.streamName, segmentID, errorMsg)
+                       mv.logger.Error().
+                               Str("path", segmentFilePath).
+                               Err(err).
+                               Msg("failed to open segment file")
+                       return fmt.Errorf("failed to open segment file %s: %w", 
segmentFilePath, err)
+               }
+
+               // Create StreamingPartData for this segment
+               files := []queue.FileInfo{
+                       {
+                               Name:   segmentFileName,
+                               Reader: segmentFile.SequentialRead(),
+                       },
+               }
+
+               // Calculate target shard ID (using a simple approach for 
series index)
+               targetShardID := uint32(segmentID) % mv.targetShardNum
+
+               // Stream segment to target shard replicas
+               if err := mv.streamSegmentToTargetShard(targetShardID, files, 
segmentTR, segmentID, segmentFileName); err != nil {
+                       errorMsg := fmt.Sprintf("failed to stream segment to 
target shard: %v", err)
+                       mv.progress.MarkStreamSeriesError(mv.group, 
mv.streamName, segmentID, errorMsg)
+                       // Close the file reader
+                       segmentFile.Close()
+                       return fmt.Errorf("failed to stream segment to target 
shard: %w", err)
+               }
+
+               // Close the file reader
+               segmentFile.Close()
+
+               // Mark segment as completed
+               mv.progress.MarkStreamSeriesCompleted(mv.group, mv.streamName, 
segmentID)
+
+               mv.logger.Info().
+                       Uint64("segment_id", segmentID).
+                       Str("filename", segmentFileName).
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Int("completed_segments", 
mv.progress.GetStreamSeriesProgress(mv.group, mv.streamName)).
+                       Int("total_segments", 
mv.progress.GetStreamSeriesCount(mv.group, mv.streamName)).
+                       Msg("segment migration completed successfully")
+       }
+
        return nil
 }
 
@@ -397,6 +510,106 @@ func (mv *MigrationVisitor) Close() error {
        return nil
 }
 
+// streamSegmentToTargetShard sends segment data to all replicas of the target 
shard.
+func (mv *MigrationVisitor) streamSegmentToTargetShard(
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+       segmentFileName string,
+) error {
+       copies := mv.replicas + 1
+
+       // Send to all replicas using the exact pattern from steps.go:219-236
+       for replicaID := uint32(0); replicaID < copies; replicaID++ {
+               // Use selector.Pick exactly like steps.go:220
+               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               if err != nil {
+                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
+               }
+
+               // Stream segment data to target node using chunked sync
+               if err := mv.streamSegmentToNode(nodeID, targetShardID, files, 
segmentTR, segmentID, segmentFileName); err != nil {
+                       return fmt.Errorf("failed to stream segment to node %s: 
%w", nodeID, err)
+               }
+       }
+
+       return nil
+}
+
+// streamSegmentToNode streams segment data to a specific target node.
+func (mv *MigrationVisitor) streamSegmentToNode(
+       nodeID string,
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+       segmentFileName string,
+) error {
+       // Get or create chunked client for this node (cache hit optimization)
+       chunkedClient, exists := mv.chunkedClients[nodeID]
+       if !exists {
+               var err error
+               // Create new chunked sync client via queue.Client
+               chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, 
uint32(mv.chunkSize))
+               if err != nil {
+                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", nodeID, err)
+               }
+               mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse
+       }
+
+       // Create streaming part data from the segment files
+       streamingParts := mv.createStreamingSegmentFromFiles(targetShardID, 
files, segmentTR, segmentID)
+
+       // Stream using chunked transfer (same as syncer.go:202)
+       ctx := context.Background()
+       result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts)
+       if err != nil {
+               return fmt.Errorf("failed to sync streaming segments to node 
%s: %w", nodeID, err)
+       }
+
+       if !result.Success {
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+       }
+
+       // Log success metrics (same pattern as syncer.go:210-217)
+       mv.logger.Info().
+               Str("node", nodeID).
+               Str("session", result.SessionID).
+               Uint64("bytes", result.TotalBytes).
+               Int64("duration_ms", result.DurationMs).
+               Uint32("chunks", result.ChunksCount).
+               Uint32("parts", result.PartsCount).
+               Uint32("target_shard", targetShardID).
+               Uint64("segment_id", segmentID).
+               Str("segment_filename", segmentFileName).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("file-based migration segment completed successfully")
+
+       return nil
+}
+
+// createStreamingSegmentFromFiles creates StreamingPartData from segment 
files.
+func (mv *MigrationVisitor) createStreamingSegmentFromFiles(
+       targetShardID uint32,
+       files []queue.FileInfo,
+       segmentTR *timestamp.TimeRange,
+       segmentID uint64,
+) []queue.StreamingPartData {
+       segmentData := queue.StreamingPartData{
+               ID:           segmentID,
+               Group:        mv.group,
+               ShardID:      targetShardID,                       // Use 
calculated target shard
+               Topic:        data.TopicStreamSeriesSync.String(), // Use the 
new topic
+               Files:        files,
+               MinTimestamp: segmentTR.Start.UnixNano(),
+               MaxTimestamp: segmentTR.End.UnixNano(),
+       }
+
+       return []queue.StreamingPartData{segmentData}
+}
+
 // SetStreamPartCount sets the total number of parts for the current stream.
 func (mv *MigrationVisitor) SetStreamPartCount(totalParts int) {
        if mv.progress != nil {
@@ -408,3 +621,15 @@ func (mv *MigrationVisitor) SetStreamPartCount(totalParts 
int) {
                        Msg("set stream part count for progress tracking")
        }
 }
+
+// SetStreamSeriesCount sets the total number of series segments for the 
current stream.
+func (mv *MigrationVisitor) SetStreamSeriesCount(totalSegments int) {
+       if mv.progress != nil {
+               mv.progress.SetStreamSeriesCount(mv.group, mv.streamName, 
totalSegments)
+               mv.logger.Info().
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Int("total_segments", totalSegments).
+                       Msg("set stream series count for progress tracking")
+       }
+}
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index 32d4c907..d8c38061 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -39,9 +39,14 @@ type Progress struct {
        StreamPartErrors     map[string]map[string]map[uint64]string 
`json:"stream_part_errors"`     // group -> stream -> partID -> error
        StreamPartCounts     map[string]map[string]int               
`json:"stream_part_counts"`     // group -> stream -> total parts
        StreamPartProgress   map[string]map[string]int               
`json:"stream_part_progress"`   // group -> stream -> completed parts count
-       SnapshotStreamDir    string                                  
`json:"snapshot_stream_dir"`
-       SnapshotMeasureDir   string                                  
`json:"snapshot_measure_dir"`
-       mu                   sync.Mutex                              `json:"-"`
+       // Series-level tracking for stream migration
+       CompletedStreamSeries map[string]map[string]map[uint64]bool   
`json:"completed_stream_series"` // group -> stream -> segmentID -> completed
+       StreamSeriesErrors    map[string]map[string]map[uint64]string 
`json:"stream_series_errors"`    // group -> stream -> segmentID -> error
+       StreamSeriesCounts    map[string]map[string]int               
`json:"stream_series_counts"`    // group -> stream -> total series segments
+       StreamSeriesProgress  map[string]map[string]int               
`json:"stream_series_progress"`  // group -> stream -> completed series 
segments count
+       SnapshotStreamDir     string                                  
`json:"snapshot_stream_dir"`
+       SnapshotMeasureDir    string                                  
`json:"snapshot_measure_dir"`
+       mu                    sync.Mutex                              `json:"-"`
 }
 
 // AllGroupsFullyCompleted checks if all groups are fully completed.
@@ -60,16 +65,20 @@ func (p *Progress) AllGroupsFullyCompleted(groups 
[]*commonv1.Group) bool {
 // NewProgress creates a new Progress tracker.
 func NewProgress() *Progress {
        return &Progress{
-               CompletedGroups:      make(map[string]bool),
-               CompletedMeasures:    make(map[string]map[string]bool),
-               DeletedStreamGroups:  make(map[string]bool),
-               DeletedMeasureGroups: make(map[string]bool),
-               MeasureErrors:        make(map[string]map[string]string),
-               MeasureCounts:        make(map[string]map[string]int),
-               CompletedStreamParts: 
make(map[string]map[string]map[uint64]bool),
-               StreamPartErrors:     
make(map[string]map[string]map[uint64]string),
-               StreamPartCounts:     make(map[string]map[string]int),
-               StreamPartProgress:   make(map[string]map[string]int),
+               CompletedGroups:       make(map[string]bool),
+               CompletedMeasures:     make(map[string]map[string]bool),
+               DeletedStreamGroups:   make(map[string]bool),
+               DeletedMeasureGroups:  make(map[string]bool),
+               MeasureErrors:         make(map[string]map[string]string),
+               MeasureCounts:         make(map[string]map[string]int),
+               CompletedStreamParts:  
make(map[string]map[string]map[uint64]bool),
+               StreamPartErrors:      
make(map[string]map[string]map[uint64]string),
+               StreamPartCounts:      make(map[string]map[string]int),
+               StreamPartProgress:    make(map[string]map[string]int),
+               CompletedStreamSeries: 
make(map[string]map[string]map[uint64]bool),
+               StreamSeriesErrors:    
make(map[string]map[string]map[uint64]string),
+               StreamSeriesCounts:    make(map[string]map[string]int),
+               StreamSeriesProgress:  make(map[string]map[string]int),
        }
 }
 
@@ -334,12 +343,12 @@ func (p *Progress) GetStreamPartErrors(group, stream 
string) map[uint64]string {
        p.mu.Lock()
        defer p.mu.Unlock()
 
-       if streams, ok := p.StreamPartErrors[group]; ok {
-               if parts, ok := streams[stream]; ok {
-                       // Return a copy to avoid race conditions
+       if errors, ok := p.StreamPartErrors[group]; ok {
+               if streamErrors, ok := errors[stream]; ok {
+                       // Return a copy to avoid concurrent access issues
                        result := make(map[uint64]string)
-                       for partID, errorMsg := range parts {
-                               result[partID] = errorMsg
+                       for k, v := range streamErrors {
+                               result[k] = v
                        }
                        return result
                }
@@ -347,12 +356,145 @@ func (p *Progress) GetStreamPartErrors(group, stream 
string) map[uint64]string {
        return nil
 }
 
-// ClearStreamPartErrors clears all part errors for a specific stream.
+// ClearStreamPartErrors clears all errors for a specific stream.
 func (p *Progress) ClearStreamPartErrors(group, stream string) {
        p.mu.Lock()
        defer p.mu.Unlock()
 
-       if streams, ok := p.StreamPartErrors[group]; ok {
-               delete(streams, stream)
+       if errors, ok := p.StreamPartErrors[group]; ok {
+               delete(errors, stream)
+       }
+}
+
+// MarkStreamSeriesCompleted marks a specific series segment of a stream as 
completed.
+func (p *Progress) MarkStreamSeriesCompleted(group, stream string, segmentID 
uint64) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.CompletedStreamSeries[group] == nil {
+               p.CompletedStreamSeries[group] = 
make(map[string]map[uint64]bool)
+       }
+       if p.CompletedStreamSeries[group][stream] == nil {
+               p.CompletedStreamSeries[group][stream] = make(map[uint64]bool)
+       }
+
+       // Mark series segment as completed
+       p.CompletedStreamSeries[group][stream][segmentID] = true
+
+       // Update progress count
+       if p.StreamSeriesProgress[group] == nil {
+               p.StreamSeriesProgress[group] = make(map[string]int)
+       }
+       p.StreamSeriesProgress[group][stream]++
+}
+
+// IsStreamSeriesCompleted checks if a specific series segment of a stream has 
been completed.
+func (p *Progress) IsStreamSeriesCompleted(group, stream string, segmentID 
uint64) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if streams, ok := p.CompletedStreamSeries[group]; ok {
+               if segments, ok := streams[stream]; ok {
+                       return segments[segmentID]
+               }
+       }
+       return false
+}
+
+// MarkStreamSeriesError records an error for a specific series segment of a 
stream.
+func (p *Progress) MarkStreamSeriesError(group, stream string, segmentID 
uint64, errorMsg string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.StreamSeriesErrors[group] == nil {
+               p.StreamSeriesErrors[group] = make(map[string]map[uint64]string)
+       }
+       if p.StreamSeriesErrors[group][stream] == nil {
+               p.StreamSeriesErrors[group][stream] = make(map[uint64]string)
+       }
+
+       // Record the error
+       p.StreamSeriesErrors[group][stream][segmentID] = errorMsg
+}
+
+// SetStreamSeriesCount sets the total number of series segments for a stream.
+func (p *Progress) SetStreamSeriesCount(group, stream string, totalSegments 
int) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.StreamSeriesCounts[group] == nil {
+               p.StreamSeriesCounts[group] = make(map[string]int)
+       }
+       p.StreamSeriesCounts[group][stream] = totalSegments
+
+       // Initialize progress tracking
+       if p.StreamSeriesProgress[group] == nil {
+               p.StreamSeriesProgress[group] = make(map[string]int)
+       }
+       if p.StreamSeriesProgress[group][stream] == 0 {
+               p.StreamSeriesProgress[group][stream] = 0
+       }
+}
+
+// GetStreamSeriesCount returns the total number of series segments for a 
stream.
+func (p *Progress) GetStreamSeriesCount(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if counts, ok := p.StreamSeriesCounts[group]; ok {
+               return counts[stream]
+       }
+       return 0
+}
+
+// GetStreamSeriesProgress returns the number of completed series segments for 
a stream.
+func (p *Progress) GetStreamSeriesProgress(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if progress, ok := p.StreamSeriesProgress[group]; ok {
+               return progress[stream]
+       }
+       return 0
+}
+
+// IsStreamSeriesFullyCompleted checks if all series segments of a stream have 
been completed.
+func (p *Progress) IsStreamSeriesFullyCompleted(group, stream string) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       totalSegments := p.StreamSeriesCounts[group][stream]
+       completedSegments := p.StreamSeriesProgress[group][stream]
+
+       return totalSegments > 0 && completedSegments >= totalSegments
+}
+
+// GetStreamSeriesErrors returns all errors for a specific stream series.
+func (p *Progress) GetStreamSeriesErrors(group, stream string) 
map[uint64]string {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if errors, ok := p.StreamSeriesErrors[group]; ok {
+               if streamErrors, ok := errors[stream]; ok {
+                       // Return a copy to avoid concurrent access issues
+                       result := make(map[uint64]string)
+                       for k, v := range streamErrors {
+                               result[k] = v
+                       }
+                       return result
+               }
+       }
+       return nil
+}
+
+// ClearStreamSeriesErrors clears all errors for a specific stream series.
+func (p *Progress) ClearStreamSeriesErrors(group, stream string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if errors, ok := p.StreamSeriesErrors[group]; ok {
+               delete(errors, stream)
        }
 }
diff --git a/banyand/backup/lifecycle/progress_test.go 
b/banyand/backup/lifecycle/progress_test.go
index 09f67151..d29dd798 100644
--- a/banyand/backup/lifecycle/progress_test.go
+++ b/banyand/backup/lifecycle/progress_test.go
@@ -97,6 +97,7 @@ func TestProgress(t *testing.T) {
 
        t.Run("RemoveProgressFile", func(t *testing.T) {
                progress := NewProgress()
+               progress.MarkGroupCompleted("group1")
                progress.Save(progressPath, l)
 
                _, err := os.Stat(progressPath)
@@ -107,4 +108,38 @@ func TestProgress(t *testing.T) {
                _, err = os.Stat(progressPath)
                assert.True(t, os.IsNotExist(err))
        })
+
+       t.Run("StreamSeriesProgress", func(t *testing.T) {
+               progress := NewProgress()
+
+               // Test series progress tracking
+               progress.SetStreamSeriesCount("group1", "stream1", 5)
+               assert.Equal(t, 5, progress.GetStreamSeriesCount("group1", 
"stream1"))
+               assert.Equal(t, 0, progress.GetStreamSeriesProgress("group1", 
"stream1"))
+
+               // Mark some series segments as completed
+               progress.MarkStreamSeriesCompleted("group1", "stream1", 1)
+               progress.MarkStreamSeriesCompleted("group1", "stream1", 2)
+               assert.Equal(t, 2, progress.GetStreamSeriesProgress("group1", 
"stream1"))
+               assert.True(t, progress.IsStreamSeriesCompleted("group1", 
"stream1", 1))
+               assert.True(t, progress.IsStreamSeriesCompleted("group1", 
"stream1", 2))
+               assert.False(t, progress.IsStreamSeriesCompleted("group1", 
"stream1", 3))
+
+               // Test error tracking
+               progress.MarkStreamSeriesError("group1", "stream1", 3, "test 
error")
+               errors := progress.GetStreamSeriesErrors("group1", "stream1")
+               assert.Equal(t, "test error", errors[3])
+
+               // Test completion check
+               assert.False(t, progress.IsStreamSeriesFullyCompleted("group1", 
"stream1"))
+               progress.MarkStreamSeriesCompleted("group1", "stream1", 3)
+               progress.MarkStreamSeriesCompleted("group1", "stream1", 4)
+               progress.MarkStreamSeriesCompleted("group1", "stream1", 5)
+               assert.True(t, progress.IsStreamSeriesFullyCompleted("group1", 
"stream1"))
+
+               // Test error clearing
+               progress.ClearStreamSeriesErrors("group1", "stream1")
+               errors = progress.GetStreamSeriesErrors("group1", "stream1")
+               assert.Nil(t, errors)
+       })
 }

Reply via email to