Copilot commented on code in PR #1181:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1181#discussion_r3427024208


##########
banyand/backup/backup.go:
##########
@@ -233,28 +256,78 @@ func getTimeDir(style string) string {
        }
 }
 
-func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog, 
timeDir string) error {
-       localFiles, err := getAllFiles(snapshotDir)
+func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog, 
timeDir string, concurrency int) error {
+       prefix := path.Join(timeDir, catalog)
+
+       remoteFiles, err := fs.List(ctx, prefix+"/")
        if err != nil {
                return err
        }
+       // Build a set of existing remote files for O(1) lookups. The local 
file list
+       // is never materialized; instead the snapshot is walked file by file 
so memory
+       // stays bounded by the remote file count rather than the (much larger) 
local
+       // file count.
+       remoteSet := make(map[string]struct{}, len(remoteFiles))
+       for _, remoteFile := range remoteFiles {
+               remoteSet[remoteFile] = struct{}{}
+       }
 
-       remotePrefix := path.Join(timeDir, catalog) + "/"
-
-       remoteFiles, err := fs.List(ctx, remotePrefix)
-       if err != nil {
-               return err
+       if concurrency < 1 {
+               concurrency = 1
        }
-       for _, relPath := range localFiles {
-               remotePath := path.Join(timeDir, catalog, relPath)
-               if !contains(remoteFiles, remotePath) {
-                       if err := uploadFile(ctx, fs, snapshotDir, relPath, 
remotePath); err != nil {
-                               return err
-                       }
+       g, gctx := errgroup.WithContext(ctx)
+       g.SetLimit(concurrency)
+
+       walkErr := filepath.Walk(snapshotDir, func(filePath string, info 
os.FileInfo, iterErr error) error {
+               if iterErr != nil {
+                       return iterErr
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               // Stop walking promptly if a concurrent upload has already 
failed.
+               if gctx.Err() != nil {
+                       return gctx.Err()
+               }
+               relPath, relErr := filepath.Rel(snapshotDir, filePath)
+               if relErr != nil {
+                       return relErr
+               }
+               relPath = filepath.ToSlash(relPath)
+               remotePath := path.Join(prefix, relPath)
+               if _, ok := remoteSet[remotePath]; ok {
+                       // Present both locally and remotely: keep it and drop 
it from the
+                       // set so that whatever remains is exactly the orphaned 
remote files.
+                       delete(remoteSet, remotePath)
+                       return nil
+               }
+               if info.Size() < smallFileThreshold {
+                       // Small files dominate and are latency-bound: upload 
them concurrently.
+                       // relPath/remotePath are per-callback locals, so 
capturing them is safe.
+                       // g.Go blocks once the limit is reached, providing 
natural backpressure.
+                       g.Go(func() error {
+                               return uploadFile(gctx, fs, snapshotDir, 
relPath, remotePath)
+                       })
+                       return nil
                }
+               // Large files are uploaded sequentially so at most one large 
write buffer
+               // is held at a time, keeping peak memory bounded.
+               return uploadFile(gctx, fs, snapshotDir, relPath, remotePath)
+       })
+       // Always wait for in-flight uploads before returning, even on a walk 
error.
+       if waitErr := g.Wait(); waitErr != nil {
+               return waitErr
+       }
+       if walkErr != nil {
+               return walkErr

Review Comment:
   If `filepath.Walk` fails (e.g., permission/read error), in-flight uploads 
are not canceled because `gctx` is only canceled by errgroup goroutine failures 
or parent context cancellation. This can cause unnecessary continued remote 
writes after the backup is already doomed to fail, extending runtime and cost. 
Consider creating a cancellable parent context for the errgroup and calling 
`cancel()` when `walkErr != nil` before `g.Wait()`, then prefer returning 
`walkErr` over a resulting `context.Canceled` from uploads (while still 
prioritizing real upload errors).



##########
banyand/backup/backup.go:
##########
@@ -94,7 +104,19 @@ func NewBackupCommand() *cobra.Command {
                        schedLogger.Info().Msgf("backup to %s will run with 
schedule: %s", backupOpts.dest, backupOpts.schedule)
                        clockInstance := clock.New()
                        sch := timestamp.NewScheduler(schedLogger, 
clockInstance)
+                       // A full backup may legitimately run longer than the 
schedule interval.
+                       // The scheduler abandons (but does not cancel) an 
action that exceeds its
+                       // internal timeout, so without this guard a slow run 
would overlap with the
+                       // next scheduled run, stacking concurrent uploads 
until the process is
+                       // OOM-killed. backupInFlight ensures only one backup 
runs at a time: a tick
+                       // that fires while the previous run is still in 
progress is skipped.
+                       var backupInFlight atomic.Bool
                        err := sch.Register(cmd.Context(), "backup", 
cron.Descriptor, backupOpts.schedule, func(ctx context.Context, _ time.Time, l 
*logger.Logger) bool {
+                               if !backupInFlight.CompareAndSwap(false, true) {
+                                       l.Warn().Msg("previous backup is still 
running; skipping this scheduled run")
+                                       return true
+                               }
+                               defer backupInFlight.Store(false)

Review Comment:
   The in-flight guard is not reset if `backupAction` (or anything it calls) 
panics, which can permanently wedge scheduled backups into “skipping” mode. 
Consider adding a deferred `recover` handler in this callback that resets 
`backupInFlight` before re-panicking (or converting the panic into an error/log 
+ return), so the scheduler can continue operating after unexpected failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to