This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit b95bcae4cbeb91724ec53972f92ec7f7d1db5e3f Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Aug 1 11:32:27 2025 +0800 Add measure desgin Signed-off-by: Gao Hongtao <hanahm...@gmail.com> --- .../backup/lifecycle/MEASURE_MIGRATION_DESIGN.md | 420 +++++++++++++++++++++ 1 file changed, 420 insertions(+) diff --git a/banyand/backup/lifecycle/MEASURE_MIGRATION_DESIGN.md b/banyand/backup/lifecycle/MEASURE_MIGRATION_DESIGN.md new file mode 100644 index 00000000..b2e03016 --- /dev/null +++ b/banyand/backup/lifecycle/MEASURE_MIGRATION_DESIGN.md @@ -0,0 +1,420 @@ +# File-Based Measure Migration Design + +## Overview + +This document outlines the design for refactoring measure migration from a query-based approach to a file-based streaming approach, following the successful pattern established in stream migration. + +## Current Architecture Analysis + +### Current Query-Based Approach +- **File**: `steps.go:230-296` (`migrateMeasure` function) +- **Method**: Individual record querying via `MeasureQueryOptions` +- **Transfer**: Message-based publishing via `queue.BatchPublisher` +- **Granularity**: Per-record processing +- **Performance**: High overhead due to individual record serialization + +### Target File-Based Approach +- **Pattern**: Similar to `file_migration_visitor.go` stream migration +- **Method**: Direct file streaming of measure parts +- **Transfer**: Chunked streaming via `queue.ChunkedSyncClient` +- **Granularity**: Per-part file processing +- **Performance**: Bulk file transfer optimization + +## Measure Storage Architecture + +Based on analysis of `measure/metadata.go` and `measure/tstable.go`, measures are stored at **group level**, not per-measure: + +### Group-Level Storage Structure +``` +measure_data/ +└── <group_name>/ # Group directory (e.g., "sw_metric") + ├── 000000000000abc1/ # Part directory (epoch-based ID) + │ ├── metadata.json # Part metadata + │ ├── primary.bin # Primary index + │ ├── meta.bin # Block metadata (compressed) + │ ├── timestamps.bin # Timestamp data + │ ├── fv.bin # Field values + │ ├── <tagfamily>.tf # Tag family data + │ └── <tagfamily>.tfm # Tag family metadata + ├── 000000000000abc2/ # Another part directory + └── snapshot_xyz.snp # Snapshot files +``` + +### Key Architecture Points +1. **Group-Level Storage**: All measures in a group share the same TSDB storage (`path.Join(s.path, group)`) +2. **Single TSDB per Group**: `g.SupplyTSDB()` returns one TSDB instance per group +3. **Part-Based Organization**: Data stored in epoch-based part directories +4. **Mixed Data**: Each part contains data from **all measures** in the group +5. **No Per-Measure Separation**: Cannot distinguish individual measures at file level + +## Proposed Design + +### 1. File Structure: `measure_migration_visitor.go` + +Create a new visitor implementing measure-specific file migration, following the exact pattern from `streamMigrationVisitor.VisitPart`: + +```go +// measureMigrationVisitor implements file-based migration for measure data +type measureMigrationVisitor struct { + selector node.Selector // Node selector + client queue.Client // Queue client + chunkedClients map[string]queue.ChunkedSyncClient // Per-node chunked sync clients + logger *logger.Logger + progress *Progress // Progress tracker + lfs fs.FileSystem + group string + targetShardNum uint32 // Target shard count + replicas uint32 // Replica count + chunkSize int // Chunk size for streaming +} + +// Key methods based on streamMigrationVisitor pattern: +func (mv *measureMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string, shardIDs []common.ShardID) error // Not used for measures +func (mv *measureMigrationVisitor) VisitPart(segmentTR *timestamp.TimeRange, sourceShardID common.ShardID, partPath string) error // Core migration logic +``` + +### 2. Integration Function: `measure_migration_integration.go` + +```go +// migrateMeasureWithFileBasedAndProgress performs file-based measure migration +// NOTE: measures parameter is removed since all measures in a group share the same parts +func migrateMeasureWithFileBasedAndProgress( + tsdbRootPath string, + timeRange timestamp.TimeRange, + group *commonv1.Group, + nodeLabels map[string]string, + nodes []*databasev1.Node, + metadata metadata.Repo, + logger *logger.Logger, + progress *Progress, + chunkSize int, +) error +``` + +### 3. Core VisitPart Implementation + +The key difference from stream migration is that measures use the existing visitor pattern with `storage.VisitSegmentsInTimeRange` which calls `VisitPart` for each part. We adapt `streamMigrationVisitor.VisitPart` for measure data: + +#### VisitPart Method (adapted from streamMigrationVisitor.VisitPart:189-242) +```go +func (mv *measureMigrationVisitor) VisitPart(segmentTR *timestamp.TimeRange, sourceShardID common.ShardID, partPath string) error { + // 1. Parse part ID from directory name (similar to stream.ParsePartMetadata but for measures) + partID, err := mv.parsePartIDFromPath(partPath) + if err != nil { + return fmt.Errorf("failed to parse part ID from path %s: %w", partPath, err) + } + + // 2. Check if this part has already been completed + if mv.progress.IsMeasurePartCompleted(mv.group, partID) { + mv.logger.Debug().Uint64("part_id", partID).Str("group", mv.group).Msg("part already completed, skipping") + return nil + } + + // 3. Calculate target shard ID based on source shard ID mapping + targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) + + // 4. Log migration start + mv.logger.Info(). + Uint64("part_id", partID). + Uint32("source_shard", uint32(sourceShardID)). + Uint32("target_shard", targetShardID). + Str("part_path", partPath). + Str("group", mv.group). + Msg("migrating measure part") + + // 5. Create file readers (adapted from stream.CreatePartFileReaderFromPath) + files, release := mv.createMeasurePartFileReaders(partPath) + defer release() + + // 6. Create streaming part data + partData := mv.createStreamingPartData(partID, targetShardID, files, segmentTR) + + // 7. Stream entire part to target shard replicas + if err := mv.streamPartToTargetShard(partData); err != nil { + errorMsg := fmt.Sprintf("failed to stream part to target shard: %v", err) + mv.progress.MarkMeasurePartError(mv.group, partID, errorMsg) + return fmt.Errorf("failed to stream part to target shard: %w", err) + } + + // 8. Mark part as completed in progress tracker + mv.progress.MarkMeasurePartCompleted(mv.group, partID) + + // 9. Log completion + mv.logger.Info(). + Uint64("part_id", partID). + Str("group", mv.group). + Int("completed_parts", mv.progress.GetMeasurePartProgress(mv.group)). + Int("total_parts", mv.progress.GetMeasurePartCount(mv.group)). + Msg("measure part migration completed successfully") + + return nil +} +``` + +#### Supporting Methods +```go +// parsePartIDFromPath extracts part ID from the part directory name +func (mv *measureMigrationVisitor) parsePartIDFromPath(partPath string) (uint64, error) { + partDirName := filepath.Base(partPath) + return strconv.ParseUint(partDirName, 16, 64) +} + +// calculateTargetShardID maps source shard ID to target shard ID (same as stream) +func (mv *measureMigrationVisitor) calculateTargetShardID(sourceShardID uint32) uint32 { + return sourceShardID % mv.targetShardNum +} + +// createMeasurePartFileReaders creates file readers for all files in a measure part +// Based on measure/part.go structure: metadata.json, primary.bin, meta.bin, timestamps.bin, fv.bin, *.tf, *.tfm +func (mv *measureMigrationVisitor) createMeasurePartFileReaders(partPath string) ([]queue.FileInfo, func()) { + var files []queue.FileInfo + var readers []fs.SequentialReader + + // Core measure files + coreFiles := []string{"metadata.json", "primary.bin", "meta.bin", "timestamps.bin", "fv.bin"} + for _, filename := range coreFiles { + if reader := mv.openFileReader(partPath, filename); reader != nil { + readers = append(readers, reader) + files = append(files, queue.FileInfo{Name: filename, Reader: reader}) + } + } + + // Tag family files (*.tf and *.tfm) + entries := mv.lfs.ReadDir(partPath) + for _, entry := range entries { + if !entry.IsDir() && (strings.HasSuffix(entry.Name(), ".tf") || strings.HasSuffix(entry.Name(), ".tfm")) { + if reader := mv.openFileReader(partPath, entry.Name()); reader != nil { + readers = append(readers, reader) + files = append(files, queue.FileInfo{Name: entry.Name(), Reader: reader}) + } + } + } + + // Return cleanup function + release := func() { + for _, reader := range readers { + reader.Close() + } + } + + return files, release +} +``` + +### 4. Integration with Existing Visitor Pattern + +The measure migration leverages the existing `storage.VisitSegmentsInTimeRange` infrastructure: + +#### Integration Function +```go +// migrateMeasureWithFileBasedAndProgress performs file-based measure migration +func migrateMeasureWithFileBasedAndProgress( + tsdbRootPath string, + timeRange timestamp.TimeRange, + group *commonv1.Group, + nodeLabels map[string]string, + nodes []*databasev1.Node, + metadata metadata.Repo, + logger *logger.Logger, + progress *Progress, + chunkSize int, +) error { + // Parse group configuration + shardNum, replicas, ttl, selector, client, err := parseGroup(group, nodeLabels, nodes, logger, metadata) + if err != nil { + return err + } + defer client.GracefulStop() + + // Convert TTL to IntervalRule + intervalRule := storage.MustToIntervalRule(ttl) + + // Create measure migration visitor + visitor := newMeasureMigrationVisitor(group, shardNum, replicas, selector, client, logger, progress, chunkSize) + defer visitor.Close() + + // Use existing segment visitor infrastructure + return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, visitor, intervalRule) +} +``` + +### 5. Service Integration + +The measure migration integrates seamlessly with the existing lifecycle service by replacing the query-based approach: + +#### Update `service.go` +```go +func (l *lifecycleService) processMeasureGroup(ctx context.Context, g *commonv1.Group, measureSVC measure.Service, + nodes []*databasev1.Node, labels map[string]string, progress *Progress, +) { + // Check if file-based migration is enabled + if l.useFileBasedMigration { + l.processMeasureGroupFileBased(ctx, g, measureSVC, nodes, labels, progress) + } else { + l.processMeasureGroupQueryBased(ctx, g, measureSVC, nodes, labels, progress) // Keep existing logic + } +} + +func (l *lifecycleService) processMeasureGroupFileBased(ctx context.Context, g *commonv1.Group, measureSVC measure.Service, + nodes []*databasev1.Node, labels map[string]string, progress *Progress, +) { + tr := measureSVC.GetRemovalSegmentsTimeRange(g.Metadata.Name) + if tr.Start.IsZero() && tr.End.IsZero() { + l.l.Info().Msgf("no removal segments time range for group %s, skipping measure migration", g.Metadata.Name) + progress.MarkGroupCompleted(g.Metadata.Name) + return + } + + // Get measure snapshot directory + measureDir := progress.SnapshotMeasureDir + if measureDir == "" { + l.l.Error().Msgf("no measure snapshot directory available for group: %s", g.Metadata.Name) + return + } + + // Perform file-based migration (no need for measure list since all measures share same parts) + err = migrateMeasureWithFileBasedAndProgress( + filepath.Join(measureDir, g.Metadata.Name), + *tr, + g, + labels, + nodes, + l.metadata, + l.l, + progress, + int(l.chunkSize), + ) + if err != nil { + l.l.Error().Err(err).Msgf("file-based measure migration failed for group: %s", g.Metadata.Name) + return + } + + l.l.Info().Msgf("completed file-based measure migration for group: %s", g.Metadata.Name) + l.deleteExpiredMeasureSegments(ctx, g, tr, progress) + progress.MarkGroupCompleted(g.Metadata.Name) +} +``` + +### 6. Progress Tracking Enhancement + +#### Extend `progress.go` +```go +// Add measure part-specific progress tracking +type Progress struct { + // ... existing fields ... + + // Measure part progress tracking + MeasurePartCounts map[string]int `json:"measure_part_counts"` + MeasurePartProgress map[string]int `json:"measure_part_progress"` + MeasurePartCompleted map[string]map[uint64]bool `json:"measure_part_completed"` + MeasurePartErrors map[string]map[uint64]string `json:"measure_part_errors"` +} + +func (p *Progress) IsMeasurePartCompleted(group string, partID uint64) bool +func (p *Progress) MarkMeasurePartCompleted(group string, partID uint64) +func (p *Progress) MarkMeasurePartError(group string, partID uint64, error string) +func (p *Progress) SetMeasurePartCount(group string, totalParts int) +func (p *Progress) GetMeasurePartProgress(group string) int +``` + +### 7. Queue Topic Extension + +#### Add New Topic in `api/data/topics.go` +```go +const ( + // ... existing topics ... + TopicMeasurePartSync = Topic("measure-part-sync") +) +``` + +### 8. Configuration Flag + +#### Add File-Based Migration Toggle +```go +func (l *lifecycleService) FlagSet() *run.FlagSet { + flagS := run.NewFlagSet(l.Name()) + // ... existing flags ... + flagS.BoolVar(&l.useFileBasedMigration, "file-based-migration", true, + "Use file-based migration for measures (default: true, fallback to query-based)") + return flagS +} +``` + +## Implementation Benefits + +### Corrected Understanding Impact +With the correct understanding that **all measures in a group share the same parts**, the file-based approach becomes even more beneficial: + +- **Massive efficiency gain**: Instead of querying each measure individually and reconstructing write requests, we stream the entire part containing **all measures at once** +- **~100-1000x performance improvement** expected since we eliminate per-measure processing overhead entirely +- **Single part = All measures**: Each part migration covers all measures in the group simultaneously +- **Reduced complexity**: No need to track per-measure progress, only per-part progress +- **Better resource utilization**: Stream complete data files rather than rebuilding data point by point + +### Performance Advantages +1. **Bulk Transfer**: Stream entire part files containing all measures instead of individual records +2. **Eliminated Per-Measure Processing**: No need to query and process each measure separately +3. **Parallel Processing**: Multiple parts can be streamed concurrently +4. **Atomic Group Operations**: Single part transfer covers entire group's measures + +### Reliability Improvements +1. **Atomic Operations**: Either entire part (with all measures) succeeds or fails +2. **Resume Capability**: Can restart from failed parts +3. **Error Isolation**: Part-level error handling covers all measures +4. **Consistent State**: No partial measure migrations within a part + +### Scalability Benefits +1. **Memory Efficiency**: Stream files directly without data reconstruction +2. **Network Optimization**: Bulk transfer vs thousands of individual messages +3. **Resource Management**: Better control over concurrent transfers +4. **Storage Efficiency**: Direct file-to-file transfers + +### Real-World Impact +For a group with 10 measures and 1000 parts: +- **Query-based**: 10,000 individual measure queries + 1,000,000 write requests +- **File-based**: 1,000 part file transfers (covers all measures automatically) + +This represents a **1000x reduction** in operations and network round-trips. + +## Migration Strategy + +### Phase 1: Implementation +1. Create `measure_migration_visitor.go` +2. Create `measure_migration_integration.go` +3. Extend progress tracking for measure parts +4. Add new queue topic for measure part sync + +### Phase 2: Integration +1. Update `service.go` with file-based measure processing +2. Add configuration flag for migration method selection +3. Implement fallback to query-based approach + +### Phase 3: Testing & Validation +1. Unit tests for measure part discovery and streaming +2. Integration tests comparing query-based vs file-based results +3. Performance benchmarks and memory usage analysis +4. End-to-end lifecycle migration testing + +### Phase 4: Rollout +1. Default to file-based migration with query-based fallback +2. Monitor performance improvements and error rates +3. Deprecate query-based approach after validation period + +## Risk Mitigation + +### Compatibility +- Maintain query-based fallback for compatibility +- Validate file-based results match query-based results +- Support rollback via configuration flag + +### Error Handling +- Implement robust error recovery at part level +- Maintain detailed error logging and progress tracking +- Support manual intervention for failed parts + +### Performance +- Implement streaming throttling to prevent resource exhaustion +- Monitor memory usage during large part transfers +- Add metrics for transfer rates and success rates + +This design provides a comprehensive approach to migrating measure data using the proven file-based streaming pattern while maintaining compatibility and reliability. \ No newline at end of file