This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 264e1974543b80885af629aacbaa477ce7adb131
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Fri Aug 1 10:01:27 2025 +0800

    Refactor lifecycle management and enhance error handling
    
    - Added a `defer` statement to `migrateStreamWithFileBasedAndProgress` for 
graceful client shutdown.
    - Updated `lifecycleService` to warn when no snapshots are found and skip 
migration accordingly.
    - Adjusted file reading logic in `CreatePartFileReaderFromPath` to use more 
descriptive naming for file info.
---
 .../backup/lifecycle/file_migration_integration.go   |   1 +
 banyand/backup/lifecycle/file_migration_visitor.go   |   5 +++++
 banyand/backup/lifecycle/service.go                  |  12 +++++++++++-
 banyand/backup/lifecycle/steps.go                    |   2 +-
 banyand/backup/snapshot/snapshot.go                  |   7 ++++++-
 banyand/internal/storage/index.go                    |   1 +
 banyand/queue/pub/chunked_sync.go                    |   7 +++++++
 banyand/stream/part.go                               |  15 +++++++++------
 lifecycle                                            | Bin 38228245 -> 0 bytes
 9 files changed, 41 insertions(+), 9 deletions(-)

diff --git a/banyand/backup/lifecycle/file_migration_integration.go 
b/banyand/backup/lifecycle/file_migration_integration.go
index ba5c4f8b..3aacc949 100644
--- a/banyand/backup/lifecycle/file_migration_integration.go
+++ b/banyand/backup/lifecycle/file_migration_integration.go
@@ -45,6 +45,7 @@ func migrateStreamWithFileBasedAndProgress(
        if err != nil {
                return err
        }
+       defer client.GracefulStop()
 
        // Convert TTL to IntervalRule using storage.MustToIntervalRule
        intervalRule := storage.MustToIntervalRule(ttl)
diff --git a/banyand/backup/lifecycle/file_migration_visitor.go 
b/banyand/backup/lifecycle/file_migration_visitor.go
index 874f6e58..c5ea084e 100644
--- a/banyand/backup/lifecycle/file_migration_visitor.go
+++ b/banyand/backup/lifecycle/file_migration_visitor.go
@@ -326,6 +326,11 @@ func (mv *streamMigrationVisitor) 
VisitElementIndex(segmentTR *timestamp.TimeRan
                // Close the file reader
                defer segmentFile.Close()
 
+               files = append(files, queue.FileInfo{
+                       Name:   segmentFileName,
+                       Reader: segmentFile.SequentialRead(),
+               })
+
                mv.logger.Info().
                        Uint64("segment_id", segmentID).
                        Str("filename", segmentFileName).
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index 4eb05b8e..90a5ccd0 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -186,6 +186,11 @@ func (l *lifecycleService) action() error {
                l.l.Error().Err(err).Msg("failed to get snapshots")
                return err
        }
+       if streamDir == "" && measureDir == "" {
+               l.l.Warn().Msg("no snapshots found, skipping lifecycle 
migration")
+               l.generateReport(progress)
+               return nil
+       }
        l.l.Info().
                Str("stream_snapshot", streamDir).
                Str("measure_snapshot", measureDir).
@@ -211,6 +216,11 @@ func (l *lifecycleService) action() error {
        for _, g := range groups {
                switch g.Catalog {
                case commonv1.Catalog_CATALOG_STREAM:
+                       if streamDir == "" {
+                               l.l.Warn().Msgf("stream snapshot directory is 
not available, skipping group: %s", g.Metadata.Name)
+                               progress.MarkGroupCompleted(g.Metadata.Name)
+                               continue
+                       }
                        l.processStreamGroup(ctx, g, streamDir, nodes, labels, 
progress)
                case commonv1.Catalog_CATALOG_MEASURE:
                        if measureSVC == nil {
@@ -388,7 +398,7 @@ func (l *lifecycleService) processStreamGroupFileBased(_ 
context.Context, g *com
 
        // Use the file-based migration with existing visitor pattern
        err := migrateStreamWithFileBasedAndProgress(
-               streamDir,        // Use snapshot directory as source
+               filepath.Join(streamDir, g.Metadata.Name), // Use snapshot 
directory as source
                *tr,              // Time range for segments to migrate
                g,                // Group configuration
                labels,           // Node labels
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index e9b5291b..ce76a222 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -68,7 +68,7 @@ func (l *lifecycleService) getSnapshots(groups 
[]*commonv1.Group, p *Progress) (
        for _, snp := range snn {
                snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot, 
l.measureRoot, "")
                if errDir != nil {
-                       logger.Warningf("Failed to get snapshot directory for 
%s: %v", snp.Name, errDir)
+                       l.l.Error().Err(errDir).Msgf("Failed to get snapshot 
directory for %s", snp.Name)
                        continue
                }
                if snp.Catalog == commonv1.Catalog_CATALOG_STREAM {
diff --git a/banyand/backup/snapshot/snapshot.go 
b/banyand/backup/snapshot/snapshot.go
index edcd17b5..981890e5 100644
--- a/banyand/backup/snapshot/snapshot.go
+++ b/banyand/backup/snapshot/snapshot.go
@@ -22,6 +22,7 @@ import (
        "context"
        "errors"
        "fmt"
+       "os"
        "path/filepath"
        "time"
 
@@ -76,7 +77,11 @@ func Dir(snapshot *databasev1.Snapshot, streamRoot, 
measureRoot, propertyRoot st
        default:
                return "", errors.New("unknown catalog type")
        }
-       return filepath.Join(baseDir, storage.SnapshotsDir, snapshot.Name), nil
+       snpPath := filepath.Join(baseDir, storage.SnapshotsDir, snapshot.Name)
+       if _, err := os.Stat(snpPath); os.IsNotExist(err) {
+               return "", fmt.Errorf("snapshot directory %s does not exist", 
snpPath)
+       }
+       return snpPath, nil
 }
 
 // LocalDir returns the local directory path of the snapshot.
diff --git a/banyand/internal/storage/index.go 
b/banyand/internal/storage/index.go
index 8a728408..b844cd06 100644
--- a/banyand/internal/storage/index.go
+++ b/banyand/internal/storage/index.go
@@ -64,6 +64,7 @@ func newSeriesIndex(ctx context.Context, root string, 
flushTimeoutSeconds int64,
                Logger:        si.l,
                BatchWaitSec:  flushTimeoutSeconds,
                CacheMaxBytes: cacheMaxBytes,
+               // EnableDeduplication: true,
        }
        if metrics != nil {
                opts.Metrics = metrics
diff --git a/banyand/queue/pub/chunked_sync.go 
b/banyand/queue/pub/chunked_sync.go
index f4c1cc7b..8e6cb679 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -97,6 +97,13 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
        if err != nil {
                return nil, fmt.Errorf("failed to stream parts: %w", err)
        }
+       if totalChunks == 0 {
+               return &queue.SyncResult{
+                       Success:    true,
+                       SessionID:  sessionID,
+                       PartsCount: uint32(len(parts)),
+               }, nil
+       }
 
        var finalResp *clusterv1.SyncPartResponse
        for {
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index e0120456..e9fac1cd 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -646,7 +646,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
        }
        readers = append(readers, metaReader)
        files = append(files, queue.FileInfo{
-               Name:   metaFilename,
+               Name:   streamMetaName,
                Reader: metaReader.SequentialRead(),
        })
 
@@ -657,7 +657,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
        }
        readers = append(readers, primaryReader)
        files = append(files, queue.FileInfo{
-               Name:   primaryFilename,
+               Name:   streamPrimaryName,
                Reader: primaryReader.SequentialRead(),
        })
 
@@ -668,7 +668,7 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
        }
        readers = append(readers, timestampsReader)
        files = append(files, queue.FileInfo{
-               Name:   timestampsFilename,
+               Name:   streamTimestampsName,
                Reader: timestampsReader.SequentialRead(),
        })
 
@@ -684,8 +684,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
                                logger.Panicf("cannot open tag family metadata 
file %q: %s", tfmPath, err)
                        }
                        readers = append(readers, tfmReader)
+                       tagName := removeExt(e.Name(), 
tagFamiliesMetadataFilenameExt)
                        files = append(files, queue.FileInfo{
-                               Name:   e.Name(),
+                               Name:   streamTagMetadataPrefix + tagName,
                                Reader: tfmReader.SequentialRead(),
                        })
                }
@@ -696,8 +697,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
                                logger.Panicf("cannot open tag family file %q: 
%s", tfPath, err)
                        }
                        readers = append(readers, tfReader)
+                       tagName := removeExt(e.Name(), tagFamiliesFilenameExt)
                        files = append(files, queue.FileInfo{
-                               Name:   e.Name(),
+                               Name:   streamTagFamiliesPrefix + tagName,
                                Reader: tfReader.SequentialRead(),
                        })
                }
@@ -708,8 +710,9 @@ func CreatePartFileReaderFromPath(partPath string, lfs 
fs.FileSystem) ([]queue.F
                                logger.Panicf("cannot open tag family filter 
file %q: %s", tffPath, err)
                        }
                        readers = append(readers, tffReader)
+                       tagName := removeExt(e.Name(), 
tagFamiliesFilterFilenameExt)
                        files = append(files, queue.FileInfo{
-                               Name:   e.Name(),
+                               Name:   streamTagFilterPrefix + tagName,
                                Reader: tffReader.SequentialRead(),
                        })
                }
diff --git a/lifecycle b/lifecycle
deleted file mode 100755
index 3182a18c..00000000
Binary files a/lifecycle and /dev/null differ

Reply via email to