This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new aa87b5f9 Suppport backup and restore trace data (#831)
aa87b5f9 is described below
commit aa87b5f9481af9e4de8782abb8c2d3867bd70739
Author: mrproliu <[email protected]>
AuthorDate: Thu Oct 30 19:45:15 2025 +0900
Suppport backup and restore trace data (#831)
* Support backup trace data
* update test case
---
banyand/backup/backup.go | 4 +-
banyand/backup/backup_test.go | 14 ++++-
banyand/backup/lifecycle/steps.go | 2 +-
banyand/backup/restore.go | 19 +++++-
banyand/backup/snapshot/snapshot.go | 6 +-
banyand/backup/timedir.go | 25 ++++----
banyand/backup/timedir_test.go | 15 +++--
banyand/trace/svc_standalone.go | 85 ++++++++++++++++++++++++++-
docs/operation/backup.md | 58 +++++++++---------
docs/operation/restore.md | 25 +++++---
test/cases/backup/all.go | 7 ++-
test/cases/backup/backup.go | 15 +++--
test/integration/distributed/backup/common.go | 3 +
13 files changed, 213 insertions(+), 65 deletions(-)
diff --git a/banyand/backup/backup.go b/banyand/backup/backup.go
index 5413fa19..2054127b 100644
--- a/banyand/backup/backup.go
+++ b/banyand/backup/backup.go
@@ -57,6 +57,7 @@ type backupOptions struct {
streamRoot string
measureRoot string
propertyRoot string
+ traceRoot string
dest string
enableTLS bool
insecure bool
@@ -122,6 +123,7 @@ func NewBackupCommand() *cobra.Command {
cmd.Flags().StringVar(&backupOpts.streamRoot, "stream-root-path",
"/tmp", "Root directory for stream catalog")
cmd.Flags().StringVar(&backupOpts.measureRoot, "measure-root-path",
"/tmp", "Root directory for measure catalog")
cmd.Flags().StringVar(&backupOpts.propertyRoot, "property-root-path",
"/tmp", "Root directory for property catalog")
+ cmd.Flags().StringVar(&backupOpts.traceRoot, "trace-root-path", "/tmp",
"Root directory for trace catalog")
cmd.Flags().StringVar(&backupOpts.dest, "dest", "", "Destination URL
(e.g., file:///backups)")
cmd.Flags().StringVar(&backupOpts.timeStyle, "time-style", "daily",
"Time directory style (daily|hourly)")
cmd.Flags().StringVar(
@@ -164,7 +166,7 @@ func backupAction(options backupOptions) error {
for _, snp := range snapshots {
var snapshotDir string
- snapshotDir, err = snapshot.Dir(snp, options.streamRoot,
options.measureRoot, options.propertyRoot)
+ snapshotDir, err = snapshot.Dir(snp, options.streamRoot,
options.measureRoot, options.propertyRoot, options.traceRoot)
if err != nil {
logger.Warningf("Failed to get snapshot directory for
%s: %v", snp.Name, err)
continue
diff --git a/banyand/backup/backup_test.go b/banyand/backup/backup_test.go
index 0d60f0f4..786e143d 100644
--- a/banyand/backup/backup_test.go
+++ b/banyand/backup/backup_test.go
@@ -60,27 +60,35 @@ func TestGetSnapshotDir(t *testing.T) {
streamRoot string
measureRoot string
propRoot string
+ traceRoot string
want string
wantErr bool
}{
{
"stream catalog",
&databasev1.Snapshot{Catalog:
commonv1.Catalog_CATALOG_STREAM, Name: "test"},
- "/tmp", "/tmp", "/tmp",
+ "/tmp", "/tmp", "/tmp", "/tmp",
filepath.Join("/tmp/stream", storage.SnapshotsDir,
"test"),
false,
},
+ {
+ "trace catalog",
+ &databasev1.Snapshot{Catalog:
commonv1.Catalog_CATALOG_TRACE, Name: "test"},
+ "/tmp", "/tmp", "/tmp", "/tmp",
+ filepath.Join("/tmp/trace", storage.SnapshotsDir,
"test"),
+ false,
+ },
{
"unknown catalog",
&databasev1.Snapshot{Catalog:
commonv1.Catalog_CATALOG_UNSPECIFIED, Name: "test"},
- "", "", "",
+ "", "", "", "",
"",
true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- got, err := snapshot.Dir(tt.snapshot, tt.streamRoot,
tt.measureRoot, tt.propRoot)
+ got, err := snapshot.Dir(tt.snapshot, tt.streamRoot,
tt.measureRoot, tt.propRoot, tt.traceRoot)
if (err != nil) != tt.wantErr {
t.Errorf("getSnapshotDir() error = %v, wantErr
%v", err, tt.wantErr)
return
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index ebf860ea..12beb51d 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -55,7 +55,7 @@ func (l *lifecycleService) getSnapshots(groups
[]*commonv1.Group, p *Progress) (
return "", "", err
}
for _, snp := range snn {
- snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot,
l.measureRoot, "")
+ snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot,
l.measureRoot, "", "")
if errDir != nil {
l.l.Error().Err(errDir).Msgf("Failed to get snapshot
directory for %s", snp.Name)
continue
diff --git a/banyand/backup/restore.go b/banyand/backup/restore.go
index 4bec7e57..afb7f803 100644
--- a/banyand/backup/restore.go
+++ b/banyand/backup/restore.go
@@ -67,6 +67,7 @@ func newRunCommand() *cobra.Command {
streamRoot string
measureRoot string
propertyRoot string
+ traceRoot string
fsConfig remoteconfig.FsConfig
)
// Initialize nested structs to avoid nil pointer during flag binding
@@ -78,8 +79,8 @@ func newRunCommand() *cobra.Command {
Use: "run",
Short: "Restore BanyanDB data from remote storage",
RunE: func(_ *cobra.Command, _ []string) error {
- if streamRoot == "" && measureRoot == "" &&
propertyRoot == "" {
- return errors.New("at least one of
stream-root-path, measure-root-path, or property-root-path is required")
+ if streamRoot == "" && measureRoot == "" &&
propertyRoot == "" && traceRoot == "" {
+ return errors.New("at least one of
stream-root-path, measure-root-path, property-root-path or trace-root-path is
required")
}
if source == "" {
return errors.New("source is required")
@@ -131,6 +132,19 @@ func newRunCommand() *cobra.Command {
return err
}
}
+ if traceRoot != "" {
+ timeDirPath := filepath.Join(traceRoot,
"trace", "time-dir")
+ if data, err := os.ReadFile(timeDirPath); err
== nil {
+ timeDir :=
strings.TrimSpace(string(data))
+ if err = restoreCatalog(fs, timeDir,
traceRoot, commonv1.Catalog_CATALOG_TRACE); err != nil {
+ errs = multierr.Append(errs,
fmt.Errorf("trace restore failed: %w", err))
+ } else {
+ _ = os.Remove(timeDirPath)
+ }
+ } else if !errors.Is(err, os.ErrNotExist) {
+ return err
+ }
+ }
return errs
},
@@ -139,6 +153,7 @@ func newRunCommand() *cobra.Command {
cmd.Flags().StringVar(&streamRoot, "stream-root-path", "/tmp", "Root
directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root-path", "/tmp", "Root
directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root-path", "/tmp",
"Root directory for property catalog")
+ cmd.Flags().StringVar(&traceRoot, "trace-root-path", "/tmp", "Root
directory for trace catalog")
cmd.Flags().StringVar(&fsConfig.S3.S3ConfigFilePath, "s3-config-file",
"", "Path to the s3 configuration file")
cmd.Flags().StringVar(&fsConfig.S3.S3CredentialFilePath,
"s3-credential-file", "", "Path to the s3 credential file")
cmd.Flags().StringVar(&fsConfig.S3.S3ProfileName, "s3-profile", "", "S3
profile name")
diff --git a/banyand/backup/snapshot/snapshot.go
b/banyand/backup/snapshot/snapshot.go
index edcd17b5..3baadc6c 100644
--- a/banyand/backup/snapshot/snapshot.go
+++ b/banyand/backup/snapshot/snapshot.go
@@ -64,7 +64,7 @@ func Get(gRPCAddr string, enableTLS, insecure bool, cert
string, groups ...*data
}
// Dir returns the directory path of the snapshot.
-func Dir(snapshot *databasev1.Snapshot, streamRoot, measureRoot, propertyRoot
string) (string, error) {
+func Dir(snapshot *databasev1.Snapshot, streamRoot, measureRoot, propertyRoot,
traceRoot string) (string, error) {
var baseDir string
switch snapshot.Catalog {
case commonv1.Catalog_CATALOG_STREAM:
@@ -73,6 +73,8 @@ func Dir(snapshot *databasev1.Snapshot, streamRoot,
measureRoot, propertyRoot st
baseDir = LocalDir(measureRoot, snapshot.Catalog)
case commonv1.Catalog_CATALOG_PROPERTY:
baseDir = LocalDir(propertyRoot, snapshot.Catalog)
+ case commonv1.Catalog_CATALOG_TRACE:
+ baseDir = LocalDir(traceRoot, snapshot.Catalog)
default:
return "", errors.New("unknown catalog type")
}
@@ -93,6 +95,8 @@ func CatalogName(catalog commonv1.Catalog) string {
return "measure"
case commonv1.Catalog_CATALOG_PROPERTY:
return "property"
+ case commonv1.Catalog_CATALOG_TRACE:
+ return "trace"
default:
logger.Panicf("unknown catalog type: %v", catalog)
return ""
diff --git a/banyand/backup/timedir.go b/banyand/backup/timedir.go
index a7bce852..66fcf1e1 100644
--- a/banyand/backup/timedir.go
+++ b/banyand/backup/timedir.go
@@ -120,7 +120,7 @@ func newListCmd() *cobra.Command {
func newCreateCmd() *cobra.Command {
var catalogs []string
- var streamRoot, measureRoot, propertyRoot string
+ var streamRoot, measureRoot, propertyRoot, traceRoot string
var timeStyle string
cmd := &cobra.Command{
@@ -128,7 +128,7 @@ func newCreateCmd() *cobra.Command {
Short: "Create local 'time-dir' file(s) in catalog directories",
RunE: func(cmd *cobra.Command, args []string) error {
if len(catalogs) == 0 {
- catalogs = []string{"stream", "measure",
"property"}
+ catalogs = []string{"stream", "measure",
"property", "trace"}
}
var tValue string
if len(args) > 0 {
@@ -138,7 +138,7 @@ func newCreateCmd() *cobra.Command {
}
for _, cat := range catalogs {
- filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot)
+ filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot, traceRoot)
if err != nil {
fmt.Fprintf(cmd.OutOrStdout(),
"Skipping unknown catalog '%s': %v\n", cat, err)
continue
@@ -157,17 +157,18 @@ func newCreateCmd() *cobra.Command {
},
}
- cmd.Flags().StringSliceVar(&catalogs, "catalog", nil, "Catalog(s) to
create time-dir file (e.g., stream, measure, property). Defaults to all if not
provided.")
+ cmd.Flags().StringSliceVar(&catalogs, "catalog", nil, "Catalog(s) to
create time-dir file (e.g., stream, measure, property, trace). Defaults to all
if not provided.")
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root
directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root
directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local
root directory for property catalog")
+ cmd.Flags().StringVar(&traceRoot, "trace-root", "/tmp", "Local root
directory for trace catalog")
cmd.Flags().StringVar(&timeStyle, "time-style", "daily", "Time style to
compute time string (daily or hourly)")
return cmd
}
func newReadCmd() *cobra.Command {
var catalogs []string
- var streamRoot, measureRoot, propertyRoot string
+ var streamRoot, measureRoot, propertyRoot, traceRoot string
cmd := &cobra.Command{
Use: "read",
@@ -175,11 +176,11 @@ func newReadCmd() *cobra.Command {
RunE: func(cmd *cobra.Command, _ []string) error {
// If no catalog is specified, process all three.
if len(catalogs) == 0 {
- catalogs = []string{"stream", "measure",
"property"}
+ catalogs = []string{"stream", "measure",
"property", "trace"}
}
for _, cat := range catalogs {
- filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot)
+ filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot, traceRoot)
if err != nil {
fmt.Fprintf(cmd.OutOrStdout(),
"Skipping unknown catalog '%s': %v\n", cat, err)
continue
@@ -203,12 +204,13 @@ func newReadCmd() *cobra.Command {
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root
directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root
directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local
root directory for property catalog")
+ cmd.Flags().StringVar(&traceRoot, "trace-root", "/tmp", "Local root
directory for trace catalog")
return cmd
}
func newDeleteCmd() *cobra.Command {
var catalogs []string
- var streamRoot, measureRoot, propertyRoot string
+ var streamRoot, measureRoot, propertyRoot, traceRoot string
cmd := &cobra.Command{
Use: "delete",
@@ -219,7 +221,7 @@ func newDeleteCmd() *cobra.Command {
catalogs = []string{"stream", "measure",
"property"}
}
for _, cat := range catalogs {
- filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot)
+ filePath, err := getLocalTimeDirFilePath(cat,
streamRoot, measureRoot, propertyRoot, traceRoot)
if err != nil {
fmt.Fprintf(cmd.ErrOrStderr(),
"Skipping unknown catalog '%s': %v\n", cat, err)
continue
@@ -243,10 +245,11 @@ func newDeleteCmd() *cobra.Command {
cmd.Flags().StringVar(&streamRoot, "stream-root", "/tmp", "Local root
directory for stream catalog")
cmd.Flags().StringVar(&measureRoot, "measure-root", "/tmp", "Local root
directory for measure catalog")
cmd.Flags().StringVar(&propertyRoot, "property-root", "/tmp", "Local
root directory for property catalog")
+ cmd.Flags().StringVar(&traceRoot, "trace-root", "/tmp", "Local root
directory for property catalog")
return cmd
}
-func getLocalTimeDirFilePath(catalog, streamRoot, measureRoot, propertyRoot
string) (string, error) {
+func getLocalTimeDirFilePath(catalog, streamRoot, measureRoot, propertyRoot,
traceRoot string) (string, error) {
switch strings.ToLower(catalog) {
case "stream":
return filepath.Join(streamRoot, "stream", "time-dir"), nil
@@ -254,6 +257,8 @@ func getLocalTimeDirFilePath(catalog, streamRoot,
measureRoot, propertyRoot stri
return filepath.Join(measureRoot, "measure", "time-dir"), nil
case "property":
return filepath.Join(propertyRoot, "property", "time-dir"), nil
+ case "trace":
+ return filepath.Join(traceRoot, "trace", "time-dir"), nil
default:
return "", fmt.Errorf("unknown catalog type: %s", catalog)
}
diff --git a/banyand/backup/timedir_test.go b/banyand/backup/timedir_test.go
index e4fb4e38..9e098b19 100644
--- a/banyand/backup/timedir_test.go
+++ b/banyand/backup/timedir_test.go
@@ -33,6 +33,7 @@ func TestNewCreateCmd(t *testing.T) {
streamDir := filepath.Join(baseDir, "stream")
measureDir := filepath.Join(baseDir, "measure")
propertyDir := filepath.Join(baseDir, "property")
+ traceDir := filepath.Join(baseDir, "trace")
content := "2023-10-12"
cmd := newCreateCmd()
@@ -41,6 +42,7 @@ func TestNewCreateCmd(t *testing.T) {
"--stream-root", baseDir,
"--measure-root", baseDir,
"--property-root", baseDir,
+ "--trace-root", baseDir,
})
if err := cmd.Execute(); err != nil {
t.Fatalf("newCreateCmd.Execute() error: %v", err)
@@ -53,6 +55,7 @@ func TestNewCreateCmd(t *testing.T) {
{"stream", streamDir},
{"measure", measureDir},
{"property", propertyDir},
+ {"trace", traceDir},
} {
fp := filepath.Join(dir.root, "time-dir")
data, err := os.ReadFile(fp)
@@ -71,7 +74,8 @@ func TestNewReadCmd(t *testing.T) {
streamDir := filepath.Join(baseDir, "stream")
measureDir := filepath.Join(baseDir, "measure")
propertyDir := filepath.Join(baseDir, "property")
- for _, dir := range []string{streamDir, measureDir, propertyDir} {
+ traceDir := filepath.Join(baseDir, "trace")
+ for _, dir := range []string{streamDir, measureDir, propertyDir,
traceDir} {
if err := os.MkdirAll(dir, storage.DirPerm); err != nil {
t.Fatalf("Failed to create dir %s: %v", dir, err)
}
@@ -88,13 +92,14 @@ func TestNewReadCmd(t *testing.T) {
"--stream-root", baseDir,
"--measure-root", baseDir,
"--property-root", baseDir,
+ "--trace-root", baseDir,
})
if err := cmd.Execute(); err != nil {
t.Fatalf("newReadCmd.Execute() error: %v", err)
}
output := outBuf.String()
- for _, catalog := range []string{"stream", "measure", "property"} {
+ for _, catalog := range []string{"stream", "measure", "property",
"trace"} {
if !strings.Contains(output, "Catalog '"+catalog+"'") {
t.Errorf("Output missing expected catalog '%s'",
catalog)
}
@@ -106,7 +111,8 @@ func TestNewDeleteCmd(t *testing.T) {
streamDir := filepath.Join(baseDir, "stream")
measureDir := filepath.Join(baseDir, "measure")
propertyDir := filepath.Join(baseDir, "property")
- for _, dir := range []string{streamDir, measureDir, propertyDir} {
+ traceDir := filepath.Join(baseDir, "trace")
+ for _, dir := range []string{streamDir, measureDir, propertyDir,
traceDir} {
if err := os.MkdirAll(dir, storage.DirPerm); err != nil {
t.Fatalf("Failed to create dir %s: %v", dir, err)
}
@@ -121,12 +127,13 @@ func TestNewDeleteCmd(t *testing.T) {
"--stream-root", baseDir,
"--measure-root", baseDir,
"--property-root", baseDir,
+ "--trace-root", baseDir,
})
if err := cmd.Execute(); err != nil {
t.Fatalf("newDeleteCmd.Execute() error: %v", err)
}
- for _, dir := range []string{streamDir, measureDir, propertyDir} {
+ for _, dir := range []string{streamDir, measureDir, propertyDir,
traceDir} {
catalog := filepath.Base(dir)
fp := filepath.Join(dir, fmt.Sprintf("%s-time-dir", catalog))
if _, err := os.Stat(fp); !os.IsNotExist(err) {
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 3484605e..085ba984 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -19,12 +19,15 @@ package trace
import (
"context"
+ "fmt"
"path"
"path/filepath"
"strings"
+ "sync"
"time"
"github.com/pkg/errors"
+ "go.uber.org/multierr"
"github.com/apache/skywalking-banyandb/api/common"
"github.com/apache/skywalking-banyandb/api/data"
@@ -35,6 +38,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/protector"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/fs"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/run"
@@ -143,7 +147,7 @@ func (s *standalone) PreRun(ctx context.Context) error {
s.schemaRepo = newSchemaRepo(s.dataPath, s, node.Labels)
// Initialize snapshot directory
- s.snapshotDir = filepath.Join(s.dataPath, "snapshot")
+ s.snapshotDir = filepath.Join(path, "snapshots")
// Initialize disk monitor for forced retention
s.diskMonitor = storage.NewDiskMonitor(s, s.retentionConfig, s.omr)
@@ -155,6 +159,10 @@ func (s *standalone) PreRun(ctx context.Context) error {
if err != nil {
return err
}
+ err = s.pipeline.Subscribe(data.TopicSnapshot,
&standaloneSnapshotListener{s: s})
+ if err != nil {
+ return err
+ }
s.pipeline.RegisterChunkedSyncHandler(data.TopicTracePartSync,
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
err = s.pipeline.Subscribe(data.TopicTraceSidxSeriesWrite,
setUpSidxSeriesIndexCallback(s.l, &s.schemaRepo))
if err != nil {
@@ -273,6 +281,81 @@ func (s *standalone) GetServiceName() string {
return s.Name()
}
+func (s *standalone) takeGroupSnapshot(dstDir string, groupName string) error {
+ group, ok := s.schemaRepo.LoadGroup(groupName)
+ if !ok {
+ return errors.Errorf("group %s not found", groupName)
+ }
+ db := group.SupplyTSDB()
+ if db == nil {
+ return errors.Errorf("group %s has no tsdb",
group.GetSchema().Metadata.Name)
+ }
+ tsdb := db.(storage.TSDB[*tsTable, option])
+ if err := tsdb.TakeFileSnapshot(dstDir); err != nil {
+ return errors.WithMessagef(err, "snapshot %s fail to take file
snapshot for group %s", dstDir, group.GetSchema().Metadata.Name)
+ }
+ return nil
+}
+
+type standaloneSnapshotListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+ snapshotSeq uint64
+ snapshotMux sync.Mutex
+}
+
+func (d *standaloneSnapshotListener) Rev(ctx context.Context, message
bus.Message) bus.Message {
+ groups := message.Data().([]*databasev1.SnapshotRequest_Group)
+ var gg []resourceSchema.Group
+ if len(groups) == 0 {
+ gg = d.s.schemaRepo.LoadAllGroups()
+ } else {
+ for _, g := range groups {
+ if g.Catalog != commonv1.Catalog_CATALOG_TRACE {
+ continue
+ }
+ group, ok := d.s.schemaRepo.LoadGroup(g.Group)
+ if !ok {
+ continue
+ }
+ gg = append(gg, group)
+ }
+ }
+ if len(gg) == 0 {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
+ }
+ d.snapshotMux.Lock()
+ defer d.snapshotMux.Unlock()
+ storage.DeleteStaleSnapshots(d.s.snapshotDir, d.s.maxFileSnapshotNum,
d.s.lfs)
+ sn := d.snapshotName()
+ var err error
+ for _, g := range gg {
+ select {
+ case <-ctx.Done():
+ return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
+ default:
+ }
+ if errGroup :=
d.s.takeGroupSnapshot(filepath.Join(d.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
+ d.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+ err = multierr.Append(err, errGroup)
+ continue
+ }
+ }
+ snp := &databasev1.Snapshot{
+ Name: sn,
+ Catalog: commonv1.Catalog_CATALOG_TRACE,
+ }
+ if err != nil {
+ snp.Error = err.Error()
+ }
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), snp)
+}
+
+func (d *standaloneSnapshotListener) snapshotName() string {
+ d.snapshotSeq++
+ return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
+}
+
// NewService returns a new service.
func NewService(metadata metadata.Repo, pipeline queue.Server, omr
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
return &standalone{
diff --git a/docs/operation/backup.md b/docs/operation/backup.md
index fa73491d..5a448904 100644
--- a/docs/operation/backup.md
+++ b/docs/operation/backup.md
@@ -11,6 +11,7 @@ The backup tool performs the following operations:
- **Stream Catalog:** Uses the `stream-root-path`.
- **Measure Catalog:** Uses the `measure-root-path`.
- **Property Catalog:** Uses the `property-root-path`.
+ - **Trace Catalog:** Uses the `trace-root-path`.
- Computes a time-based directory name (formatted as daily or hourly).
- Uploads files that are not found in the remote storage to the specified
destination.
- Deletes orphaned files in the remote destination that no longer exist
locally.
@@ -28,7 +29,7 @@ Before running the backup tool, ensure you have:
- Azure Blob Storage: Azure URLs with account credentials
- Google Cloud Storage: GCS URLs with service account
- Necessary access rights for writing to the destination.
-- Sufficient permissions to access the snapshots directories for **Stream**,
**Measure**, and **Property** catalogs on the data node.
+- Sufficient permissions to access the snapshots directories for **Stream**,
**Measure**, **Property**, **Trace** catalogs on the data node.
## Command-Line Usage
@@ -96,32 +97,33 @@ When a schedule is provided, the tool:
## Detailed Options
-| Flag | Description
| Default Value |
-| ------------------- |
-----------------------------------------------------------------------------------------
| --------------------- |
-| `--grpc-addr` | gRPC address of the data node.
| `127.0.0.1:17912` |
-| `--enable-tls` | Enable TLS for the gRPC connection.
| `false` |
-| `--insecure` | Skip server certificate verification.
| `false` |
-| `--cert` | Path to the gRPC server certificate.
| _empty_ |
-| `--dest` | Destination URL for backup data (`file:///`, `s3://`,
`azure://`, `gs://`) | _required_ |
-| `--stream-root-path`| Root directory for the stream catalog snapshots.
| `/tmp` |
-| `--measure-root-path`| Root directory for the measure catalog snapshots.
| `/tmp` |
-| `--property-root-path`| Root directory for the property catalog snapshots.
| `/tmp` |
-| `--time-style` | Directory naming style based on time (`daily` or
`hourly`) | `daily` |
-| `--schedule` | Schedule expression for periodic backup. Options:
`@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly`, `@every <duration>` |
_empty_ |
-| `--logging-level` | Root logging level (`debug`, `info`, `warn`, `error`)
| `info` |
-| `--logging-env` | Logging environment (`dev` or `prod`)
| `prod` |
-| **AWS S3 specific** | | |
-| `--s3-profile` | AWS profile name
| _empty_ |
-| `--s3-config-file` | Path to the AWS config file
| _empty_ |
-| `--s3-credential-file`| Path to the AWS credential file
| _empty_ |
-| `--s3-storage-class`| AWS S3 storage class for uploaded objects
| _empty_ |
-| `--s3-checksum-algorithm`| Checksum algorithm when uploading to S3
| _empty_ |
-| **Azure Blob specific** | | |
-| `--azure-account-name`| Azure storage account name
| _empty_ |
-| `--azure-account-key`| Azure storage account key
| _empty_ |
-| `--azure-sas-token` | Azure SAS token (alternative to account key)
| _empty_ |
-| `--azure-endpoint` | Azure blob service endpoint
| _empty_ |
-| **Google Cloud Storage specific** | | |
-| `--gcp-service-account-file`| Path to the GCP service account JSON file
| _empty_ |
+| Flag | Description
| Default Value |
+|-----------------------------------|------------------------------------------------------------------------------------------------------------------------------|-------------------|
+| `--grpc-addr` | gRPC address of the data node.
| `127.0.0.1:17912` |
+| `--enable-tls` | Enable TLS for the gRPC connection.
| `false` |
+| `--insecure` | Skip server certificate verification.
| `false` |
+| `--cert` | Path to the gRPC server certificate.
| _empty_ |
+| `--dest` | Destination URL for backup data
(`file:///`, `s3://`, `azure://`, `gs://`)
| _required_ |
+| `--stream-root-path` | Root directory for the stream catalog
snapshots.
| `/tmp` |
+| `--measure-root-path` | Root directory for the measure catalog
snapshots.
| `/tmp` |
+| `--property-root-path` | Root directory for the property catalog
snapshots.
| `/tmp` |
+| `--trace-root-path` | Root directory for the trace catalog
snapshots.
| `/tmp` |
+| `--time-style` | Directory naming style based on time
(`daily` or `hourly`)
| `daily` |
+| `--schedule` | Schedule expression for periodic backup.
Options: `@yearly`, `@monthly`, `@weekly`, `@daily`, `@hourly`, `@every
<duration>` | _empty_ |
+| `--logging-level` | Root logging level (`debug`, `info`,
`warn`, `error`)
| `info` |
+| `--logging-env` | Logging environment (`dev` or `prod`)
| `prod` |
+| **AWS S3 specific** |
| |
+| `--s3-profile` | AWS profile name
| _empty_ |
+| `--s3-config-file` | Path to the AWS config file
| _empty_ |
+| `--s3-credential-file` | Path to the AWS credential file
| _empty_ |
+| `--s3-storage-class` | AWS S3 storage class for uploaded
objects
| _empty_ |
+| `--s3-checksum-algorithm` | Checksum algorithm when uploading to S3
| _empty_ |
+| **Azure Blob specific** |
| |
+| `--azure-account-name` | Azure storage account name
| _empty_ |
+| `--azure-account-key` | Azure storage account key
| _empty_ |
+| `--azure-sas-token` | Azure SAS token (alternative to account
key)
| _empty_ |
+| `--azure-endpoint` | Azure blob service endpoint
| _empty_ |
+| **Google Cloud Storage specific** |
| |
+| `--gcp-service-account-file` | Path to the GCP service account JSON
file
| _empty_ |
This guide should provide you with the necessary steps and information to
effectively use the backup tool for your data backup operations.
diff --git a/docs/operation/restore.md b/docs/operation/restore.md
index c0c7dfc9..013afadc 100644
--- a/docs/operation/restore.md
+++ b/docs/operation/restore.md
@@ -11,7 +11,7 @@ This document explains how to use the backup and restore
command line tools for
Reads the *timedir* files from local catalog directories and uses the
timestamp within to determine which remote backup snapshot should be applied.
Once restoration is successful, it removes all *timedir* files to avoid
repeated restore operations, especially during unexpected scenarios.
- **Timedir Utility:**
- Provides commands to create, list, read, and delete *time-dir* marker files
for each catalog (e.g., _stream_, _measure_, and _property_).
+ Provides commands to create, list, read, and delete *time-dir* marker files
for each catalog (e.g., _stream_, _measure_, _property_, and _trace_).
## Restore Workflow
@@ -29,6 +29,7 @@ backup run \
--stream-root-path /data \
--measure-root-path /data \
--property-root-path /data \
+ --trace-root-path /data \
--dest file:///backups \
--time-style daily
```
@@ -36,7 +37,7 @@ backup run \
**Notes:**
- `--grpc-addr`: gRPC address for the data node.
-- `--stream-root-path`, `--measure-root-path`, `--property-root-path`: Local
directories for the respective catalogs.
+- `--stream-root-path`, `--measure-root-path`, `--property-root-path`,
`--trace-root-path`: Local directories for the respective catalogs.
- `--dest`: Remote destination URL where backups will be stored (e.g., local
filesystem path with the `file://` scheme).
- `--time-style`: Defines the time directory style, such as `daily` or
`hourly`.
@@ -72,10 +73,11 @@ Before undertaking a pod or service restart, an
administrator may create timedir
restore timedir create 2025-02-12 \
--stream-root /data \
--measure-root /data \
- --property-root /data
+ --property-root /data \
+ --trace-root /data
```
-This command writes the specified timestamp (e.g., 2025-02-12) into files
named `/data/stream/time-dir`, `/data/measure/time-dir`, and
`/data/property/time-dir` in the designated root directories.
+This command writes the specified timestamp (e.g., 2025-02-12) into files
named `/data/stream/time-dir`, `/data/measure/time-dir`,
`/data/property/time-dir`, and `/data/trace/time-dir` in the designated root
directories.
#### Verifying Timedir Files
@@ -85,7 +87,8 @@ Ensure that the timedir files have been created with the
correct content by read
restore timedir read \
--stream-root /data \
--measure-root /data \
- --property-root /data
+ --property-root /data \
+ --trace-root /data
```
The output should display the timedir content for each catalog.
@@ -103,7 +106,8 @@ restore run \
--source file:///backups \
--stream-root-path /data \
--measure-root-path /data \
- --property-root-path /data
+ --property-root-path /data \
+ --trace-root-path /data
```
**Key Points:**
@@ -124,7 +128,8 @@ restore run \
--s3-profile "my-profile" \
--stream-root-path /data \
--measure-root-path /data \
- --property-root-path /data
+ --property-root-path /data \
+ --trace-root-path /data
# Google Cloud Storage
restore run \
@@ -132,7 +137,8 @@ restore run \
--gcp-service-account-file "/path/to/service-account.json" \
--stream-root-path /data \
--measure-root-path /data \
- --property-root-path /data
+ --property-root-path /data \
+ --trace-root-path /data
# Azure Blob Storage (using SAS token)
restore run \
@@ -141,7 +147,8 @@ restore run \
--azure-sas-token "mysastoken" \
--stream-root-path /data \
--measure-root-path /data \
- --property-root-path /data
+ --property-root-path /data \
+ --trace-root-path /data
```
## Kubernetes Deployment
diff --git a/test/cases/backup/all.go b/test/cases/backup/all.go
index 87104bba..d06aa780 100644
--- a/test/cases/backup/all.go
+++ b/test/cases/backup/all.go
@@ -48,6 +48,7 @@ var _ = ginkgo.Describe("Backup All", func() {
"--stream-root-path", SharedContext.RootDir,
"--measure-root-path", SharedContext.RootDir,
"--property-root-path", SharedContext.RootDir,
+ "--trace-root-path", SharedContext.RootDir,
"--dest", destURL,
"--time-style", "daily",
}, SharedContext.S3Args...))
@@ -110,6 +111,7 @@ var _ = ginkgo.Describe("Backup All", func() {
"--stream-root", newCatalogDir,
"--measure-root", newCatalogDir,
"--property-root", newCatalogDir,
+ "--trace-root", newCatalogDir,
latestTimedir,
})
createOut := &bytes.Buffer{}
@@ -125,6 +127,7 @@ var _ = ginkgo.Describe("Backup All", func() {
"--stream-root", newCatalogDir,
"--measure-root", newCatalogDir,
"--property-root", newCatalogDir,
+ "--trace-root", newCatalogDir,
})
readOut := &bytes.Buffer{}
readCmd.SetOut(readOut)
@@ -135,7 +138,7 @@ var _ = ginkgo.Describe("Backup All", func() {
gomega.Expect(readResult).To(gomega.ContainSubstring(latestTimedir))
ginkgo.By("Create random files in the data directories
of the new catalog")
- catalogs := []string{"stream", "measure", "property"}
+ catalogs := []string{"stream", "measure", "property",
"trace"}
for _, cat := range catalogs {
dataDir := filepath.Join(newCatalogDir, cat,
"data")
err = os.MkdirAll(dataDir, 0o755)
@@ -154,6 +157,7 @@ var _ = ginkgo.Describe("Backup All", func() {
"--stream-root-path", newCatalogDir,
"--measure-root-path", newCatalogDir,
"--property-root-path", newCatalogDir,
+ "--trace-root-path", newCatalogDir,
}, SharedContext.S3Args...))
restoreOut := &bytes.Buffer{}
restoreCmd.SetOut(restoreOut)
@@ -229,6 +233,7 @@ func clearSnapshotDirs() {
filepath.Join(SharedContext.RootDir, "measure", "snapshots"),
filepath.Join(SharedContext.RootDir, "stream", "snapshots"),
filepath.Join(SharedContext.RootDir, "property", "snapshots"),
+ filepath.Join(SharedContext.RootDir, "trace", "snapshots"),
}
for _, dir := range snpDirs {
diff --git a/test/cases/backup/backup.go b/test/cases/backup/backup.go
index c3a9c53a..589be3e9 100644
--- a/test/cases/backup/backup.go
+++ b/test/cases/backup/backup.go
@@ -55,7 +55,7 @@ var _ = ginkgo.Describe("Backup", func() {
resp, err := client.Snapshot(context.Background(),
&databasev1.SnapshotRequest{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(resp).NotTo(gomega.BeNil())
- gomega.Expect(resp.Snapshots).To(gomega.HaveLen(3))
+ gomega.Expect(resp.Snapshots).To(gomega.HaveLen(4))
catalogNumMap := make(map[commonv1.Catalog]int)
for _, snp := range resp.Snapshots {
var snpDir string
@@ -65,6 +65,8 @@ var _ = ginkgo.Describe("Backup", func() {
snpDir = filepath.Join(SharedContext.RootDir,
"stream", "snapshots")
} else if snp.Catalog ==
commonv1.Catalog_CATALOG_PROPERTY {
snpDir = filepath.Join(SharedContext.RootDir,
"property", "snapshots")
+ } else if snp.Catalog == commonv1.Catalog_CATALOG_TRACE
{
+ snpDir = filepath.Join(SharedContext.RootDir,
"trace", "snapshots")
} else {
ginkgo.Fail("unexpected snapshot catalog")
}
@@ -74,7 +76,7 @@ var _ = ginkgo.Describe("Backup", func() {
resp, err = client.Snapshot(context.Background(),
&databasev1.SnapshotRequest{})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
gomega.Expect(resp).NotTo(gomega.BeNil())
- gomega.Expect(resp.Snapshots).To(gomega.HaveLen(3))
+ gomega.Expect(resp.Snapshots).To(gomega.HaveLen(4))
for _, snp := range resp.Snapshots {
if snp.Catalog == commonv1.Catalog_CATALOG_MEASURE {
measureSnapshotDir :=
filepath.Join(SharedContext.RootDir, "measure", "snapshots")
@@ -88,6 +90,10 @@ var _ = ginkgo.Describe("Backup", func() {
propertySnapshotDir :=
filepath.Join(SharedContext.RootDir, "property", "snapshots")
entries := lfs.ReadDir(propertySnapshotDir)
gomega.Expect(entries).To(gomega.HaveLen(catalogNumMap[snp.GetCatalog()] + 1))
+ } else if snp.Catalog == commonv1.Catalog_CATALOG_TRACE
{
+ traceSnapshotDir :=
filepath.Join(SharedContext.RootDir, "trace", "snapshots")
+ entries := lfs.ReadDir(traceSnapshotDir)
+
gomega.Expect(entries).To(gomega.HaveLen(catalogNumMap[snp.GetCatalog()] + 1))
} else {
ginkgo.Fail("unexpected snapshot catalog")
}
@@ -106,6 +112,7 @@ var _ = ginkgo.Describe("Backup", func() {
"--stream-root-path", SharedContext.RootDir,
"--measure-root-path", SharedContext.RootDir,
"--property-root-path", SharedContext.RootDir,
+ "--trace-root-path", SharedContext.RootDir,
"--dest", destURL,
"--time-style", "daily",
})
@@ -114,9 +121,9 @@ var _ = ginkgo.Describe("Backup", func() {
timeDir := time.Now().Format("2006-01-02")
entries := lfs.ReadDir(filepath.Join(destDir, timeDir))
- gomega.Expect(entries).To(gomega.HaveLen(3))
+ gomega.Expect(entries).To(gomega.HaveLen(4))
for _, entry := range entries {
-
gomega.Expect(entry.Name()).To(gomega.BeElementOf([]string{"stream", "measure",
"property"}))
+
gomega.Expect(entry.Name()).To(gomega.BeElementOf([]string{"stream", "measure",
"property", "trace"}))
}
})
})
diff --git a/test/integration/distributed/backup/common.go
b/test/integration/distributed/backup/common.go
index 6029fdcd..3a9a840c 100644
--- a/test/integration/distributed/backup/common.go
+++ b/test/integration/distributed/backup/common.go
@@ -49,6 +49,7 @@ import (
test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
test_cases "github.com/apache/skywalking-banyandb/test/cases"
casesbackup "github.com/apache/skywalking-banyandb/test/cases/backup"
@@ -105,6 +106,8 @@ func InitializeTestSuite() (*CommonTestVars, error) {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = test_measure.PreloadSchema(ctx, schemaRegistry)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ err = test_trace.PreloadSchema(ctx, schemaRegistry)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = test_property.PreloadSchema(ctx, schemaRegistry)
gomega.Expect(err).NotTo(gomega.HaveOccurred())