This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new ee009edd Support lifecycle to trace data (#852)
ee009edd is described below
commit ee009edd711642197c7d8579669611fab9811f52
Author: mrproliu <[email protected]>
AuthorDate: Thu Nov 27 10:13:23 2025 +0800
Support lifecycle to trace data (#852)
* Support lifecycle to trace data
---------
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: Gao Hongtao <[email protected]>
---
api/data/data.go | 4 +
api/data/trace.go | 9 +
api/proto/banyandb/trace/v1/rpc.proto | 11 +
banyand/backup/lifecycle/migration_integration.go | 70 +++
banyand/backup/lifecycle/progress.go | 192 ++++++-
banyand/backup/lifecycle/service.go | 103 +++-
banyand/backup/lifecycle/steps.go | 16 +-
.../backup/lifecycle/trace_migration_visitor.go | 593 +++++++++++++++++++++
banyand/internal/sidx/part.go | 136 +++++
banyand/internal/storage/tsdb.go | 2 +
banyand/measure/snapshot.go | 124 ++++-
banyand/measure/svc_data.go | 11 +
banyand/queue/sub/segments.go | 28 +
banyand/queue/sub/server.go | 2 +
banyand/stream/snapshot.go | 123 ++++-
banyand/trace/part.go | 104 ++++
banyand/trace/part_metadata.go | 24 +
banyand/trace/snapshot.go | 2 +-
banyand/trace/svc_standalone.go | 150 +++++-
banyand/trace/visitor.go | 57 ++
banyand/trace/write_data.go | 123 ++++-
docs/api-reference.md | 35 ++
pkg/schema/cache.go | 2 +
pkg/test/trace/etcd.go | 12 +-
.../testdata/groups_stages/test-trace-group.json | 34 ++
.../testdata/groups_stages/test-trace-updated.json | 34 ++
.../testdata/groups_stages/zipkin-trace-group.json | 34 ++
test/cases/lifecycle/lifecycle.go | 17 +
.../distributed/lifecycle/lifecycle_suite_test.go | 8 +-
29 files changed, 2024 insertions(+), 36 deletions(-)
diff --git a/api/data/data.go b/api/data/data.go
index 5d142277..956c9e33 100644
--- a/api/data/data.go
+++ b/api/data/data.go
@@ -52,6 +52,7 @@ var (
TopicTraceWrite.String(): TopicTraceWrite,
TopicTraceQuery.String(): TopicTraceQuery,
TopicTracePartSync.String(): TopicTracePartSync,
+ TopicTraceSeriesSync.String(): TopicTraceSeriesSync,
TopicTraceSidxSeriesWrite.String():
TopicTraceSidxSeriesWrite,
}
@@ -121,6 +122,9 @@ var (
TopicTracePartSync: func() proto.Message {
return nil
},
+ TopicTraceSeriesSync: func() proto.Message {
+ return nil
+ },
TopicTraceSidxSeriesWrite: func() proto.Message {
return nil
},
diff --git a/api/data/trace.go b/api/data/trace.go
index ee309058..3918e3f7 100644
--- a/api/data/trace.go
+++ b/api/data/trace.go
@@ -58,6 +58,15 @@ var TracePartSyncKindVersion = common.KindVersion{
// TopicTracePartSync is the part sync topic.
var TopicTracePartSync = bus.BiTopic(TracePartSyncKindVersion.String())
+// TraceSeriesSyncKindVersion is the version tag of series sync kind.
+var TraceSeriesSyncKindVersion = common.KindVersion{
+ Version: "v1",
+ Kind: "trace-series-sync",
+}
+
+// TopicTraceSeriesSync is the series sync topic.
+var TopicTraceSeriesSync = bus.BiTopic(TraceSeriesSyncKindVersion.String())
+
// TraceSidxSeriesWriteKindVersion is the version tag of trace sidx series
write kind.
var TraceSidxSeriesWriteKindVersion = common.KindVersion{
Version: "v1",
diff --git a/api/proto/banyandb/trace/v1/rpc.proto
b/api/proto/banyandb/trace/v1/rpc.proto
index 0c889388..62e04ada 100644
--- a/api/proto/banyandb/trace/v1/rpc.proto
+++ b/api/proto/banyandb/trace/v1/rpc.proto
@@ -28,6 +28,15 @@ option go_package =
"github.com/apache/skywalking-banyandb/api/proto/banyandb/tr
option java_package = "org.apache.skywalking.banyandb.trace.v1";
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) =
{base_path: "/api"};
+message DeleteExpiredSegmentsRequest {
+ string group = 1;
+ repeated string segment_suffixes = 2;
+}
+
+message DeleteExpiredSegmentsResponse {
+ int64 deleted = 1;
+}
+
service TraceService {
rpc Query(QueryRequest) returns (QueryResponse) {
option (google.api.http) = {
@@ -37,4 +46,6 @@ service TraceService {
}
rpc Write(stream WriteRequest) returns (stream WriteResponse);
+
+ rpc DeleteExpiredSegments(DeleteExpiredSegmentsRequest) returns
(DeleteExpiredSegmentsResponse);
}
diff --git a/banyand/backup/lifecycle/migration_integration.go
b/banyand/backup/lifecycle/migration_integration.go
index b36ae1e5..8036f95d 100644
--- a/banyand/backup/lifecycle/migration_integration.go
+++ b/banyand/backup/lifecycle/migration_integration.go
@@ -22,6 +22,7 @@ import (
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/measure"
"github.com/apache/skywalking-banyandb/banyand/stream"
+ "github.com/apache/skywalking-banyandb/banyand/trace"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -168,3 +169,72 @@ func (pcv *measurePartCountVisitor) VisitPart(_
*timestamp.TimeRange, _ common.S
pcv.partCount++
return nil
}
+
+// migrateTraceWithFileBasedAndProgress performs file-based trace migration
with progress tracking.
+func migrateTraceWithFileBasedAndProgress(tsdbRootPath string, timeRange
timestamp.TimeRange, group *GroupConfig,
+ logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+ // Convert segment interval to IntervalRule using
storage.MustToIntervalRule
+ segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
+
+ // Get target stage configuration
+ targetStageInterval := getTargetStageInterval(group)
+
+ // Count total shards before starting migration
+ totalShards, segmentSuffixes, err := countTraceShards(tsdbRootPath,
timeRange, segmentIntervalRule)
+ if err != nil {
+ logger.Warn().Err(err).Msg("failed to count trace parts,
proceeding without part count")
+ } else {
+ logger.Info().Int("total_shards",
totalShards).Strs("segment_suffixes", segmentSuffixes).
+ Msg("counted trace parts for progress tracking")
+ }
+
+ // Create file-based migration visitor with progress tracking and
target stage interval
+ visitor := newTraceMigrationVisitor(
+ group.Group, group.TargetShardNum, group.TargetReplicas,
group.NodeSelector, group.QueueClient,
+ logger, progress, chunkSize, targetStageInterval,
+ )
+ defer visitor.Close()
+
+ // Set the total part count for progress tracking
+ if totalShards > 0 {
+ visitor.SetTraceShardCount(totalShards)
+ }
+
+ // Use the existing VisitTracesInTimeRange function with our file-based
visitor
+ _, err = trace.VisitTracesInTimeRange(tsdbRootPath, timeRange, visitor,
segmentIntervalRule)
+ if err != nil {
+ return nil, err
+ }
+ return segmentSuffixes, nil
+}
+
+// countTraceShards counts the total number of shards in the given time range.
+func countTraceShards(tsdbRootPath string, timeRange timestamp.TimeRange,
segmentInterval storage.IntervalRule) (int, []string, error) {
+ // Create a simple visitor to count shards
+ shardCounter := &traceShardsCountVisitor{}
+
+ // Use the existing VisitTracesInTimeRange function to count parts
+ segmentSuffixes, err := trace.VisitTracesInTimeRange(tsdbRootPath,
timeRange, shardCounter, segmentInterval)
+ if err != nil {
+ return 0, nil, err
+ }
+
+ return shardCounter.shardCount, segmentSuffixes, nil
+}
+
+// traceShardsCountVisitor is a simple visitor that counts trace shards.
+type traceShardsCountVisitor struct {
+ shardCount int
+}
+
+// VisitSeries implements trace.Visitor.
+func (pcv *traceShardsCountVisitor) VisitSeries(_ *timestamp.TimeRange, _
string, _ []common.ShardID) error {
+ return nil
+}
+
+// VisitShard implements trace.Visitor.
+func (pcv *traceShardsCountVisitor) VisitShard(_ *timestamp.TimeRange, _
common.ShardID, _ string) error {
+ pcv.shardCount++
+ return nil
+}
diff --git a/banyand/backup/lifecycle/progress.go
b/banyand/backup/lifecycle/progress.go
index 7baf214b..99f7083d 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -33,6 +33,7 @@ type Progress struct {
CompletedGroups map[string]bool
`json:"completed_groups"`
DeletedStreamGroups map[string]bool
`json:"deleted_stream_groups"`
DeletedMeasureGroups map[string]bool
`json:"deleted_measure_groups"`
+ DeletedTraceGroups map[string]bool
`json:"deleted_trace_groups"`
CompletedStreamParts
map[string]map[string]map[common.ShardID]map[uint64]bool
`json:"completed_stream_parts"`
StreamPartErrors
map[string]map[string]map[common.ShardID]map[uint64]string
`json:"stream_part_errors"`
CompletedStreamSeries
map[string]map[string]map[common.ShardID]bool
`json:"completed_stream_series"`
@@ -54,10 +55,20 @@ type Progress struct {
MeasureSeriesErrors map[string]map[string]map[common.ShardID]string
`json:"measure_series_errors"`
MeasureSeriesCounts map[string]int
`json:"measure_series_counts"`
MeasureSeriesProgress map[string]int
`json:"measure_series_progress"`
- progressFilePath string
`json:"-"`
- SnapshotStreamDir string
`json:"snapshot_stream_dir"`
- SnapshotMeasureDir string
`json:"snapshot_measure_dir"`
- mu sync.Mutex
`json:"-"`
+ // Trace part-specific progress tracking
+ CompletedTraceShards map[string]map[string]map[common.ShardID]bool
`json:"completed_trace_shards"`
+ TraceShardErrors map[string]map[string]map[common.ShardID]string
`json:"trace_shard_errors"`
+ TraceShardCounts map[string]int
`json:"trace_shard_counts"`
+ TraceShardProgress map[string]int
`json:"trace_shard_progress"`
+ CompletedTraceSeries map[string]map[string]map[common.ShardID]bool
`json:"completed_trace_series"`
+ TraceSeriesErrors map[string]map[string]map[common.ShardID]string
`json:"trace_series_errors"`
+ TraceSeriesCounts map[string]int
`json:"trace_series_counts"`
+ TraceSeriesProgress map[string]int
`json:"trace_series_progress"`
+ progressFilePath string
`json:"-"`
+ SnapshotStreamDir string
`json:"snapshot_stream_dir"`
+ SnapshotMeasureDir string
`json:"snapshot_measure_dir"`
+ SnapshotTraceDir string
`json:"snapshot_trace_dir"`
+ mu sync.Mutex
`json:"-"`
}
// AllGroupsNotFullyCompleted find is there have any group not fully completed.
@@ -80,6 +91,7 @@ func NewProgress(path string, l *logger.Logger) *Progress {
CompletedGroups: make(map[string]bool),
DeletedStreamGroups: make(map[string]bool),
DeletedMeasureGroups: make(map[string]bool),
+ DeletedTraceGroups: make(map[string]bool),
CompletedStreamParts:
make(map[string]map[string]map[common.ShardID]map[uint64]bool),
StreamPartErrors:
make(map[string]map[string]map[common.ShardID]map[uint64]string),
StreamPartCounts: make(map[string]int),
@@ -100,6 +112,14 @@ func NewProgress(path string, l *logger.Logger) *Progress {
MeasureSeriesErrors:
make(map[string]map[string]map[common.ShardID]string),
MeasureSeriesCounts: make(map[string]int),
MeasureSeriesProgress: make(map[string]int),
+ CompletedTraceShards:
make(map[string]map[string]map[common.ShardID]bool),
+ TraceShardErrors:
make(map[string]map[string]map[common.ShardID]string),
+ TraceShardCounts: make(map[string]int),
+ TraceShardProgress: make(map[string]int),
+ CompletedTraceSeries:
make(map[string]map[string]map[common.ShardID]bool),
+ TraceSeriesErrors:
make(map[string]map[string]map[common.ShardID]string),
+ TraceSeriesCounts: make(map[string]int),
+ TraceSeriesProgress: make(map[string]int),
progressFilePath: path,
logger: l,
}
@@ -697,3 +717,167 @@ func (p *Progress) GetMeasureSeriesProgress(group string)
int {
}
return 0
}
+
+// MarkTraceGroupDeleted marks a trace group as deleted.
+func (p *Progress) MarkTraceGroupDeleted(group string) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.DeletedTraceGroups[group] = true
+}
+
+// IsTraceGroupDeleted checks if a trace group has been deleted.
+func (p *Progress) IsTraceGroupDeleted(group string) bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ return p.DeletedTraceGroups[group]
+}
+
+// MarkTraceShardCompleted marks a specific shard of a trace as completed.
+func (p *Progress) MarkTraceShardCompleted(group string, segmentID string,
shardID common.ShardID) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.CompletedTraceShards[group] == nil {
+ p.CompletedTraceShards[group] =
make(map[string]map[common.ShardID]bool)
+ }
+ if p.CompletedTraceShards[group][segmentID] == nil {
+ p.CompletedTraceShards[group][segmentID] =
make(map[common.ShardID]bool)
+ }
+ p.CompletedTraceShards[group][segmentID][shardID] = true
+
+ // Increment progress
+ p.TraceShardProgress[group]++
+}
+
+// IsTraceShardCompleted checks if a specific part of a trace has been
completed.
+func (p *Progress) IsTraceShardCompleted(group string, segmentID string,
shardID common.ShardID) bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if segments, ok := p.CompletedTraceShards[group]; ok {
+ if shards, ok := segments[segmentID]; ok {
+ return shards[shardID]
+ }
+ }
+ return false
+}
+
+// MarkTraceShardError marks an error for a specific part of a trace.
+func (p *Progress) MarkTraceShardError(group string, segmentID string, shardID
common.ShardID, errorMsg string) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.TraceShardErrors[group] == nil {
+ p.TraceShardErrors[group] =
make(map[string]map[common.ShardID]string)
+ }
+ if p.TraceShardErrors[group][segmentID] == nil {
+ p.TraceShardErrors[group][segmentID] =
make(map[common.ShardID]string)
+ }
+ p.TraceShardErrors[group][segmentID][shardID] = errorMsg
+}
+
+// SetTraceShardCount sets the total number of shards for the current trace.
+func (p *Progress) SetTraceShardCount(group string, totalShards int) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.TraceShardCounts[group] = totalShards
+}
+
+// GetTraceShards gets the total number of shards for the current trace.
+func (p *Progress) GetTraceShards(group string) int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if count, ok := p.TraceShardCounts[group]; ok {
+ return count
+ }
+ return 0
+}
+
+// GetTraceShardProgress gets the number of completed shards for the current
trace.
+func (p *Progress) GetTraceShardProgress(group string) int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if progress, ok := p.TraceShardProgress[group]; ok {
+ return progress
+ }
+ return 0
+}
+
+// MarkTraceSeriesCompleted marks a specific series segment of a trace as
completed.
+func (p *Progress) MarkTraceSeriesCompleted(group string, segmentID string,
shardID common.ShardID) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.CompletedTraceSeries[group] == nil {
+ p.CompletedTraceSeries[group] =
make(map[string]map[common.ShardID]bool)
+ }
+ if p.CompletedTraceSeries[group][segmentID] == nil {
+ p.CompletedTraceSeries[group][segmentID] =
make(map[common.ShardID]bool)
+ }
+ p.CompletedTraceSeries[group][segmentID][shardID] = true
+
+ // Increment progress
+ p.TraceSeriesProgress[group]++
+}
+
+// IsTraceSeriesCompleted checks if a specific series segment of a trace has
been completed.
+func (p *Progress) IsTraceSeriesCompleted(group string, segmentID string,
shardID common.ShardID) bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if segments, ok := p.CompletedTraceSeries[group]; ok {
+ if shards, ok := segments[segmentID]; ok {
+ return shards[shardID]
+ }
+ }
+ return false
+}
+
+// MarkTraceSeriesError marks an error for a specific series segment of a
trace.
+func (p *Progress) MarkTraceSeriesError(group string, segmentID string,
shardID common.ShardID, errorMsg string) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if p.TraceSeriesErrors[group] == nil {
+ p.TraceSeriesErrors[group] =
make(map[string]map[common.ShardID]string)
+ }
+ if p.TraceSeriesErrors[group][segmentID] == nil {
+ p.TraceSeriesErrors[group][segmentID] =
make(map[common.ShardID]string)
+ }
+ p.TraceSeriesErrors[group][segmentID][shardID] = errorMsg
+}
+
+// SetTraceSeriesCount sets the total number of series segments for the
current trace.
+func (p *Progress) SetTraceSeriesCount(group string, totalSegments int) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ p.TraceSeriesCounts[group] = totalSegments
+}
+
+// GetTraceSeriesCount gets the total number of series segments for the
current trace.
+func (p *Progress) GetTraceSeriesCount(group string) int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if count, ok := p.TraceSeriesCounts[group]; ok {
+ return count
+ }
+ return 0
+}
+
+// GetTraceSeriesProgress gets the number of completed series segments for the
current trace.
+func (p *Progress) GetTraceSeriesProgress(group string) int {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ if progress, ok := p.TraceSeriesProgress[group]; ok {
+ return progress
+ }
+ return 0
+}
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index 243893f0..c001d72f 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -36,6 +36,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
@@ -61,6 +62,7 @@ type lifecycleService struct {
sch *timestamp.Scheduler
measureRoot string
streamRoot string
+ traceRoot string
progressFilePath string
reportDir string
schedule string
@@ -90,6 +92,7 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
flagS.BoolVar(&l.insecure, "insecure", false, "Skip server certificate
verification")
flagS.StringVar(&l.cert, "cert", "", "Path to the gRPC server
certificate")
flagS.StringVar(&l.streamRoot, "stream-root-path", "/tmp", "Root
directory for stream catalog")
+ flagS.StringVar(&l.traceRoot, "trace-root-path", "/tmp", "Root
directory for trace catalog")
flagS.StringVar(&l.measureRoot, "measure-root-path", "/tmp", "Root
directory for measure catalog")
flagS.StringVar(&l.progressFilePath, "progress-file",
"/tmp/lifecycle-progress.json", "Path to store progress for crash recovery")
flagS.StringVar(&l.reportDir, "report-dir", "/tmp/lifecycle-reports",
"Directory to store migration reports")
@@ -174,12 +177,12 @@ func (l *lifecycleService) action() error {
}
// Pass progress to getSnapshots
- streamDir, measureDir, err := l.getSnapshots(groups, progress)
+ streamDir, measureDir, traceDir, err := l.getSnapshots(groups, progress)
if err != nil {
l.l.Error().Err(err).Msg("failed to get snapshots")
return err
}
- if streamDir == "" && measureDir == "" {
+ if streamDir == "" && measureDir == "" && traceDir == "" {
l.l.Warn().Msg("no snapshots found, skipping lifecycle
migration")
l.generateReport(progress)
return nil
@@ -187,6 +190,7 @@ func (l *lifecycleService) action() error {
l.l.Info().
Str("stream_snapshot", streamDir).
Str("measure_snapshot", measureDir).
+ Str("trace_snapshot", traceDir).
Msg("created snapshots")
progress.Save(l.progressFilePath, l.l)
@@ -215,9 +219,12 @@ func (l *lifecycleService) action() error {
}
l.processMeasureGroup(ctx, g, measureDir, nodes,
labels, progress)
case commonv1.Catalog_CATALOG_TRACE:
- progress.MarkGroupCompleted(g.Metadata.Name)
- l.l.Info().Msgf("group trace not supported, skipping
group: %s", g.Metadata.Name)
- continue
+ if traceDir == "" {
+ l.l.Warn().Msgf("trace snapshot directory is
not available, skipping group: %s", g.Metadata.Name)
+ progress.MarkGroupCompleted(g.Metadata.Name)
+ continue
+ }
+ l.processTraceGroup(ctx, g, traceDir, nodes, labels,
progress)
default:
l.l.Info().Msgf("group catalog: %s doesn't support
lifecycle management", g.Catalog)
}
@@ -531,13 +538,16 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
l.l.Error().Err(err).Msg("failed to list groups")
return nil, err
}
+ allGroupNames := getGroupNames(gg)
groups := make([]*commonv1.Group, 0, len(gg))
for _, g := range gg {
if g.ResourceOpts == nil {
+ l.l.Debug().Msgf("skipping group %s because resource
opts is nil", g.Metadata.Name)
continue
}
if len(g.ResourceOpts.Stages) == 0 {
+ l.l.Debug().Msgf("skipping group %s because stages is
empty", g.Metadata.Name)
continue
}
if progress.IsGroupCompleted(g.Metadata.Name) {
@@ -546,6 +556,7 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
}
groups = append(groups, g)
}
+ l.l.Info().Msgf("found groups needs to do lifecycle processing: %v, all
groups: %v", getGroupNames(groups), allGroupNames)
return groups, nil
}
@@ -751,3 +762,85 @@ func (l *lifecycleService)
deleteExpiredMeasureSegments(ctx context.Context, g *
progress.MarkMeasureGroupDeleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
+
+func (l *lifecycleService) deleteExpiredTraceSegments(ctx context.Context, g
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
+ if progress.IsTraceGroupDeleted(g.Metadata.Name) {
+ l.l.Info().Msgf("skipping already deleted trace group segments:
%s", g.Metadata.Name)
+ return
+ }
+
+ resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*tracev1.DeleteExpiredSegmentsResponse, error) {
+ client := tracev1.NewTraceServiceClient(conn)
+ return client.DeleteExpiredSegments(ctx,
&tracev1.DeleteExpiredSegmentsRequest{
+ Group: g.Metadata.Name,
+ SegmentSuffixes: segmentSuffixes,
+ })
+ })
+ if err != nil {
+ l.l.Error().Err(err).Msgf("failed to delete expired segments in
group %s, suffixes: %s", g.Metadata.Name, segmentSuffixes)
+ return
+ }
+
+ l.l.Warn().Msgf("deleted %d expired segments in group %s, suffixes:
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
+ progress.MarkTraceGroupDeleted(g.Metadata.Name)
+ progress.Save(l.progressFilePath, l.l)
+}
+
+func (l *lifecycleService) processTraceGroup(ctx context.Context, g
*commonv1.Group, traceDir string,
+ nodes []*databasev1.Node, labels map[string]string, progress *Progress,
+) {
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ if err != nil {
+ l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
+ return
+ }
+ defer group.Close()
+
+ tr := l.getRemovalSegmentsTimeRange(group)
+ if tr.Start.IsZero() && tr.End.IsZero() {
+ l.l.Info().Msgf("no removal segments time range for group %s,
skipping trace migration", g.Metadata.Name)
+ progress.MarkGroupCompleted(g.Metadata.Name)
+ progress.Save(l.progressFilePath, l.l)
+ return
+ }
+
+ // Try file-based migration first
+ segmentSuffixes, err := l.processTraceGroupFileBased(ctx, group,
traceDir, tr, progress)
+ if err != nil {
+ l.l.Error().Err(err).Msgf("failed to migrate trace group %s
using file-based approach", g.Metadata.Name)
+ return
+ }
+
+ l.l.Info().Msgf("deleting expired trace segments for group: %s",
g.Metadata.Name)
+ l.deleteExpiredTraceSegments(ctx, g, segmentSuffixes, progress)
+ progress.MarkGroupCompleted(g.Metadata.Name)
+ progress.Save(l.progressFilePath, l.l)
+}
+
+func (l *lifecycleService) processTraceGroupFileBased(_ context.Context, g
*GroupConfig, traceDir string,
+ tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
+ if progress.IsTraceGroupDeleted(g.Metadata.Name) {
+ l.l.Info().Msgf("skipping already completed file-based trace
migration for group: %s", g.Metadata.Name)
+ return nil, nil
+ }
+
+ l.l.Info().Msgf("starting file-based trace migration for group: %s",
g.Metadata.Name)
+
+ rootDir := filepath.Join(traceDir, g.Metadata.Name)
+ // skip the counting if the tsdb root path does not exist
+ // may no data found in the snapshot
+ if _, err := os.Stat(rootDir); err != nil && errors.Is(err,
os.ErrNotExist) {
+ l.l.Info().Msgf("skipping file-based trace migration for group
because is empty in the snapshot dir: %s", g.Metadata.Name)
+ return nil, nil
+ }
+
+ // Use the file-based migration with existing visitor pattern
+ segmentSuffixes, err := migrateTraceWithFileBasedAndProgress(rootDir,
*tr, g, l.l, progress, int(l.chunkSize))
+ if err != nil {
+ return nil, fmt.Errorf("file-based trace migration failed: %w",
err)
+ }
+
+ l.l.Info().Msgf("completed file-based trace migration for group: %s",
g.Metadata.Name)
+ return segmentSuffixes, nil
+}
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index e54d8a49..6e405e87 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -38,10 +38,10 @@ import (
"github.com/apache/skywalking-banyandb/pkg/node"
)
-func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress)
(streamDir string, measureDir string, err error) {
+func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress)
(streamDir string, measureDir string, traceDir string, err error) {
// If we already have snapshot dirs in Progress, reuse them
- if p.SnapshotStreamDir != "" || p.SnapshotMeasureDir != "" {
- return p.SnapshotStreamDir, p.SnapshotMeasureDir, nil
+ if p.SnapshotStreamDir != "" || p.SnapshotMeasureDir != "" ||
p.SnapshotTraceDir != "" {
+ return p.SnapshotStreamDir, p.SnapshotMeasureDir,
p.SnapshotTraceDir, nil
}
snapshotGroups := make([]*databasev1.SnapshotRequest_Group, 0,
len(groups))
@@ -53,10 +53,10 @@ func (l *lifecycleService) getSnapshots(groups
[]*commonv1.Group, p *Progress) (
}
snn, err := snapshot.Get(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
snapshotGroups...)
if err != nil {
- return "", "", err
+ return "", "", "", err
}
for _, snp := range snn {
- snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot,
l.measureRoot, "", "")
+ snapshotDir, errDir := snapshot.Dir(snp, l.streamRoot,
l.measureRoot, "", l.traceRoot)
if errDir != nil {
l.l.Error().Err(errDir).Msgf("Failed to get snapshot
directory for %s", snp.Name)
continue
@@ -71,11 +71,15 @@ func (l *lifecycleService) getSnapshots(groups
[]*commonv1.Group, p *Progress) (
if snp.Catalog == commonv1.Catalog_CATALOG_MEASURE {
measureDir = snapshotDir
}
+ if snp.Catalog == commonv1.Catalog_CATALOG_TRACE {
+ traceDir = snapshotDir
+ }
}
// Save the new snapshot paths into Progress
p.SnapshotStreamDir = streamDir
p.SnapshotMeasureDir = measureDir
- return streamDir, measureDir, nil
+ p.SnapshotTraceDir = traceDir
+ return streamDir, measureDir, traceDir, nil
}
// GroupConfig encapsulates the parsed lifecycle configuration for a Group.
diff --git a/banyand/backup/lifecycle/trace_migration_visitor.go
b/banyand/backup/lifecycle/trace_migration_visitor.go
new file mode 100644
index 00000000..3b767a86
--- /dev/null
+++ b/banyand/backup/lifecycle/trace_migration_visitor.go
@@ -0,0 +1,593 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "strings"
+
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/api/data"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+ "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/banyand/trace"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/node"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// traceMigrationVisitor implements the trace.Visitor interface for file-based
migration.
+type traceMigrationVisitor struct {
+ selector node.Selector // From
parseGroup - node selector
+ client queue.Client // From
parseGroup - queue client
+ chunkedClients map[string]queue.ChunkedSyncClient // Per-node
chunked sync clients cache
+ logger *logger.Logger
+ progress *Progress // Progress tracker for migration states
+ lfs fs.FileSystem
+ group string
+ targetShardNum uint32 // From parseGroup - target
shard count
+ replicas uint32 // From parseGroup - replica
count
+ chunkSize int // Chunk size for streaming
data
+ targetStageInterval storage.IntervalRule // NEW: target stage's segment
interval
+}
+
+// newTraceMigrationVisitor creates a new file-based migration visitor.
+func newTraceMigrationVisitor(group *commonv1.Group, shardNum, replicas
uint32, selector node.Selector, client queue.Client,
+ l *logger.Logger, progress *Progress, chunkSize int,
targetStageInterval storage.IntervalRule,
+) *traceMigrationVisitor {
+ return &traceMigrationVisitor{
+ group: group.Metadata.Name,
+ targetShardNum: shardNum,
+ replicas: replicas,
+ selector: selector,
+ client: client,
+ chunkedClients: make(map[string]queue.ChunkedSyncClient),
+ logger: l,
+ progress: progress,
+ chunkSize: chunkSize,
+ targetStageInterval: targetStageInterval,
+ lfs: fs.NewLocalFileSystem(),
+ }
+}
+
+// VisitSeries implements trace.Visitor.
+func (mv *traceMigrationVisitor) VisitSeries(segmentTR *timestamp.TimeRange,
seriesIndexPath string, shardIDs []common.ShardID) error {
+ mv.logger.Info().
+ Str("path", seriesIndexPath).
+ Int64("min_timestamp", segmentTR.Start.UnixNano()).
+ Int64("max_timestamp", segmentTR.End.UnixNano()).
+ Str("group", mv.group).
+ Msg("migrating trace series index")
+
+ // Find all *.seg segment files in the seriesIndexPath
+ entries := mv.lfs.ReadDir(seriesIndexPath)
+
+ var segmentFiles []string
+ for _, entry := range entries {
+ if !entry.IsDir() && strings.HasSuffix(entry.Name(), ".seg") {
+ segmentFiles = append(segmentFiles, entry.Name())
+ }
+ }
+
+ if len(segmentFiles) == 0 {
+ mv.logger.Debug().
+ Str("path", seriesIndexPath).
+ Msg("no .seg files found in trace series index path")
+ return nil
+ }
+
+ mv.logger.Info().
+ Int("segment_count", len(segmentFiles)).
+ Str("path", seriesIndexPath).
+ Msg("found trace segment files for migration")
+
+ // Set the total number of series segments for progress tracking
+ mv.SetTraceSeriesCount(len(segmentFiles))
+
+ // Calculate ALL target segments this series index should go to
+ targetSegments := calculateTargetSegments(
+ segmentTR.Start.UnixNano(),
+ segmentTR.End.UnixNano(),
+ mv.targetStageInterval,
+ )
+
+ mv.logger.Info().
+ Int("target_segments_count", len(targetSegments)).
+ Int64("series_min_ts", segmentTR.Start.UnixNano()).
+ Int64("series_max_ts", segmentTR.End.UnixNano()).
+ Str("group", mv.group).
+ Msg("migrating trace series index to multiple target segments")
+
+ // Send series index to EACH target segment that overlaps with its time
range
+ for i, targetSegmentTime := range targetSegments {
+ // Create StreamingPartData for this segment
+ files := make([]fileInfo, 0, len(segmentFiles))
+ // Process each segment file
+ for _, segmentFileName := range segmentFiles {
+ // Extract segment ID from filename (remove .seg
extension)
+ fileSegmentIDStr := strings.TrimSuffix(segmentFileName,
".seg")
+
+ // Parse hex segment ID
+ segmentID, err := strconv.ParseUint(fileSegmentIDStr,
16, 64)
+ if err != nil {
+ mv.logger.Error().
+ Str("filename", segmentFileName).
+ Str("id_str", fileSegmentIDStr).
+ Err(err).
+ Msg("failed to parse segment ID from
filename")
+ continue
+ }
+
+ // Convert segmentID to ShardID for progress tracking
+ shardID := common.ShardID(segmentID)
+
+ // Check if this segment has already been completed for
this target segment
+ segmentIDStr := getSegmentTimeRange(targetSegmentTime,
mv.targetStageInterval).String()
+ if mv.progress.IsTraceSeriesCompleted(mv.group,
segmentIDStr, shardID) {
+ mv.logger.Debug().
+ Uint64("segment_id", segmentID).
+ Str("filename", segmentFileName).
+ Time("target_segment",
targetSegmentTime).
+ Str("group", mv.group).
+ Msg("trace series segment already
completed for this target segment, skipping")
+ continue
+ }
+
+ mv.logger.Info().
+ Uint64("segment_id", segmentID).
+ Str("filename", segmentFileName).
+ Time("target_segment", targetSegmentTime).
+ Str("group", mv.group).
+ Msg("migrating trace series segment file")
+
+ // Create file reader for the segment file
+ segmentFilePath := filepath.Join(seriesIndexPath,
segmentFileName)
+ segmentFile, err := mv.lfs.OpenFile(segmentFilePath)
+ if err != nil {
+ errorMsg := fmt.Sprintf("failed to open trace
segment file %s: %v", segmentFilePath, err)
+ mv.progress.MarkTraceSeriesError(mv.group,
segmentIDStr, shardID, errorMsg)
+ mv.logger.Error().
+ Str("path", segmentFilePath).
+ Err(err).
+ Msg("failed to open trace segment file")
+ return fmt.Errorf("failed to open trace segment
file %s: %w", segmentFilePath, err)
+ }
+
+ // Close the file reader
+ defer func() {
+ if i == len(targetSegments)-1 { // Only close
on last iteration
+ segmentFile.Close()
+ }
+ }()
+
+ files = append(files, fileInfo{
+ name: segmentFileName,
+ file: segmentFile,
+ })
+
+ mv.logger.Info().
+ Uint64("segment_id", segmentID).
+ Str("filename", segmentFileName).
+ Time("target_segment", targetSegmentTime).
+ Str("group", mv.group).
+ Int("completed_segments",
mv.progress.GetTraceSeriesProgress(mv.group)).
+ Int("total_segments",
mv.progress.GetTraceSeriesCount(mv.group)).
+ Msgf("trace series segment migration completed
for target segment %d/%d", i+1, len(targetSegments))
+ }
+
+ // Send segment file to each shard in shardIDs for this target
segment
+ segmentIDStr := getSegmentTimeRange(targetSegmentTime,
mv.targetStageInterval).String()
+ for _, shardID := range shardIDs {
+ targetShardID :=
mv.calculateTargetShardID(uint32(shardID))
+ ff := make([]queue.FileInfo, 0, len(files))
+ for _, file := range files {
+ ff = append(ff, queue.FileInfo{
+ Name: file.name,
+ Reader: file.file.SequentialRead(),
+ })
+ }
+ partData :=
mv.createStreamingSegmentFromFiles(targetShardID, ff, segmentTR,
data.TopicTraceSeriesSync.String())
+
+ // Stream segment to target shard replicas
+ if err := mv.streamPartToTargetShard(targetShardID,
[]queue.StreamingPartData{partData}); err != nil {
+ errorMsg := fmt.Sprintf("failed to stream trace
segment to target shard %d: %v", targetShardID, err)
+ mv.progress.MarkTraceSeriesError(mv.group,
segmentIDStr, shardID, errorMsg)
+ return fmt.Errorf("failed to stream trace
segment to target shard %d: %w", targetShardID, err)
+ }
+ // Mark segment as completed for this specific target
segment
+ mv.progress.MarkTraceSeriesCompleted(mv.group,
segmentIDStr, shardID)
+ }
+ }
+
+ return nil
+}
+
+// VisitShard implements trace.Visitor - core and sidx migration logic.
+func (mv *traceMigrationVisitor) VisitShard(timestampTR *timestamp.TimeRange,
sourceShardID common.ShardID, shardPath string) error {
+ segmentIDStr := timestampTR.String()
+ if mv.progress.IsTraceShardCompleted(mv.group, shardPath,
sourceShardID) {
+ mv.logger.Debug().
+ Str("shard_path", shardPath).
+ Str("group", mv.group).
+ Uint32("source_shard", uint32(sourceShardID)).
+ Msg("trace shard already completed for this target
segment, skipping")
+ return nil
+ }
+ allParts := make([]queue.StreamingPartData, 0)
+
+ sidxPartData, sidxReleases, err :=
mv.generateAllSidxPartData(timestampTR, sourceShardID, filepath.Join(shardPath,
"sidx"))
+ if err != nil {
+ return fmt.Errorf("failed to generate sidx part data: %s: %w",
shardPath, err)
+ }
+ defer func() {
+ for _, release := range sidxReleases {
+ release()
+ }
+ }()
+ allParts = append(allParts, sidxPartData...)
+
+ partDatas, partDataReleases, err :=
mv.generateAllPartData(sourceShardID, shardPath)
+ if err != nil {
+ return fmt.Errorf("failed to generate core par data: %s: %w",
shardPath, err)
+ }
+ defer func() {
+ for _, release := range partDataReleases {
+ release()
+ }
+ }()
+ allParts = append(allParts, partDatas...)
+
+ targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+
+ sort.Slice(allParts, func(i, j int) bool {
+ if allParts[i].ID == allParts[j].ID {
+ return allParts[i].PartType < allParts[j].PartType
+ }
+ return allParts[i].ID < allParts[j].ID
+ })
+
+ // Stream part to target segment
+ if err := mv.streamPartToTargetShard(targetShardID, allParts); err !=
nil {
+ errorMsg := fmt.Errorf("failed to stream to target shard %d:
%w", targetShardID, err)
+ mv.progress.MarkTraceShardError(mv.group, segmentIDStr,
sourceShardID, errorMsg.Error())
+ return fmt.Errorf("failed to stream trace shard to target
segment shard: %w", err)
+ }
+
+ // Mark shard as completed for this target segment
+ mv.progress.MarkTraceShardCompleted(mv.group, segmentIDStr,
sourceShardID)
+ mv.logger.Info().
+ Str("group", mv.group).
+ Msgf("trace shard migration completed for target segment")
+
+ return nil
+}
+
+func (mv *traceMigrationVisitor) generateAllSidxPartData(
+ segmentTR *timestamp.TimeRange,
+ sourceShardID common.ShardID,
+ sidxPath string,
+) ([]queue.StreamingPartData, []func(), error) {
+ // Sidx structure: sidx/{index-name}/{part-id}/files
+ // Find all index directories in the sidx directory
+ entries := mv.lfs.ReadDir(sidxPath)
+
+ parts := make([]queue.StreamingPartData, 0, len(entries))
+ releases := make([]func(), 0, len(entries))
+
+ var indexDirs []string
+ for _, entry := range entries {
+ if entry.IsDir() {
+ indexDirs = append(indexDirs, entry.Name())
+ }
+ }
+
+ if len(indexDirs) == 0 {
+ mv.logger.Debug().
+ Str("path", sidxPath).
+ Msg("no index directories found in trace sidx
directory")
+ return nil, nil, nil
+ }
+
+ mv.logger.Info().
+ Int("index_count", len(indexDirs)).
+ Str("path", sidxPath).
+ Uint32("source_shard", uint32(sourceShardID)).
+ Msg("found trace sidx indexes for migration")
+
+ // Calculate target shard ID
+ targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+
+ // Process each index directory
+ for _, indexName := range indexDirs {
+ indexDirPath := filepath.Join(sidxPath, indexName)
+
+ // Find all part directories in this index
+ partEntries := mv.lfs.ReadDir(indexDirPath)
+ var partDirs []string
+ for _, entry := range partEntries {
+ if entry.IsDir() {
+ // Validate it's a valid hex string (part ID)
+ if _, err := strconv.ParseUint(entry.Name(),
16, 64); err == nil {
+ partDirs = append(partDirs,
entry.Name())
+ }
+ }
+ }
+
+ if len(partDirs) == 0 {
+ mv.logger.Debug().
+ Str("index_name", indexName).
+ Str("path", indexDirPath).
+ Msg("no part directories found in trace sidx
index")
+ continue
+ }
+
+ mv.logger.Info().
+ Str("index_name", indexName).
+ Int("part_count", len(partDirs)).
+ Str("path", indexDirPath).
+ Msg("found trace sidx parts for index")
+
+ // Process each part directory
+ for _, partDirName := range partDirs {
+ partID, _ := strconv.ParseUint(partDirName, 16, 64)
+ partPath := filepath.Join(indexDirPath, partDirName)
+
+ // Create file readers for this sidx part
+ files, release :=
sidx.CreatePartFileReaderFromPath(partPath, mv.lfs)
+
+ // Create StreamingPartData with PartType = index name
+ partData, err := sidx.ParsePartMetadata(mv.lfs,
partPath)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to parse
sidx part metadata: %s: %w", partPath, err)
+ }
+ partData.Group = mv.group
+ partData.ShardID = targetShardID
+ partData.Topic = data.TopicTracePartSync.String()
+ partData.ID = partID
+ partData.PartType = indexName // Use index name as
PartType (not "core")
+ partData.Files = files
+ partData.MinTimestamp = segmentTR.Start.UnixNano()
+ partData.MaxTimestamp = segmentTR.End.UnixNano()
+
+ mv.logger.Debug().
+ Str("index_name", indexName).
+ Uint64("part_id", partID).
+ Uint32("source_shard", uint32(sourceShardID)).
+ Uint32("target_shard", targetShardID).
+ Str("group", mv.group).
+ Msg("generated trace sidx part data")
+
+ parts = append(parts, *partData)
+ releases = append(releases, release)
+ }
+ }
+
+ return parts, releases, nil
+}
+
+func (mv *traceMigrationVisitor) generateAllPartData(sourceShardID
common.ShardID, shardPath string) ([]queue.StreamingPartData, []func(), error) {
+ entries := mv.lfs.ReadDir(shardPath)
+
+ allParts := make([]queue.StreamingPartData, 0)
+ allReleases := make([]func(), 0)
+ for _, entry := range entries {
+ if !entry.IsDir() {
+ continue
+ }
+
+ name := entry.Name()
+ // Check if this is a part directory (16-character hex string)
+ if len(name) != 16 {
+ continue // Skip non-part entries
+ }
+
+ // Validate it's a valid hex string (part ID)
+ if _, err := strconv.ParseUint(name, 16, 64); err != nil {
+ continue // Skip invalid part entries
+ }
+
+ partPath := filepath.Join(shardPath, name)
+ parts, releases, err := mv.generatePartData(sourceShardID,
partPath)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to generate part
data for path %s: %w", partPath, err)
+ }
+ allParts = append(allParts, parts...)
+ allReleases = append(allReleases, releases...)
+ }
+
+ return allParts, allReleases, nil
+}
+
+func (mv *traceMigrationVisitor) generatePartData(sourceShardID
common.ShardID, partPath string) ([]queue.StreamingPartData, []func(), error) {
+ partData, err := trace.ParsePartMetadata(mv.lfs, partPath)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to parse trace part
metadata: %w", err)
+ }
+ partID, err := parsePartIDFromPath(partPath)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to parse part ID from path:
%w", err)
+ }
+ // Calculate ALL target segments this part should go to
+ targetSegments := calculateTargetSegments(
+ partData.MinTimestamp,
+ partData.MaxTimestamp,
+ mv.targetStageInterval,
+ )
+
+ mv.logger.Info().
+ Uint64("part_id", partID).
+ Uint32("source_shard", uint32(sourceShardID)).
+ Int("target_segments_count", len(targetSegments)).
+ Int64("part_min_ts", partData.MinTimestamp).
+ Int64("part_max_ts", partData.MaxTimestamp).
+ Str("group", mv.group).
+ Msg("generate trace part files to multiple target segments")
+
+ parts := make([]queue.StreamingPartData, 0)
+ releases := make([]func(), 0)
+ // Send part to EACH target segment that overlaps with its time range
+ for i, targetSegmentTime := range targetSegments {
+ targetShardID :=
mv.calculateTargetShardID(uint32(sourceShardID))
+
+ // Create file readers for this part
+ files, release := trace.CreatePartFileReaderFromPath(partPath,
mv.lfs)
+
+ // Clone part data for this target segment
+ targetPartData := partData
+ targetPartData.ID = partID
+ targetPartData.Group = mv.group
+ targetPartData.ShardID = targetShardID
+ targetPartData.Topic = data.TopicTracePartSync.String()
+ targetPartData.Files = files
+ targetPartData.PartType = trace.PartTypeCore
+ parts = append(parts, targetPartData)
+ releases = append(releases, release)
+
+ mv.logger.Info().
+ Uint64("part_id", partID).
+ Time("target_segment", targetSegmentTime).
+ Str("group", mv.group).
+ Msgf("generated trace part file migration completed for
target segment %d/%d", i+1, len(targetSegments))
+ }
+
+ return parts, releases, nil
+}
+
+// calculateTargetShardID maps source shard ID to target shard ID.
+func (mv *traceMigrationVisitor) calculateTargetShardID(sourceShardID uint32)
uint32 {
+ return calculateTargetShardID(sourceShardID, mv.targetShardNum)
+}
+
+// streamPartToTargetShard sends part data to all replicas of the target shard.
+func (mv *traceMigrationVisitor) streamPartToTargetShard(targetShardID uint32,
partData []queue.StreamingPartData) error {
+ copies := mv.replicas + 1
+
+ // Send to all replicas using the exact pattern from steps.go:219-236
+ for replicaID := uint32(0); replicaID < copies; replicaID++ {
+ // Use selector.Pick exactly like steps.go:220
+ nodeID, err := mv.selector.Pick(mv.group, "", targetShardID,
replicaID)
+ if err != nil {
+ return fmt.Errorf("failed to pick node for shard %d
replica %d: %w", targetShardID, replicaID, err)
+ }
+
+ // Stream part data to target node using chunked sync
+ if err := mv.streamPartToNode(nodeID, targetShardID, partData);
err != nil {
+ return fmt.Errorf("failed to stream trace part to node
%s: %w", nodeID, err)
+ }
+ }
+
+ return nil
+}
+
+// streamPartToNode streams part data to a specific target node.
+func (mv *traceMigrationVisitor) streamPartToNode(nodeID string, targetShardID
uint32, partData []queue.StreamingPartData) error {
+ // Get or create chunked client for this node (cache hit optimization)
+ chunkedClient, exists := mv.chunkedClients[nodeID]
+ if !exists {
+ var err error
+ // Create new chunked sync client via queue.Client
+ chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID,
uint32(mv.chunkSize))
+ if err != nil {
+ return fmt.Errorf("failed to create chunked sync client
for node %s: %w", nodeID, err)
+ }
+ mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse
+ }
+
+ // Stream using chunked transfer (same as syncer.go:202)
+ ctx := context.Background()
+ result, err := chunkedClient.SyncStreamingParts(ctx, partData)
+ if err != nil {
+ return fmt.Errorf("failed to sync streaming parts to node %s:
%w", nodeID, err)
+ }
+
+ if !result.Success {
+ return fmt.Errorf("chunked sync partially failed: %v",
result.FailedParts)
+ }
+
+ mv.logger.Info().
+ Str("node", nodeID).
+ Str("session", result.SessionID).
+ Uint64("bytes", result.TotalBytes).
+ Int64("duration_ms", result.DurationMs).
+ Uint32("chunks", result.ChunksCount).
+ Uint32("parts", result.PartsCount).
+ Uint32("target_shard", targetShardID).
+ Str("group", mv.group).
+ Msg("file-based trace migration shard completed successfully")
+
+ return nil
+}
+
+// Close cleans up all chunked sync clients.
+func (mv *traceMigrationVisitor) Close() error {
+ for nodeID, client := range mv.chunkedClients {
+ if err := client.Close(); err != nil {
+ mv.logger.Warn().Err(err).Str("node",
nodeID).Msg("failed to close chunked sync client")
+ }
+ }
+ mv.chunkedClients = make(map[string]queue.ChunkedSyncClient)
+ return nil
+}
+
+// createStreamingSegmentFromFiles creates StreamingPartData from segment
files.
+func (mv *traceMigrationVisitor) createStreamingSegmentFromFiles(
+ targetShardID uint32,
+ files []queue.FileInfo,
+ segmentTR *timestamp.TimeRange,
+ topic string,
+) queue.StreamingPartData {
+ segmentData := queue.StreamingPartData{
+ Group: mv.group,
+ ShardID: targetShardID, // Use calculated target shard
+ Topic: topic, // Use the new topic
+ Files: files,
+ MinTimestamp: segmentTR.Start.UnixNano(),
+ MaxTimestamp: segmentTR.End.UnixNano(),
+ }
+
+ return segmentData
+}
+
+// SetTraceShardCount sets the total number of shards for the current trace.
+func (mv *traceMigrationVisitor) SetTraceShardCount(totalShards int) {
+ if mv.progress != nil {
+ mv.progress.SetTraceShardCount(mv.group, totalShards)
+ mv.logger.Info().
+ Str("group", mv.group).
+ Int("total_shards", totalShards).
+ Msg("set trace part count for progress tracking")
+ }
+}
+
+// SetTraceSeriesCount sets the total number of series segments for the
current trace.
+func (mv *traceMigrationVisitor) SetTraceSeriesCount(totalSegments int) {
+ if mv.progress != nil {
+ mv.progress.SetTraceSeriesCount(mv.group, totalSegments)
+ mv.logger.Info().
+ Str("group", mv.group).
+ Int("total_segments", totalSegments).
+ Msg("set trace series count for progress tracking")
+ }
+}
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index 8447c575..96be5642 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -20,6 +20,7 @@ package sidx
import (
"encoding/json"
"fmt"
+ "path"
"path/filepath"
"sort"
"strings"
@@ -27,6 +28,7 @@ import (
"github.com/apache/skywalking-banyandb/api/common"
internalencoding
"github.com/apache/skywalking-banyandb/banyand/internal/encoding"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/encoding"
@@ -163,6 +165,136 @@ func (p *part) loadPartMetadata() error {
return nil
}
+// ParsePartMetadata reads and parses part metadata from manifest.json in the
specified parent path.
+func ParsePartMetadata(fs fs.FileSystem, parentPath string)
(*queue.StreamingPartData, error) {
+ manifestData, err := fs.Read(filepath.Join(parentPath,
manifestFilename))
+ if err != nil {
+ return nil, fmt.Errorf("failed to read manifest.json: %w", err)
+ }
+ // Parse JSON manifest
+ pm := &partMetadata{}
+ if unmarshalErr := json.Unmarshal(manifestData, pm); unmarshalErr !=
nil {
+ return nil, fmt.Errorf("failed to unmarshal manifest.json: %w",
unmarshalErr)
+ }
+ return &queue.StreamingPartData{
+ ID: pm.ID,
+ CompressedSizeBytes: pm.CompressedSizeBytes,
+ UncompressedSizeBytes: pm.UncompressedSizeBytes,
+ TotalCount: pm.TotalCount,
+ BlocksCount: pm.BlocksCount,
+ MinKey: pm.MinKey,
+ MaxKey: pm.MaxKey,
+ }, nil
+}
+
+// CreatePartFileReaderFromPath opens all files in a part directory and
returns their FileInfo and a cleanup function.
+func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem)
([]queue.FileInfo, func()) {
+ var files []queue.FileInfo
+ var readers []fs.Reader
+
+ // Core trace files (required files)
+ metaPath := path.Join(partPath, metaFilename)
+ metaReader, err := lfs.OpenFile(metaPath)
+ if err != nil {
+ logger.Panicf("cannot open trace meta file %q: %s", metaPath,
err)
+ }
+ readers = append(readers, metaReader)
+ files = append(files, queue.FileInfo{
+ Name: SidxMetaName,
+ Reader: metaReader.SequentialRead(),
+ })
+
+ primaryPath := path.Join(partPath, primaryFilename)
+ primaryReader, err := lfs.OpenFile(primaryPath)
+ if err != nil {
+ logger.Panicf("cannot open trace primary file %q: %s",
primaryPath, err)
+ }
+ readers = append(readers, primaryReader)
+ files = append(files, queue.FileInfo{
+ Name: SidxPrimaryName,
+ Reader: primaryReader.SequentialRead(),
+ })
+
+ keysPath := path.Join(partPath, keysFilename)
+ if keysReader, err := lfs.OpenFile(keysPath); err == nil {
+ readers = append(readers, keysReader)
+ files = append(files, queue.FileInfo{
+ Name: SidxKeysName,
+ Reader: keysReader.SequentialRead(),
+ })
+ }
+
+ dataPath := path.Join(partPath, dataFilename)
+ if dataReader, err := lfs.OpenFile(dataPath); err == nil {
+ readers = append(readers, dataReader)
+ files = append(files, queue.FileInfo{
+ Name: SidxDataName,
+ Reader: dataReader.SequentialRead(),
+ })
+ }
+
+ // Dynamic tag files (*.t and *.tm)
+ ee := lfs.ReadDir(partPath)
+ for _, e := range ee {
+ if e.IsDir() {
+ continue
+ }
+
+ // Tag metadata files (.tm)
+ if filepath.Ext(e.Name()) == tagMetadataExtension {
+ tmPath := path.Join(partPath, e.Name())
+ tmReader, err := lfs.OpenFile(tmPath)
+ if err != nil {
+ logger.Panicf("cannot open trace tag metadata
file %q: %s", tmPath, err)
+ }
+ readers = append(readers, tmReader)
+ tagName := removeExt(e.Name(), tagMetadataExtension)
+ files = append(files, queue.FileInfo{
+ Name: TagMetadataPrefix + tagName,
+ Reader: tmReader.SequentialRead(),
+ })
+ }
+
+ // Tag family files (.tf)
+ if filepath.Ext(e.Name()) == tagFilterExtension {
+ tPath := path.Join(partPath, e.Name())
+ tReader, err := lfs.OpenFile(tPath)
+ if err != nil {
+ logger.Panicf("cannot open trace tag file %q:
%s", tPath, err)
+ }
+ readers = append(readers, tReader)
+ tagName := removeExt(e.Name(), tagFilterExtension)
+ files = append(files, queue.FileInfo{
+ Name: TagFilterPrefix + tagName,
+ Reader: tReader.SequentialRead(),
+ })
+ }
+
+ // Tag data files (.td)
+ if filepath.Ext(e.Name()) == tagDataExtension {
+ tdPath := path.Join(partPath, e.Name())
+ tdReader, err := lfs.OpenFile(tdPath)
+ if err != nil {
+ logger.Panicf("cannot open trace tag data file
%q: %s", tdPath, err)
+ }
+ readers = append(readers, tdReader)
+ tagName := removeExt(e.Name(), tagDataExtension)
+ files = append(files, queue.FileInfo{
+ Name: TagDataPrefix + tagName,
+ Reader: tdReader.SequentialRead(),
+ })
+ }
+ }
+
+ cleanup := func() {
+ for _, reader := range readers {
+ fs.MustClose(reader)
+ }
+ }
+
+ return files, cleanup
+}
+
// loadPrimaryBlockMetadata reads and parses primary block metadata from
meta.bin.
func (p *part) loadPrimaryBlockMetadata() {
// Load primary block metadata from meta.bin file (compressed
primaryBlockMetadata)
@@ -784,3 +916,7 @@ func (spc *SyncPartContext) Close() {
spc.writers = nil
spc.memPart = nil
}
+
+func removeExt(nameWithExt, ext string) string {
+ return nameWithExt[:len(nameWithExt)-len(ext)]
+}
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index a047bb7b..3754000e 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -262,6 +262,8 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) error
{
}
}()
+ log.Info().Int("segment_count", len(segments)).Str("db_location",
d.location).
+ Msgf("taking file snapshot for %s", dst)
for _, seg := range segments {
segDir := filepath.Base(seg.location)
segPath := filepath.Join(dst, segDir)
diff --git a/banyand/measure/snapshot.go b/banyand/measure/snapshot.go
index 3a3a9972..4389d7c0 100644
--- a/banyand/measure/snapshot.go
+++ b/banyand/measure/snapshot.go
@@ -21,7 +21,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "os"
"path/filepath"
+ "sort"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -215,6 +218,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string,
groupName string) error {
return nil
}
+// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+ segDirs := make(map[string]bool)
+
+ walkErr := filepath.Walk(rootDir, func(currentPath string, info
os.FileInfo, err error) error {
+ if err != nil {
+ // If we can't read a directory, log but continue
+ return nil
+ }
+
+ // Skip files, only process directories
+ if !info.IsDir() {
+ return nil
+ }
+
+ // Get the directory name
+ dirName := filepath.Base(currentPath)
+
+ // Check if this is a seg-* directory
+ if strings.HasPrefix(dirName, "seg-") {
+ // Get relative path from root
+ relPath, relErr := filepath.Rel(rootDir, currentPath)
+ if relErr != nil {
+ return fmt.Errorf("failed to get relative path
for %s: %w", currentPath, relErr)
+ }
+
+ // Add to our collection
+ segDirs[relPath] = true
+
+ // Don't recurse into seg-* directories
+ return filepath.SkipDir
+ }
+
+ return nil
+ })
+
+ if walkErr != nil {
+ return nil, fmt.Errorf("failed to walk directory %s: %w",
rootDir, walkErr)
+ }
+
+ return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched,
onlyInSnapshot, onlyInData []string) {
+ // Find directories in both sets (matched)
+ for dir := range snapshotDirs {
+ if dataDirs[dir] {
+ matched = append(matched, dir)
+ } else {
+ onlyInSnapshot = append(onlyInSnapshot, dir)
+ }
+ }
+
+ // Find directories only in data
+ for dir := range dataDirs {
+ if !snapshotDirs[dir] {
+ onlyInData = append(onlyInData, dir)
+ }
+ }
+
+ // Sort for consistent output
+ sort.Strings(matched)
+ sort.Strings(onlyInSnapshot)
+ sort.Strings(onlyInData)
+
+ return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data
directory
+// to verify that seg-* directories are consistent. Only logs differences,
does not return errors.
+func (s *snapshotListener) compareSnapshotWithData(snapshotDir, dataDir,
groupName string) {
+ // Collect seg-* directories from snapshot
+ snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+ if snapshotErr != nil {
+ s.s.l.Warn().Err(snapshotErr).
+ Str("group", groupName).
+ Str("snapshotDir", snapshotDir).
+ Msg("failed to collect seg-* directories from snapshot,
skipping comparison")
+ return
+ }
+
+ // Collect seg-* directories from data
+ dataDirs, dataErr := collectSegDirs(dataDir)
+ if dataErr != nil {
+ s.s.l.Warn().Err(dataErr).
+ Str("group", groupName).
+ Str("dataDir", dataDir).
+ Msg("failed to collect seg-* directories from data,
skipping comparison")
+ return
+ }
+
+ // Compare the directories
+ matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs,
dataDirs)
+
+ // Log consolidated comparison results at Info level
+ s.s.l.Info().
+ Str("group", groupName).
+ Int("matched", len(matched)).
+ Int("onlyInSnapshot", len(onlyInSnapshot)).
+ Int("onlyInData", len(onlyInData)).
+ Strs("matchedDirs", matched).
+ Strs("onlyInSnapshotDirs", onlyInSnapshot).
+ Strs("onlyInDataDirs", onlyInData).
+ Msgf("snapshot comparison for group %s, data dir: %s, snapshot
dir: %s",
+ groupName, dataDir, snapshotDir)
+}
+
type snapshotListener struct {
*bus.UnImplementedHealthyListener
s *standalone
@@ -254,11 +368,17 @@ func (s *snapshotListener) Rev(ctx context.Context,
message bus.Message) bus.Mes
return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
default:
}
- if errGroup :=
s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
- s.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+ groupName := g.GetSchema().Metadata.Name
+ snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
+ if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ s.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+
+ // Compare snapshot with data directory to verify consistency
+ dataPath := filepath.Join(s.s.dataPath, groupName)
+ s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
snp := &databasev1.Snapshot{
Name: sn,
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 86358908..0d191027 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -453,6 +453,15 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
var gg []resourceSchema.Group
if len(groups) == 0 {
gg = d.s.schemaRepo.LoadAllGroups()
+ n := ""
+ for _, g := range gg {
+ if n != "" {
+ n += ","
+ }
+ n += g.GetSchema().Metadata.Name
+ }
+
+ log.Info().Msgf("loaded all snapshots: %s", n)
} else {
for _, g := range groups {
if g.Catalog != commonv1.Catalog_CATALOG_MEASURE {
@@ -464,6 +473,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
}
gg = append(gg, group)
}
+ log.Info().Msgf("loaded groups: %s", gg)
}
if len(gg) == 0 {
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
@@ -489,6 +499,7 @@ func (d *dataSnapshotListener) Rev(ctx context.Context,
message bus.Message) bus
Name: sn,
Catalog: commonv1.Catalog_CATALOG_MEASURE,
}
+ log.Info().Msgf("snapshot %s created", sn)
if err != nil {
snp.Error = err.Error()
}
diff --git a/banyand/queue/sub/segments.go b/banyand/queue/sub/segments.go
index 02b27856..1895b6ea 100644
--- a/banyand/queue/sub/segments.go
+++ b/banyand/queue/sub/segments.go
@@ -23,6 +23,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/pkg/bus"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -80,3 +81,30 @@ func (s *measureService) DeleteExpiredSegments(ctx
context.Context, request *mea
}
return &measurev1.DeleteExpiredSegmentsResponse{Deleted: deleted}, nil
}
+
+type traceService struct {
+ tracev1.UnimplementedTraceServiceServer
+ ser *server
+}
+
+func (s *traceService) DeleteExpiredSegments(ctx context.Context, request
*tracev1.DeleteExpiredSegmentsRequest) (*tracev1.DeleteExpiredSegmentsResponse,
error) {
+ s.ser.listenersLock.RLock()
+ defer s.ser.listenersLock.RUnlock()
+ ll := s.ser.getListeners(data.TopicDeleteExpiredTraceSegments)
+ if len(ll) == 0 {
+ logger.Panicf("no listener found for topic %s",
data.TopicDeleteExpiredTraceSegments)
+ }
+ var deleted int64
+ for _, l := range ll {
+ message := l.Rev(ctx, bus.NewMessage(bus.MessageID(0), request))
+ data := message.Data()
+ if data != nil {
+ d, ok := data.(int64)
+ if !ok {
+ logger.Panicf("invalid data type %T", data)
+ }
+ deleted += d
+ }
+ }
+ return &tracev1.DeleteExpiredSegmentsResponse{Deleted: deleted}, nil
+}
diff --git a/banyand/queue/sub/server.go b/banyand/queue/sub/server.go
index 842770a8..c1949b20 100644
--- a/banyand/queue/sub/server.go
+++ b/banyand/queue/sub/server.go
@@ -43,6 +43,7 @@ import (
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/observability"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bus"
@@ -225,6 +226,7 @@ func (s *server) Serve() run.StopNotify {
databasev1.RegisterSnapshotServiceServer(s.ser, s)
streamv1.RegisterStreamServiceServer(s.ser, &streamService{ser: s})
measurev1.RegisterMeasureServiceServer(s.ser, &measureService{ser: s})
+ tracev1.RegisterTraceServiceServer(s.ser, &traceService{ser: s})
var ctx context.Context
ctx, s.clientCloser = context.WithCancel(context.Background())
diff --git a/banyand/stream/snapshot.go b/banyand/stream/snapshot.go
index 3e3731e8..41008a5f 100644
--- a/banyand/stream/snapshot.go
+++ b/banyand/stream/snapshot.go
@@ -21,8 +21,10 @@ import (
"context"
"encoding/json"
"fmt"
+ "os"
"path/filepath"
"sort" // added for sorting parts
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -267,6 +269,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string,
groupName string) error {
return nil
}
+// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+ segDirs := make(map[string]bool)
+
+ walkErr := filepath.Walk(rootDir, func(currentPath string, info
os.FileInfo, err error) error {
+ if err != nil {
+ // If we can't read a directory, log but continue
+ return nil
+ }
+
+ // Skip files, only process directories
+ if !info.IsDir() {
+ return nil
+ }
+
+ // Get the directory name
+ dirName := filepath.Base(currentPath)
+
+ // Check if this is a seg-* directory
+ if strings.HasPrefix(dirName, "seg-") {
+ // Get relative path from root
+ relPath, relErr := filepath.Rel(rootDir, currentPath)
+ if relErr != nil {
+ return fmt.Errorf("failed to get relative path
for %s: %w", currentPath, relErr)
+ }
+
+ // Add to our collection
+ segDirs[relPath] = true
+
+ // Don't recurse into seg-* directories
+ return filepath.SkipDir
+ }
+
+ return nil
+ })
+
+ if walkErr != nil {
+ return nil, fmt.Errorf("failed to walk directory %s: %w",
rootDir, walkErr)
+ }
+
+ return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched,
onlyInSnapshot, onlyInData []string) {
+ // Find directories in both sets (matched)
+ for dir := range snapshotDirs {
+ if dataDirs[dir] {
+ matched = append(matched, dir)
+ } else {
+ onlyInSnapshot = append(onlyInSnapshot, dir)
+ }
+ }
+
+ // Find directories only in data
+ for dir := range dataDirs {
+ if !snapshotDirs[dir] {
+ onlyInData = append(onlyInData, dir)
+ }
+ }
+
+ // Sort for consistent output
+ sort.Strings(matched)
+ sort.Strings(onlyInSnapshot)
+ sort.Strings(onlyInData)
+
+ return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data
directory
+// to verify that seg-* directories are consistent. Only logs differences,
does not return errors.
+func (s *snapshotListener) compareSnapshotWithData(snapshotDir, dataDir,
groupName string) {
+ // Collect seg-* directories from snapshot
+ snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+ if snapshotErr != nil {
+ s.s.l.Warn().Err(snapshotErr).
+ Str("group", groupName).
+ Str("snapshotDir", snapshotDir).
+ Msg("failed to collect seg-* directories from snapshot,
skipping comparison")
+ return
+ }
+
+ // Collect seg-* directories from data
+ dataDirs, dataErr := collectSegDirs(dataDir)
+ if dataErr != nil {
+ s.s.l.Warn().Err(dataErr).
+ Str("group", groupName).
+ Str("dataDir", dataDir).
+ Msg("failed to collect seg-* directories from data,
skipping comparison")
+ return
+ }
+
+ // Compare the directories
+ matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs,
dataDirs)
+
+ // Log consolidated comparison results at Info level
+ s.s.l.Info().
+ Str("group", groupName).
+ Int("matched", len(matched)).
+ Int("onlyInSnapshot", len(onlyInSnapshot)).
+ Int("onlyInData", len(onlyInData)).
+ Strs("matchedDirs", matched).
+ Strs("onlyInSnapshotDirs", onlyInSnapshot).
+ Strs("onlyInDataDirs", onlyInData).
+ Msgf("snapshot comparison for group %s, data dir: %s, snapshot
dir: %s",
+ groupName, dataDir, snapshotDir)
+}
+
type snapshotListener struct {
*bus.UnImplementedHealthyListener
s *standalone
@@ -306,11 +419,17 @@ func (s *snapshotListener) Rev(ctx context.Context,
message bus.Message) bus.Mes
return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
default:
}
- if errGroup :=
s.s.takeGroupSnapshot(filepath.Join(s.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
- s.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+ groupName := g.GetSchema().Metadata.Name
+ snapshotPath := filepath.Join(s.s.snapshotDir, sn, groupName)
+ if errGroup := s.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ s.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+
+ // Compare snapshot with data directory to verify consistency
+ dataPath := filepath.Join(s.s.dataPath, groupName)
+ s.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
snp := &databasev1.Snapshot{
Name: sn,
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 2f744b61..0e2c50be 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -26,6 +26,7 @@ import (
"sync/atomic"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/fs"
@@ -409,3 +410,106 @@ func partPath(root string, epoch uint64) string {
func partName(epoch uint64) string {
return fmt.Sprintf("%016x", epoch)
}
+
+// CreatePartFileReaderFromPath opens all files in a part directory and
returns their FileInfo and a cleanup function.
+func CreatePartFileReaderFromPath(partPath string, lfs fs.FileSystem)
([]queue.FileInfo, func()) {
+ var files []queue.FileInfo
+ var readers []fs.Reader
+
+ // Core trace files (required files)
+ metaPath := path.Join(partPath, metaFilename)
+ metaReader, err := lfs.OpenFile(metaPath)
+ if err != nil {
+ logger.Panicf("cannot open trace meta file %q: %s", metaPath,
err)
+ }
+ readers = append(readers, metaReader)
+ files = append(files, queue.FileInfo{
+ Name: traceMetaName,
+ Reader: metaReader.SequentialRead(),
+ })
+
+ primaryPath := path.Join(partPath, primaryFilename)
+ primaryReader, err := lfs.OpenFile(primaryPath)
+ if err != nil {
+ logger.Panicf("cannot open trace primary file %q: %s",
primaryPath, err)
+ }
+ readers = append(readers, primaryReader)
+ files = append(files, queue.FileInfo{
+ Name: tracePrimaryName,
+ Reader: primaryReader.SequentialRead(),
+ })
+
+ spansPath := path.Join(partPath, spansFilename)
+ if spansReader, err := lfs.OpenFile(spansPath); err == nil {
+ readers = append(readers, spansReader)
+ files = append(files, queue.FileInfo{
+ Name: traceSpansName,
+ Reader: spansReader.SequentialRead(),
+ })
+ }
+
+ // Special trace files: traceID.filter and tag.type
+ traceIDFilterPath := path.Join(partPath, traceIDFilterFilename)
+ if filterReader, err := lfs.OpenFile(traceIDFilterPath); err == nil {
+ readers = append(readers, filterReader)
+ files = append(files, queue.FileInfo{
+ Name: traceIDFilterFilename,
+ Reader: filterReader.SequentialRead(),
+ })
+ }
+
+ tagTypePath := path.Join(partPath, tagTypeFilename)
+ if tagTypeReader, err := lfs.OpenFile(tagTypePath); err == nil {
+ readers = append(readers, tagTypeReader)
+ files = append(files, queue.FileInfo{
+ Name: tagTypeFilename,
+ Reader: tagTypeReader.SequentialRead(),
+ })
+ }
+
+ // Dynamic tag files (*.t and *.tm)
+ ee := lfs.ReadDir(partPath)
+ for _, e := range ee {
+ if e.IsDir() {
+ continue
+ }
+
+ // Tag metadata files (.tm)
+ if filepath.Ext(e.Name()) == tagsMetadataFilenameExt {
+ tmPath := path.Join(partPath, e.Name())
+ tmReader, err := lfs.OpenFile(tmPath)
+ if err != nil {
+ logger.Panicf("cannot open trace tag metadata
file %q: %s", tmPath, err)
+ }
+ readers = append(readers, tmReader)
+ tagName := removeExt(e.Name(), tagsMetadataFilenameExt)
+ files = append(files, queue.FileInfo{
+ Name: traceTagMetadataPrefix + tagName,
+ Reader: tmReader.SequentialRead(),
+ })
+ }
+
+ // Tag data files (.t)
+ if filepath.Ext(e.Name()) == tagsFilenameExt {
+ tPath := path.Join(partPath, e.Name())
+ tReader, err := lfs.OpenFile(tPath)
+ if err != nil {
+ logger.Panicf("cannot open trace tag file %q:
%s", tPath, err)
+ }
+ readers = append(readers, tReader)
+ tagName := removeExt(e.Name(), tagsFilenameExt)
+ files = append(files, queue.FileInfo{
+ Name: traceTagsPrefix + tagName,
+ Reader: tReader.SequentialRead(),
+ })
+ }
+ }
+
+ cleanup := func() {
+ for _, reader := range readers {
+ fs.MustClose(reader)
+ }
+ }
+
+ return files, cleanup
+}
diff --git a/banyand/trace/part_metadata.go b/banyand/trace/part_metadata.go
index b4f5189f..bd3aec7a 100644
--- a/banyand/trace/part_metadata.go
+++ b/banyand/trace/part_metadata.go
@@ -25,6 +25,7 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/convert"
"github.com/apache/skywalking-banyandb/pkg/encoding"
"github.com/apache/skywalking-banyandb/pkg/filter"
@@ -248,3 +249,26 @@ func (tf *traceIDFilter) mustWriteTraceIDFilter(fileSystem
fs.FileSystem, partPa
logger.Panicf("unexpected number of bytes written to %s; got
%d; want %d", traceIDFilterPath, n, len(data))
}
}
+
+// ParsePartMetadata parses the part metadata from the metadata.json file.
+func ParsePartMetadata(fileSystem fs.FileSystem, partPath string)
(queue.StreamingPartData, error) {
+ metadataPath := filepath.Join(partPath, metadataFilename)
+ metadata, err := fileSystem.Read(metadataPath)
+ if err != nil {
+ return queue.StreamingPartData{}, errors.WithMessage(err,
"cannot read metadata.json")
+ }
+ var pm partMetadata
+ if err := json.Unmarshal(metadata, &pm); err != nil {
+ return queue.StreamingPartData{}, errors.WithMessage(err,
"cannot parse metadata.json")
+ }
+
+ return queue.StreamingPartData{
+ ID: pm.ID,
+ CompressedSizeBytes: pm.CompressedSizeBytes,
+ UncompressedSizeBytes: pm.UncompressedSpanSizeBytes,
+ TotalCount: pm.TotalCount,
+ BlocksCount: pm.BlocksCount,
+ MinTimestamp: pm.MinTimestamp,
+ MaxTimestamp: pm.MaxTimestamp,
+ }, nil
+}
diff --git a/banyand/trace/snapshot.go b/banyand/trace/snapshot.go
index 03eed89f..479cc214 100644
--- a/banyand/trace/snapshot.go
+++ b/banyand/trace/snapshot.go
@@ -205,7 +205,7 @@ func parseSnapshot(name string) (uint64, error) {
func (tst *tsTable) TakeFileSnapshot(dst string) error {
for k, v := range tst.sidxMap {
- indexDir := filepath.Join(dst, k)
+ indexDir := filepath.Join(dst, sidxDirName, k)
tst.fileSystem.MkdirPanicIfExist(indexDir, storage.DirPerm)
if err := v.TakeFileSnapshot(indexDir); err != nil {
return fmt.Errorf("failed to take file snapshot for
index, %s: %w", k, err)
diff --git a/banyand/trace/svc_standalone.go b/banyand/trace/svc_standalone.go
index 085ba984..8f14c950 100644
--- a/banyand/trace/svc_standalone.go
+++ b/banyand/trace/svc_standalone.go
@@ -20,8 +20,10 @@ package trace
import (
"context"
"fmt"
+ "os"
"path"
"path/filepath"
+ "sort"
"strings"
"sync"
"time"
@@ -33,6 +35,7 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ tracev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/trace/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/observability"
@@ -163,7 +166,12 @@ func (s *standalone) PreRun(ctx context.Context) error {
if err != nil {
return err
}
+ err = s.pipeline.Subscribe(data.TopicDeleteExpiredTraceSegments,
&standaloneDeleteTraceSegmentsListener{s: s})
+ if err != nil {
+ return err
+ }
s.pipeline.RegisterChunkedSyncHandler(data.TopicTracePartSync,
setUpChunkedSyncCallback(s.l, &s.schemaRepo))
+ s.pipeline.RegisterChunkedSyncHandler(data.TopicTraceSeriesSync,
setUpSeriesSyncCallback(s.l, &s.schemaRepo))
err = s.pipeline.Subscribe(data.TopicTraceSidxSeriesWrite,
setUpSidxSeriesIndexCallback(s.l, &s.schemaRepo))
if err != nil {
return err
@@ -297,6 +305,117 @@ func (s *standalone) takeGroupSnapshot(dstDir string,
groupName string) error {
return nil
}
+// collectSegDirs walks a directory tree and collects all seg-* directory
paths.
+// It only collects directories matching the "seg-*" pattern and ignores all
files.
+// Returns a map of relative paths to seg-* directories.
+func collectSegDirs(rootDir string) (map[string]bool, error) {
+ segDirs := make(map[string]bool)
+
+ walkErr := filepath.Walk(rootDir, func(currentPath string, info
os.FileInfo, err error) error {
+ if err != nil {
+ // If we can't read a directory, log but continue
+ return nil
+ }
+
+ // Skip files, only process directories
+ if !info.IsDir() {
+ return nil
+ }
+
+ // Get the directory name
+ dirName := filepath.Base(currentPath)
+
+ // Check if this is a seg-* directory
+ if strings.HasPrefix(dirName, "seg-") {
+ // Get relative path from root
+ relPath, relErr := filepath.Rel(rootDir, currentPath)
+ if relErr != nil {
+ return fmt.Errorf("failed to get relative path
for %s: %w", currentPath, relErr)
+ }
+
+ // Add to our collection
+ segDirs[relPath] = true
+
+ // Don't recurse into seg-* directories
+ return filepath.SkipDir
+ }
+
+ return nil
+ })
+
+ if walkErr != nil {
+ return nil, fmt.Errorf("failed to walk directory %s: %w",
rootDir, walkErr)
+ }
+
+ return segDirs, nil
+}
+
+// compareSegDirs compares two sets of seg-* directories and returns three
slices:
+// matched (in both), onlyInSnapshot (only in snapshot), onlyInData (only in
data).
+func compareSegDirs(snapshotDirs, dataDirs map[string]bool) (matched,
onlyInSnapshot, onlyInData []string) {
+ // Find directories in both sets (matched)
+ for dir := range snapshotDirs {
+ if dataDirs[dir] {
+ matched = append(matched, dir)
+ } else {
+ onlyInSnapshot = append(onlyInSnapshot, dir)
+ }
+ }
+
+ // Find directories only in data
+ for dir := range dataDirs {
+ if !snapshotDirs[dir] {
+ onlyInData = append(onlyInData, dir)
+ }
+ }
+
+ // Sort for consistent output
+ sort.Strings(matched)
+ sort.Strings(onlyInSnapshot)
+ sort.Strings(onlyInData)
+
+ return matched, onlyInSnapshot, onlyInData
+}
+
+// compareSnapshotWithData compares the snapshot directory with the data
directory
+// to verify that seg-* directories are consistent. Only logs differences,
does not return errors.
+func (d *standaloneSnapshotListener) compareSnapshotWithData(snapshotDir,
dataDir, groupName string) {
+ // Collect seg-* directories from snapshot
+ snapshotDirs, snapshotErr := collectSegDirs(snapshotDir)
+ if snapshotErr != nil {
+ d.s.l.Warn().Err(snapshotErr).
+ Str("group", groupName).
+ Str("snapshotDir", snapshotDir).
+ Msg("failed to collect seg-* directories from snapshot,
skipping comparison")
+ return
+ }
+
+ // Collect seg-* directories from data
+ dataDirs, dataErr := collectSegDirs(dataDir)
+ if dataErr != nil {
+ d.s.l.Warn().Err(dataErr).
+ Str("group", groupName).
+ Str("dataDir", dataDir).
+ Msg("failed to collect seg-* directories from data,
skipping comparison")
+ return
+ }
+
+ // Compare the directories
+ matched, onlyInSnapshot, onlyInData := compareSegDirs(snapshotDirs,
dataDirs)
+
+ // Log consolidated comparison results at Info level
+ d.s.l.Info().
+ Str("group", groupName).
+ Int("matched", len(matched)).
+ Int("onlyInSnapshot", len(onlyInSnapshot)).
+ Int("onlyInData", len(onlyInData)).
+ Strs("matchedDirs", matched).
+ Strs("onlyInSnapshotDirs", onlyInSnapshot).
+ Strs("onlyInDataDirs", onlyInData).
+ Msgf("snapshot comparison for group %s, data dir: %s, snapshot
dir: %s",
+ groupName, dataDir, snapshotDir)
+}
+
type standaloneSnapshotListener struct {
*bus.UnImplementedHealthyListener
s *standalone
@@ -335,11 +454,17 @@ func (d *standaloneSnapshotListener) Rev(ctx
context.Context, message bus.Messag
return
bus.NewMessage(bus.MessageID(time.Now().UnixNano()), nil)
default:
}
- if errGroup :=
d.s.takeGroupSnapshot(filepath.Join(d.s.snapshotDir, sn,
g.GetSchema().Metadata.Name), g.GetSchema().Metadata.Name); err != nil {
- d.s.l.Error().Err(errGroup).Str("group",
g.GetSchema().Metadata.Name).Msg("fail to take group snapshot")
+ groupName := g.GetSchema().Metadata.Name
+ snapshotPath := filepath.Join(d.s.snapshotDir, sn, groupName)
+ if errGroup := d.s.takeGroupSnapshot(snapshotPath, groupName);
errGroup != nil {
+ d.s.l.Error().Err(errGroup).Str("group",
groupName).Msg("fail to take group snapshot")
err = multierr.Append(err, errGroup)
continue
}
+
+ // Compare snapshot with data directory to verify consistency
+ dataPath := filepath.Join(d.s.dataPath, groupName)
+ d.compareSnapshotWithData(snapshotPath, dataPath, groupName)
}
snp := &databasev1.Snapshot{
Name: sn,
@@ -356,6 +481,27 @@ func (d *standaloneSnapshotListener) snapshotName() string
{
return fmt.Sprintf("%s-%08X",
time.Now().UTC().Format("20060102150405"), d.snapshotSeq)
}
+type standaloneDeleteTraceSegmentsListener struct {
+ *bus.UnImplementedHealthyListener
+ s *standalone
+}
+
+func (s *standaloneDeleteTraceSegmentsListener) Rev(_ context.Context, message
bus.Message) bus.Message {
+ req := message.Data().(*tracev1.DeleteExpiredSegmentsRequest)
+ if req == nil {
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
int64(0))
+ }
+
+ db, err := s.s.schemaRepo.loadTSDB(req.Group)
+ if err != nil {
+ s.s.l.Error().Err(err).Str("group", req.Group).Msg("fail to
load tsdb")
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
int64(0))
+ }
+ s.s.l.Info().Msg("test")
+ deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
+ return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
+}
+
// NewService returns a new service.
func NewService(metadata metadata.Repo, pipeline queue.Server, omr
observability.MetricsRegistry, pm protector.Memory) (Service, error) {
return &standalone{
diff --git a/banyand/trace/visitor.go b/banyand/trace/visitor.go
new file mode 100644
index 00000000..8beff6d8
--- /dev/null
+++ b/banyand/trace/visitor.go
@@ -0,0 +1,57 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+ "github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// Visitor defines the interface for visiting trace components.
+type Visitor interface {
+ // VisitSeries visits the series index directory for a segment.
+ VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string,
shardIDs []common.ShardID) error
+ // VisitShard visits the shard directory for a segment.
+ VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID,
segmentPath string) error
+}
+
+// traceSegmentVisitor adapts Visitor to work with storage.SegmentVisitor.
+type traceSegmentVisitor struct {
+ visitor Visitor
+}
+
+// VisitSeries implements storage.SegmentVisitor.
+func (tv *traceSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange,
seriesIndexPath string, shardIDs []common.ShardID) error {
+ return tv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs)
+}
+
+// VisitShard implements storage.SegmentVisitor.
+func (tv *traceSegmentVisitor) VisitShard(segmentTR *timestamp.TimeRange,
shardID common.ShardID, shardPath string) error {
+ // Visit parts within the shard
+ return tv.visitor.VisitShard(segmentTR, shardID, shardPath)
+}
+
+// VisitTracesInTimeRange traverses trace parts within the specified time range
+// and calls the visitor methods for parts and sidx directories.
+// This function works directly with the filesystem without requiring a
database instance.
+// Returns a list of segment suffixes that were visited.
+func VisitTracesInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule)
([]string, error) {
+ adapter := &traceSegmentVisitor{visitor: visitor}
+ return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange,
adapter, intervalRule)
+}
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 5fdefc6e..6e44cbb7 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -23,8 +23,11 @@ import (
"time"
"github.com/apache/skywalking-banyandb/api/common"
+ commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/index"
"github.com/apache/skywalking-banyandb/pkg/logger"
)
@@ -114,24 +117,126 @@ func (s *syncPartContext) Close() error {
return nil
}
-type syncCallback struct {
+type syncSeriesContext struct {
+ streamer index.ExternalSegmentStreamer
+ segment storage.Segment[*tsTable, *commonv1.ResourceOpts]
+ l *logger.Logger
+ fileName string
+}
+
+func (s *syncSeriesContext) NewPartType(_ *queue.ChunkedSyncPartContext) error
{
+ logger.Panicf("new part type is not supported for trace")
+ return nil
+}
+
+func (s *syncSeriesContext) FinishSync() error {
+ if s.streamer != nil {
+ if err := s.streamer.CompleteSegment(); err != nil {
+ s.l.Error().Err(err).Msg("failed to complete external
segment")
+ return err
+ }
+ }
+ return s.Close()
+}
+
+func (s *syncSeriesContext) Close() error {
+ if s.segment != nil {
+ s.segment.DecRef()
+ }
+ s.streamer = nil
+ s.fileName = ""
+ s.segment = nil
+ return nil
+}
+
+type syncSeriesCallback struct {
+ l *logger.Logger
+ schemaRepo *schemaRepo
+}
+
+func setUpSeriesSyncCallback(l *logger.Logger, s *schemaRepo)
queue.ChunkedSyncHandler {
+ return &syncSeriesCallback{
+ l: l,
+ schemaRepo: s,
+ }
+}
+
+func (s *syncSeriesCallback) CheckHealth() *common.Error {
+ return nil
+}
+
+func (s *syncSeriesCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
+ tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
+ if err != nil {
+ s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
+ return nil, err
+ }
+ segmentTime := time.Unix(0, ctx.MinTimestamp)
+ segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
+ if err != nil {
+ s.l.Error().Err(err).Str("group",
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
+ return nil, err
+ }
+ return &syncSeriesContext{
+ l: s.l,
+ segment: segment,
+ }, nil
+}
+
+// HandleFileChunk implements queue.ChunkedSyncHandler for streaming series
index chunks.
+func (s *syncSeriesCallback) HandleFileChunk(ctx
*queue.ChunkedSyncPartContext, chunk []byte) error {
+ if ctx.Handler == nil {
+ return fmt.Errorf("part handler is nil")
+ }
+ seriesCtx := ctx.Handler.(*syncSeriesContext)
+
+ if seriesCtx.segment == nil {
+ return fmt.Errorf("segment is nil")
+ }
+ if seriesCtx.fileName != ctx.FileName {
+ if seriesCtx.streamer != nil {
+ if err := seriesCtx.streamer.CompleteSegment(); err !=
nil {
+ s.l.Error().Err(err).Str("group",
ctx.Group).Msg("failed to complete external segment")
+ return err
+ }
+ }
+ indexDB := seriesCtx.segment.IndexDB()
+ streamer, err := indexDB.EnableExternalSegments()
+ if err != nil {
+ s.l.Error().Err(err).Str("group",
ctx.Group).Msg("failed to enable external segments")
+ return err
+ }
+ if err := streamer.StartSegment(); err != nil {
+ s.l.Error().Err(err).Str("group",
ctx.Group).Msg("failed to start external segment")
+ return err
+ }
+ seriesCtx.fileName = ctx.FileName
+ seriesCtx.streamer = streamer
+ }
+ if err := seriesCtx.streamer.WriteChunk(chunk); err != nil {
+ return fmt.Errorf("failed to write chunk (size: %d) to file %q:
%w", len(chunk), ctx.FileName, err)
+ }
+ return nil
+}
+
+type syncChunkCallback struct {
l *logger.Logger
schemaRepo *schemaRepo
}
func setUpChunkedSyncCallback(l *logger.Logger, schemaRepo *schemaRepo)
queue.ChunkedSyncHandler {
- return &syncCallback{
+ return &syncChunkCallback{
l: l,
schemaRepo: schemaRepo,
}
}
-func (s *syncCallback) CheckHealth() *common.Error {
+func (s *syncChunkCallback) CheckHealth() *common.Error {
return nil
}
// CreatePartHandler implements queue.ChunkedSyncHandler.
-func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext)
(queue.PartHandler, error) {
+func (s *syncChunkCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (queue.PartHandler, error) {
tsdb, err := s.schemaRepo.loadTSDB(ctx.Group)
if err != nil {
s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to
load TSDB for group")
@@ -159,7 +264,7 @@ func (s *syncCallback) CreatePartHandler(ctx
*queue.ChunkedSyncPartContext) (que
}
// HandleFileChunk implements queue.ChunkedSyncHandler for streaming file
chunks.
-func (s *syncCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext,
chunk []byte) error {
+func (s *syncChunkCallback) HandleFileChunk(ctx *queue.ChunkedSyncPartContext,
chunk []byte) error {
if ctx.Handler == nil {
return fmt.Errorf("part handler is nil")
}
@@ -169,7 +274,7 @@ func (s *syncCallback) HandleFileChunk(ctx
*queue.ChunkedSyncPartContext, chunk
return s.handleTraceFileChunk(ctx, chunk)
}
-func (s *syncCallback) handleSidxFileChunk(ctx *queue.ChunkedSyncPartContext,
chunk []byte) error {
+func (s *syncChunkCallback) handleSidxFileChunk(ctx
*queue.ChunkedSyncPartContext, chunk []byte) error {
sidxName := ctx.PartType
fileName := ctx.FileName
partCtx := ctx.Handler.(*syncPartContext)
@@ -202,7 +307,7 @@ func (s *syncCallback) handleSidxFileChunk(ctx
*queue.ChunkedSyncPartContext, ch
return nil
}
-func (s *syncCallback) handleTraceFileChunk(ctx *queue.ChunkedSyncPartContext,
chunk []byte) error {
+func (s *syncChunkCallback) handleTraceFileChunk(ctx
*queue.ChunkedSyncPartContext, chunk []byte) error {
fileName := ctx.FileName
partCtx := ctx.Handler.(*syncPartContext)
switch {
@@ -235,10 +340,10 @@ func (s *syncCallback) handleTraceFileChunk(ctx
*queue.ChunkedSyncPartContext, c
return nil
}
-func (s *syncCallback) handleTraceIDFilterChunk(partCtx *syncPartContext,
chunk []byte) {
+func (s *syncChunkCallback) handleTraceIDFilterChunk(partCtx *syncPartContext,
chunk []byte) {
partCtx.traceIDFilterBuffer = append(partCtx.traceIDFilterBuffer,
chunk...)
}
-func (s *syncCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk
[]byte) {
+func (s *syncChunkCallback) handleTagTypeChunk(partCtx *syncPartContext, chunk
[]byte) {
partCtx.tagTypeBuffer = append(partCtx.tagTypeBuffer, chunk...)
}
diff --git a/docs/api-reference.md b/docs/api-reference.md
index 18047c3d..ff86ff51 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -335,6 +335,9 @@
- [WriteResponse](#banyandb-trace-v1-WriteResponse)
- [banyandb/trace/v1/rpc.proto](#banyandb_trace_v1_rpc-proto)
+ -
[DeleteExpiredSegmentsRequest](#banyandb-trace-v1-DeleteExpiredSegmentsRequest)
+ -
[DeleteExpiredSegmentsResponse](#banyandb-trace-v1-DeleteExpiredSegmentsResponse)
+
- [TraceService](#banyandb-trace-v1-TraceService)
- [Scalar Value Types](#scalar-value-types)
@@ -4947,6 +4950,37 @@ TagFamilySpec defines the specification of a tag family.
## banyandb/trace/v1/rpc.proto
+
+<a name="banyandb-trace-v1-DeleteExpiredSegmentsRequest"></a>
+
+### DeleteExpiredSegmentsRequest
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| group | [string](#string) | | |
+| segment_suffixes | [string](#string) | repeated | |
+
+
+
+
+
+
+<a name="banyandb-trace-v1-DeleteExpiredSegmentsResponse"></a>
+
+### DeleteExpiredSegmentsResponse
+
+
+
+| Field | Type | Label | Description |
+| ----- | ---- | ----- | ----------- |
+| deleted | [int64](#int64) | | |
+
+
+
+
+
@@ -4963,6 +4997,7 @@ TagFamilySpec defines the specification of a tag family.
| ----------- | ------------ | ------------- | ------------|
| Query | [QueryRequest](#banyandb-trace-v1-QueryRequest) |
[QueryResponse](#banyandb-trace-v1-QueryResponse) | |
| Write | [WriteRequest](#banyandb-trace-v1-WriteRequest) stream |
[WriteResponse](#banyandb-trace-v1-WriteResponse) stream | |
+| DeleteExpiredSegments |
[DeleteExpiredSegmentsRequest](#banyandb-trace-v1-DeleteExpiredSegmentsRequest)
|
[DeleteExpiredSegmentsResponse](#banyandb-trace-v1-DeleteExpiredSegmentsResponse)
| |
diff --git a/pkg/schema/cache.go b/pkg/schema/cache.go
index 73d15beb..e085f814 100644
--- a/pkg/schema/cache.go
+++ b/pkg/schema/cache.go
@@ -26,6 +26,7 @@ import (
"time"
"github.com/pkg/errors"
+ "github.com/rs/zerolog/log"
"go.uber.org/multierr"
"google.golang.org/protobuf/proto"
@@ -294,6 +295,7 @@ func (sr *schemaRepo) createGroup(name string) (g *group) {
func (sr *schemaRepo) deleteGroup(groupMeta *commonv1.Metadata) error {
name := groupMeta.GetName()
g, loaded := sr.groupMap.LoadAndDelete(name)
+ log.Info().Str("group", name).Bool("loaded", loaded).Msg("deleting
group")
if !loaded {
return nil
}
diff --git a/pkg/test/trace/etcd.go b/pkg/test/trace/etcd.go
index 275c1765..414e8ef6 100644
--- a/pkg/test/trace/etcd.go
+++ b/pkg/test/trace/etcd.go
@@ -34,6 +34,7 @@ import (
const (
groupDir = "testdata/groups"
+ groupStagesDir = "testdata/groups_stages"
traceDir = "testdata/traces"
indexRuleDir = "testdata/index_rules"
indexRuleBindingDir = "testdata/index_rule_bindings"
@@ -44,14 +45,19 @@ var store embed.FS
// PreloadSchema loads schemas from files in the booting process.
func PreloadSchema(ctx context.Context, e schema.Registry) error {
- return loadAllSchemas(ctx, e)
+ return loadAllSchemas(ctx, e, groupDir)
+}
+
+// PreloadSchemaWithStages loads group schemas with stages from files in the
booting process.
+func PreloadSchemaWithStages(ctx context.Context, e schema.Registry) error {
+ return loadAllSchemas(ctx, e, groupStagesDir)
}
// loadAllSchemas loads all trace-related schemas from the testdata directory.
-func loadAllSchemas(ctx context.Context, e schema.Registry) error {
+func loadAllSchemas(ctx context.Context, e schema.Registry, group string)
error {
return preloadSchemaWithFuncs(ctx, e,
func(ctx context.Context, e schema.Registry) error {
- return loadSchema(groupDir, &commonv1.Group{},
func(group *commonv1.Group) error {
+ return loadSchema(group, &commonv1.Group{}, func(group
*commonv1.Group) error {
return e.CreateGroup(ctx, group)
})
},
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-group.json
b/pkg/test/trace/testdata/groups_stages/test-trace-group.json
new file mode 100644
index 00000000..4a1b35ba
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-group.json
@@ -0,0 +1,34 @@
+{
+ "metadata": {
+ "name": "test-trace-group"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 0,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/groups_stages/test-trace-updated.json
b/pkg/test/trace/testdata/groups_stages/test-trace-updated.json
new file mode 100644
index 00000000..2e016960
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/test-trace-updated.json
@@ -0,0 +1,34 @@
+{
+ "metadata": {
+ "name": "test-trace-updated"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 0,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json
b/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json
new file mode 100644
index 00000000..25004d1c
--- /dev/null
+++ b/pkg/test/trace/testdata/groups_stages/zipkin-trace-group.json
@@ -0,0 +1,34 @@
+{
+ "metadata": {
+ "name": "zipkinTrace"
+ },
+ "catalog": "CATALOG_TRACE",
+ "resource_opts": {
+ "shard_num": 2,
+ "replicas": 1,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 1
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "stages": [
+ {
+ "name": "warm",
+ "shard_num": 2,
+ "segment_interval": {
+ "unit": "UNIT_DAY",
+ "num": 3
+ },
+ "ttl": {
+ "unit": "UNIT_DAY",
+ "num": 7
+ },
+ "node_selector": "type=warm"
+ }
+ ]
+ },
+ "updated_at": "2021-04-15T01:30:15.01Z"
+}
\ No newline at end of file
diff --git a/test/cases/lifecycle/lifecycle.go
b/test/cases/lifecycle/lifecycle.go
index f831e252..00f0185e 100644
--- a/test/cases/lifecycle/lifecycle.go
+++ b/test/cases/lifecycle/lifecycle.go
@@ -36,6 +36,7 @@ import (
measureTestData
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
streamTestData
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
topNTestData
"github.com/apache/skywalking-banyandb/test/cases/topn/data"
+ traceTestData
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
)
// SharedContext is the shared context for the snapshot test cases.
@@ -53,6 +54,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
"--grpc-addr", SharedContext.DataAddr,
"--stream-root-path", SharedContext.SrcDir,
"--measure-root-path", SharedContext.SrcDir,
+ "--trace-root-path", SharedContext.SrcDir,
"--etcd-endpoints", SharedContext.EtcdAddr,
"--progress-file", pf,
"--report-dir", rf,
@@ -97,6 +99,13 @@ var _ = ginkgo.Describe("Lifecycle", func() {
Duration: 25 * time.Minute,
Offset: -20 * time.Minute,
})
+
+ // Verify trace data lifecycle stages
+ verifyLifecycleStages(sc, traceTestData.VerifyFn, helpers.Args{
+ Input: "having_query_tag",
+ Duration: 25 * time.Minute,
+ Offset: -20 * time.Minute,
+ })
})
ginkgo.It("should migrate data correctly with a scheduler", func() {
dir, err := os.MkdirTemp("", "lifecycle-restore-dest")
@@ -109,6 +118,7 @@ var _ = ginkgo.Describe("Lifecycle", func() {
"--grpc-addr", SharedContext.DataAddr,
"--stream-root-path", SharedContext.SrcDir,
"--measure-root-path", SharedContext.SrcDir,
+ "--trace-root-path", SharedContext.SrcDir,
"--etcd-endpoints", SharedContext.EtcdAddr,
"--progress-file", pf,
"--report-dir", rf,
@@ -155,6 +165,13 @@ var _ = ginkgo.Describe("Lifecycle", func() {
Duration: 25 * time.Minute,
Offset: -20 * time.Minute,
})
+
+ // Verify trace data lifecycle stages
+ verifyLifecycleStages(sc, traceTestData.VerifyFn, helpers.Args{
+ Input: "having_query_tag",
+ Duration: 25 * time.Minute,
+ Offset: -20 * time.Minute,
+ })
})
})
diff --git a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
index 8b7b2923..13287bdf 100644
--- a/test/integration/distributed/lifecycle/lifecycle_suite_test.go
+++ b/test/integration/distributed/lifecycle/lifecycle_suite_test.go
@@ -41,6 +41,7 @@ import (
test_property "github.com/apache/skywalking-banyandb/pkg/test/property"
"github.com/apache/skywalking-banyandb/pkg/test/setup"
test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
test_cases "github.com/apache/skywalking-banyandb/test/cases"
caseslifecycle
"github.com/apache/skywalking-banyandb/test/cases/lifecycle"
@@ -95,13 +96,16 @@ var _ = SynchronizedBeforeSuite(func() []byte {
ctx := context.Background()
test_stream.LoadSchemaWithStages(ctx, schemaRegistry)
test_measure.LoadSchemaWithStages(ctx, schemaRegistry)
+ test_trace.PreloadSchemaWithStages(ctx, schemaRegistry)
test_property.PreloadSchema(ctx, schemaRegistry)
By("Starting hot data node")
var closeDataNode0 func()
- dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=hot", "--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s")
+ dataAddr, srcDir, closeDataNode0 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=hot",
+ "--measure-flush-timeout", "0s", "--stream-flush-timeout",
"0s", "--trace-flush-timeout", "0s")
By("Starting warm data node")
var closeDataNode1 func()
- _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=warm", "--measure-flush-timeout", "0s",
"--stream-flush-timeout", "0s")
+ _, destDir, closeDataNode1 = setup.DataNodeWithAddrAndDir(ep,
"--node-labels", "type=warm",
+ "--measure-flush-timeout", "0s", "--stream-flush-timeout",
"0s", "--trace-flush-timeout", "0s")
By("Starting liaison node")
var closerLiaisonNode func()
liaisonAddr, closerLiaisonNode = setup.LiaisonNode(ep,
"--data-node-selector", "type=hot")