This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 775148e7 Refactor lifecycle logical (#848)
775148e7 is described below
commit 775148e72c8b937ee68ed923526dfcf93a5bd43b
Author: mrproliu <[email protected]>
AuthorDate: Mon Nov 17 19:30:43 2025 +0900
Refactor lifecycle logical (#848)
* Refactor lifecycle logical
---------
Co-authored-by: Gao Hongtao <[email protected]>
---
api/proto/banyandb/measure/v1/rpc.proto | 3 +-
api/proto/banyandb/stream/v1/rpc.proto | 3 +-
banyand/backup/lifecycle/migration_integration.go | 101 +++++++----------
banyand/backup/lifecycle/segment_boundary_utils.go | 3 +-
banyand/backup/lifecycle/service.go | 123 +++++++++------------
banyand/backup/lifecycle/steps.go | 56 ++++++++--
banyand/internal/storage/segment.go | 23 +++-
banyand/internal/storage/segment_test.go | 11 +-
banyand/internal/storage/storage.go | 2 +-
banyand/internal/storage/tsdb.go | 4 +-
banyand/internal/storage/visitor.go | 33 ++++--
banyand/internal/storage/visitor_test.go | 22 ++--
banyand/measure/svc_data.go | 2 +-
banyand/measure/visitor.go | 7 +-
banyand/stream/svc_standalone.go | 2 +-
banyand/stream/visitor.go | 5 +-
banyand/stream/visitor_test.go | 3 +-
docs/api-reference.md | 4 +-
18 files changed, 224 insertions(+), 183 deletions(-)
diff --git a/api/proto/banyandb/measure/v1/rpc.proto
b/api/proto/banyandb/measure/v1/rpc.proto
index 22e71173..0f1d1bf5 100644
--- a/api/proto/banyandb/measure/v1/rpc.proto
+++ b/api/proto/banyandb/measure/v1/rpc.proto
@@ -22,7 +22,6 @@ package banyandb.measure.v1;
import "banyandb/measure/v1/query.proto";
import "banyandb/measure/v1/topn.proto";
import "banyandb/measure/v1/write.proto";
-import "banyandb/model/v1/query.proto";
import "google/api/annotations.proto";
import "protoc-gen-openapiv2/options/annotations.proto";
@@ -32,7 +31,7 @@ option
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_pat
message DeleteExpiredSegmentsRequest {
string group = 1;
- model.v1.TimeRange time_range = 2;
+ repeated string segment_suffixes = 2;
}
message DeleteExpiredSegmentsResponse {
diff --git a/api/proto/banyandb/stream/v1/rpc.proto
b/api/proto/banyandb/stream/v1/rpc.proto
index dcbe266e..1712a5f9 100644
--- a/api/proto/banyandb/stream/v1/rpc.proto
+++ b/api/proto/banyandb/stream/v1/rpc.proto
@@ -19,7 +19,6 @@ syntax = "proto3";
package banyandb.stream.v1;
-import "banyandb/model/v1/query.proto";
import "banyandb/stream/v1/query.proto";
import "banyandb/stream/v1/write.proto";
import "google/api/annotations.proto";
@@ -31,7 +30,7 @@ option
(grpc.gateway.protoc_gen_openapiv2.options.openapiv2_swagger) = {base_pat
message DeleteExpiredSegmentsRequest {
string group = 1;
- model.v1.TimeRange time_range = 2;
+ repeated string segment_suffixes = 2;
}
message DeleteExpiredSegmentsResponse {
diff --git a/banyand/backup/lifecycle/migration_integration.go
b/banyand/backup/lifecycle/migration_integration.go
index d51d9eea..b36ae1e5 100644
--- a/banyand/backup/lifecycle/migration_integration.go
+++ b/banyand/backup/lifecycle/migration_integration.go
@@ -19,51 +19,37 @@ package lifecycle
import (
"github.com/apache/skywalking-banyandb/api/common"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
- databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/measure"
- "github.com/apache/skywalking-banyandb/banyand/metadata"
"github.com/apache/skywalking-banyandb/banyand/stream"
"github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
// migrateStreamWithFileBasedAndProgress performs file-based stream migration
with progress tracking.
-func migrateStreamWithFileBasedAndProgress(
- tsdbRootPath string,
- timeRange timestamp.TimeRange,
- group *commonv1.Group,
- nodeLabels map[string]string,
- nodes []*databasev1.Node,
- metadata metadata.Repo,
- logger *logger.Logger,
- progress *Progress,
- chunkSize int,
-) error {
- // Use parseGroup function to get sharding parameters and TTL
- shardNum, replicas, ttl, selector, client, err := parseGroup(group,
nodeLabels, nodes, logger, metadata)
- if err != nil {
- return err
- }
- defer client.GracefulStop()
-
- // Convert TTL to IntervalRule using storage.MustToIntervalRule
- intervalRule := storage.MustToIntervalRule(ttl)
+func migrateStreamWithFileBasedAndProgress(tsdbRootPath string, timeRange
timestamp.TimeRange, group *GroupConfig,
+ logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+ // Convert segment Interval to IntervalRule using
storage.MustToIntervalRule
+ segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
// Get target stage configuration
targetStageInterval := getTargetStageInterval(group)
// Count total parts before starting migration
- totalParts, err := countStreamParts(tsdbRootPath, timeRange,
intervalRule)
+ totalParts, segmentSuffixes, err := countStreamParts(tsdbRootPath,
timeRange, segmentIntervalRule)
if err != nil {
logger.Warn().Err(err).Msg("failed to count stream parts,
proceeding without part count")
} else {
- logger.Info().Int("total_parts", totalParts).Msg("counted
stream parts for progress tracking")
+ logger.Info().Int("total_parts",
totalParts).Strs("segment_suffixes", segmentSuffixes).
+ Msg("counted stream parts for progres tracking")
}
// Create file-based migration visitor with progress tracking and
target stage interval
- visitor := newStreamMigrationVisitor(group, shardNum, replicas,
selector, client, logger, progress, chunkSize, targetStageInterval)
+ visitor := newStreamMigrationVisitor(
+ group.Group, group.TargetShardNum, group.TargetReplicas,
group.NodeSelector, group.QueueClient,
+ logger, progress, chunkSize, targetStageInterval,
+ )
defer visitor.Close()
// Set the total part count for progress tracking
@@ -72,21 +58,25 @@ func migrateStreamWithFileBasedAndProgress(
}
// Use the existing VisitStreamsInTimeRange function with our
file-based visitor
- return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor,
intervalRule)
+ _, err = stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange,
visitor, segmentIntervalRule)
+ if err != nil {
+ return nil, err
+ }
+ return segmentSuffixes, nil
}
// countStreamParts counts the total number of parts in the given time range.
-func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange,
intervalRule storage.IntervalRule) (int, error) {
+func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange,
segmentInterval storage.IntervalRule) (int, []string, error) {
// Create a simple visitor to count parts
partCounter := &partCountVisitor{}
// Use the existing VisitStreamsInTimeRange function to count parts
- err := stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange,
partCounter, intervalRule)
+ segmentSuffixes, err := stream.VisitStreamsInTimeRange(tsdbRootPath,
timeRange, partCounter, segmentInterval)
if err != nil {
- return 0, err
+ return 0, nil, err
}
- return partCounter.partCount, nil
+ return partCounter.partCount, segmentSuffixes, nil
}
// partCountVisitor is a simple visitor that counts parts.
@@ -111,40 +101,29 @@ func (pcv *partCountVisitor) VisitElementIndex(_
*timestamp.TimeRange, _ common.
}
// migrateMeasureWithFileBasedAndProgress performs file-based measure
migration with progress tracking.
-func migrateMeasureWithFileBasedAndProgress(
- tsdbRootPath string,
- timeRange timestamp.TimeRange,
- group *commonv1.Group,
- nodeLabels map[string]string,
- nodes []*databasev1.Node,
- metadata metadata.Repo,
- logger *logger.Logger,
- progress *Progress,
- chunkSize int,
-) error {
- // Use parseGroup function to get sharding parameters and TTL
- shardNum, replicas, ttl, selector, client, err := parseGroup(group,
nodeLabels, nodes, logger, metadata)
- if err != nil {
- return err
- }
- defer client.GracefulStop()
-
- // Convert TTL to IntervalRule using storage.MustToIntervalRule
- intervalRule := storage.MustToIntervalRule(ttl)
+func migrateMeasureWithFileBasedAndProgress(tsdbRootPath string, timeRange
timestamp.TimeRange, group *GroupConfig,
+ logger *logger.Logger, progress *Progress, chunkSize int,
+) ([]string, error) {
+ // Convert segment interval to IntervalRule using
storage.MustToIntervalRule
+ segmentIntervalRule := storage.MustToIntervalRule(group.SegmentInterval)
// Get target stage configuration
targetStageInterval := getTargetStageInterval(group)
// Count total parts before starting migration
- totalParts, err := countMeasureParts(tsdbRootPath, timeRange,
intervalRule)
+ totalParts, segmentSuffixes, err := countMeasureParts(tsdbRootPath,
timeRange, segmentIntervalRule)
if err != nil {
logger.Warn().Err(err).Msg("failed to count measure parts,
proceeding without part count")
} else {
- logger.Info().Int("total_parts", totalParts).Msg("counted
measure parts for progress tracking")
+ logger.Info().Int("total_parts",
totalParts).Strs("segment_suffixes", segmentSuffixes).
+ Msg("counted measure parts for progress tracking")
}
// Create file-based migration visitor with progress tracking and
target stage interval
- visitor := newMeasureMigrationVisitor(group, shardNum, replicas,
selector, client, logger, progress, chunkSize, targetStageInterval)
+ visitor := newMeasureMigrationVisitor(
+ group.Group, group.TargetShardNum, group.TargetReplicas,
group.NodeSelector, group.QueueClient,
+ logger, progress, chunkSize, targetStageInterval,
+ )
defer visitor.Close()
// Set the total part count for progress tracking
@@ -153,21 +132,25 @@ func migrateMeasureWithFileBasedAndProgress(
}
// Use the existing VisitMeasuresInTimeRange function with our
file-based visitor
- return measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange,
visitor, intervalRule)
+ _, err = measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange,
visitor, segmentIntervalRule)
+ if err != nil {
+ return nil, err
+ }
+ return segmentSuffixes, nil
}
// countMeasureParts counts the total number of parts in the given time range.
-func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange,
intervalRule storage.IntervalRule) (int, error) {
+func countMeasureParts(tsdbRootPath string, timeRange timestamp.TimeRange,
segmentInterval storage.IntervalRule) (int, []string, error) {
// Create a simple visitor to count parts
partCounter := &measurePartCountVisitor{}
// Use the existing VisitMeasuresInTimeRange function to count parts
- err := measure.VisitMeasuresInTimeRange(tsdbRootPath, timeRange,
partCounter, intervalRule)
+ segmentSuffixes, err := measure.VisitMeasuresInTimeRange(tsdbRootPath,
timeRange, partCounter, segmentInterval)
if err != nil {
- return 0, err
+ return 0, nil, err
}
- return partCounter.partCount, nil
+ return partCounter.partCount, segmentSuffixes, nil
}
// measurePartCountVisitor is a simple visitor that counts measure parts.
diff --git a/banyand/backup/lifecycle/segment_boundary_utils.go
b/banyand/backup/lifecycle/segment_boundary_utils.go
index 243bb4a9..c3f0d6a1 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils.go
@@ -20,7 +20,6 @@ package lifecycle
import (
"time"
- commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
@@ -62,7 +61,7 @@ func getSegmentTimeRange(segmentStart time.Time, interval
storage.IntervalRule)
return timestamp.NewSectionTimeRange(segmentStart, segmentEnd)
}
-func getTargetStageInterval(group *commonv1.Group) storage.IntervalRule {
+func getTargetStageInterval(group *GroupConfig) storage.IntervalRule {
if group.ResourceOpts != nil && len(group.ResourceOpts.Stages) > 0 {
stage := group.ResourceOpts.Stages[0]
if stage.SegmentInterval != nil {
diff --git a/banyand/backup/lifecycle/service.go
b/banyand/backup/lifecycle/service.go
index afa77715..243893f0 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -30,13 +30,11 @@ import (
"github.com/benbjohnson/clock"
"github.com/robfig/cron/v3"
"google.golang.org/grpc"
- "google.golang.org/protobuf/types/known/timestamppb"
"github.com/apache/skywalking-banyandb/api/common"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
measurev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/measure/v1"
- modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
@@ -555,76 +553,67 @@ func (l *lifecycleService) getGroupsToProcess(ctx
context.Context, progress *Pro
func (l *lifecycleService) processStreamGroup(ctx context.Context, g
*commonv1.Group,
streamDir string, nodes []*databasev1.Node, labels map[string]string,
progress *Progress,
) {
- tr := l.getRemovalSegmentsTimeRange(g)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ if err != nil {
+ l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
+ return
+ }
+ defer group.Close()
+ tr := l.getRemovalSegmentsTimeRange(group)
if tr.Start.IsZero() && tr.End.IsZero() {
l.l.Info().Msgf("no removal segments time range for group %s,
skipping stream migration", g.Metadata.Name)
progress.MarkGroupCompleted(g.Metadata.Name)
return
}
- err := l.processStreamGroupFileBased(ctx, g, streamDir, tr, nodes,
labels, progress)
+ segmentSuffixes, err := l.processStreamGroupFileBased(ctx, group,
streamDir, tr, progress)
if err != nil {
l.l.Error().Err(err).Msgf("failed to migrate stream group %s
using file-based approach", g.Metadata.Name)
return
}
- l.l.Info().Msgf("deleting expired stream segments for group: %s",
g.Metadata.Name)
- l.deleteExpiredStreamSegments(ctx, g, tr, progress)
+ l.l.Info().Msgf("deleting expired stream segments for group: %s, time
range: %s", g.Metadata.Name, tr.String())
+ l.deleteExpiredStreamSegments(ctx, g, segmentSuffixes, progress)
progress.MarkGroupCompleted(g.Metadata.Name)
}
// processStreamGroupFileBased uses file-based migration instead of
element-based queries.
-func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g
*commonv1.Group,
- streamDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node,
labels map[string]string, progress *Progress,
-) error {
+func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g
*GroupConfig, streamDir string,
+ tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
if progress.IsStreamGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already completed file-based
migration for group: %s", g.Metadata.Name)
- return nil
+ return nil, nil
}
- l.l.Info().Msgf("starting file-based stream migration for group: %s",
g.Metadata.Name)
+ l.l.Info().Msgf("starting file-based stream migration for group: %s,
time range: %s", g.Metadata.Name, tr.String())
rootDir := filepath.Join(streamDir, g.Metadata.Name)
// skip the counting if the tsdb root path does not exist
// may no data found in the snapshot
if _, err := os.Stat(rootDir); err != nil && errors.Is(err,
os.ErrNotExist) {
l.l.Info().Msgf("skipping file-based stream migration for group
because is empty in the snapshot dir: %s", g.Metadata.Name)
- return nil
+ return nil, nil
}
// Use the file-based migration with existing visitor pattern
- err := migrateStreamWithFileBasedAndProgress(
- rootDir, // Use snapshot directory as source
- *tr, // Time range for segments to migrate
- g, // Group configuration
- labels, // Node labels
- nodes, // Target nodes
- l.metadata, // Metadata repository
- l.l, // Logger
- progress, // Progress tracking
- int(l.chunkSize), // Chunk size for streaming
- )
+ segmentSuffixes, err := migrateStreamWithFileBasedAndProgress(rootDir,
*tr, g, l.l, progress, int(l.chunkSize))
if err != nil {
- return fmt.Errorf("file-based stream migration failed: %w", err)
+ return nil, fmt.Errorf("file-based stream migration failed:
%w", err)
}
l.l.Info().Msgf("completed file-based stream migration for group: %s",
g.Metadata.Name)
- return nil
+ return segmentSuffixes, nil
}
// getRemovalSegmentsTimeRange calculates the time range for segments that
should be migrated
// based on the group's TTL configuration, similar to
storage.segmentController.getExpiredSegmentsTimeRange.
-func (l *lifecycleService) getRemovalSegmentsTimeRange(g *commonv1.Group)
*timestamp.TimeRange {
- if g.ResourceOpts == nil || g.ResourceOpts.Ttl == nil {
- l.l.Debug().Msgf("no TTL configured for group %s",
g.Metadata.Name)
- return ×tamp.TimeRange{} // Return empty time range
- }
-
+func (l *lifecycleService) getRemovalSegmentsTimeRange(g *GroupConfig)
*timestamp.TimeRange {
// Convert TTL to storage.IntervalRule
- ttl := storage.MustToIntervalRule(g.ResourceOpts.Ttl)
+ ttlRule := storage.MustToIntervalRule(g.AccumulatedTTL)
// Calculate deadline based on TTL (same logic as
segmentController.getExpiredSegmentsTimeRange)
- deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttl))
+ deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttlRule))
// Create time range for segments before the deadline
timeRange := ×tamp.TimeRange{
@@ -637,7 +626,7 @@ func (l *lifecycleService) getRemovalSegmentsTimeRange(g
*commonv1.Group) *times
l.l.Info().
Str("group", g.Metadata.Name).
Time("deadline", deadline).
- Str("ttl", fmt.Sprintf("%d %s", g.ResourceOpts.Ttl.Num,
g.ResourceOpts.Ttl.Unit.String())).
+ Str("ttl", fmt.Sprintf("%d %s", g.AccumulatedTTL.Num,
g.AccumulatedTTL.Unit)).
Msg("calculated removal segments time range based on TTL")
return timeRange
@@ -657,7 +646,7 @@ func (l *lifecycleService) calculateTTLDuration(ttl
storage.IntervalRule) time.D
}
}
-func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g
*commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
+func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
if progress.IsStreamGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already deleted stream group
segments: %s", g.Metadata.Name)
return
@@ -666,19 +655,16 @@ func (l *lifecycleService)
deleteExpiredStreamSegments(ctx context.Context, g *c
resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*streamv1.DeleteExpiredSegmentsResponse, error) {
client := streamv1.NewStreamServiceClient(conn)
return client.DeleteExpiredSegments(ctx,
&streamv1.DeleteExpiredSegmentsRequest{
- Group: g.Metadata.Name,
- TimeRange: &modelv1.TimeRange{
- Begin: timestamppb.New(tr.Start),
- End: timestamppb.New(tr.End),
- },
+ Group: g.Metadata.Name,
+ SegmentSuffixes: segmentSuffixes,
})
})
if err != nil {
- l.l.Error().Err(err).Msgf("failed to delete expired segments in
group %s", g.Metadata.Name)
+ l.l.Error().Err(err).Msgf("failed to delete expired segments in
group %s, segments: %s", g.Metadata.Name, segmentSuffixes)
return
}
- l.l.Info().Msgf("deleted %d expired segments in group %s",
resp.Deleted, g.Metadata.Name)
+ l.l.Info().Msgf("deleted %d expired segments in group %s, segments:
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
progress.MarkStreamGroupDeleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
@@ -686,7 +672,14 @@ func (l *lifecycleService) deleteExpiredStreamSegments(ctx
context.Context, g *c
func (l *lifecycleService) processMeasureGroup(ctx context.Context, g
*commonv1.Group, measureDir string,
nodes []*databasev1.Node, labels map[string]string, progress *Progress,
) {
- tr := l.getRemovalSegmentsTimeRange(g)
+ group, err := parseGroup(g, labels, nodes, l.l, l.metadata)
+ if err != nil {
+ l.l.Error().Err(err).Msgf("failed to parse group %s",
g.Metadata.Name)
+ return
+ }
+ defer group.Close()
+
+ tr := l.getRemovalSegmentsTimeRange(group)
if tr.Start.IsZero() && tr.End.IsZero() {
l.l.Info().Msgf("no removal segments time range for group %s,
skipping measure migration", g.Metadata.Name)
progress.MarkGroupCompleted(g.Metadata.Name)
@@ -695,24 +688,25 @@ func (l *lifecycleService) processMeasureGroup(ctx
context.Context, g *commonv1.
}
// Try file-based migration first
- if err := l.processMeasureGroupFileBased(ctx, g, measureDir, tr, nodes,
labels, progress); err != nil {
+ segmentSuffixes, err := l.processMeasureGroupFileBased(ctx, group,
measureDir, tr, progress)
+ if err != nil {
l.l.Error().Err(err).Msgf("failed to migrate measure group %s
using file-based approach", g.Metadata.Name)
return
}
l.l.Info().Msgf("deleting expired measure segments for group: %s",
g.Metadata.Name)
- l.deleteExpiredMeasureSegments(ctx, g, tr, progress)
+ l.deleteExpiredMeasureSegments(ctx, g, segmentSuffixes, progress)
progress.MarkGroupCompleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
// processMeasureGroupFileBased uses file-based migration instead of
query-based migration.
-func (l *lifecycleService) processMeasureGroupFileBased(_ context.Context, g
*commonv1.Group,
- measureDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node,
labels map[string]string, progress *Progress,
-) error {
+func (l *lifecycleService) processMeasureGroupFileBased(_ context.Context, g
*GroupConfig, measureDir string,
+ tr *timestamp.TimeRange, progress *Progress,
+) ([]string, error) {
if progress.IsMeasureGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already completed file-based measure
migration for group: %s", g.Metadata.Name)
- return nil
+ return nil, nil
}
l.l.Info().Msgf("starting file-based measure migration for group: %s",
g.Metadata.Name)
@@ -722,30 +716,20 @@ func (l *lifecycleService) processMeasureGroupFileBased(_
context.Context, g *co
// may no data found in the snapshot
if _, err := os.Stat(rootDir); err != nil && errors.Is(err,
os.ErrNotExist) {
l.l.Info().Msgf("skipping file-based measure migration for
group because is empty in the snapshot dir: %s", g.Metadata.Name)
- return nil
+ return nil, nil
}
// Use the file-based migration with existing visitor pattern
- err := migrateMeasureWithFileBasedAndProgress(
- rootDir, // Use snapshot directory as source
- *tr, // Time range for segments to migrate
- g, // Group configuration
- labels, // Node labels
- nodes, // Target nodes
- l.metadata, // Metadata repository
- l.l, // Logger
- progress, // Progress tracking
- int(l.chunkSize), // Chunk size for streaming
- )
+ segmentSuffixes, err := migrateMeasureWithFileBasedAndProgress(rootDir,
*tr, g, l.l, progress, int(l.chunkSize))
if err != nil {
- return fmt.Errorf("file-based measure migration failed: %w",
err)
+ return nil, fmt.Errorf("file-based measure migration failed:
%w", err)
}
l.l.Info().Msgf("completed file-based measure migration for group: %s",
g.Metadata.Name)
- return nil
+ return segmentSuffixes, nil
}
-func (l *lifecycleService) deleteExpiredMeasureSegments(ctx context.Context, g
*commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
+func (l *lifecycleService) deleteExpiredMeasureSegments(ctx context.Context, g
*commonv1.Group, segmentSuffixes []string, progress *Progress) {
if progress.IsMeasureGroupDeleted(g.Metadata.Name) {
l.l.Info().Msgf("skipping already deleted measure group
segments: %s", g.Metadata.Name)
return
@@ -754,19 +738,16 @@ func (l *lifecycleService)
deleteExpiredMeasureSegments(ctx context.Context, g *
resp, err := snapshot.Conn(l.gRPCAddr, l.enableTLS, l.insecure, l.cert,
func(conn *grpc.ClientConn) (*measurev1.DeleteExpiredSegmentsResponse, error) {
client := measurev1.NewMeasureServiceClient(conn)
return client.DeleteExpiredSegments(ctx,
&measurev1.DeleteExpiredSegmentsRequest{
- Group: g.Metadata.Name,
- TimeRange: &modelv1.TimeRange{
- Begin: timestamppb.New(tr.Start),
- End: timestamppb.New(tr.End),
- },
+ Group: g.Metadata.Name,
+ SegmentSuffixes: segmentSuffixes,
})
})
if err != nil {
- l.l.Error().Err(err).Msgf("failed to delete expired segments in
group %s", g.Metadata.Name)
+ l.l.Error().Err(err).Msgf("failed to delete expired segments in
group %s, suffixes: %s", g.Metadata.Name, segmentSuffixes)
return
}
- l.l.Info().Msgf("deleted %d expired segments in group %s",
resp.Deleted, g.Metadata.Name)
+ l.l.Info().Msgf("deleted %d expired segments in group %s, suffixes:
%s", resp.Deleted, g.Metadata.Name, segmentSuffixes)
progress.MarkMeasureGroupDeleted(g.Metadata.Name)
progress.Save(l.progressFilePath, l.l)
}
diff --git a/banyand/backup/lifecycle/steps.go
b/banyand/backup/lifecycle/steps.go
index 12beb51d..e54d8a49 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -22,6 +22,7 @@ import (
"os"
"github.com/pkg/errors"
+ "google.golang.org/protobuf/proto"
"github.com/apache/skywalking-banyandb/api/data"
commonv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
@@ -77,43 +78,70 @@ func (l *lifecycleService) getSnapshots(groups
[]*commonv1.Group, p *Progress) (
return streamDir, measureDir, nil
}
+// GroupConfig encapsulates the parsed lifecycle configuration for a Group.
+// It contains all necessary information for migration and deletion operations.
+type GroupConfig struct {
+ *commonv1.Group
+ NodeSelector node.Selector
+ QueueClient queue.Client
+ AccumulatedTTL *commonv1.IntervalRule
+ SegmentInterval *commonv1.IntervalRule
+ TargetShardNum uint32
+ TargetReplicas uint32
+}
+
+// Close releases resources held by the GroupConfig.
+func (gc *GroupConfig) Close() {
+ if gc.QueueClient != nil {
+ gc.QueueClient.GracefulStop()
+ }
+}
+
func parseGroup(g *commonv1.Group, nodeLabels map[string]string, nodes
[]*databasev1.Node,
l *logger.Logger, metadata metadata.Repo,
-) (uint32, uint32, *commonv1.IntervalRule, node.Selector, queue.Client, error)
{
+) (*GroupConfig, error) {
ro := g.ResourceOpts
if ro == nil {
- return 0, 0, nil, nil, nil, fmt.Errorf("no resource opts in
group %s", g.Metadata.Name)
+ return nil, fmt.Errorf("no resource opts in group %s",
g.Metadata.Name)
}
if len(ro.Stages) == 0 {
- return 0, 0, nil, nil, nil, fmt.Errorf("no stages in group %s",
g.Metadata.Name)
+ return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name)
}
+ ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+ segmentInterval :=
proto.Clone(ro.SegmentInterval).(*commonv1.IntervalRule)
var nst *commonv1.LifecycleStage
for i, st := range ro.Stages {
selector, err := pub.ParseLabelSelector(st.NodeSelector)
if err != nil {
- return 0, 0, nil, nil, nil, errors.WithMessagef(err,
"failed to parse node selector %s", st.NodeSelector)
+ return nil, errors.WithMessagef(err, "failed to parse
node selector %s", st.NodeSelector)
}
+ ttlTime.Num += st.Ttl.Num
if !selector.Matches(nodeLabels) {
continue
}
if i+1 >= len(ro.Stages) {
l.Info().Msgf("no next stage for group %s at stage %s",
g.Metadata.Name, st.Name)
- return 0, 0, nil, nil, nil, nil
+ return nil, nil
}
nst = ro.Stages[i+1]
- l.Info().Msgf("migrating group %s at stage %s to stage %s",
g.Metadata.Name, st.Name, nst.Name)
+ segmentInterval = st.SegmentInterval
+ l.Info().Msgf("migrating group %s at stage %s to stage %s,
segment interval: %d(%s), total ttl needs: %d(%s)",
+ g.Metadata.Name, st.Name, nst.Name,
segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num,
ttlTime.Unit.String())
break
}
if nst == nil {
nst = ro.Stages[0]
+ ttlTime = proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+ l.Info().Msgf("no matching stage for group %s, defaulting to
first stage %s segment interval: %d(%s), total ttl needs: %d(%s)",
+ g.Metadata.Name, nst.Name, segmentInterval.Num,
segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String())
}
nsl, err := pub.ParseLabelSelector(nst.NodeSelector)
if err != nil {
- return 0, 0, nil, nil, nil, errors.WithMessagef(err, "failed to
parse node selector %s", nst.NodeSelector)
+ return nil, errors.WithMessagef(err, "failed to parse node
selector %s", nst.NodeSelector)
}
nodeSel := node.NewRoundRobinSelector("", metadata)
if ok, _ := nodeSel.OnInit([]schema.Kind{schema.KindGroup}); !ok {
- return 0, 0, nil, nil, nil, fmt.Errorf("failed to initialize
node selector for group %s", g.Metadata.Name)
+ return nil, fmt.Errorf("failed to initialize node selector for
group %s", g.Metadata.Name)
}
client := pub.NewWithoutMetadata()
if g.Catalog == commonv1.Catalog_CATALOG_STREAM {
@@ -138,9 +166,17 @@ func parseGroup(g *commonv1.Group, nodeLabels
map[string]string, nodes []*databa
}
}
if !existed {
- return 0, 0, nil, nil, nil, errors.New("no nodes matched")
+ return nil, errors.New("no nodes matched")
}
- return nst.ShardNum, nst.Replicas, nst.Ttl, nodeSel, client, nil
+ return &GroupConfig{
+ Group: g,
+ TargetShardNum: nst.ShardNum,
+ TargetReplicas: nst.Replicas,
+ AccumulatedTTL: ttlTime,
+ SegmentInterval: segmentInterval,
+ NodeSelector: nodeSel,
+ QueueClient: client,
+ }, nil
}
type fileInfo struct {
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 8c164b03..b0609049 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -590,17 +590,36 @@ func (sc *segmentController[T, O])
getExpiredSegmentsTimeRange() *timestamp.Time
return timeRange
}
-func (sc *segmentController[T, O]) deleteExpiredSegments(timeRange
timestamp.TimeRange) int64 {
+func (sc *segmentController[T, O]) deleteExpiredSegments(segmentSuffixes
[]string) int64 {
deadline := time.Now().Local().Add(-sc.opts.TTL.estimatedDuration())
var count int64
ss, _ := sc.segments(false)
+ sc.l.Info().Str("segment_suffixes", fmt.Sprintf("%s", segmentSuffixes)).
+ Str("ttl", fmt.Sprintf("%d(%s)", sc.opts.TTL.Num,
sc.opts.TTL.Unit)).
+ Str("deadline", deadline.String()).
+ Int("total_segment_count", len(ss)).Msg("deleting expired
segments")
+ shouldDeleteSuffixes := make(map[string]bool)
+ for _, s := range segmentSuffixes {
+ shouldDeleteSuffixes[s] = true
+ }
for _, s := range ss {
- if s.Before(deadline) && s.Overlapping(timeRange) {
+ if shouldDeleteSuffixes[s.suffix] {
+ sc.l.Info().Str("suffix", s.suffix).
+ Str("deadline", deadline.String()).
+ Str("segment_name", s.String()).
+ Str("segment_time_range",
s.GetTimeRange().String()).
+ Msg("deleting an expired segment")
s.delete()
sc.Lock()
sc.removeSeg(s.id)
sc.Unlock()
count++
+ } else {
+ sc.l.Info().Str("suffix", s.suffix).
+ Str("deadline", deadline.String()).
+ Str("segment_name", s.String()).
+ Str("segment_time_range",
s.GetTimeRange().String()).
+ Msg("segment is not expired or not in the time
range, skipping deletion")
}
s.DecRef()
}
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 1fc8691a..b7cd892d 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -619,12 +619,11 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t
*testing.T) {
// Now delete expired segments
// Get the time range for segments 0, 1, and 2 (the expired ones)
- timeRange := timestamp.NewInclusiveTimeRange(
- segments[0].Start,
- segments[2].End,
- )
-
- deletedCount := sc.deleteExpiredSegments(timeRange)
+ deletedCount := sc.deleteExpiredSegments([]string{
+ time.Now().AddDate(0, 0, -6).Format(dayFormat),
+ time.Now().AddDate(0, 0, -5).Format(dayFormat),
+ time.Now().AddDate(0, 0, -4).Format(dayFormat),
+ })
assert.Equal(t, int64(3), deletedCount, "Should have deleted 3 expired
segments")
// Verify segment controller's segment list
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index 2dd40331..e4cb4d5b 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -113,7 +113,7 @@ type TSDB[T TSTable, O any] interface {
UpdateOptions(opts *commonv1.ResourceOpts)
TakeFileSnapshot(dst string) error
GetExpiredSegmentsTimeRange() *timestamp.TimeRange
- DeleteExpiredSegments(timeRange timestamp.TimeRange) int64
+ DeleteExpiredSegments(segmentSuffixes []string) int64
// PeekOldestSegmentEndTime returns the end time of the oldest segment.
// Returns the zero time and false if no segments exist or retention
gate cannot be acquired.
PeekOldestSegmentEndTime() (time.Time, bool)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 66d9f129..a047bb7b 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -300,8 +300,8 @@ func (d *database[T, O]) GetExpiredSegmentsTimeRange()
*timestamp.TimeRange {
return d.segmentController.getExpiredSegmentsTimeRange()
}
-func (d *database[T, O]) DeleteExpiredSegments(timeRange timestamp.TimeRange)
int64 {
- return d.segmentController.deleteExpiredSegments(timeRange)
+func (d *database[T, O]) DeleteExpiredSegments(segmentSuffixes []string) int64
{
+ return d.segmentController.deleteExpiredSegments(segmentSuffixes)
}
// PeekOldestSegmentEndTime returns the end time of the oldest segment.
diff --git a/banyand/internal/storage/visitor.go
b/banyand/internal/storage/visitor.go
index 40e14e1a..2a058b1e 100644
--- a/banyand/internal/storage/visitor.go
+++ b/banyand/internal/storage/visitor.go
@@ -25,9 +25,12 @@ import (
"github.com/pkg/errors"
"github.com/apache/skywalking-banyandb/api/common"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
+var log = logger.GetLogger("storage", "visitor")
+
// SegmentVisitor defines the interface for visiting segment components.
type SegmentVisitor interface {
// VisitSeries visits the series index directory for a segment.
@@ -39,24 +42,32 @@ type SegmentVisitor interface {
// VisitSegmentsInTimeRange traverses segments within the specified time range
// and calls the visitor methods for series index and shard directories.
// This function works directly with the filesystem without requiring a
database instance.
-func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor SegmentVisitor, intervalRule IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor SegmentVisitor, segmentInterval IntervalRule)
([]string, error) {
// Parse segment directories in the root path
var segmentPaths []segmentInfo
err := walkDir(tsdbRootPath, segPathPrefix, func(suffix string) error {
- startTime, err := parseSegmentTime(suffix, intervalRule.Unit)
+ startTime, err := parseSegmentTime(suffix, segmentInterval.Unit)
if err != nil {
return err
}
// Calculate end time based on interval rule
- endTime := intervalRule.NextTime(startTime)
+ endTime := segmentInterval.NextTime(startTime)
segTR := timestamp.NewSectionTimeRange(startTime, endTime)
// Check if segment is completely included in the requested
time range
+ logEntry := log.Info().Str("segment_suffix", suffix).
+ Str("tsdb_root_path", tsdbRootPath).
+ Str("segment_interval_rule", fmt.Sprintf("%d(%s)",
segmentInterval.Num, segmentInterval.Unit)).
+ Str("total_time_range", timeRange.String()).
+ Str("segment_time_range", segTR.String())
if !timeRange.Include(segTR) {
+ logEntry.Msg("segment time range is not included")
return nil // Skip segments not fully contained in the
time range
}
+ logEntry.Msg("segment time range is include")
segmentPath := filepath.Join(tsdbRootPath,
fmt.Sprintf(segTemplate, suffix))
segmentPaths = append(segmentPaths, segmentInfo{
path: segmentPath,
@@ -66,30 +77,36 @@ func VisitSegmentsInTimeRange(tsdbRootPath string,
timeRange timestamp.TimeRange
return nil
})
if err != nil {
- return errors.Wrap(err, "failed to walk segment directories")
+ return nil, errors.Wrap(err, "failed to walk segment
directories")
}
+ // Collect segment suffixes to return
+ var visitedSuffixes []string
+
// Visit each matching segment
for _, segInfo := range segmentPaths {
// Collect shard IDs for this segment
shardIDs, err := collectSegmentShardIDs(segInfo.path)
if err != nil {
- return errors.Wrapf(err, "failed to collect shard IDs
for segment %s", segInfo.suffix)
+ return nil, errors.Wrapf(err, "failed to collect shard
IDs for segment %s", segInfo.suffix)
}
// Visit series index directory
seriesIndexPath := filepath.Join(segInfo.path,
seriesIndexDirName)
if err := visitor.VisitSeries(&segInfo.timeRange,
seriesIndexPath, shardIDs); err != nil {
- return errors.Wrapf(err, "failed to visit series index
for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix,
segInfo.path, segInfo.timeRange)
+ return nil, errors.Wrapf(err, "failed to visit series
index for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix,
segInfo.path, segInfo.timeRange)
}
// Visit shard directories
if err := visitSegmentShards(segInfo.path, &segInfo.timeRange,
visitor); err != nil {
- return errors.Wrapf(err, "failed to visit shards for
segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix, segInfo.path,
segInfo.timeRange)
+ return nil, errors.Wrapf(err, "failed to visit shards
for segment (suffix: %s, path: %s, timeRange: %v)", segInfo.suffix,
segInfo.path, segInfo.timeRange)
}
+
+ // Add visited suffix to result
+ visitedSuffixes = append(visitedSuffixes, segInfo.suffix)
}
- return nil
+ return visitedSuffixes, nil
}
// segmentInfo holds information about a segment directory.
diff --git a/banyand/internal/storage/visitor_test.go
b/banyand/internal/storage/visitor_test.go
index b9b6fc3a..1c1c7fb6 100644
--- a/banyand/internal/storage/visitor_test.go
+++ b/banyand/internal/storage/visitor_test.go
@@ -135,8 +135,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
visitor := NewTestVisitor()
timeRange := timestamp.NewSectionTimeRange(ts,
ts.Add(24*time.Hour))
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, opts.SegmentInterval)
require.NoError(t, err)
+ require.Len(t, suffixes, 1)
// Verify series index was visited
require.Len(t, visitor.visitedSeries, 1)
@@ -202,8 +203,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
visitor := NewTestVisitor()
timeRange := timestamp.NewSectionTimeRange(baseDate,
baseDate.AddDate(0, 0, 3))
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, opts.SegmentInterval)
require.NoError(t, err)
+ require.Len(t, suffixes, 3)
// Verify all series indices were visited
require.Len(t, visitor.visitedSeries, 3)
@@ -285,8 +287,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
endTime := baseDate.AddDate(0, 0, 4) // 2024-05-05 (exclusive)
timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, opts.SegmentInterval)
require.NoError(t, err)
+ require.Len(t, suffixes, 3)
// Verify only middle segments were visited (3 segments)
require.Len(t, visitor.visitedSeries, 3)
@@ -311,8 +314,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
timeRange := timestamp.NewSectionTimeRange(time.Now(),
time.Now().Add(24*time.Hour))
intervalRule := IntervalRule{Unit: DAY, Num: 1}
- err := VisitSegmentsInTimeRange(dir, timeRange, visitor,
intervalRule)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, intervalRule)
require.NoError(t, err)
+ require.Len(t, suffixes, 0)
// Verify nothing was visited
require.Len(t, visitor.visitedSeries, 0)
@@ -363,7 +367,7 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
timeRange := timestamp.NewSectionTimeRange(ts,
ts.Add(24*time.Hour))
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ _, err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
require.Error(t, err)
require.Contains(t, err.Error(), "series access error")
require.Contains(t, err.Error(), "failed to visit series index
for segment")
@@ -414,7 +418,7 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
timeRange := timestamp.NewSectionTimeRange(ts,
ts.Add(24*time.Hour))
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ _, err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
require.Error(t, err)
require.Contains(t, err.Error(), "shard access error")
require.Contains(t, err.Error(), "failed to visit shards for
segment")
@@ -470,8 +474,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
visitor := NewTestVisitor()
timeRange := timestamp.NewSectionTimeRange(baseTime,
baseTime.Add(3*time.Hour))
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, opts.SegmentInterval)
require.NoError(t, err)
+ require.Len(t, suffixes, 3)
// Verify all series indices were visited
require.Len(t, visitor.visitedSeries, 3)
@@ -540,8 +545,9 @@ func TestVisitSegmentsInTimeRange(t *testing.T) {
tsPlus1Min, err := time.ParseInLocation("2006-01-02 15:04:05",
"2025-11-01 00:01:00", time.Local)
require.NoError(t, err)
timeRange := timestamp.NewSectionTimeRange(time.Time{},
tsPlus1Min)
- err = VisitSegmentsInTimeRange(dir, timeRange, visitor,
opts.SegmentInterval)
+ suffixes, err := VisitSegmentsInTimeRange(dir, timeRange,
visitor, opts.SegmentInterval)
require.NoError(t, err)
+ require.Len(t, suffixes, 1)
require.Len(t, visitor.visitedSeries, 1)
// Should only oct 31 be including
expectedSeriesPath := filepath.Join(dir, "seg-20251031",
seriesIndexDirName)
diff --git a/banyand/measure/svc_data.go b/banyand/measure/svc_data.go
index 3ab3e56c..86358908 100644
--- a/banyand/measure/svc_data.go
+++ b/banyand/measure/svc_data.go
@@ -516,6 +516,6 @@ func (d *dataDeleteStreamSegmentsListener) Rev(_
context.Context, message bus.Me
d.s.l.Error().Err(err).Str("group", req.Group).Msg("failed to
load tsdb")
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
int64(0))
}
- deleted :=
db.DeleteExpiredSegments(timestamp.NewSectionTimeRange(req.TimeRange.Begin.AsTime(),
req.TimeRange.End.AsTime()))
+ deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
}
diff --git a/banyand/measure/visitor.go b/banyand/measure/visitor.go
index 44edd1ee..3c011b87 100644
--- a/banyand/measure/visitor.go
+++ b/banyand/measure/visitor.go
@@ -40,7 +40,7 @@ type measureSegmentVisitor struct {
visitor Visitor
}
-// VisitSeries implements Visitor.
+// VisitSeries implements storage.SegmentVisitor.
func (mv *measureSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange,
seriesIndexPath string, shardIDs []common.ShardID) error {
return mv.visitor.VisitSeries(segmentTR, seriesIndexPath, shardIDs)
}
@@ -84,7 +84,8 @@ func (mv *measureSegmentVisitor) visitShardParts(segmentTR
*timestamp.TimeRange,
// VisitMeasuresInTimeRange traverses measure segments within the specified
time range
// and calls the visitor methods for parts within shards.
// This function works directly with the filesystem without requiring a
database instance.
-func VisitMeasuresInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitMeasuresInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor Visitor, segmentInterval storage.IntervalRule)
([]string, error) {
adapter := &measureSegmentVisitor{visitor: visitor}
- return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange,
adapter, intervalRule)
+ return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange,
adapter, segmentInterval)
}
diff --git a/banyand/stream/svc_standalone.go b/banyand/stream/svc_standalone.go
index 7b41a194..bfb2e54a 100644
--- a/banyand/stream/svc_standalone.go
+++ b/banyand/stream/svc_standalone.go
@@ -341,6 +341,6 @@ func (d *deleteStreamSegmentsListener) Rev(_
context.Context, message bus.Messag
d.s.l.Error().Err(err).Str("group", req.Group).Msg("failed to
load tsdb")
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()),
int64(0))
}
- deleted :=
db.DeleteExpiredSegments(timestamp.NewSectionTimeRange(req.TimeRange.Begin.AsTime(),
req.TimeRange.End.AsTime()))
+ deleted := db.DeleteExpiredSegments(req.SegmentSuffixes)
return bus.NewMessage(bus.MessageID(time.Now().UnixNano()), deleted)
}
diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go
index 4959e604..19ab1d58 100644
--- a/banyand/stream/visitor.go
+++ b/banyand/stream/visitor.go
@@ -97,7 +97,8 @@ func (sv *streamSegmentVisitor)
visitShardElementIndex(segmentTR *timestamp.Time
// VisitStreamsInTimeRange traverses stream segments within the specified time
range
// and calls the visitor methods for series index, parts, and element indexes.
// This function works directly with the filesystem without requiring a
database instance.
-func VisitStreamsInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error {
+// Returns a list of segment suffixes that were visited.
+func VisitStreamsInTimeRange(tsdbRootPath string, timeRange
timestamp.TimeRange, visitor Visitor, segmentInterval storage.IntervalRule)
([]string, error) {
adapter := &streamSegmentVisitor{visitor: visitor}
- return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange,
adapter, intervalRule)
+ return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange,
adapter, segmentInterval)
}
diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go
index 3d489088..687ac69f 100644
--- a/banyand/stream/visitor_test.go
+++ b/banyand/stream/visitor_test.go
@@ -104,8 +104,9 @@ func TestVisitStreamsInTimeRange(t *testing.T) {
}
intervalRule := storage.IntervalRule{Unit: storage.HOUR, Num: 1}
- err = VisitStreamsInTimeRange(tmpDir, timeRange, visitor, intervalRule)
+ suffixes, err := VisitStreamsInTimeRange(tmpDir, timeRange, visitor,
intervalRule)
require.NoError(t, err)
+ assert.NotEmpty(t, suffixes)
// Verify visits occurred
assert.Len(t, visitor.visitedSeries, 1)
diff --git a/docs/api-reference.md b/docs/api-reference.md
index ab7fb8ba..f18bad4d 100644
--- a/docs/api-reference.md
+++ b/docs/api-reference.md
@@ -4299,7 +4299,7 @@ WriteResponse is the response contract for write
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| group | [string](#string) | | |
-| time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |
| |
+| segment_suffixes | [string](#string) | repeated | |
@@ -4761,7 +4761,7 @@ WriteResponse is the response contract for write
| Field | Type | Label | Description |
| ----- | ---- | ----- | ----------- |
| group | [string](#string) | | |
-| time_range | [banyandb.model.v1.TimeRange](#banyandb-model-v1-TimeRange) |
| |
+| segment_suffixes | [string](#string) | repeated | |