This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 7cd2616f66bd2da68358782dcc2510429633d496 Author: mrproliu <[email protected]> AuthorDate: Fri May 22 14:04:24 2026 +0800 Fix incorrect counts, rate and missing trace fields in the lifecycle migration report (#1133) --- CHANGES.md | 1 + .../backup/lifecycle/measure_migration_visitor.go | 14 +- banyand/backup/lifecycle/migration_integration.go | 148 +++++++----- banyand/backup/lifecycle/progress.go | 198 +++++++++++++--- banyand/backup/lifecycle/progress_mark_test.go | 192 ++++++++++++++++ .../lifecycle/report_partial_failure_test.go | 255 +++++++++++++++++++++ banyand/backup/lifecycle/service.go | 69 +++++- .../backup/lifecycle/stream_migration_visitor.go | 21 +- .../backup/lifecycle/trace_migration_visitor.go | 21 +- test/cases/lifecycle/lifecycle.go | 141 +++++++++++- 10 files changed, 947 insertions(+), 113 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 8a64b558d..0f007b0aa 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -41,6 +41,7 @@ Release Notes. - Fix lifecycle migration where the receiving node could create segments shorter than the configured `SegmentInterval`. - Fail fast on incompatible storage version at boot. Previously the server would start in a degraded `SERVING` state with affected groups un-loaded because the property schema-registry retry loop swallowed the version-incompatibility panic. Compatible versions are listed in `banyand/internal/storage/versions.yml`. - Release bluge index writers on segment rotation so `analysisWorker` pools sized from `GOMAXPROCS` don't accumulate across rotations. Two layered defects kept the existing idle-segment reclaim path from running: `segmentIdleTimeout` defaulted to `0` (which disabled the 10-minute reclaim ticker), and `incRef` refreshed `lastAccessed` on every rotation tick so `closeIdleSegments` never observed an idle segment. Defaults to `time.Hour`, moves the `lastAccessed` bump to real read/write call [...] +- Fix incorrect counts and missing trace fields in the lifecycle migration report. ### Chores diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go b/banyand/backup/lifecycle/measure_migration_visitor.go index fd12b85ba..7a72e8394 100644 --- a/banyand/backup/lifecycle/measure_migration_visitor.go +++ b/banyand/backup/lifecycle/measure_migration_visitor.go @@ -102,9 +102,6 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, s Str("path", seriesIndexPath). Msg("found measure segment files for migration") - // Set the total number of series segments for progress tracking - mv.SetMeasureSeriesCount(len(segmentFiles)) - // Calculate ALL target segments this series index should go to targetSegments := calculateTargetSegments( segmentTR.Start.UnixNano(), @@ -213,6 +210,15 @@ func (mv *measureMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, s } } + // Mark each source series file as fully migrated (idempotent — bumps Progress once per source). + for _, segmentFileName := range segmentFiles { + fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg") + segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 64) + if parseErr != nil { + continue + } + mv.progress.MarkSourceMeasureSeriesCompleted(mv.group, seriesIndexPath, common.ShardID(segmentID)) + } return nil } @@ -226,7 +232,6 @@ func (mv *measureMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShard if err != nil { return fmt.Errorf("failed to parse part ID from path: %w", err) } - // Calculate ALL target segments this part should go to targetSegments := calculateTargetSegments( partData.MinTimestamp, @@ -291,6 +296,7 @@ func (mv *measureMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShard Msgf("measure part migration completed for target segment %d/%d", i+1, len(targetSegments)) } + mv.progress.MarkSourceMeasurePartCompleted(mv.group, partPath, sourceShardID, partID) return nil } diff --git a/banyand/backup/lifecycle/migration_integration.go b/banyand/backup/lifecycle/migration_integration.go index 8036f95d8..742bfeef6 100644 --- a/banyand/backup/lifecycle/migration_integration.go +++ b/banyand/backup/lifecycle/migration_integration.go @@ -18,11 +18,14 @@ package lifecycle import ( + "strings" + "github.com/apache/skywalking-banyandb/api/common" "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/banyand/trace" + "github.com/apache/skywalking-banyandb/pkg/fs" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/timestamp" ) @@ -37,13 +40,16 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath string, timeRange timest // Get target stage configuration targetStageInterval := getTargetStageInterval(group) - // Count total parts before starting migration - totalParts, segmentSuffixes, err := countStreamParts(tsdbRootPath, timeRange, segmentIntervalRule) + // Pre-walk source items so each resource has a fixed planned denominator. + counter, segmentSuffixes, err := countStreamParts(tsdbRootPath, timeRange, segmentIntervalRule) if err != nil { - logger.Warn().Err(err).Msg("failed to count stream parts, proceeding without part count") + logger.Warn().Err(err).Msg("failed to count stream source items, proceeding without planned counts") } else { - logger.Info().Int("total_parts", totalParts).Strs("segment_suffixes", segmentSuffixes). - Msg("counted stream parts for progres tracking") + logger.Info().Int("total_parts", counter.partCount). + Int("total_series_files", counter.seriesFileCount). + Int("total_element_index_visits", counter.elementIndexCount). + Strs("segment_suffixes", segmentSuffixes). + Msg("counted stream source items for progress tracking") } // Create file-based migration visitor with progress tracking and target stage interval @@ -53,9 +59,11 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath string, timeRange timest ) defer visitor.Close() - // Set the total part count for progress tracking - if totalParts > 0 { - visitor.SetStreamPartCount(totalParts) + // Set the planned source-item counts for progress tracking. + if counter != nil { + visitor.SetStreamPartCount(counter.partCount) + visitor.SetStreamSeriesCount(counter.seriesFileCount) + visitor.SetStreamElementIndexCount(counter.elementIndexCount) } // Use the existing VisitStreamsInTimeRange function with our file-based visitor @@ -66,27 +74,36 @@ func migrateStreamWithFileBasedAndProgress(tsdbRootPath string, timeRange timest return segmentSuffixes, nil } -// countStreamParts counts the total number of parts in the given time range. -func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (int, []string, error) { - // Create a simple visitor to count parts - partCounter := &partCountVisitor{} +// countStreamParts counts the total number of source items in the given time range. +func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (*partCountVisitor, []string, error) { + // Create a simple visitor to count source items + partCounter := &partCountVisitor{lfs: fs.NewLocalFileSystem()} // Use the existing VisitStreamsInTimeRange function to count parts segmentSuffixes, err := stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, partCounter, segmentInterval) if err != nil { - return 0, nil, err + return nil, nil, err } - return partCounter.partCount, segmentSuffixes, nil + return partCounter, segmentSuffixes, nil } -// partCountVisitor is a simple visitor that counts parts. +// partCountVisitor is a simple visitor that counts source parts, series files and element-index visits. type partCountVisitor struct { - partCount int + lfs fs.FileSystem + partCount int + seriesFileCount int + elementIndexCount int } -// VisitSeries implements stream.Visitor. -func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ []common.ShardID) error { +// VisitSeries implements stream.Visitor; counts .seg files under the source series index directory. +func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string, _ []common.ShardID) error { + entries := pcv.lfs.ReadDir(seriesIndexPath) + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") { + pcv.seriesFileCount++ + } + } return nil } @@ -98,6 +115,7 @@ func (pcv *partCountVisitor) VisitPart(_ *timestamp.TimeRange, _ common.ShardID, // VisitElementIndex implements stream.Visitor. func (pcv *partCountVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ common.ShardID, _ string) error { + pcv.elementIndexCount++ return nil } @@ -111,13 +129,15 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath string, timeRange times // Get target stage configuration targetStageInterval := getTargetStageInterval(group) - // Count total parts before starting migration - totalParts, segmentSuffixes, err := countMeasureParts(tsdbRootPath, timeRange, segmentIntervalRule) + // Pre-walk source items so each resource has a fixed planned denominator. + counter, segmentSuffixes, err := countMeasureParts(tsdbRootPath, timeRange, segmentIntervalRule) if err != nil { - logger.Warn().Err(err).Msg("failed to count measure parts, proceeding without part count") + logger.Warn().Err(err).Msg("failed to count measure source items, proceeding without planned counts") } else { - logger.Info().Int("total_parts", totalParts).Strs("segment_suffixes", segmentSuffixes). - Msg("counted measure parts for progress tracking") + logger.Info().Int("total_parts", counter.partCount). + Int("total_series_files", counter.seriesFileCount). + Strs("segment_suffixes", segmentSuffixes). + Msg("counted measure source items for progress tracking") } // Create file-based migration visitor with progress tracking and target stage interval @@ -127,9 +147,10 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath string, timeRange times ) defer visitor.Close() - // Set the total part count for progress tracking - if totalParts > 0 { - visitor.SetMeasurePartCount(totalParts) + // Set the planned source-item counts for progress tracking. + if counter != nil { + visitor.SetMeasurePartCount(counter.partCount) + visitor.SetMeasureSeriesCount(counter.seriesFileCount) } // Use the existing VisitMeasuresInTimeRange function with our file-based visitor @@ -140,27 +161,35 @@ func migrateMeasureWithFileBasedAndProgress(tsdbRootPath string, timeRange times return segmentSuffixes, nil } -// countMeasureParts counts the total number of parts in the given time range. -func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (int, []string, error) { - // Create a simple visitor to count parts - partCounter := &measurePartCountVisitor{} +// countMeasureParts counts the total number of source items in the given time range. +func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (*measurePartCountVisitor, []string, error) { + // Create a simple visitor to count source items + partCounter := &measurePartCountVisitor{lfs: fs.NewLocalFileSystem()} // Use the existing VisitMeasuresInTimeRange function to count parts segmentSuffixes, err := measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange, partCounter, segmentInterval) if err != nil { - return 0, nil, err + return nil, nil, err } - return partCounter.partCount, segmentSuffixes, nil + return partCounter, segmentSuffixes, nil } -// measurePartCountVisitor is a simple visitor that counts measure parts. +// measurePartCountVisitor is a simple visitor that counts measure source parts and series files. type measurePartCountVisitor struct { - partCount int + lfs fs.FileSystem + partCount int + seriesFileCount int } -// VisitSeries implements measure.Visitor. -func (pcv *measurePartCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ []common.ShardID) error { +// VisitSeries implements measure.Visitor; counts .seg files under the source series index directory. +func (pcv *measurePartCountVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string, _ []common.ShardID) error { + entries := pcv.lfs.ReadDir(seriesIndexPath) + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") { + pcv.seriesFileCount++ + } + } return nil } @@ -180,13 +209,15 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath string, timeRange timesta // Get target stage configuration targetStageInterval := getTargetStageInterval(group) - // Count total shards before starting migration - totalShards, segmentSuffixes, err := countTraceShards(tsdbRootPath, timeRange, segmentIntervalRule) + // Pre-walk source items so each resource has a fixed planned denominator. + counter, segmentSuffixes, err := countTraceShards(tsdbRootPath, timeRange, segmentIntervalRule) if err != nil { - logger.Warn().Err(err).Msg("failed to count trace parts, proceeding without part count") + logger.Warn().Err(err).Msg("failed to count trace source items, proceeding without planned counts") } else { - logger.Info().Int("total_shards", totalShards).Strs("segment_suffixes", segmentSuffixes). - Msg("counted trace parts for progress tracking") + logger.Info().Int("total_shards", counter.shardCount). + Int("total_series_files", counter.seriesFileCount). + Strs("segment_suffixes", segmentSuffixes). + Msg("counted trace source items for progress tracking") } // Create file-based migration visitor with progress tracking and target stage interval @@ -196,9 +227,10 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath string, timeRange timesta ) defer visitor.Close() - // Set the total part count for progress tracking - if totalShards > 0 { - visitor.SetTraceShardCount(totalShards) + // Set the planned source-item counts for progress tracking. + if counter != nil { + visitor.SetTraceShardCount(counter.shardCount) + visitor.SetTraceSeriesCount(counter.seriesFileCount) } // Use the existing VisitTracesInTimeRange function with our file-based visitor @@ -209,27 +241,35 @@ func migrateTraceWithFileBasedAndProgress(tsdbRootPath string, timeRange timesta return segmentSuffixes, nil } -// countTraceShards counts the total number of shards in the given time range. -func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (int, []string, error) { - // Create a simple visitor to count shards - shardCounter := &traceShardsCountVisitor{} +// countTraceShards counts the total number of source items in the given time range. +func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange, segmentInterval storage.IntervalRule) (*traceShardsCountVisitor, []string, error) { + // Create a simple visitor to count source items + shardCounter := &traceShardsCountVisitor{lfs: fs.NewLocalFileSystem()} // Use the existing VisitTracesInTimeRange function to count parts segmentSuffixes, err := trace.VisitTracesInTimeRange(tsdbRootPath, timeRange, shardCounter, segmentInterval) if err != nil { - return 0, nil, err + return nil, nil, err } - return shardCounter.shardCount, segmentSuffixes, nil + return shardCounter, segmentSuffixes, nil } -// traceShardsCountVisitor is a simple visitor that counts trace shards. +// traceShardsCountVisitor is a simple visitor that counts trace source shards and series files. type traceShardsCountVisitor struct { - shardCount int + lfs fs.FileSystem + shardCount int + seriesFileCount int } -// VisitSeries implements trace.Visitor. -func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string, _ []common.ShardID) error { +// VisitSeries implements trace.Visitor; counts .seg files under the source series index directory. +func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string, _ []common.ShardID) error { + entries := pcv.lfs.ReadDir(seriesIndexPath) + for _, e := range entries { + if !e.IsDir() && strings.HasSuffix(e.Name(), ".seg") { + pcv.seriesFileCount++ + } + } return nil } diff --git a/banyand/backup/lifecycle/progress.go b/banyand/backup/lifecycle/progress.go index 99f7083d1..91deb265f 100644 --- a/banyand/backup/lifecycle/progress.go +++ b/banyand/backup/lifecycle/progress.go @@ -64,11 +64,31 @@ type Progress struct { TraceSeriesErrors map[string]map[string]map[common.ShardID]string `json:"trace_series_errors"` TraceSeriesCounts map[string]int `json:"trace_series_counts"` TraceSeriesProgress map[string]int `json:"trace_series_progress"` - progressFilePath string `json:"-"` - SnapshotStreamDir string `json:"snapshot_stream_dir"` - SnapshotMeasureDir string `json:"snapshot_measure_dir"` - SnapshotTraceDir string `json:"snapshot_trace_dir"` - mu sync.Mutex `json:"-"` + + // Per-source completion tracking (advances *Progress idempotently once per source item). + SourceCompletedStreamParts map[string]map[string]map[common.ShardID]map[uint64]bool `json:"source_completed_stream_parts"` + SourceCompletedStreamSeries map[string]map[string]map[common.ShardID]bool `json:"source_completed_stream_series"` + SourceCompletedStreamElementIndex map[string]map[string]map[common.ShardID]bool `json:"source_completed_stream_element_index"` + SourceCompletedMeasureParts map[string]map[string]map[common.ShardID]map[uint64]bool `json:"source_completed_measure_parts"` + SourceCompletedMeasureSeries map[string]map[string]map[common.ShardID]bool `json:"source_completed_measure_series"` + SourceCompletedTraceShards map[string]map[string]map[common.ShardID]bool `json:"source_completed_trace_shards"` + SourceCompletedTraceSeries map[string]map[string]map[common.ShardID]bool `json:"source_completed_trace_series"` + + progressFilePath string `json:"-"` + SnapshotStreamDir string `json:"snapshot_stream_dir"` + SnapshotMeasureDir string `json:"snapshot_measure_dir"` + SnapshotTraceDir string `json:"snapshot_trace_dir"` + GroupsToProcess []string `json:"groups_to_process"` + mu sync.Mutex `json:"-"` +} + +// SetGroupsToProcess records the set of groups picked up by this migration cycle. +// Used as the denominator for migration_status.completion_rate so the report +// reflects "completed / scheduled" rather than overlapping per-catalog buckets. +func (p *Progress) SetGroupsToProcess(groups []string) { + p.mu.Lock() + defer p.mu.Unlock() + p.GroupsToProcess = append(p.GroupsToProcess[:0], groups...) } // AllGroupsNotFullyCompleted find is there have any group not fully completed. @@ -120,8 +140,17 @@ func NewProgress(path string, l *logger.Logger) *Progress { TraceSeriesErrors: make(map[string]map[string]map[common.ShardID]string), TraceSeriesCounts: make(map[string]int), TraceSeriesProgress: make(map[string]int), - progressFilePath: path, - logger: l, + + SourceCompletedStreamParts: make(map[string]map[string]map[common.ShardID]map[uint64]bool), + SourceCompletedStreamSeries: make(map[string]map[string]map[common.ShardID]bool), + SourceCompletedStreamElementIndex: make(map[string]map[string]map[common.ShardID]bool), + SourceCompletedMeasureParts: make(map[string]map[string]map[common.ShardID]map[uint64]bool), + SourceCompletedMeasureSeries: make(map[string]map[string]map[common.ShardID]bool), + SourceCompletedTraceShards: make(map[string]map[string]map[common.ShardID]bool), + SourceCompletedTraceSeries: make(map[string]map[string]map[common.ShardID]bool), + + progressFilePath: path, + logger: l, } } @@ -266,9 +295,6 @@ func (p *Progress) MarkStreamPartCompleted(group string, segmentID string, shard // Mark part as completed p.CompletedStreamParts[group][segmentID][shardID][partID] = true - - // Update progress count - p.StreamPartProgress[group]++ } // IsStreamPartCompleted checks if a specific part of a stream has been completed. @@ -354,9 +380,6 @@ func (p *Progress) MarkStreamSeriesCompleted(group string, segmentID string, sha // Mark series segment as completed p.CompletedStreamSeries[group][segmentID][shardID] = true - - // Update progress count - p.StreamSeriesProgress[group]++ } // IsStreamSeriesCompleted checks if a specific series segment of a stream has been completed. @@ -466,9 +489,6 @@ func (p *Progress) MarkStreamElementIndexCompleted(group string, segmentID strin // Mark shard as completed p.CompletedStreamElementIndex[group][segmentID][shardID] = true - - // Update progress count - p.StreamElementIndexProgress[group]++ } // IsStreamElementIndexCompleted checks if a specific element index file of a stream has been completed. @@ -552,9 +572,6 @@ func (p *Progress) MarkMeasurePartCompleted(group string, segmentID string, shar // Mark part as completed p.CompletedMeasureParts[group][segmentID][shardID][partID] = true - - // Update progress count - p.MeasurePartProgress[group]++ } // IsMeasurePartCompleted checks if a specific part of a measure has been completed. @@ -633,6 +650,8 @@ func (p *Progress) ClearErrors() { p.StreamElementIndexErrors = make(map[string]map[string]map[common.ShardID]string) p.MeasurePartErrors = make(map[string]map[string]map[common.ShardID]map[uint64]string) p.MeasureSeriesErrors = make(map[string]map[string]map[common.ShardID]string) + p.TraceShardErrors = make(map[string]map[string]map[common.ShardID]string) + p.TraceSeriesErrors = make(map[string]map[string]map[common.ShardID]string) } // MarkMeasureSeriesCompleted marks a specific series segment of a measure as completed. @@ -651,9 +670,6 @@ func (p *Progress) MarkMeasureSeriesCompleted(group string, segmentID string, sh // Mark series segment as completed p.CompletedMeasureSeries[group][segmentID][shardID] = true - - // Update progress count - p.MeasureSeriesProgress[group]++ } // IsMeasureSeriesCompleted checks if a specific series segment of a measure has been completed. @@ -746,9 +762,6 @@ func (p *Progress) MarkTraceShardCompleted(group string, segmentID string, shard p.CompletedTraceShards[group][segmentID] = make(map[common.ShardID]bool) } p.CompletedTraceShards[group][segmentID][shardID] = true - - // Increment progress - p.TraceShardProgress[group]++ } // IsTraceShardCompleted checks if a specific part of a trace has been completed. @@ -820,9 +833,6 @@ func (p *Progress) MarkTraceSeriesCompleted(group string, segmentID string, shar p.CompletedTraceSeries[group][segmentID] = make(map[common.ShardID]bool) } p.CompletedTraceSeries[group][segmentID][shardID] = true - - // Increment progress - p.TraceSeriesProgress[group]++ } // IsTraceSeriesCompleted checks if a specific series segment of a trace has been completed. @@ -881,3 +891,135 @@ func (p *Progress) GetTraceSeriesProgress(group string) int { } return 0 } + +// MarkSourceStreamPartCompleted records that one source stream part finished +// every target write successfully; idempotent (++Progress on first call only). +func (p *Progress) MarkSourceStreamPartCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID, partID uint64) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedStreamParts[group] == nil { + p.SourceCompletedStreamParts[group] = make(map[string]map[common.ShardID]map[uint64]bool) + } + if p.SourceCompletedStreamParts[group][sourceSegmentID] == nil { + p.SourceCompletedStreamParts[group][sourceSegmentID] = make(map[common.ShardID]map[uint64]bool) + } + if p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID] == nil { + p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID] = make(map[uint64]bool) + } + if !p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID][partID] { + p.SourceCompletedStreamParts[group][sourceSegmentID][sourceShardID][partID] = true + p.StreamPartProgress[group]++ + } +} + +// MarkSourceStreamSeriesCompleted records that one source stream series file finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceStreamSeriesCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedStreamSeries[group] == nil { + p.SourceCompletedStreamSeries[group] = make(map[string]map[common.ShardID]bool) + } + if p.SourceCompletedStreamSeries[group][sourceSegmentID] == nil { + p.SourceCompletedStreamSeries[group][sourceSegmentID] = make(map[common.ShardID]bool) + } + if !p.SourceCompletedStreamSeries[group][sourceSegmentID][sourceShardID] { + p.SourceCompletedStreamSeries[group][sourceSegmentID][sourceShardID] = true + p.StreamSeriesProgress[group]++ + } +} + +// MarkSourceStreamElementIndexCompleted records that one source stream element-index visit finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceStreamElementIndexCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedStreamElementIndex[group] == nil { + p.SourceCompletedStreamElementIndex[group] = make(map[string]map[common.ShardID]bool) + } + if p.SourceCompletedStreamElementIndex[group][sourceSegmentID] == nil { + p.SourceCompletedStreamElementIndex[group][sourceSegmentID] = make(map[common.ShardID]bool) + } + if !p.SourceCompletedStreamElementIndex[group][sourceSegmentID][sourceShardID] { + p.SourceCompletedStreamElementIndex[group][sourceSegmentID][sourceShardID] = true + p.StreamElementIndexProgress[group]++ + } +} + +// MarkSourceMeasurePartCompleted records that one source measure part finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceMeasurePartCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID, partID uint64) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedMeasureParts[group] == nil { + p.SourceCompletedMeasureParts[group] = make(map[string]map[common.ShardID]map[uint64]bool) + } + if p.SourceCompletedMeasureParts[group][sourceSegmentID] == nil { + p.SourceCompletedMeasureParts[group][sourceSegmentID] = make(map[common.ShardID]map[uint64]bool) + } + if p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID] == nil { + p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID] = make(map[uint64]bool) + } + if !p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID][partID] { + p.SourceCompletedMeasureParts[group][sourceSegmentID][sourceShardID][partID] = true + p.MeasurePartProgress[group]++ + } +} + +// MarkSourceMeasureSeriesCompleted records that one source measure series file finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceMeasureSeriesCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedMeasureSeries[group] == nil { + p.SourceCompletedMeasureSeries[group] = make(map[string]map[common.ShardID]bool) + } + if p.SourceCompletedMeasureSeries[group][sourceSegmentID] == nil { + p.SourceCompletedMeasureSeries[group][sourceSegmentID] = make(map[common.ShardID]bool) + } + if !p.SourceCompletedMeasureSeries[group][sourceSegmentID][sourceShardID] { + p.SourceCompletedMeasureSeries[group][sourceSegmentID][sourceShardID] = true + p.MeasureSeriesProgress[group]++ + } +} + +// MarkSourceTraceShardCompleted records that one source trace shard finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceTraceShardCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedTraceShards[group] == nil { + p.SourceCompletedTraceShards[group] = make(map[string]map[common.ShardID]bool) + } + if p.SourceCompletedTraceShards[group][sourceSegmentID] == nil { + p.SourceCompletedTraceShards[group][sourceSegmentID] = make(map[common.ShardID]bool) + } + if !p.SourceCompletedTraceShards[group][sourceSegmentID][sourceShardID] { + p.SourceCompletedTraceShards[group][sourceSegmentID][sourceShardID] = true + p.TraceShardProgress[group]++ + } +} + +// MarkSourceTraceSeriesCompleted records that one source trace series file finished +// every target write successfully; idempotent. +func (p *Progress) MarkSourceTraceSeriesCompleted(group string, sourceSegmentID string, sourceShardID common.ShardID) { + defer p.saveProgress() + p.mu.Lock() + defer p.mu.Unlock() + if p.SourceCompletedTraceSeries[group] == nil { + p.SourceCompletedTraceSeries[group] = make(map[string]map[common.ShardID]bool) + } + if p.SourceCompletedTraceSeries[group][sourceSegmentID] == nil { + p.SourceCompletedTraceSeries[group][sourceSegmentID] = make(map[common.ShardID]bool) + } + if !p.SourceCompletedTraceSeries[group][sourceSegmentID][sourceShardID] { + p.SourceCompletedTraceSeries[group][sourceSegmentID][sourceShardID] = true + p.TraceSeriesProgress[group]++ + } +} diff --git a/banyand/backup/lifecycle/progress_mark_test.go b/banyand/backup/lifecycle/progress_mark_test.go new file mode 100644 index 000000000..367330d70 --- /dev/null +++ b/banyand/backup/lifecycle/progress_mark_test.go @@ -0,0 +1,192 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// TestMarkPerTargetDoesNotMoveCounters pins the contract that per-target +// Mark*Completed / Mark*Error only maintain the per-target dedup / error +// maps and do NOT advance Counts or Progress. Counts is set by pre-walk +// (Set*Count) and Progress is advanced exclusively by MarkSource*Completed. +func TestMarkPerTargetDoesNotMoveCounters(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + + // Stream part: per-target mark several times — counters must stay 0. + p.MarkStreamPartCompleted("g", "tgt-seg-1", 0, 1) + p.MarkStreamPartCompleted("g", "tgt-seg-2", 0, 1) + p.MarkStreamPartError("g", "tgt-seg-3", 0, 1, "err") + assert.Equal(t, 0, p.StreamPartCounts["g"]) + assert.Equal(t, 0, p.StreamPartProgress["g"]) + + // Per-target dedup map populated. + assert.True(t, p.IsStreamPartCompleted("g", "tgt-seg-1", 0, 1)) + assert.True(t, p.IsStreamPartCompleted("g", "tgt-seg-2", 0, 1)) + + // Same for series / element_index / measure / trace. + p.MarkStreamSeriesCompleted("g", "tgt-seg-1", 0) + p.MarkStreamElementIndexCompleted("g", "tgt-seg-1", 0) + p.MarkMeasurePartCompleted("g", "tgt-seg-1", 0, 1) + p.MarkMeasureSeriesCompleted("g", "tgt-seg-1", 0) + p.MarkTraceShardCompleted("g", "tgt-seg-1", 0) + p.MarkTraceSeriesCompleted("g", "tgt-seg-1", 0) + for _, counter := range []int{ + p.StreamSeriesCounts["g"], p.StreamSeriesProgress["g"], + p.StreamElementIndexCounts["g"], p.StreamElementIndexProgress["g"], + p.MeasurePartCounts["g"], p.MeasurePartProgress["g"], + p.MeasureSeriesCounts["g"], p.MeasureSeriesProgress["g"], + p.TraceShardCounts["g"], p.TraceShardProgress["g"], + p.TraceSeriesCounts["g"], p.TraceSeriesProgress["g"], + } { + assert.Equal(t, 0, counter, "per-target marks must not move counters") + } +} + +// TestSetCountIsThePlannedDenominator pins that Set*Count is the only writer +// of the per-resource Counts denominator (set by pre-walk). +func TestSetCountIsThePlannedDenominator(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + + p.SetStreamPartCount("g", 100) + p.SetStreamSeriesCount("g", 50) + p.SetStreamElementIndexCount("g", 10) + p.SetMeasurePartCount("g", 70) + p.SetMeasureSeriesCount("g", 35) + p.SetTraceShardCount("g", 5) + p.SetTraceSeriesCount("g", 8) + + assert.Equal(t, 100, p.StreamPartCounts["g"]) + assert.Equal(t, 50, p.StreamSeriesCounts["g"]) + assert.Equal(t, 10, p.StreamElementIndexCounts["g"]) + assert.Equal(t, 70, p.MeasurePartCounts["g"]) + assert.Equal(t, 35, p.MeasureSeriesCounts["g"]) + assert.Equal(t, 5, p.TraceShardCounts["g"]) + assert.Equal(t, 8, p.TraceSeriesCounts["g"]) + + // Set is overwrite semantics — last call wins. + p.SetStreamPartCount("g", 200) + assert.Equal(t, 200, p.StreamPartCounts["g"]) +} + +// TestMarkSourceCompletedIdempotent pins that MarkSource*Completed advances +// Progress exactly once per unique source key across all 7 resources. +func TestMarkSourceCompletedIdempotent(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + + // Stream part: same key 3 times → Progress=1. + p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1) + p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1) + p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 1) + assert.Equal(t, 1, p.StreamPartProgress["g"]) + assert.True(t, p.SourceCompletedStreamParts["g"]["src-seg"][0][1]) + // Different partID = different source = +1. + p.MarkSourceStreamPartCompleted("g", "src-seg", 0, 2) + assert.Equal(t, 2, p.StreamPartProgress["g"]) + + // Stream series, element_index, measure, trace: same pattern, +1 once. + p.MarkSourceStreamSeriesCompleted("g", "src-seg", 0) + p.MarkSourceStreamSeriesCompleted("g", "src-seg", 0) + assert.Equal(t, 1, p.StreamSeriesProgress["g"]) + + p.MarkSourceStreamElementIndexCompleted("g", "src-seg", 0) + p.MarkSourceStreamElementIndexCompleted("g", "src-seg", 0) + assert.Equal(t, 1, p.StreamElementIndexProgress["g"]) + + p.MarkSourceMeasurePartCompleted("g", "src-seg", 0, 1) + p.MarkSourceMeasurePartCompleted("g", "src-seg", 0, 1) + assert.Equal(t, 1, p.MeasurePartProgress["g"]) + + p.MarkSourceMeasureSeriesCompleted("g", "src-seg", 0) + p.MarkSourceMeasureSeriesCompleted("g", "src-seg", 0) + assert.Equal(t, 1, p.MeasureSeriesProgress["g"]) + + p.MarkSourceTraceShardCompleted("g", "src-seg", 0) + p.MarkSourceTraceShardCompleted("g", "src-seg", 0) + assert.Equal(t, 1, p.TraceShardProgress["g"]) + + p.MarkSourceTraceSeriesCompleted("g", "src-seg", 0) + p.MarkSourceTraceSeriesCompleted("g", "src-seg", 0) + assert.Equal(t, 1, p.TraceSeriesProgress["g"]) +} + +// TestClearErrors_AllSevenBuckets pins that ClearErrors resets every error +// map (including trace) and does NOT touch Counts/Progress (which now +// derive from pre-walk + MarkSource and are immutable to error replay). +func TestClearErrors_AllSevenBuckets(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + + // Set planned counts (pre-walk). + p.SetStreamPartCount("g", 10) + p.SetStreamSeriesCount("g", 10) + p.SetStreamElementIndexCount("g", 10) + p.SetMeasurePartCount("g", 10) + p.SetMeasureSeriesCount("g", 10) + p.SetTraceShardCount("g", 10) + p.SetTraceSeriesCount("g", 10) + + // Drop one error per bucket. + p.MarkStreamPartError("g", "s", 0, 1, "x") + p.MarkStreamSeriesError("g", "s", 0, "x") + p.MarkStreamElementIndexError("g", "s", 0, "x") + p.MarkMeasurePartError("g", "s", 0, 1, "x") + p.MarkMeasureSeriesError("g", "s", 0, "x") + p.MarkTraceShardError("g", "s", 0, "x") + p.MarkTraceSeriesError("g", "s", 0, "x") + + p.ClearErrors() + + assert.Empty(t, p.StreamPartErrors["g"]) + assert.Empty(t, p.StreamSeriesErrors["g"]) + assert.Empty(t, p.StreamElementIndexErrors["g"]) + assert.Empty(t, p.MeasurePartErrors["g"]) + assert.Empty(t, p.MeasureSeriesErrors["g"]) + assert.Empty(t, p.TraceShardErrors["g"]) + assert.Empty(t, p.TraceSeriesErrors["g"]) + + // Counts are pre-walk planned — must survive ClearErrors. + for _, c := range []int{ + p.StreamPartCounts["g"], p.StreamSeriesCounts["g"], p.StreamElementIndexCounts["g"], + p.MeasurePartCounts["g"], p.MeasureSeriesCounts["g"], + p.TraceShardCounts["g"], p.TraceSeriesCounts["g"], + } { + assert.Equal(t, 10, c, "Counts must not be touched by ClearErrors") + } +} + +// TestPerTargetAndPerSourceAreIndependent pins that per-target dedup and +// per-source progress are tracked independently: per-target Mark does not +// move source progress, and MarkSource does not populate the per-target map. +func TestPerTargetAndPerSourceAreIndependent(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + + p.MarkStreamPartCompleted("g", "tgt-1", 0, 1) + assert.True(t, p.IsStreamPartCompleted("g", "tgt-1", 0, 1)) + assert.False(t, p.SourceCompletedStreamParts["g"]["src-1"][0][1]) + assert.Equal(t, 0, p.StreamPartProgress["g"]) + + p.MarkSourceStreamPartCompleted("g", "src-1", 0, 1) + assert.True(t, p.SourceCompletedStreamParts["g"]["src-1"][0][1]) + // Per-target map untouched by source mark. + assert.False(t, p.IsStreamPartCompleted("g", "src-1", 0, 1)) + assert.Equal(t, 1, p.StreamPartProgress["g"]) +} diff --git a/banyand/backup/lifecycle/report_partial_failure_test.go b/banyand/backup/lifecycle/report_partial_failure_test.go new file mode 100644 index 000000000..1fd3a9683 --- /dev/null +++ b/banyand/backup/lifecycle/report_partial_failure_test.go @@ -0,0 +1,255 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +// TestBuildMigrationReport_PartialFailure pins down the JSON shape and +// numeric values that buildMigrationReport must produce under a partial +// migration: one stream group has a failed part write while every other +// group and every other resource within the failing group completes. The +// shapes and counts are source-level — Counts comes from the pre-walk +// (Set*Count), Progress is bumped per source by MarkSource*Completed, and +// completion_rate = Progress / Counts is always in [0, 100] with 100 % +// meaning every source item migrated. +func TestBuildMigrationReport_PartialFailure(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + p.SetGroupsToProcess([]string{"sw_a", "sw_b", "sw_metrics", "sw_trace"}) + + p.SnapshotStreamDir = "/tmp/stream/snapshots/example" + p.SnapshotMeasureDir = "/tmp/measure/snapshots/example" + p.SnapshotTraceDir = "/tmp/trace/snapshots/example" + + // === sw_a (stream): full success on 2 source parts, 2 source series files, 1 source element_index visit. === + p.SetStreamPartCount("sw_a", 2) + p.MarkSourceStreamPartCompleted("sw_a", "20260513", 0, 1) + p.MarkSourceStreamPartCompleted("sw_a", "20260513", 0, 2) + p.SetStreamSeriesCount("sw_a", 2) + p.MarkSourceStreamSeriesCompleted("sw_a", "20260513", 100) + p.MarkSourceStreamSeriesCompleted("sw_a", "20260513", 101) + p.SetStreamElementIndexCount("sw_a", 1) + p.MarkSourceStreamElementIndexCompleted("sw_a", "20260513", 0) + p.MarkGroupCompleted("sw_a") + p.MarkStreamGroupDeleted("sw_a") + + // === sw_b (stream): 4 source parts planned, 3 complete + 1 fails (partID 7 on shard 1). === + // series + element_index all complete. + p.SetStreamPartCount("sw_b", 4) + p.MarkSourceStreamPartCompleted("sw_b", "20260512", 0, 1) + p.MarkSourceStreamPartCompleted("sw_b", "20260512", 0, 2) + p.MarkSourceStreamPartCompleted("sw_b", "20260512", 1, 1) + // partID 7 fails on shard 1 — record the per-target error, source not advanced. + p.MarkStreamPartError("sw_b", "20260512", 1, 7, "failed to stream part to target node-cold-0: connection refused") + p.SetStreamSeriesCount("sw_b", 2) + p.MarkSourceStreamSeriesCompleted("sw_b", "20260512", 0) + p.MarkSourceStreamSeriesCompleted("sw_b", "20260512", 1) + p.SetStreamElementIndexCount("sw_b", 2) + p.MarkSourceStreamElementIndexCompleted("sw_b", "20260512", 0) + p.MarkSourceStreamElementIndexCompleted("sw_b", "20260512", 1) + // sw_b is NOT marked completed (processStreamGroup early-returns on the failure). + + // === sw_metrics (measure): full success. === + p.SetMeasurePartCount("sw_metrics", 3) + p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 1) + p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 2) + p.MarkSourceMeasurePartCompleted("sw_metrics", "20260513", 0, 3) + p.SetMeasureSeriesCount("sw_metrics", 1) + p.MarkSourceMeasureSeriesCompleted("sw_metrics", "20260513", 100) + p.MarkGroupCompleted("sw_metrics") + p.MarkMeasureGroupDeleted("sw_metrics") + + // === sw_trace: full success. === + p.SetTraceShardCount("sw_trace", 1) + p.MarkSourceTraceShardCompleted("sw_trace", "20260513", 0) + p.SetTraceSeriesCount("sw_trace", 1) + p.MarkSourceTraceSeriesCompleted("sw_trace", "20260513", 100) + p.MarkGroupCompleted("sw_trace") + p.MarkTraceGroupDeleted("sw_trace") + + // --- Build report --- + svc := &lifecycleService{l: logger.GetLogger("test")} + report := svc.buildMigrationReport(p) + + require.Equal(t, "2.0", report["report_version"]) + summary, ok := report["summary"].(map[string]interface{}) + require.True(t, ok) + errs, ok := report["errors"].(map[string]interface{}) + require.True(t, ok) + + // migration_status: 4 scheduled, 3 completed (sw_a, sw_metrics, sw_trace), 75 %. + ms, ok := summary["migration_status"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, 4, ms["total_groups"]) + assert.Equal(t, 3, ms["completed_groups"]) + assert.InDelta(t, 75.0, ms["completion_rate"], 1e-9) + + // stream parts: 2+4=6 planned, 2+3=5 source-completed. + assertResource(t, summary, "stream_migration", "parts", 6, 5, 83.33333333333334, 1) + assertResource(t, summary, "stream_migration", "series", 4, 4, 100.0, 0) + assertResource(t, summary, "stream_migration", "element_index", 3, 3, 100.0, 0) + + // measure + assertResource(t, summary, "measure_migration", "parts", 3, 3, 100.0, 0) + assertResource(t, summary, "measure_migration", "series", 1, 1, 100.0, 0) + + // trace + assertResource(t, summary, "trace_migration", "parts", 1, 1, 100.0, 0) + assertResource(t, summary, "trace_migration", "series", 1, 1, 100.0, 0) + + // errors: stream_parts has one entry for sw_b; everything else empty. + for _, key := range []string{ + "stream_parts", "stream_series", "stream_element_index", + "measure_parts", "measure_series", + "trace_parts", "trace_series", + } { + v, found := errs[key] + require.Truef(t, found, "errors.%s must be present", key) + _, isMap := v.(map[string]interface{}) + require.Truef(t, isMap, "errors.%s must be map[string]interface{}", key) + } + streamPartErrs := errs["stream_parts"].(map[string]interface{}) + require.Lenf(t, streamPartErrs, 1, "errors.stream_parts must hold one group entry, got %v", streamPartErrs) + require.Contains(t, streamPartErrs, "sw_b") + + for _, key := range []string{ + "stream_series", "stream_element_index", + "measure_parts", "measure_series", + "trace_parts", "trace_series", + } { + v := errs[key].(map[string]interface{}) + assert.Emptyf(t, v, "errors.%s must be empty for this scenario", key) + } +} + +// TestBuildMigrationReport_CompletedScopedToScheduledSet pins the +// resume-safety invariant: CompletedGroups carrying entries from a prior +// cycle that are no longer in the current GroupsToProcess set must NOT +// inflate completed_groups beyond total_groups. +func TestBuildMigrationReport_CompletedScopedToScheduledSet(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + p.MarkGroupCompleted("sw_a") + p.MarkGroupCompleted("sw_b") + p.MarkGroupCompleted("sw_c") + p.MarkGroupCompleted("sw_d") + p.MarkGroupCompleted("sw_e") + p.SetGroupsToProcess([]string{"sw_f", "sw_g", "sw_h"}) + p.MarkGroupCompleted("sw_f") + p.MarkGroupCompleted("sw_g") + p.MarkGroupCompleted("sw_h") + + svc := &lifecycleService{l: logger.GetLogger("test")} + report := svc.buildMigrationReport(p) + summary := report["summary"].(map[string]interface{}) + ms := summary["migration_status"].(map[string]interface{}) + + assert.Equal(t, 3, ms["total_groups"]) + assert.Equal(t, 3, ms["completed_groups"]) + assert.InDelta(t, 100.0, ms["completion_rate"], 1e-9) +} + +// TestBuildMigrationReport_EmptyCycleHonestTotalGroups pins that an +// empty-snapshot cycle reports total_groups=0 / rate=0 instead of inheriting +// a stale prior-cycle GroupsToProcess. +func TestBuildMigrationReport_EmptyCycleHonestTotalGroups(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + p.MarkGroupCompleted("sw_a") + p.MarkGroupCompleted("sw_b") + p.SetGroupsToProcess(nil) + + svc := &lifecycleService{l: logger.GetLogger("test")} + report := svc.buildMigrationReport(p) + summary := report["summary"].(map[string]interface{}) + ms := summary["migration_status"].(map[string]interface{}) + + assert.Equal(t, 0, ms["total_groups"]) + assert.Equal(t, 0, ms["completed_groups"]) + assert.InDelta(t, 0.0, ms["completion_rate"], 1e-9) +} + +// assertResource pins down a single resource sub-block under a catalog +// (stream | measure | trace)_migration. total = pre-walk source count; +// completed = source items fully migrated; errors = per-target error count; +// rate = completed / total ∈ [0, 100]. +func assertResource(t *testing.T, summary map[string]interface{}, catalog, resource string, total, completed int, rate float64, errors int) { + t.Helper() + cat, ok := summary[catalog].(map[string]interface{}) + require.Truef(t, ok, "summary.%s must be a map", catalog) + res, ok := cat[resource].(map[string]interface{}) + require.Truef(t, ok, "summary.%s.%s must be a map", catalog, resource) + assert.Equalf(t, total, res["total"], "summary.%s.%s.total", catalog, resource) + assert.Equalf(t, completed, res["completed"], "summary.%s.%s.completed", catalog, resource) + assert.Equalf(t, errors, res["errors"], "summary.%s.%s.errors", catalog, resource) + assert.InDeltaf(t, rate, res["completion_rate"], 1e-9, "summary.%s.%s.completion_rate", catalog, resource) +} + +// TestCompletionRateNeverExceeds100UnderFanOut replays the original 266 % +// regression scenario: per-target Mark*Completed is invoked once per +// (source × target_segment × target_shard) tuple while MarkSource*Completed +// is invoked once per source. Under Option 3 the per-target Marks must NOT +// move counters and MarkSource bumps Progress exactly once per source, so +// completion_rate = Progress / pre-walk-Counts stays in [0, 100] no matter +// how large the fan-out is. This is the regression test that pins the 266 % +// bug as fixed. +func TestCompletionRateNeverExceeds100UnderFanOut(t *testing.T) { + p := NewProgress("", logger.GetLogger("test")) + p.SetGroupsToProcess([]string{"sw_fanout"}) + + // 4 source series files × 2 target segments × 3 target shards = 24 + // per-target Mark calls. Pre-walk says 4 source items. + const sourceCount = 4 + p.SetStreamSeriesCount("sw_fanout", sourceCount) + targetSegs := []string{"tgt-seg-0", "tgt-seg-1"} + + for src := 0; src < sourceCount; src++ { + sourceShard := common.ShardID(src) + // Fan-out: emit per-target Marks for every (target_segment, target_shard). + for _, tgtSeg := range targetSegs { + for tgtShard := common.ShardID(0); tgtShard < 3; tgtShard++ { + p.MarkStreamSeriesCompleted("sw_fanout", tgtSeg, tgtShard) + } + } + // Per-source: advance Progress exactly once per source file. + p.MarkSourceStreamSeriesCompleted("sw_fanout", "src-segment", sourceShard) + } + p.MarkGroupCompleted("sw_fanout") + + svc := &lifecycleService{l: logger.GetLogger("test")} + report := svc.buildMigrationReport(p) + summary := report["summary"].(map[string]interface{}) + + // Counts comes from pre-walk Set*Count, not from per-target Marks. + // Progress is one bump per source MarkSource, not one per fan-out Mark. + // rate = 4/4 = 100 % (NOT 24/4 = 600 %). + streamMigration := summary["stream_migration"].(map[string]interface{}) + series := streamMigration["series"].(map[string]interface{}) + assert.Equal(t, sourceCount, series["total"], "Counts must come from Set*Count, not from per-target Marks") + assert.Equal(t, sourceCount, series["completed"], "Progress must be one bump per source, not per fan-out target") + + rate := series["completion_rate"].(float64) + assert.GreaterOrEqualf(t, rate, 0.0, "completion_rate must be >= 0, got %v", rate) + assert.LessOrEqualf(t, rate, 100.0, "completion_rate must stay <= 100%% under fan-out (266%% regression), got %v", rate) + assert.InDelta(t, 100.0, rate, 1e-9) +} diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index 85a78bf01..61eb8eae6 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -429,6 +429,10 @@ func (l *lifecycleService) action() error { } if streamDir == "" && measureDir == "" && traceDir == "" { l.l.Warn().Msg("no snapshots found, skipping lifecycle migration") + // Clear any GroupsToProcess persisted from a prior cycle so the + // emitted report honestly reports total_groups=0 for this empty + // cycle instead of inheriting a stale denominator. + progress.SetGroupsToProcess(nil) l.generateReport(progress) return nil } @@ -437,6 +441,10 @@ func (l *lifecycleService) action() error { Str("measure_snapshot", measureDir). Str("trace_snapshot", traceDir). Msg("created snapshots") + // Record the scheduled group set only after snapshots are confirmed so + // the report distinguishes "no work this cycle" (total_groups=0) from a + // real cycle that processed N groups (total_groups=N). + progress.SetGroupsToProcess(getGroupNames(groups)) progress.Save(l.progressFilePath, l.l) nodes, err := l.metadata.NodeRegistry().ListNode(ctx, databasev1.Role_ROLE_DATA) @@ -446,7 +454,6 @@ func (l *lifecycleService) action() error { } labels := common.ParseNodeFlags() - allGroupsCompleted := true for _, g := range groups { switch g.Catalog { case commonv1.Catalog_CATALOG_STREAM: @@ -478,13 +485,18 @@ func (l *lifecycleService) action() error { // Only remove progress file if ALL groups are fully completed notCompleteGroups := progress.AllGroupsNotFullyCompleted(groups) - if allGroupsCompleted && len(notCompleteGroups) == 0 { + if len(notCompleteGroups) == 0 { progress.Remove(l.progressFilePath, l.l) l.l.Info().Msg("lifecycle migration completed successfully") l.generateReport(progress) return nil } + // Partial-failure path: also emit a report so operators can inspect + // errors.* and per-resource completion_rate without parsing raw + // progress.json. The report distinguishes itself from a clean cycle + // by having errors.* non-empty and completion_rate < 100. l.l.Info().Msg("lifecycle migration partially completed, progress file retained") + l.generateReport(progress) return fmt.Errorf("lifecycle migration partially completed, progress file retained; %v groups not fully completed", notCompleteGroups) } @@ -531,6 +543,7 @@ func (l *lifecycleService) buildMigrationReport(p *Progress) map[string]interfac "snapshot_info": map[string]interface{}{ "stream_dir": p.SnapshotStreamDir, "measure_dir": p.SnapshotMeasureDir, + "trace_dir": p.SnapshotTraceDir, }, } @@ -539,8 +552,20 @@ func (l *lifecycleService) buildMigrationReport(p *Progress) map[string]interfac // buildSummaryStats creates overall migration statistics. func (l *lifecycleService) buildSummaryStats(p *Progress) map[string]interface{} { - totalGroups := len(p.CompletedGroups) + len(p.DeletedStreamGroups) + len(p.DeletedMeasureGroups) - completedGroups := len(p.CompletedGroups) + // total_groups reflects the cycle's scheduled set captured at the top of + // action(); completed_groups counts groups that ran the full + // processXxxGroup → MarkGroupCompleted path. The intersection guards + // against resume from a partial-failure progress.json where + // CompletedGroups carries entries from prior cycles that are not in + // the current scheduled set, which would otherwise yield + // completed_groups > total_groups. + totalGroups := len(p.GroupsToProcess) + completedGroups := 0 + for _, name := range p.GroupsToProcess { + if p.CompletedGroups[name] { + completedGroups++ + } + } // Calculate total parts and series across all groups totalStreamParts, completedStreamParts := l.calculateTotalCounts(p.StreamPartCounts, p.StreamPartProgress) @@ -548,6 +573,8 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) map[string]interface{} totalStreamElementIndex, completedStreamElementIndex := l.calculateTotalCounts(p.StreamElementIndexCounts, p.StreamElementIndexProgress) totalMeasureParts, completedMeasureParts := l.calculateTotalCounts(p.MeasurePartCounts, p.MeasurePartProgress) totalMeasureSeries, completedMeasureSeries := l.calculateTotalCounts(p.MeasureSeriesCounts, p.MeasureSeriesProgress) + totalTraceShards, completedTraceShards := l.calculateTotalCounts(p.TraceShardCounts, p.TraceShardProgress) + totalTraceSeries, completedTraceSeries := l.calculateTotalCounts(p.TraceSeriesCounts, p.TraceSeriesProgress) // Calculate error counts streamPartErrors := l.countErrors(p.StreamPartErrors) @@ -555,6 +582,8 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) map[string]interface{} streamElementIndexErrors := l.countErrors(p.StreamElementIndexErrors) measurePartErrors := l.countErrors(p.MeasurePartErrors) measureSeriesErrors := l.countErrors(p.MeasureSeriesErrors) + traceShardErrors := l.countErrors(p.TraceShardErrors) + traceSeriesErrors := l.countErrors(p.TraceSeriesErrors) return map[string]interface{}{ "migration_status": map[string]interface{}{ @@ -596,20 +625,37 @@ func (l *lifecycleService) buildSummaryStats(p *Progress) map[string]interface{} "completion_rate": l.calculatePercentage(completedMeasureSeries, totalMeasureSeries), }, }, + // Field names parts/series mirror stream_migration / measure_migration. + // "parts" is fed by the per-shard TraceShard counters because trace + // migration is shard-batched (sidx + core parts streamed together). + "trace_migration": map[string]interface{}{ + "parts": map[string]interface{}{ + "total": totalTraceShards, + "completed": completedTraceShards, + "errors": traceShardErrors, + "completion_rate": l.calculatePercentage(completedTraceShards, totalTraceShards), + }, + "series": map[string]interface{}{ + "total": totalTraceSeries, + "completed": completedTraceSeries, + "errors": traceSeriesErrors, + "completion_rate": l.calculatePercentage(completedTraceSeries, totalTraceSeries), + }, + }, } } // buildErrorSummary creates detailed error information. func (l *lifecycleService) buildErrorSummary(p *Progress) map[string]interface{} { - errors := map[string]interface{}{ + return map[string]interface{}{ "stream_parts": l.buildErrorDetails(p.StreamPartErrors), "stream_series": l.buildErrorDetails(p.StreamSeriesErrors), "stream_element_index": l.buildErrorDetails(p.StreamElementIndexErrors), "measure_parts": l.buildErrorDetails(p.MeasurePartErrors), "measure_series": l.buildErrorDetails(p.MeasureSeriesErrors), + "trace_parts": l.buildErrorDetails(p.TraceShardErrors), + "trace_series": l.buildErrorDetails(p.TraceSeriesErrors), } - - return errors } // Helper functions. @@ -684,7 +730,14 @@ func (l *lifecycleService) buildErrorDetails(errorMaps interface{}) map[string]i segmentDetails := make(map[string]interface{}) for shardID, parts := range shards { if len(parts) > 0 { - segmentDetails[fmt.Sprintf("shard_%d", shardID)] = parts + // Copy the inner map so the report does not alias + // Progress state; aliasing would race with a + // concurrent Mark*Error during JSON marshaling. + partDetails := make(map[uint64]string, len(parts)) + for partID, errorMsg := range parts { + partDetails[partID] = errorMsg + } + segmentDetails[fmt.Sprintf("shard_%d", shardID)] = partDetails } } if len(segmentDetails) > 0 { diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go b/banyand/backup/lifecycle/stream_migration_visitor.go index 71b03e5fd..42ca53405 100644 --- a/banyand/backup/lifecycle/stream_migration_visitor.go +++ b/banyand/backup/lifecycle/stream_migration_visitor.go @@ -102,9 +102,6 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, se Str("path", seriesIndexPath). Msg("found segment files for migration") - // Set the total number of series segments for progress tracking - mv.SetStreamSeriesCount(len(segmentFiles)) - // Calculate ALL target segments this series index should go to targetSegments := calculateTargetSegments( segmentTR.Start.UnixNano(), @@ -220,6 +217,15 @@ func (mv *streamMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, se } } + // Mark each source series file as fully migrated (idempotent — bumps Progress once per source). + for _, segmentFileName := range segmentFiles { + fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg") + segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 64) + if parseErr != nil { + continue + } + mv.progress.MarkSourceStreamSeriesCompleted(mv.group, seriesIndexPath, common.ShardID(segmentID)) + } return nil } @@ -233,7 +239,6 @@ func (mv *streamMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardI if err != nil { return fmt.Errorf("failed to parse part ID from path: %w", err) } - // Calculate ALL target segments this part should go to targetSegments := calculateTargetSegments( partData.MinTimestamp, @@ -298,6 +303,7 @@ func (mv *streamMigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardI Msgf("part migration completed for target segment %d/%d", i+1, len(targetSegments)) } + mv.progress.MarkSourceStreamPartCompleted(mv.group, partPath, sourceShardID, partID) return nil } @@ -309,6 +315,9 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan Str("group", mv.group). Uint32("source_shard", uint32(sourceShardID)). Msg("element index segment already completed, skipping") + // Per-target write done; ensure source progress is recorded to survive + // a crash between MarkStreamElementIndexCompleted and MarkSource (idempotent). + mv.progress.MarkSourceStreamElementIndexCompleted(mv.group, indexPath, sourceShardID) return nil } @@ -337,9 +346,6 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan return nil } - // Set the total number of element index segment files for progress tracking - mv.SetStreamElementIndexCount(len(segmentFiles)) - // Calculate target shard ID (using a simple approach for element index) targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) mv.logger.Info(). @@ -411,6 +417,7 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan // Mark segment as completed mv.progress.MarkStreamElementIndexCompleted(mv.group, segmentIDStr, sourceShardID) + mv.progress.MarkSourceStreamElementIndexCompleted(mv.group, indexPath, sourceShardID) return nil } diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go b/banyand/backup/lifecycle/trace_migration_visitor.go index 2936d7c14..10d4a2d00 100644 --- a/banyand/backup/lifecycle/trace_migration_visitor.go +++ b/banyand/backup/lifecycle/trace_migration_visitor.go @@ -103,9 +103,6 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, ser Str("path", seriesIndexPath). Msg("found trace segment files for migration") - // Set the total number of series segments for progress tracking - mv.SetTraceSeriesCount(len(segmentFiles)) - // Calculate ALL target segments this series index should go to targetSegments := calculateTargetSegments( segmentTR.Start.UnixNano(), @@ -221,18 +218,33 @@ func (mv *traceMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, ser } } + // Mark each source series file as fully migrated (idempotent — bumps Progress once per source). + for _, segmentFileName := range segmentFiles { + fileSegmentIDStr := strings.TrimSuffix(segmentFileName, ".seg") + segmentID, parseErr := strconv.ParseUint(fileSegmentIDStr, 16, 64) + if parseErr != nil { + continue + } + mv.progress.MarkSourceTraceSeriesCompleted(mv.group, seriesIndexPath, common.ShardID(segmentID)) + } return nil } // VisitShard implements trace.Visitor - core and sidx migration logic. func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange, sourceShardID common.ShardID, shardPath string) error { segmentIDStr := timestampTR.String() - if mv.progress.IsTraceShardCompleted(mv.group, shardPath, sourceShardID) { + // Match the key used by Mark below; using shardPath here would always + // miss on resume and re-run the migration even though the shard had + // completed, wasting bandwidth and writing duplicates to the target. + if mv.progress.IsTraceShardCompleted(mv.group, segmentIDStr, sourceShardID) { mv.logger.Debug(). Str("shard_path", shardPath). Str("group", mv.group). Uint32("source_shard", uint32(sourceShardID)). Msg("trace shard already completed for this target segment, skipping") + // Per-target write done; ensure source progress is recorded to survive + // a crash between MarkTraceShardCompleted and MarkSource (idempotent). + mv.progress.MarkSourceTraceShardCompleted(mv.group, segmentIDStr, sourceShardID) return nil } allParts := make([]queue.StreamingPartData, 0) @@ -277,6 +289,7 @@ func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange, so // Mark shard as completed for this target segment mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr, sourceShardID) + mv.progress.MarkSourceTraceShardCompleted(mv.group, segmentIDStr, sourceShardID) mv.logger.Info(). Str("group", mv.group). Msgf("trace shard migration completed for target segment") diff --git a/test/cases/lifecycle/lifecycle.go b/test/cases/lifecycle/lifecycle.go index cecc841e1..40c7c682b 100644 --- a/test/cases/lifecycle/lifecycle.go +++ b/test/cases/lifecycle/lifecycle.go @@ -19,6 +19,7 @@ package lifecycle_test import ( + "encoding/json" "io/fs" "os" "path/filepath" @@ -64,10 +65,7 @@ var _ = ginkgo.Describe("Lifecycle", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) verifySourceDirectoriesAfterMigration() verifyDestinationDirectoriesAfterMigration() - // Check report directory has files - rEntries, err := os.ReadDir(rf) - gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report directory should exist") - gomega.Expect(len(rEntries)).To(gomega.BeNumerically(">", 0), "Report directory should contain files") + verifyMigrationReport(rf) conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) defer func() { @@ -131,10 +129,7 @@ var _ = ginkgo.Describe("Lifecycle", func() { gomega.Expect(err).NotTo(gomega.HaveOccurred()) verifySourceDirectoriesAfterMigration() verifyDestinationDirectoriesAfterMigration() - // Check report directory has files - rEntries, err := os.ReadDir(rf) - gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report directory should exist") - gomega.Expect(len(rEntries)).To(gomega.BeNumerically(">", 0), "Report directory should contain files") + verifyMigrationReport(rf) conn, err := grpchelper.Conn(SharedContext.LiaisonAddr, 10*time.Second, grpc.WithTransportCredentials(insecure.NewCredentials())) defer func() { @@ -273,3 +268,133 @@ func verifyLockFileAndSegFolder(entries []fs.DirEntry) (hasLockFile bool, hasSeg } return hasLockFile, hasSegFolder } + +// verifyMigrationReport reads every JSON report file generated under rf +// and asserts the invariants of the comprehensive migration report: +// +// - migration_status.completion_rate is 100 when the cycle had scheduled +// groups; the denominator equals the scheduled group set. +// - Every per-resource block (parts / series / element_index) reaches +// 100 % when the catalog had work to do, or 0 % when the cycle had +// nothing to migrate (total == 0). +// - The trace_migration block is present with parts + series fields +// aligned to stream_migration / measure_migration; snapshot_info +// includes trace_dir; errors includes trace_parts / trace_series keys. +// - All errors.* maps are empty for a clean cycle. +func verifyMigrationReport(rf string) { + rEntries, err := os.ReadDir(rf) + gomega.Expect(err).NotTo(gomega.HaveOccurred(), "Report directory should exist") + gomega.Expect(rEntries).NotTo(gomega.BeEmpty(), "Report directory should contain files") + + jsonReports := 0 + for _, entry := range rEntries { + if entry.IsDir() || filepath.Ext(entry.Name()) != ".json" { + continue + } + jsonReports++ + path := filepath.Join(rf, entry.Name()) + raw, readErr := os.ReadFile(path) + gomega.Expect(readErr).NotTo(gomega.HaveOccurred(), "Report file %s should be readable", path) + + var report map[string]interface{} + gomega.Expect(json.Unmarshal(raw, &report)).To(gomega.Succeed(), "Report file %s should be valid JSON", path) + + gomega.Expect(report).To(gomega.HaveKey("summary"), "report %s missing summary", path) + gomega.Expect(report).To(gomega.HaveKey("errors"), "report %s missing errors", path) + gomega.Expect(report).To(gomega.HaveKey("snapshot_info"), "report %s missing snapshot_info", path) + gomega.Expect(report["report_version"]).To(gomega.Equal("2.0"), "report %s has unexpected report_version", path) + + // snapshot_info: trace_dir must surface alongside stream/measure. + snap, ok := report["snapshot_info"].(map[string]interface{}) + gomega.Expect(ok).To(gomega.BeTrue(), "snapshot_info must be a map in %s", path) + gomega.Expect(snap).To(gomega.HaveKey("trace_dir"), "snapshot_info must include trace_dir in %s", path) + + summary, ok := report["summary"].(map[string]interface{}) + gomega.Expect(ok).To(gomega.BeTrue(), "summary must be a map in %s", path) + + // migration_status: when total_groups=0 the cycle had no scheduled + // work (e.g. snapshots were empty); the rate is 0 by construction. + // Otherwise the report reflects a fully completed cycle, so the + // rate must be 100. + ms, ok := summary["migration_status"].(map[string]interface{}) + gomega.Expect(ok).To(gomega.BeTrue(), "summary.migration_status must be a map in %s", path) + if asInt(ms["total_groups"]) == 0 { + gomega.Expect(asFloat(ms["completion_rate"])).To(gomega.BeNumerically("~", 0.0, 1e-9), + "migration_status.completion_rate must be 0 when total_groups=0 in %s", path) + } else { + gomega.Expect(asFloat(ms["completion_rate"])).To(gomega.BeNumerically("~", 100.0, 1e-9), + "migration_status.completion_rate should be 100 in %s", path) + } + + // Per-resource invariants. + verifyAllRatesAt100(summary, "stream_migration", []string{"parts", "series", "element_index"}, path) + verifyAllRatesAt100(summary, "measure_migration", []string{"parts", "series"}, path) + + // trace_migration: block must exist with parts + series. + verifyAllRatesAt100(summary, "trace_migration", []string{"parts", "series"}, path) + + // errors must include trace_* keys and stay empty. + errs, ok := report["errors"].(map[string]interface{}) + gomega.Expect(ok).To(gomega.BeTrue(), "errors must be a map in %s", path) + for _, key := range []string{ + "stream_parts", "stream_series", "stream_element_index", + "measure_parts", "measure_series", + "trace_parts", "trace_series", + } { + v, found := errs[key] + gomega.Expect(found).To(gomega.BeTrue(), "errors.%s must be present in %s", key, path) + errMap, isMap := v.(map[string]interface{}) + gomega.Expect(isMap).To(gomega.BeTrue(), "errors.%s must be a map in %s", key, path) + gomega.Expect(errMap).To(gomega.BeEmpty(), "errors.%s must be empty for a clean cycle in %s", key, path) + } + } + gomega.Expect(jsonReports).To(gomega.BeNumerically(">", 0), "no JSON report files found under %s", rf) +} + +// verifyAllRatesAt100 enforces the per-resource accounting invariant: when +// total == 0 the cycle had no work for that resource and the rate stays at +// 0; otherwise completed must equal total and the rate must be 100. +func verifyAllRatesAt100(summary map[string]interface{}, catalog string, resources []string, path string) { + cat, ok := summary[catalog].(map[string]interface{}) + gomega.Expect(ok).To(gomega.BeTrue(), "summary.%s must be a map in %s", catalog, path) + for _, r := range resources { + res, isMap := cat[r].(map[string]interface{}) + gomega.Expect(isMap).To(gomega.BeTrue(), "summary.%s.%s must be a map in %s", catalog, r, path) + gomega.Expect(res).To(gomega.HaveKey("total"), "summary.%s.%s missing total in %s", catalog, r, path) + gomega.Expect(res).To(gomega.HaveKey("completed"), "summary.%s.%s missing completed in %s", catalog, r, path) + gomega.Expect(res).To(gomega.HaveKey("errors"), "summary.%s.%s missing errors in %s", catalog, r, path) + gomega.Expect(res).To(gomega.HaveKey("completion_rate"), "summary.%s.%s missing completion_rate in %s", catalog, r, path) + + total := asInt(res["total"]) + completed := asInt(res["completed"]) + errors := asInt(res["errors"]) + rate := asFloat(res["completion_rate"]) + + gomega.Expect(errors).To(gomega.Equal(0), "summary.%s.%s.errors must be 0 in %s", catalog, r, path) + if total == 0 { + gomega.Expect(rate).To(gomega.BeNumerically("~", 0.0, 1e-9), + "summary.%s.%s.completion_rate must be 0 when total=0 in %s", catalog, r, path) + continue + } + gomega.Expect(completed).To(gomega.Equal(total), + "summary.%s.%s.completed must equal total in %s", catalog, r, path) + gomega.Expect(rate).To(gomega.BeNumerically("~", 100.0, 1e-9), + "summary.%s.%s.completion_rate must be 100 in %s", catalog, r, path) + } +} + +// asFloat coerces a JSON-decoded numeric value to float64. +func asFloat(v interface{}) float64 { + if f, ok := v.(float64); ok { + return f + } + return 0 +} + +// asInt coerces a JSON-decoded numeric value to int. +func asInt(v interface{}) int { + if f, ok := v.(float64); ok { + return int(f) + } + return 0 +}
