This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 310432329 Fix backup container OOM and speed up GCS snapshot uploads
(#1181)
310432329 is described below
commit 31043232977d147578e581511239bf24811d6f81
Author: mrproliu <[email protected]>
AuthorDate: Wed Jun 17 22:18:36 2026 +0800
Fix backup container OOM and speed up GCS snapshot uploads (#1181)
---
CHANGES.md | 2 +
banyand/backup/backup.go | 158 +++++++++++++++------
banyand/backup/backup_test.go | 81 ++++++++++-
docs/operation/backup.md | 1 +
pkg/fs/remote/checksum/interface.go | 3 +
pkg/fs/remote/checksum/sha256.go | 8 ++
.../checksum/{interface.go => sha256_test.go} | 41 ++++--
pkg/fs/remote/gcp/gcs.go | 64 ++++++++-
8 files changed, 301 insertions(+), 57 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3bdeb2623..a254f3fc2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@ Release Notes.
- Snapshot/backup and data inspection no longer reopen idle-closed segments,
avoiding cold-segment nil-index panics and index lock-file churn.
- Add opt-in vectorized measure query tracing over raw-frame distributed
queries, including a trace envelope and fixed trace-label vocabulary.
- Enhance segment lifecycle: `refCount` now counts only active users,
decoupled from "open" (`index != nil`), adding a "dormant" state (open,
`refCount == 0`). A `DecRef` to zero no longer closes a segment; idle reclaim
and retention delete act only at `refCount == 0`, so an in-flight
snapshot/inspect is no longer torn down mid-operation (fixing the cold-node
nil-index panic and bluge lock churn) while idle segments still release their
bluge writers.
+- Speed up GCS backup uploads: write each object and its checksum metadata in
one request, dropping the per-object `Update` round-trip.
- Lifecycle migration now archives rows whose measure/stream schema was
deleted from the registry, instead of aborting the group.
### Bug Fixes
@@ -117,6 +118,7 @@ Release Notes.
- Fix FODC agent labeling metrics with `node_role="ROLE_UNSPECIFIED"`. The
agent resolved the node role exactly once at startup via a single
`GetCurrentNode` poll whose endpoint retries spanned only ~1s; when the sibling
lifecycle/banyandb gRPC server was not yet listening (`connect: cannot assign
requested address`) the role fell back to `ROLE_UNSPECIFIED` permanently, so
most nodes never reported their real `ROLE_DATA`/`ROLE_LIAISON`. Retry the
initial node-role resolution with exponen [...]
- Fix lifecycle row-replay OOM on large measure parts by streaming the dump
reader, pooling size-classed marshal buffers, and bounding in-flight batch
bytes (default 32 MiB); peak heap drops ~1.5 GB→~296 MB.
- Consolidate lifecycle migration report errors into a single flat list of
structured, stage-aware entries.
+- Fix backup container OOM from overlapping scheduled runs; serialize runs and
upload small snapshot files concurrently.
### Document
diff --git a/banyand/backup/backup.go b/banyand/backup/backup.go
index 9fd35cbe1..68ac063ea 100644
--- a/banyand/backup/backup.go
+++ b/banyand/backup/backup.go
@@ -28,6 +28,7 @@ import (
"path"
"path/filepath"
"strings"
+ "sync/atomic"
"syscall"
"time"
@@ -35,6 +36,7 @@ import (
"github.com/robfig/cron/v3"
"github.com/spf13/cobra"
"go.uber.org/multierr"
+ "golang.org/x/sync/errgroup"
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
cfg "github.com/apache/skywalking-banyandb/pkg/config"
@@ -50,20 +52,28 @@ import (
"github.com/apache/skywalking-banyandb/pkg/version"
)
+// smallFileThreshold is the size below which files are uploaded concurrently.
+// Backup snapshots are dominated by tiny index files whose upload cost is
bound
+// by per-request latency rather than bandwidth, so parallelizing them shortens
+// the overall backup well within the schedule interval. Larger files are
uploaded
+// sequentially to keep the concurrent write-buffer memory bounded.
+const smallFileThreshold = 5 << 20 // 5 MiB
+
type backupOptions struct {
- fsConfig remoteconfig.FsConfig
- gRPCAddr string
- cert string
- timeStyle string
- schedule string
- streamRoot string
- measureRoot string
- propertyRoot string
- traceRoot string
- schemaRoot string
- dest string
- enableTLS bool
- insecure bool
+ fsConfig remoteconfig.FsConfig
+ gRPCAddr string
+ cert string
+ timeStyle string
+ schedule string
+ streamRoot string
+ measureRoot string
+ propertyRoot string
+ traceRoot string
+ schemaRoot string
+ dest string
+ uploadConcurrency int
+ enableTLS bool
+ insecure bool
}
// NewBackupCommand creates a new backup command.
@@ -94,7 +104,19 @@ func NewBackupCommand() *cobra.Command {
schedLogger.Info().Msgf("backup to %s will run with
schedule: %s", backupOpts.dest, backupOpts.schedule)
clockInstance := clock.New()
sch := timestamp.NewScheduler(schedLogger,
clockInstance)
+ // A full backup may legitimately run longer than the
schedule interval.
+ // The scheduler abandons (but does not cancel) an
action that exceeds its
+ // internal timeout, so without this guard a slow run
would overlap with the
+ // next scheduled run, stacking concurrent uploads
until the process is
+ // OOM-killed. backupInFlight ensures only one backup
runs at a time: a tick
+ // that fires while the previous run is still in
progress is skipped.
+ var backupInFlight atomic.Bool
err := sch.Register(cmd.Context(), "backup",
cron.Descriptor, backupOpts.schedule, func(ctx context.Context, _ time.Time, l
*logger.Logger) bool {
+ if !backupInFlight.CompareAndSwap(false, true) {
+ l.Warn().Msg("previous backup is still
running; skipping this scheduled run")
+ return true
+ }
+ defer backupInFlight.Store(false)
err := backupAction(ctx, backupOpts)
if err != nil {
l.Error().Err(err).Msg("backup failed")
@@ -130,6 +152,7 @@ func NewBackupCommand() *cobra.Command {
cmd.Flags().StringVar(&backupOpts.schemaRoot, "schema-root-path",
"/tmp", "Root directory for schema property catalog")
cmd.Flags().StringVar(&backupOpts.dest, "dest", "", "Destination URL
(e.g., file:///backups)")
cmd.Flags().StringVar(&backupOpts.timeStyle, "time-style", "daily",
"Time directory style (daily|hourly)")
+ cmd.Flags().IntVar(&backupOpts.uploadConcurrency, "upload-concurrency",
8, "Number of concurrent uploads for small files (<5MiB)")
cmd.Flags().StringVar(
&backupOpts.schedule,
"schedule",
@@ -198,7 +221,7 @@ func backupAction(ctx context.Context, options
backupOptions) error {
if strings.HasPrefix(snp.Name,
snapshot.SchemaPropertyCatalogName+"/") {
catalogName = snapshot.SchemaPropertyCatalogName
}
- multierr.AppendInto(&err, backupSnapshot(ctx, fs, snapshotDir,
catalogName, timeDir))
+ multierr.AppendInto(&err, backupSnapshot(ctx, fs, snapshotDir,
catalogName, timeDir, options.uploadConcurrency))
}
return err
}
@@ -233,28 +256,92 @@ func getTimeDir(style string) string {
}
}
-func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog,
timeDir string) error {
- localFiles, err := getAllFiles(snapshotDir)
+func backupSnapshot(ctx context.Context, fs remote.FS, snapshotDir, catalog,
timeDir string, concurrency int) error {
+ prefix := path.Join(timeDir, catalog)
+
+ remoteFiles, err := fs.List(ctx, prefix+"/")
if err != nil {
return err
}
+ // Build a set of existing remote files for O(1) lookups. The local
file list
+ // is never materialized; instead the snapshot is walked file by file
so memory
+ // stays bounded by the remote file count rather than the (much larger)
local
+ // file count.
+ remoteSet := make(map[string]struct{}, len(remoteFiles))
+ for _, remoteFile := range remoteFiles {
+ remoteSet[remoteFile] = struct{}{}
+ }
- remotePrefix := path.Join(timeDir, catalog) + "/"
-
- remoteFiles, err := fs.List(ctx, remotePrefix)
- if err != nil {
- return err
+ if concurrency < 1 {
+ concurrency = 1
}
- for _, relPath := range localFiles {
- remotePath := path.Join(timeDir, catalog, relPath)
- if !contains(remoteFiles, remotePath) {
- if err := uploadFile(ctx, fs, snapshotDir, relPath,
remotePath); err != nil {
- return err
- }
+ // A dedicated cancellable context lets us stop in-flight uploads as
soon as the
+ // walk fails, instead of letting them run to completion for a doomed
backup.
+ uploadCtx, cancelUploads := context.WithCancel(ctx)
+ defer cancelUploads()
+ g, gctx := errgroup.WithContext(uploadCtx)
+ g.SetLimit(concurrency)
+
+ walkErr := filepath.Walk(snapshotDir, func(filePath string, info
os.FileInfo, iterErr error) error {
+ if iterErr != nil {
+ return iterErr
+ }
+ if info.IsDir() {
+ return nil
+ }
+ // Stop walking promptly if a concurrent upload has already
failed.
+ if gctx.Err() != nil {
+ return gctx.Err()
+ }
+ relPath, relErr := filepath.Rel(snapshotDir, filePath)
+ if relErr != nil {
+ return relErr
+ }
+ relPath = filepath.ToSlash(relPath)
+ remotePath := path.Join(prefix, relPath)
+ if _, ok := remoteSet[remotePath]; ok {
+ // Present both locally and remotely: keep it and drop
it from the
+ // set so that whatever remains is exactly the orphaned
remote files.
+ delete(remoteSet, remotePath)
+ return nil
}
+ if info.Size() < smallFileThreshold {
+ // Small files dominate and are latency-bound: upload
them concurrently.
+ // relPath/remotePath are per-callback locals, so
capturing them is safe.
+ // g.Go blocks once the limit is reached, providing
natural backpressure.
+ g.Go(func() error {
+ return uploadFile(gctx, fs, snapshotDir,
relPath, remotePath)
+ })
+ return nil
+ }
+ // Large files are uploaded sequentially so at most one large
write buffer
+ // is held at a time, keeping peak memory bounded.
+ return uploadFile(gctx, fs, snapshotDir, relPath, remotePath)
+ })
+ if walkErr != nil {
+ // The backup is already failing, so stop in-flight uploads
rather than
+ // letting them run to completion.
+ cancelUploads()
+ }
+ waitErr := g.Wait()
+ // A real upload failure takes priority; a context.Canceled that we
induced via
+ // cancelUploads above is not itself a reportable error.
+ if waitErr != nil && !errors.Is(waitErr, context.Canceled) {
+ return waitErr
+ }
+ if walkErr != nil {
+ return walkErr
+ }
+ if waitErr != nil {
+ return waitErr
}
- deleteOrphanedFiles(ctx, fs, localFiles, remoteFiles, timeDir, catalog)
+ // Remaining entries exist remotely but no longer locally: delete them.
+ for orphan := range remoteSet {
+ if delErr := fs.Delete(ctx, orphan); delErr != nil {
+ logger.Warningf("Warning: failed to delete orphaned
file %s: %v\n", orphan, delErr)
+ }
+ }
return nil
}
@@ -287,21 +374,6 @@ func uploadFile(ctx context.Context, fs remote.FS,
snapshotDir, relPath, remoteP
return fs.Upload(ctx, remotePath, file)
}
-func deleteOrphanedFiles(ctx context.Context, fs remote.FS, localFiles,
remoteFiles []string, timeDir, snapshotName string) {
- expected := make(map[string]struct{})
- for _, f := range localFiles {
- expected[path.Join(timeDir, snapshotName, f)] = struct{}{}
- }
-
- for _, remoteFile := range remoteFiles {
- if _, exists := expected[remoteFile]; !exists {
- if err := fs.Delete(ctx, remoteFile); err != nil {
- logger.Warningf("Warning: failed to delete
orphaned file %s: %v\n", remoteFile, err)
- }
- }
- }
-}
-
func contains(slice []string, s string) bool {
for _, item := range slice {
if item == s {
diff --git a/banyand/backup/backup_test.go b/banyand/backup/backup_test.go
index f5c533dac..1b5ccafd7 100644
--- a/banyand/backup/backup_test.go
+++ b/banyand/backup/backup_test.go
@@ -19,10 +19,13 @@ package backup
import (
"context"
+ "fmt"
"io"
"os"
"path"
"path/filepath"
+ "strings"
+ "sync"
"testing"
"time"
@@ -158,8 +161,10 @@ func TestGetAllFiles(t *testing.T) {
}
type mockFS struct {
- uploaded []string
- deleted []string
+ uploadErrOn string
+ uploaded []string
+ deleted []string
+ mu sync.Mutex
}
func (m *mockFS) List(_ context.Context, prefix string) ([]string, error) {
@@ -167,11 +172,18 @@ func (m *mockFS) List(_ context.Context, prefix string)
([]string, error) {
}
func (m *mockFS) Upload(_ context.Context, p string, _ io.Reader) error {
+ if m.uploadErrOn != "" && strings.Contains(p, m.uploadErrOn) {
+ return fmt.Errorf("mock upload failure for %s", p)
+ }
+ m.mu.Lock()
+ defer m.mu.Unlock()
m.uploaded = append(m.uploaded, p)
return nil
}
func (m *mockFS) Delete(_ context.Context, p string) error {
+ m.mu.Lock()
+ defer m.mu.Unlock()
m.deleted = append(m.deleted, p)
return nil
}
@@ -185,7 +197,7 @@ func TestBackupSnapshot(t *testing.T) {
os.WriteFile(filepath.Join(tmpDir, "newfile.txt"), nil, 0o600)
m := &mockFS{}
- err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot",
"daily")
+ err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot",
"daily", 4)
if err != nil {
t.Fatal(err)
}
@@ -201,6 +213,69 @@ func TestBackupSnapshot(t *testing.T) {
}
}
+// TestBackupSnapshotConcurrent exercises the concurrent small-file path, the
+// sequential large-file path (>= smallFileThreshold), and orphan deletion all
+// at once. Run with -race to catch data races in the upload fan-out.
+func TestBackupSnapshotConcurrent(t *testing.T) {
+ tmpDir := t.TempDir()
+ const numSmall = 50
+ for i := 0; i < numSmall; i++ {
+ sub := filepath.Join(tmpDir, fmt.Sprintf("seg-%d", i%5))
+ if err := os.MkdirAll(sub, 0o750); err != nil {
+ t.Fatal(err)
+ }
+ if err := os.WriteFile(filepath.Join(sub,
fmt.Sprintf("f-%d.tm", i)), []byte("x"), 0o600); err != nil {
+ t.Fatal(err)
+ }
+ }
+ // A file at exactly smallFileThreshold takes the sequential branch
(size is
+ // not strictly less than the threshold).
+ if err := os.WriteFile(filepath.Join(tmpDir, "big.bin"), make([]byte,
smallFileThreshold), 0o600); err != nil {
+ t.Fatal(err)
+ }
+
+ m := &mockFS{}
+ if err := backupSnapshot(context.Background(), m, tmpDir,
"test-snapshot", "daily", 8); err != nil {
+ t.Fatal(err)
+ }
+
+ if len(m.uploaded) != numSmall+1 {
+ t.Fatalf("uploaded %d files, want %d", len(m.uploaded),
numSmall+1)
+ }
+ uploaded := make(map[string]struct{}, len(m.uploaded))
+ for _, p := range m.uploaded {
+ uploaded[p] = struct{}{}
+ }
+ if _, ok := uploaded["daily/test-snapshot/big.bin"]; !ok {
+ t.Errorf("large file not uploaded; uploaded=%v", m.uploaded)
+ }
+ wantDelete := "daily/test-snapshot/existing.txt"
+ if len(m.deleted) != 1 || m.deleted[0] != wantDelete {
+ t.Errorf("deleted = %v, want [%s]", m.deleted, wantDelete)
+ }
+}
+
+// TestBackupSnapshotUploadError verifies that a failed upload surfaces an
error
+// and that orphaned remote files are NOT deleted when the backup did not fully
+// succeed.
+func TestBackupSnapshotUploadError(t *testing.T) {
+ tmpDir := t.TempDir()
+ for _, name := range []string{"a.tm", "b.tm", "boom.tm"} {
+ if err := os.WriteFile(filepath.Join(tmpDir, name),
[]byte("x"), 0o600); err != nil {
+ t.Fatal(err)
+ }
+ }
+
+ m := &mockFS{uploadErrOn: "boom.tm"}
+ err := backupSnapshot(context.Background(), m, tmpDir, "test-snapshot",
"daily", 4)
+ if err == nil {
+ t.Fatal("expected an error when an upload fails, got nil")
+ }
+ if len(m.deleted) != 0 {
+ t.Errorf("orphans must not be deleted on a failed backup,
deleted = %v", m.deleted)
+ }
+}
+
func TestContains(t *testing.T) {
tests := []struct {
s string
diff --git a/docs/operation/backup.md b/docs/operation/backup.md
index 5a4489042..c69b28b3a 100644
--- a/docs/operation/backup.md
+++ b/docs/operation/backup.md
@@ -110,6 +110,7 @@ When a schedule is provided, the tool:
| `--trace-root-path` | Root directory for the trace catalog
snapshots.
| `/tmp` |
| `--time-style` | Directory naming style based on time
(`daily` or `hourly`)
| `daily` |
| `--schedule` | Schedule expression for periodic backup.
Options: `@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly`, `@every
<duration>` | _empty_ |
+| `--upload-concurrency` | Number of concurrent uploads for small
snapshot files (<5MiB).
| `8` |
| `--logging-level` | Root logging level (`debug`, `info`,
`warn`, `error`)
| `info` |
| `--logging-env` | Logging environment (`dev` or `prod`)
| `prod` |
| **AWS S3 specific** |
| |
diff --git a/pkg/fs/remote/checksum/interface.go
b/pkg/fs/remote/checksum/interface.go
index c36184910..d91d909cb 100644
--- a/pkg/fs/remote/checksum/interface.go
+++ b/pkg/fs/remote/checksum/interface.go
@@ -29,4 +29,7 @@ type Verifier interface {
// ComputeAndWrap returns a reader that computes the checksum while
reading.
ComputeAndWrap(r io.Reader) (wrappedReader io.Reader, getHash func()
(string, error))
+
+ // Sum reads r to completion and returns the hex-encoded checksum of
its contents.
+ Sum(r io.Reader) (string, error)
}
diff --git a/pkg/fs/remote/checksum/sha256.go b/pkg/fs/remote/checksum/sha256.go
index a286fd574..bd534238f 100644
--- a/pkg/fs/remote/checksum/sha256.go
+++ b/pkg/fs/remote/checksum/sha256.go
@@ -79,6 +79,14 @@ func (v *sha256Verifier) ComputeAndWrap(r io.Reader)
(io.Reader, func() (string,
return hr, hr.getHash
}
+func (v *sha256Verifier) Sum(r io.Reader) (string, error) {
+ wrapped, getHash := v.ComputeAndWrap(r)
+ if _, err := io.Copy(io.Discard, wrapped); err != nil {
+ return "", err
+ }
+ return getHash()
+}
+
func (v *sha256Verifier) Wrap(rc io.ReadCloser, expected string) io.ReadCloser
{
h := sha256.New()
return &verifyingReadCloser{
diff --git a/pkg/fs/remote/checksum/interface.go
b/pkg/fs/remote/checksum/sha256_test.go
similarity index 51%
copy from pkg/fs/remote/checksum/interface.go
copy to pkg/fs/remote/checksum/sha256_test.go
index c36184910..40fd66040 100644
--- a/pkg/fs/remote/checksum/interface.go
+++ b/pkg/fs/remote/checksum/sha256_test.go
@@ -15,18 +15,39 @@
// specific language governing permissions and limitations
// under the License.
-// Package checksum provides functions for computing checksums algorithms and
verifying.
package checksum
-import "io"
+import (
+ "io"
+ "strings"
+ "testing"
+)
-// Verifier defines the interface for computing and verifying checksums.
-type Verifier interface {
- // Wrap returns an io.ReadCloser that transparently verifies the
checksum
- // when the returned reader is closed. This enables streaming
verification
- // without buffering the entire content in memory.
- Wrap(io.ReadCloser, string) io.ReadCloser
+func TestSum(t *testing.T) {
+ v, err := DefaultSHA256Verifier()
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Well-known SHA-256 of "hello".
+ const want =
"2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
+ got, err := v.Sum(strings.NewReader("hello"))
+ if err != nil {
+ t.Fatalf("Sum: %v", err)
+ }
+ if got != want {
+ t.Errorf("Sum = %s, want %s", got, want)
+ }
- // ComputeAndWrap returns a reader that computes the checksum while
reading.
- ComputeAndWrap(r io.Reader) (wrappedReader io.Reader, getHash func()
(string, error))
+ // Sum must agree with the streaming ComputeAndWrap path on the same
input.
+ wrapped, getHash := v.ComputeAndWrap(strings.NewReader("hello"))
+ if _, err = io.Copy(io.Discard, wrapped); err != nil {
+ t.Fatal(err)
+ }
+ streamed, err := getHash()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if streamed != got {
+ t.Errorf("Sum (%s) disagrees with ComputeAndWrap (%s)", got,
streamed)
+ }
}
diff --git a/pkg/fs/remote/gcp/gcs.go b/pkg/fs/remote/gcp/gcs.go
index ecdaf9120..234d05b3c 100644
--- a/pkg/fs/remote/gcp/gcs.go
+++ b/pkg/fs/remote/gcp/gcs.go
@@ -41,6 +41,12 @@ import (
const checksumSha256Key = "checksum_sha256"
+// singleRequestChunkThreshold is the object size below which the GCS writer is
+// configured for a single-request (non-resumable) upload. Objects smaller than
+// one default chunk gain nothing from resumable chunking, and a single request
+// avoids the extra round-trip of establishing a resumable session.
+const singleRequestChunkThreshold = 16 << 20 // 16 MiB
+
var _ remote.FS = (*gcsFS)(nil)
// gcsFS implements remote.FS backed by Google Cloud Storage.
@@ -144,9 +150,21 @@ func (g *gcsFS) Upload(ctx context.Context, p string, data
io.Reader) error {
objPath := g.getFullPath(p)
logger.Infof("GCS Upload: bucket=%s, path=%s, fullPath=%s", g.bucket,
p, objPath)
+
+ // When the source is seekable (e.g. a local *os.File, as used by the
backup
+ // tool), compute the checksum in a first pass, then upload it together
with the
+ // object in a single request. This halves the per-object round-trips
by removing
+ // the follow-up metadata Update call, which dominates backup time for
the many
+ // tiny snapshot files.
+ if seeker, ok := data.(io.ReadSeeker); ok &&
os.Getenv("STORAGE_EMULATOR_HOST") == "" {
+ return g.uploadSeekable(ctx, objPath, seeker)
+ }
+
wrappedReader, getHash := g.verifier.ComputeAndWrap(data)
- w := g.client.Bucket(g.bucket).Object(objPath).NewWriter(ctx)
+ // Size is unknown for a non-seekable stream, so the default resumable
upload
+ // is kept.
+ w := g.newWriter(ctx, objPath, -1)
if _, err := io.Copy(w, wrappedReader); err != nil {
_ = w.Close()
return fmt.Errorf("failed to write object: %w", err)
@@ -176,6 +194,50 @@ func (g *gcsFS) Upload(ctx context.Context, p string, data
io.Reader) error {
return nil
}
+// newWriter creates an object writer. When the object size is known and
smaller
+// than one default chunk, it switches to a single-request (non-resumable)
upload
+// to avoid the extra round-trip of establishing a resumable session. A
negative
+// size means the size is unknown (e.g. a non-seekable stream), keeping the
+// default resumable behavior.
+func (g *gcsFS) newWriter(ctx context.Context, objPath string, size int64)
*storage.Writer {
+ w := g.client.Bucket(g.bucket).Object(objPath).NewWriter(ctx)
+ if size >= 0 && size < singleRequestChunkThreshold {
+ w.ChunkSize = 0
+ }
+ return w
+}
+
+// uploadSeekable uploads a seekable source in a single request: it computes
the
+// checksum in a first pass, rewinds, then writes the object with the checksum
+// already attached as metadata, avoiding the follow-up metadata Update call.
+func (g *gcsFS) uploadSeekable(ctx context.Context, objPath string, seeker
io.ReadSeeker) error {
+ size, err := seeker.Seek(0, io.SeekEnd)
+ if err != nil {
+ return fmt.Errorf("failed to size object: %w", err)
+ }
+ if _, err = seeker.Seek(0, io.SeekStart); err != nil {
+ return fmt.Errorf("failed to rewind object: %w", err)
+ }
+ hash, err := g.verifier.Sum(seeker)
+ if err != nil {
+ return fmt.Errorf("failed to compute hash: %w", err)
+ }
+ if _, err = seeker.Seek(0, io.SeekStart); err != nil {
+ return fmt.Errorf("failed to rewind object: %w", err)
+ }
+
+ w := g.newWriter(ctx, objPath, size)
+ w.Metadata = map[string]string{checksumSha256Key: hash}
+ if _, err = io.Copy(w, seeker); err != nil {
+ _ = w.Close()
+ return fmt.Errorf("failed to write object: %w", err)
+ }
+ if err = w.Close(); err != nil {
+ return fmt.Errorf("failed to close writer: %w", err)
+ }
+ return nil
+}
+
func (g *gcsFS) Download(ctx context.Context, p string) (io.ReadCloser, error)
{
if g.verifier == nil {
return nil, fmt.Errorf("verifier not initialized")