This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new bdd2eaa3 Skip trace catalog and empty directory in the lifecycle
process (#842)
bdd2eaa3 is described below
commit bdd2eaa3401e756531c3fafe080b944f993eb6fd
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 10 14:26:28 2025 +0900
Skip trace catalog and empty directory in the lifecycle process (#842)
* Skip trace when executing lifecycle
---
banyand/backup/lifecycle/progress.go | 9 +++++----
banyand/backup/lifecycle/service.go | 30 ++++++++++++++++++++++++++----
2 files changed, 31 insertions(+), 8 deletions(-)
diff --git a/banyand/backup/lifecycle/progress.go
b/banyand/backup/lifecycle/progress.go
index 984389f9..7baf214b 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -60,17 +60,18 @@ type Progress struct {
mu sync.Mutex
`json:"-"`
}
-// AllGroupsFullyCompleted checks if all groups are fully completed.
-func (p *Progress) AllGroupsFullyCompleted(groups []*commonv1.Group) bool {
+// AllGroupsNotFullyCompleted find is there have any group not fully completed.
+func (p *Progress) AllGroupsNotFullyCompleted(groups []*commonv1.Group)
[]string {
p.mu.Lock()
defer p.mu.Unlock()
+ result := make([]string, 0)
for _, group := range groups {
if !p.CompletedGroups[group.Metadata.Name] {
- return false
+ result = append(result, group.Metadata.Name)
}
}
- return true
+ return result
}
// NewProgress creates a new Progress tracker.
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index 60a04e31..afa77715 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -20,6 +20,7 @@ package lifecycle
import (
"context"
"encoding/json"
+ "errors"
"fmt"
"os"
"path/filepath"
@@ -215,6 +216,10 @@ func (l *lifecycleService) action() error {
continue
}
l.processMeasureGroup(ctx, g, measureDir, nodes,
labels, progress)
+ case commonv1.Catalog_CATALOG_TRACE:
+ progress.MarkGroupCompleted(g.Metadata.Name)
+ l.l.Info().Msgf("group trace not supported, skipping
group: %s", g.Metadata.Name)
+ continue
default:
l.l.Info().Msgf("group catalog: %s doesn't support
lifecycle management", g.Catalog)
}
@@ -222,14 +227,15 @@ func (l *lifecycleService) action() error {
}
// Only remove progress file if ALL groups are fully completed
- if allGroupsCompleted && progress.AllGroupsFullyCompleted(groups) {
+ notCompleteGroups := progress.AllGroupsNotFullyCompleted(groups)
+ if allGroupsCompleted && len(notCompleteGroups) == 0 {
progress.Remove(l.progressFilePath, l.l)
l.l.Info().Msg("lifecycle migration completed successfully")
l.generateReport(progress)
return nil
}
l.l.Info().Msg("lifecycle migration partially completed, progress file
retained")
- return fmt.Errorf("lifecycle migration partially completed, progress
file retained; %d groups not fully completed",
len(groups)-len(progress.CompletedGroups))
+ return fmt.Errorf("lifecycle migration partially completed, progress
file retained; %v groups not fully completed", notCompleteGroups)
}
// generateReport gathers detailed counts & errors from Progress, writes
comprehensive JSON file per run, and keeps only 5 latest.
@@ -578,9 +584,17 @@ func (l *lifecycleService) processStreamGroupFileBased(_
context.Context, g *com
l.l.Info().Msgf("starting file-based stream migration for group: %s",
g.Metadata.Name)
+ rootDir := filepath.Join(streamDir, g.Metadata.Name)
+ // skip the counting if the tsdb root path does not exist
+ // may no data found in the snapshot
+ if _, err := os.Stat(rootDir); err != nil && errors.Is(err,
os.ErrNotExist) {
+ l.l.Info().Msgf("skipping file-based stream migration for group
because is empty in the snapshot dir: %s", g.Metadata.Name)
+ return nil
+ }
+
// Use the file-based migration with existing visitor pattern
err := migrateStreamWithFileBasedAndProgress(
- filepath.Join(streamDir, g.Metadata.Name), // Use snapshot
directory as source
+ rootDir, // Use snapshot directory as source
*tr, // Time range for segments to migrate
g, // Group configuration
labels, // Node labels
@@ -703,9 +717,17 @@ func (l *lifecycleService) processMeasureGroupFileBased(_
context.Context, g *co
l.l.Info().Msgf("starting file-based measure migration for group: %s",
g.Metadata.Name)
+ rootDir := filepath.Join(measureDir, g.Metadata.Name)
+ // skip the counting if the tsdb root path does not exist
+ // may no data found in the snapshot
+ if _, err := os.Stat(rootDir); err != nil && errors.Is(err,
os.ErrNotExist) {
+ l.l.Info().Msgf("skipping file-based measure migration for
group because is empty in the snapshot dir: %s", g.Metadata.Name)
+ return nil
+ }
+
// Use the file-based migration with existing visitor pattern
err := migrateMeasureWithFileBasedAndProgress(
- filepath.Join(measureDir, g.Metadata.Name), // Use snapshot
directory as source
+ rootDir, // Use snapshot directory as source
*tr, // Time range for segments to migrate
g, // Group configuration
labels, // Node labels