This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 264e1974543b80885af629aacbaa477ce7adb131 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Fri Aug 1 10:01:27 2025 +0800 Refactor lifecycle management and enhance error handling - Added a `defer` statement to `migrateStreamWithFileBasedAndProgress` for graceful client shutdown. - Updated `lifecycleService` to warn when no snapshots are found and skip migration accordingly. - Adjusted file reading logic in `CreatePartFileReaderFromPath` to use more descriptive naming for file info. --- .../backup/lifecycle/file_migration_integration.go | 1 + banyand/backup/lifecycle/file_migration_visitor.go | 5 +++++ banyand/backup/lifecycle/service.go | 12 +++++++++++- banyand/backup/lifecycle/steps.go | 2 +- banyand/backup/snapshot/snapshot.go | 7 ++++++- banyand/internal/storage/index.go | 1 + banyand/queue/pub/chunked_sync.go | 7 +++++++ banyand/stream/part.go | 15 +++++++++------ lifecycle | Bin 38228245 -> 0 bytes 9 files changed, 41 insertions(+), 9 deletions(-) diff --git a/banyand/backup/lifecycle/file_migration_integration.go b/banyand/backup/lifecycle/file_migration_integration.go index ba5c4f8b..3aacc949 100644 --- a/banyand/backup/lifecycle/file_migration_integration.go +++ b/banyand/backup/lifecycle/file_migration_integration.go @@ -45,6 +45,7 @@ func migrateStreamWithFileBasedAndProgress( if err != nil { return err } + defer client.GracefulStop() // Convert TTL to IntervalRule using storage.MustToIntervalRule intervalRule := storage.MustToIntervalRule(ttl) diff --git a/banyand/backup/lifecycle/file_migration_visitor.go b/banyand/backup/lifecycle/file_migration_visitor.go index 874f6e58..c5ea084e 100644 --- a/banyand/backup/lifecycle/file_migration_visitor.go +++ b/banyand/backup/lifecycle/file_migration_visitor.go @@ -326,6 +326,11 @@ func (mv *streamMigrationVisitor) VisitElementIndex(segmentTR *timestamp.TimeRan // Close the file reader defer segmentFile.Close() + files = append(files, queue.FileInfo{ + Name: segmentFileName, + Reader: segmentFile.SequentialRead(), + }) + mv.logger.Info(). Uint64("segment_id", segmentID). Str("filename", segmentFileName). diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index 4eb05b8e..90a5ccd0 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -186,6 +186,11 @@ func (l *lifecycleService) action() error { l.l.Error().Err(err).Msg("failed to get snapshots") return err } + if streamDir == "" && measureDir == "" { + l.l.Warn().Msg("no snapshots found, skipping lifecycle migration") + l.generateReport(progress) + return nil + } l.l.Info(). Str("stream_snapshot", streamDir). Str("measure_snapshot", measureDir). @@ -211,6 +216,11 @@ func (l *lifecycleService) action() error { for _, g := range groups { switch g.Catalog { case commonv1.Catalog_CATALOG_STREAM: + if streamDir == "" { + l.l.Warn().Msgf("stream snapshot directory is not available, skipping group: %s", g.Metadata.Name) + progress.MarkGroupCompleted(g.Metadata.Name) + continue + } l.processStreamGroup(ctx, g, streamDir, nodes, labels, progress) case commonv1.Catalog_CATALOG_MEASURE: if measureSVC == nil { @@ -388,7 +398,7 @@ func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g *com // Use the file-based migration with existing visitor pattern err := migrateStreamWithFileBasedAndProgress( - streamDir, // Use snapshot directory as source + filepath.Join(streamDir, g.Metadata.Name), // Use snapshot directory as source *tr, // Time range for segments to migrate g, // Group configuration labels, // Node labels diff --git a/banyand/backup/lifecycle/steps.go b/banyand/backup/lifecycle/steps.go index e9b5291b..ce76a222 100644 --- a/banyand/backup/lifecycle/steps.go +++ b/banyand/backup/lifecycle/steps.go @@ -68,7 +68,7 @@ func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress) ( for _, snp := range snn { snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot, l.measureRoot, "") if errDir != nil { - logger.Warningf("Failed to get snapshot directory for %s: %v", snp.Name, errDir) + l.l.Error().Err(errDir).Msgf("Failed to get snapshot directory for %s", snp.Name) continue } if snp.Catalog == commonv1.Catalog_CATALOG_STREAM { diff --git a/banyand/backup/snapshot/snapshot.go b/banyand/backup/snapshot/snapshot.go index edcd17b5..981890e5 100644 --- a/banyand/backup/snapshot/snapshot.go +++ b/banyand/backup/snapshot/snapshot.go @@ -22,6 +22,7 @@ import ( "context" "errors" "fmt" + "os" "path/filepath" "time" @@ -76,7 +77,11 @@ func Dir(snapshot *databasev1.Snapshot, streamRoot, measureRoot, propertyRoot st default: return "", errors.New("unknown catalog type") } - return filepath.Join(baseDir, storage.SnapshotsDir, snapshot.Name), nil + snpPath := filepath.Join(baseDir, storage.SnapshotsDir, snapshot.Name) + if _, err := os.Stat(snpPath); os.IsNotExist(err) { + return "", fmt.Errorf("snapshot directory %s does not exist", snpPath) + } + return snpPath, nil } // LocalDir returns the local directory path of the snapshot. diff --git a/banyand/internal/storage/index.go b/banyand/internal/storage/index.go index 8a728408..b844cd06 100644 --- a/banyand/internal/storage/index.go +++ b/banyand/internal/storage/index.go @@ -64,6 +64,7 @@ func newSeriesIndex(ctx context.Context, root string, flushTimeoutSeconds int64, Logger: si.l, BatchWaitSec: flushTimeoutSeconds, CacheMaxBytes: cacheMaxBytes, + // EnableDeduplication: true, } if metrics != nil { opts.Metrics = metrics diff --git a/banyand/queue/pub/chunked_sync.go b/banyand/queue/pub/chunked_sync.go index f4c1cc7b..8e6cb679 100644 --- a/banyand/queue/pub/chunked_sync.go +++ b/banyand/queue/pub/chunked_sync.go @@ -97,6 +97,13 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx context.Context, parts []queu if err != nil { return nil, fmt.Errorf("failed to stream parts: %w", err) } + if totalChunks == 0 { + return &queue.SyncResult{ + Success: true, + SessionID: sessionID, + PartsCount: uint32(len(parts)), + }, nil + } var finalResp *clusterv1.SyncPartResponse for { diff --git a/banyand/stream/part.go b/banyand/stream/part.go index e0120456..e9fac1cd 100644 --- a/banyand/stream/part.go +++ b/banyand/stream/part.go @@ -646,7 +646,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F } readers = append(readers, metaReader) files = append(files, queue.FileInfo{ - Name: metaFilename, + Name: streamMetaName, Reader: metaReader.SequentialRead(), }) @@ -657,7 +657,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F } readers = append(readers, primaryReader) files = append(files, queue.FileInfo{ - Name: primaryFilename, + Name: streamPrimaryName, Reader: primaryReader.SequentialRead(), }) @@ -668,7 +668,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F } readers = append(readers, timestampsReader) files = append(files, queue.FileInfo{ - Name: timestampsFilename, + Name: streamTimestampsName, Reader: timestampsReader.SequentialRead(), }) @@ -684,8 +684,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F logger.Panicf("cannot open tag family metadata file %q: %s", tfmPath, err) } readers = append(readers, tfmReader) + tagName := removeExt(e.Name(), tagFamiliesMetadataFilenameExt) files = append(files, queue.FileInfo{ - Name: e.Name(), + Name: streamTagMetadataPrefix + tagName, Reader: tfmReader.SequentialRead(), }) } @@ -696,8 +697,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F logger.Panicf("cannot open tag family file %q: %s", tfPath, err) } readers = append(readers, tfReader) + tagName := removeExt(e.Name(), tagFamiliesFilenameExt) files = append(files, queue.FileInfo{ - Name: e.Name(), + Name: streamTagFamiliesPrefix + tagName, Reader: tfReader.SequentialRead(), }) } @@ -708,8 +710,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem) ([]queue.F logger.Panicf("cannot open tag family filter file %q: %s", tffPath, err) } readers = append(readers, tffReader) + tagName := removeExt(e.Name(), tagFamiliesFilterFilenameExt) files = append(files, queue.FileInfo{ - Name: e.Name(), + Name: streamTagFilterPrefix + tagName, Reader: tffReader.SequentialRead(), }) } diff --git a/lifecycle b/lifecycle deleted file mode 100755 index 3182a18c..00000000 Binary files a/lifecycle and /dev/null differ