This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit dd02d125d0d4ffbcecbc7256a5e0d2077802ac16 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jul 30 15:55:53 2025 +0800 Add file-based stream migration implementation - Introduced `file_migration_integration.go` to handle file-based stream migration using a visitor pattern. - Created `file_migration_visitor.go` to define the `MigrationVisitor` struct and its methods for processing stream parts. - Updated `progress.go` to track progress at the part level for stream migrations. - Refactored `service.go` to utilize the new file-based migration approach for stream groups. --- .../backup/lifecycle/file_migration_integration.go | 126 +++++++ banyand/backup/lifecycle/file_migration_visitor.go | 410 +++++++++++++++++++++ .../lifecycle/file_migration_visitor_test.go | 161 ++++++++ banyand/backup/lifecycle/progress.go | 195 +++++++--- banyand/backup/lifecycle/progress_test.go | 9 - banyand/backup/lifecycle/service.go | 193 +++++----- lifecycle | Bin 0 -> 38228265 bytes 7 files changed, 929 insertions(+), 165 deletions(-) diff --git a/banyand/backup/lifecycle/file_migration_integration.go b/banyand/backup/lifecycle/file_migration_integration.go new file mode 100644 index 00000000..35c00482 --- /dev/null +++ b/banyand/backup/lifecycle/file_migration_integration.go @@ -0,0 +1,126 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "github.com/apache/skywalking-banyandb/api/common" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/stream" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// MigrateStreamWithFileBased performs file-based stream migration using the visitor pattern. +func MigrateStreamWithFileBased( + tsdbRootPath string, + timeRange timestamp.TimeRange, + group *commonv1.Group, + nodeLabels map[string]string, + nodes []*databasev1.Node, + metadata metadata.Repo, + intervalRule storage.IntervalRule, + logger *logger.Logger, + chunkSize int, +) error { + // Create file-based migration visitor + visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, metadata, logger, nil, "", chunkSize) + if err != nil { + return err + } + defer visitor.Close() + + // Use the existing VisitStreamsInTimeRange function with our file-based visitor + return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor, intervalRule) +} + +// MigrateStreamWithFileBasedAndProgress performs file-based stream migration with progress tracking. +func MigrateStreamWithFileBasedAndProgress( + tsdbRootPath string, + timeRange timestamp.TimeRange, + group *commonv1.Group, + nodeLabels map[string]string, + nodes []*databasev1.Node, + metadata metadata.Repo, + intervalRule storage.IntervalRule, + logger *logger.Logger, + progress *Progress, + streamName string, + chunkSize int, +) error { + // Count total parts for this stream before starting migration + totalParts, err := countStreamParts(tsdbRootPath, timeRange, streamName, intervalRule) + if err != nil { + logger.Warn().Err(err).Str("stream", streamName).Msg("failed to count stream parts, proceeding without part count") + } else { + logger.Info().Str("stream", streamName).Int("total_parts", totalParts).Msg("counted stream parts for progress tracking") + } + + // Create file-based migration visitor with progress tracking + visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, metadata, logger, progress, streamName, chunkSize) + if err != nil { + return err + } + defer visitor.Close() + + // Set the total part count for progress tracking + if totalParts > 0 { + visitor.SetStreamPartCount(totalParts) + } + + // Use the existing VisitStreamsInTimeRange function with our file-based visitor + return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor, intervalRule) +} + +// countStreamParts counts the total number of parts for a specific stream in the given time range. +func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, streamName string, intervalRule storage.IntervalRule) (int, error) { + // Create a simple visitor to count parts + partCounter := &partCountVisitor{streamName: streamName} + + // Use the existing VisitStreamsInTimeRange function to count parts + err := stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, partCounter, intervalRule) + if err != nil { + return 0, err + } + + return partCounter.partCount, nil +} + +// partCountVisitor is a simple visitor that counts parts for a specific stream. +type partCountVisitor struct { + streamName string + partCount int +} + +// VisitSeries implements stream.Visitor. +func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string) error { + return nil +} + +// VisitPart implements stream.Visitor. +func (pcv *partCountVisitor) VisitPart(_ *timestamp.TimeRange, _ common.ShardID, _ string) error { + pcv.partCount++ + return nil +} + +// VisitElementIndex implements stream.Visitor. +func (pcv *partCountVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ common.ShardID, _ string) error { + return nil +} diff --git a/banyand/backup/lifecycle/file_migration_visitor.go b/banyand/backup/lifecycle/file_migration_visitor.go new file mode 100644 index 00000000..a4a9ae41 --- /dev/null +++ b/banyand/backup/lifecycle/file_migration_visitor.go @@ -0,0 +1,410 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "context" + "encoding/json" + "fmt" + "path/filepath" + "strconv" + "strings" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/api/data" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/banyand/metadata" + "github.com/apache/skywalking-banyandb/banyand/queue" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/node" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +const ( + // Stream file names from banyand/stream/syncer.go. + streamMetaName = "meta" + streamPrimaryName = "primary" + streamTimestampsName = "timestamps" + streamTagFamiliesPrefix = "tag_families_" + streamTagMetadataPrefix = "tag_metadata_" + streamTagFilterPrefix = "tag_filter_" +) + +// MigrationVisitor implements the stream.Visitor interface for file-based migration. +type MigrationVisitor struct { + selector node.Selector // From parseGroup - node selector + client queue.Client // From parseGroup - queue client + chunkedClients map[string]queue.ChunkedSyncClient // Per-node chunked sync clients cache + logger *logger.Logger + progress *Progress // Progress tracker for migration states + group string + streamName string + targetShardNum uint32 // From parseGroup - target shard count + replicas uint32 // From parseGroup - replica count + chunkSize int // Chunk size for streaming data +} + +// partMetadata matches the structure in banyand/stream/part_metadata.go. +type partMetadata struct { + CompressedSizeBytes uint64 `json:"compressedSizeBytes"` + UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"` + TotalCount uint64 `json:"totalCount"` + BlocksCount uint64 `json:"blocksCount"` + MinTimestamp int64 `json:"minTimestamp"` + MaxTimestamp int64 `json:"maxTimestamp"` +} + +// NewMigrationVisitor creates a new file-based migration visitor. +func NewMigrationVisitor(group *commonv1.Group, nodeLabels map[string]string, + nodes []*databasev1.Node, metadata metadata.Repo, l *logger.Logger, + progress *Progress, streamName string, chunkSize int, +) (*MigrationVisitor, error) { + // Use existing parseGroup function to get sharding parameters + shardNum, replicas, selector, client, err := parseGroup(group, nodeLabels, nodes, l, metadata) + if err != nil { + return nil, err + } + + return &MigrationVisitor{ + group: group.Metadata.Name, + targetShardNum: shardNum, + replicas: replicas, + selector: selector, + client: client, + chunkedClients: make(map[string]queue.ChunkedSyncClient), + logger: l, + progress: progress, + streamName: streamName, + chunkSize: chunkSize, + }, nil +} + +// VisitSeries implements stream.Visitor. +func (mv *MigrationVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { + // TODO: Implement series index migration if needed + mv.logger.Debug(). + Str("path", seriesIndexPath). + Msg("skipping series index migration (not implemented)") + return nil +} + +// VisitPart implements stream.Visitor - core migration logic. +func (mv *MigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardID common.ShardID, partPath string) error { + // Extract part ID from path for progress tracking + partID, err := mv.extractPartIDFromPath(partPath) + if err != nil { + errorMsg := fmt.Sprintf("failed to extract part ID from path %s: %v", partPath, err) + mv.progress.MarkStreamPartError(mv.group, mv.streamName, 0, errorMsg) + return fmt.Errorf("failed to extract part ID from path %s: %w", partPath, err) + } + + // Check if this part has already been completed + if mv.progress.IsStreamPartCompleted(mv.group, mv.streamName, partID) { + mv.logger.Debug(). + Uint64("part_id", partID). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("part already completed, skipping") + return nil + } + + // Calculate target shard ID based on source shard ID mapping + targetShardID := mv.calculateTargetShardID(uint32(sourceShardID)) + + mv.logger.Info(). + Uint64("part_id", partID). + Uint32("source_shard", uint32(sourceShardID)). + Uint32("target_shard", targetShardID). + Str("part_path", partPath). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("migrating part") + + // Create file readers for the entire part (similar to syncer.go:79-132) + files, release := mv.createPartFileReaders(partPath) + defer release() + + // Stream entire part to target shard replicas + if err := mv.streamPartToTargetShard(targetShardID, files, partPath, partID); err != nil { + errorMsg := fmt.Sprintf("failed to stream part to target shard: %v", err) + mv.progress.MarkStreamPartError(mv.group, mv.streamName, partID, errorMsg) + return fmt.Errorf("failed to stream part to target shard: %w", err) + } + + // Mark part as completed in progress tracker + mv.progress.MarkStreamPartCompleted(mv.group, mv.streamName, partID) + + mv.logger.Info(). + Uint64("part_id", partID). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("completed_parts", mv.progress.GetStreamPartProgress(mv.group, mv.streamName)). + Int("total_parts", mv.progress.GetStreamPartCount(mv.group, mv.streamName)). + Msg("part migration completed successfully") + + return nil +} + +// VisitElementIndex implements stream.Visitor. +func (mv *MigrationVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ common.ShardID, indexPath string) error { + // TODO: Implement element index migration if needed + mv.logger.Debug(). + Str("path", indexPath). + Msg("skipping element index migration (not implemented)") + return nil +} + +// calculateTargetShardID maps source shard ID to target shard ID. +func (mv *MigrationVisitor) calculateTargetShardID(sourceShardID uint32) uint32 { + // Simple modulo-based mapping from source shard to target shard + // This ensures deterministic and balanced distribution + return sourceShardID % mv.targetShardNum +} + +// streamPartToTargetShard sends part data to all replicas of the target shard. +func (mv *MigrationVisitor) streamPartToTargetShard(targetShardID uint32, files []queue.FileInfo, partPath string, partID uint64) error { + copies := mv.replicas + 1 + + // Send to all replicas using the exact pattern from steps.go:219-236 + for replicaID := uint32(0); replicaID < copies; replicaID++ { + // Use selector.Pick exactly like steps.go:220 + nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, replicaID) + if err != nil { + return fmt.Errorf("failed to pick node for shard %d replica %d: %w", targetShardID, replicaID, err) + } + + // Stream part data to target node using chunked sync + if err := mv.streamPartToNode(nodeID, targetShardID, files, partPath, partID); err != nil { + return fmt.Errorf("failed to stream part to node %s: %w", nodeID, err) + } + } + + return nil +} + +// streamPartToNode streams part data to a specific target node. +func (mv *MigrationVisitor) streamPartToNode(nodeID string, targetShardID uint32, files []queue.FileInfo, partPath string, partID uint64) error { + // Get or create chunked client for this node (cache hit optimization) + chunkedClient, exists := mv.chunkedClients[nodeID] + if !exists { + var err error + // Create new chunked sync client via queue.Client + chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, uint32(mv.chunkSize)) + if err != nil { + return fmt.Errorf("failed to create chunked sync client for node %s: %w", nodeID, err) + } + mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse + } + + // Create streaming part data from the part files + streamingParts, err := mv.createStreamingPartFromFiles(targetShardID, files, partPath) + if err != nil { + return fmt.Errorf("failed to create streaming parts: %w", err) + } + + // Stream using chunked transfer (same as syncer.go:202) + ctx := context.Background() + result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts) + if err != nil { + return fmt.Errorf("failed to sync streaming parts to node %s: %w", nodeID, err) + } + + if !result.Success { + return fmt.Errorf("chunked sync partially failed: %v", result.ErrorMessage) + } + + // Log success metrics (same pattern as syncer.go:210-217) + mv.logger.Info(). + Str("node", nodeID). + Str("session", result.SessionID). + Uint64("bytes", result.TotalBytes). + Int64("duration_ms", result.DurationMs). + Uint32("chunks", result.ChunksCount). + Uint32("parts", result.PartsCount). + Uint32("target_shard", targetShardID). + Uint64("part_id", partID). + Str("part_path", partPath). + Str("stream", mv.streamName). + Str("group", mv.group). + Msg("file-based migration part completed successfully") + + return nil +} + +// createStreamingPartFromFiles creates StreamingPartData from part files and metadata. +func (mv *MigrationVisitor) createStreamingPartFromFiles(targetShardID uint32, files []queue.FileInfo, partPath string) ([]queue.StreamingPartData, error) { + // Calculate part metadata from files + partID, err := mv.extractPartIDFromPath(partPath) + if err != nil { + return nil, fmt.Errorf("failed to extract part ID from path %s: %w", partPath, err) + } + + // Extract metadata from metadata.json file in the part directory + metadata, err := mv.readPartMetadata(partPath) + if err != nil { + return nil, fmt.Errorf("failed to read part metadata: %w", err) + } + + partData := queue.StreamingPartData{ + ID: partID, + Group: mv.group, + ShardID: targetShardID, // Use calculated target shard + Topic: data.TopicStreamPartSync.String(), + Files: files, + CompressedSizeBytes: metadata.CompressedSizeBytes, + UncompressedSizeBytes: metadata.UncompressedSizeBytes, + TotalCount: metadata.TotalCount, + BlocksCount: metadata.BlocksCount, + MinTimestamp: metadata.MinTimestamp, + MaxTimestamp: metadata.MaxTimestamp, + } + + return []queue.StreamingPartData{partData}, nil +} + +// createPartFileReaders creates file readers for all files in a part. +func (mv *MigrationVisitor) createPartFileReaders(partPath string) ([]queue.FileInfo, func()) { + // Read part files using the same pattern as createPartFileReaders in syncer.go:79-132 + lfs := fs.NewLocalFileSystem() + var files []queue.FileInfo + + // Read stream metadata file + metaFilePath := filepath.Join(partPath, streamMetaName) + if file, err := lfs.OpenFile(metaFilePath); err == nil { + files = append(files, queue.FileInfo{ + Name: streamMetaName, + Reader: file.SequentialRead(), + }) + } + + // Read primary data file + primaryFilePath := filepath.Join(partPath, streamPrimaryName) + if file, err := lfs.OpenFile(primaryFilePath); err == nil { + files = append(files, queue.FileInfo{ + Name: streamPrimaryName, + Reader: file.SequentialRead(), + }) + } + + // Read timestamps file + timestampsFilePath := filepath.Join(partPath, streamTimestampsName) + if file, err := lfs.OpenFile(timestampsFilePath); err == nil { + files = append(files, queue.FileInfo{ + Name: streamTimestampsName, + Reader: file.SequentialRead(), + }) + } + + // Read tag family files (similar to syncer.go:106-127) + entries := lfs.ReadDir(partPath) + for _, entry := range entries { + if !entry.IsDir() { + name := entry.Name() + if strings.HasPrefix(name, streamTagFamiliesPrefix) || + strings.HasPrefix(name, streamTagMetadataPrefix) || + strings.HasPrefix(name, streamTagFilterPrefix) { + filePath := filepath.Join(partPath, name) + if file, err := lfs.OpenFile(filePath); err == nil { + files = append(files, queue.FileInfo{ + Name: name, + Reader: file.SequentialRead(), + }) + } + } + } + } + + // Return cleanup function to close all readers + releaseFunc := func() { + for _, file := range files { + if file.Reader != nil { + file.Reader.Close() + } + } + } + + return files, releaseFunc +} + +// readPartMetadata reads and parses the metadata.json file from a part directory. +func (mv *MigrationVisitor) readPartMetadata(partPath string) (*partMetadata, error) { + lfs := fs.NewLocalFileSystem() + metadataPath := filepath.Join(partPath, "metadata.json") + + // Read metadata.json file + data, err := lfs.Read(metadataPath) + if err != nil { + return nil, fmt.Errorf("cannot read metadata.json: %w", err) + } + + // Unmarshal JSON data + var metadata partMetadata + if err := json.Unmarshal(data, &metadata); err != nil { + return nil, fmt.Errorf("cannot parse metadata.json: %w", err) + } + + // Validate metadata (same validation as part_metadata.go:78-80) + if metadata.MinTimestamp > metadata.MaxTimestamp { + return nil, fmt.Errorf("invalid metadata: MinTimestamp (%d) cannot exceed MaxTimestamp (%d)", + metadata.MinTimestamp, metadata.MaxTimestamp) + } + + return &metadata, nil +} + +// extractPartIDFromPath extracts the part ID from the part directory path. +func (mv *MigrationVisitor) extractPartIDFromPath(partPath string) (uint64, error) { + // Extract the 16-character hex part ID from the path + // Part paths typically end with the hex part ID directory + partName := filepath.Base(partPath) + if len(partName) != 16 { + return 0, fmt.Errorf("invalid part path format: %s", partPath) + } + + partID, err := strconv.ParseUint(partName, 16, 64) + if err != nil { + return 0, fmt.Errorf("failed to parse part ID from %s: %w", partName, err) + } + + return partID, nil +} + +// Close cleans up all chunked sync clients. +func (mv *MigrationVisitor) Close() error { + for nodeID, client := range mv.chunkedClients { + if err := client.Close(); err != nil { + mv.logger.Warn().Err(err).Str("node", nodeID).Msg("failed to close chunked sync client") + } + } + mv.chunkedClients = make(map[string]queue.ChunkedSyncClient) + return nil +} + +// SetStreamPartCount sets the total number of parts for the current stream. +func (mv *MigrationVisitor) SetStreamPartCount(totalParts int) { + if mv.progress != nil { + mv.progress.SetStreamPartCount(mv.group, mv.streamName, totalParts) + mv.logger.Info(). + Str("stream", mv.streamName). + Str("group", mv.group). + Int("total_parts", totalParts). + Msg("set stream part count for progress tracking") + } +} diff --git a/banyand/backup/lifecycle/file_migration_visitor_test.go b/banyand/backup/lifecycle/file_migration_visitor_test.go new file mode 100644 index 00000000..525f676c --- /dev/null +++ b/banyand/backup/lifecycle/file_migration_visitor_test.go @@ -0,0 +1,161 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" + databasev1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1" + "github.com/apache/skywalking-banyandb/pkg/logger" +) + +func TestMigrationVisitor_calculateTargetShardID(t *testing.T) { + tests := []struct { + name string + targetShardNum uint32 + sourceShardID uint32 + expectedTarget uint32 + }{ + { + name: "simple modulo mapping", + targetShardNum: 4, + sourceShardID: 7, + expectedTarget: 3, // 7 % 4 = 3 + }, + { + name: "exact division", + targetShardNum: 5, + sourceShardID: 10, + expectedTarget: 0, // 10 % 5 = 0 + }, + { + name: "same shard mapping", + targetShardNum: 8, + sourceShardID: 3, + expectedTarget: 3, // 3 % 8 = 3 + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mv := &MigrationVisitor{ + targetShardNum: tt.targetShardNum, + logger: logger.GetLogger("test"), + } + + result := mv.calculateTargetShardID(tt.sourceShardID) + assert.Equal(t, tt.expectedTarget, result) + }) + } +} + +func TestMigrationVisitor_extractPartIDFromPath(t *testing.T) { + tests := []struct { + name string + partPath string + expectedID uint64 + expectError bool + }{ + { + name: "valid 16-char hex path", + partPath: "/data/stream/shard0/seg-123/0123456789abcdef", + expectedID: 0x0123456789abcdef, + expectError: false, + }, + { + name: "invalid short path", + partPath: "/data/stream/shard0/seg-123/abc", + expectedID: 0, + expectError: true, + }, + { + name: "invalid non-hex path", + partPath: "/data/stream/shard0/seg-123/ghijklmnopqrstuv", + expectedID: 0, + expectError: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mv := &MigrationVisitor{ + logger: logger.GetLogger("test"), + } + + result, err := mv.extractPartIDFromPath(tt.partPath) + + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.expectedID, result) + } + }) + } +} + +func TestNewMigrationVisitor_Construction(t *testing.T) { + // Create minimal test group + group := &commonv1.Group{ + Metadata: &commonv1.Metadata{ + Name: "test-group", + }, + Catalog: commonv1.Catalog_CATALOG_STREAM, + ResourceOpts: &commonv1.ResourceOpts{ + Stages: []*commonv1.LifecycleStage{ + { + Name: "hot", + ShardNum: 4, + Replicas: 2, + }, + }, + }, + } + + // Create test node + nodes := []*databasev1.Node{ + { + Metadata: &commonv1.Metadata{ + Name: "test-node", + }, + GrpcAddress: "localhost:17912", + Roles: []databasev1.Role{databasev1.Role_ROLE_DATA}, + }, + } + + nodeLabels := map[string]string{ + "nodeRole": "data", + } + + // This test may fail without proper metadata setup, but validates structure + visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, nil, logger.GetLogger("test"), nil, "", 1024*1024) + // We expect this to fail due to missing metadata, but the structure should be valid + if err != nil { + t.Logf("Expected error due to minimal test setup: %v", err) + return + } + + require.NotNil(t, visitor) + assert.Equal(t, "test-group", visitor.group) + assert.NotNil(t, visitor.chunkedClients) + assert.NotNil(t, visitor.logger) +} diff --git a/banyand/backup/lifecycle/progress.go b/banyand/backup/lifecycle/progress.go index 09bc6130..32d4c907 100644 --- a/banyand/backup/lifecycle/progress.go +++ b/banyand/backup/lifecycle/progress.go @@ -29,17 +29,19 @@ import ( // Progress tracks the lifecycle migration progress to support resume after crash. type Progress struct { CompletedGroups map[string]bool `json:"completed_groups"` - CompletedStreams map[string]map[string]bool `json:"completed_streams"` CompletedMeasures map[string]map[string]bool `json:"completed_measures"` DeletedStreamGroups map[string]bool `json:"deleted_stream_groups"` DeletedMeasureGroups map[string]bool `json:"deleted_measure_groups"` - StreamErrors map[string]map[string]string `json:"stream_errors"` MeasureErrors map[string]map[string]string `json:"measure_errors"` - StreamCounts map[string]map[string]int `json:"stream_counts"` MeasureCounts map[string]map[string]int `json:"measure_counts"` - SnapshotStreamDir string `json:"snapshot_stream_dir"` - SnapshotMeasureDir string `json:"snapshot_measure_dir"` - mu sync.Mutex `json:"-"` + // Part-level tracking for stream migration + CompletedStreamParts map[string]map[string]map[uint64]bool `json:"completed_stream_parts"` // group -> stream -> partID -> completed + StreamPartErrors map[string]map[string]map[uint64]string `json:"stream_part_errors"` // group -> stream -> partID -> error + StreamPartCounts map[string]map[string]int `json:"stream_part_counts"` // group -> stream -> total parts + StreamPartProgress map[string]map[string]int `json:"stream_part_progress"` // group -> stream -> completed parts count + SnapshotStreamDir string `json:"snapshot_stream_dir"` + SnapshotMeasureDir string `json:"snapshot_measure_dir"` + mu sync.Mutex `json:"-"` } // AllGroupsFullyCompleted checks if all groups are fully completed. @@ -59,14 +61,15 @@ func (p *Progress) AllGroupsFullyCompleted(groups []*commonv1.Group) bool { func NewProgress() *Progress { return &Progress{ CompletedGroups: make(map[string]bool), - CompletedStreams: make(map[string]map[string]bool), CompletedMeasures: make(map[string]map[string]bool), DeletedStreamGroups: make(map[string]bool), DeletedMeasureGroups: make(map[string]bool), - StreamErrors: make(map[string]map[string]string), MeasureErrors: make(map[string]map[string]string), - StreamCounts: make(map[string]map[string]int), MeasureCounts: make(map[string]map[string]int), + CompletedStreamParts: make(map[string]map[string]map[uint64]bool), + StreamPartErrors: make(map[string]map[string]map[uint64]string), + StreamPartCounts: make(map[string]map[string]int), + StreamPartProgress: make(map[string]map[string]int), } } @@ -133,30 +136,6 @@ func (p *Progress) IsGroupCompleted(group string) bool { return p.CompletedGroups[group] } -// MarkStreamCompleted marks a stream as completed. -func (p *Progress) MarkStreamCompleted(group, stream string, count int) { - p.mu.Lock() - defer p.mu.Unlock() - if p.CompletedStreams[group] == nil { - p.CompletedStreams[group] = make(map[string]bool) - } - p.CompletedStreams[group][stream] = true - if p.StreamCounts[group] == nil { - p.StreamCounts[group] = make(map[string]int) - } - p.StreamCounts[group][stream] = count -} - -// IsStreamCompleted checks if a stream has been completed. -func (p *Progress) IsStreamCompleted(group, stream string) bool { - p.mu.Lock() - defer p.mu.Unlock() - if streams, ok := p.CompletedStreams[group]; ok { - return streams[stream] - } - return false -} - // MarkMeasureCompleted marks a measure as completed. func (p *Progress) MarkMeasureCompleted(group, measure string, count int) { p.mu.Lock() @@ -209,28 +188,13 @@ func (p *Progress) IsMeasureGroupDeleted(group string) bool { return p.DeletedMeasureGroups[group] } -// ClearErrors resets all prior stream/measure error records. +// ClearErrors resets all prior measure error records. func (p *Progress) ClearErrors() { p.mu.Lock() defer p.mu.Unlock() - p.StreamErrors = make(map[string]map[string]string) p.MeasureErrors = make(map[string]map[string]string) } -// MarkStreamError records an error message for a specific stream. -func (p *Progress) MarkStreamError(group, stream, msg string) { - p.mu.Lock() - defer p.mu.Unlock() - if p.StreamErrors[group] == nil { - p.StreamErrors[group] = make(map[string]string) - } - p.StreamErrors[group][stream] = msg - if p.CompletedStreams[group] == nil { - p.CompletedStreams[group] = make(map[string]bool) - } - p.CompletedStreams[group][stream] = false -} - // MarkMeasureError records an error message for a specific measure. func (p *Progress) MarkMeasureError(group, measure, msg string) { p.mu.Lock() @@ -259,3 +223,136 @@ func (p *Progress) Remove(path string, l *logger.Logger) { } l.Info().Msgf("Removed progress file at %s", path) } + +// MarkStreamPartCompleted marks a specific part of a stream as completed. +func (p *Progress) MarkStreamPartCompleted(group, stream string, partID uint64) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.CompletedStreamParts[group] == nil { + p.CompletedStreamParts[group] = make(map[string]map[uint64]bool) + } + if p.CompletedStreamParts[group][stream] == nil { + p.CompletedStreamParts[group][stream] = make(map[uint64]bool) + } + + // Mark part as completed + p.CompletedStreamParts[group][stream][partID] = true + + // Update progress count + if p.StreamPartProgress[group] == nil { + p.StreamPartProgress[group] = make(map[string]int) + } + p.StreamPartProgress[group][stream]++ +} + +// IsStreamPartCompleted checks if a specific part of a stream has been completed. +func (p *Progress) IsStreamPartCompleted(group, stream string, partID uint64) bool { + p.mu.Lock() + defer p.mu.Unlock() + + if streams, ok := p.CompletedStreamParts[group]; ok { + if parts, ok := streams[stream]; ok { + return parts[partID] + } + } + return false +} + +// MarkStreamPartError records an error for a specific part of a stream. +func (p *Progress) MarkStreamPartError(group, stream string, partID uint64, errorMsg string) { + p.mu.Lock() + defer p.mu.Unlock() + + // Initialize nested maps if they don't exist + if p.StreamPartErrors[group] == nil { + p.StreamPartErrors[group] = make(map[string]map[uint64]string) + } + if p.StreamPartErrors[group][stream] == nil { + p.StreamPartErrors[group][stream] = make(map[uint64]string) + } + + // Record the error + p.StreamPartErrors[group][stream][partID] = errorMsg +} + +// SetStreamPartCount sets the total number of parts for a stream. +func (p *Progress) SetStreamPartCount(group, stream string, totalParts int) { + p.mu.Lock() + defer p.mu.Unlock() + + if p.StreamPartCounts[group] == nil { + p.StreamPartCounts[group] = make(map[string]int) + } + p.StreamPartCounts[group][stream] = totalParts + + // Initialize progress tracking + if p.StreamPartProgress[group] == nil { + p.StreamPartProgress[group] = make(map[string]int) + } + if p.StreamPartProgress[group][stream] == 0 { + p.StreamPartProgress[group][stream] = 0 + } +} + +// GetStreamPartCount returns the total number of parts for a stream. +func (p *Progress) GetStreamPartCount(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if counts, ok := p.StreamPartCounts[group]; ok { + return counts[stream] + } + return 0 +} + +// GetStreamPartProgress returns the number of completed parts for a stream. +func (p *Progress) GetStreamPartProgress(group, stream string) int { + p.mu.Lock() + defer p.mu.Unlock() + + if progress, ok := p.StreamPartProgress[group]; ok { + return progress[stream] + } + return 0 +} + +// IsStreamFullyCompleted checks if all parts of a stream have been completed. +func (p *Progress) IsStreamFullyCompleted(group, stream string) bool { + p.mu.Lock() + defer p.mu.Unlock() + + totalParts := p.StreamPartCounts[group][stream] + completedParts := p.StreamPartProgress[group][stream] + + return totalParts > 0 && completedParts >= totalParts +} + +// GetStreamPartErrors returns all errors for a specific stream. +func (p *Progress) GetStreamPartErrors(group, stream string) map[uint64]string { + p.mu.Lock() + defer p.mu.Unlock() + + if streams, ok := p.StreamPartErrors[group]; ok { + if parts, ok := streams[stream]; ok { + // Return a copy to avoid race conditions + result := make(map[uint64]string) + for partID, errorMsg := range parts { + result[partID] = errorMsg + } + return result + } + } + return nil +} + +// ClearStreamPartErrors clears all part errors for a specific stream. +func (p *Progress) ClearStreamPartErrors(group, stream string) { + p.mu.Lock() + defer p.mu.Unlock() + + if streams, ok := p.StreamPartErrors[group]; ok { + delete(streams, stream) + } +} diff --git a/banyand/backup/lifecycle/progress_test.go b/banyand/backup/lifecycle/progress_test.go index 5d3943a2..09f67151 100644 --- a/banyand/backup/lifecycle/progress_test.go +++ b/banyand/backup/lifecycle/progress_test.go @@ -40,7 +40,6 @@ func TestProgress(t *testing.T) { progress := NewProgress() assert.NotNil(t, progress) assert.Empty(t, progress.CompletedGroups) - assert.Empty(t, progress.CompletedStreams) assert.Empty(t, progress.CompletedMeasures) assert.Empty(t, progress.DeletedStreamGroups) assert.Empty(t, progress.DeletedMeasureGroups) @@ -53,11 +52,6 @@ func TestProgress(t *testing.T) { assert.True(t, progress.IsGroupCompleted("group1")) assert.False(t, progress.IsGroupCompleted("group2")) - progress.MarkStreamCompleted("group1", "stream1", 1) - assert.True(t, progress.IsStreamCompleted("group1", "stream1")) - assert.False(t, progress.IsStreamCompleted("group1", "stream2")) - assert.False(t, progress.IsStreamCompleted("group2", "stream1")) - progress.MarkMeasureCompleted("group1", "measure1", 1) assert.True(t, progress.IsMeasureCompleted("group1", "measure1")) assert.False(t, progress.IsMeasureCompleted("group1", "measure2")) @@ -75,7 +69,6 @@ func TestProgress(t *testing.T) { t.Run("SaveAndLoad", func(t *testing.T) { progress := NewProgress() progress.MarkGroupCompleted("group1") - progress.MarkStreamCompleted("group1", "stream1", 1) progress.MarkMeasureCompleted("group1", "measure1", 1) progress.MarkStreamGroupDeleted("group2") progress.MarkMeasureGroupDeleted("group2") @@ -85,13 +78,11 @@ func TestProgress(t *testing.T) { loaded := LoadProgress(progressPath, l) assert.True(t, loaded.IsGroupCompleted("group1")) - assert.True(t, loaded.IsStreamCompleted("group1", "stream1")) assert.True(t, loaded.IsMeasureCompleted("group1", "measure1")) assert.True(t, loaded.IsStreamGroupDeleted("group2")) assert.True(t, loaded.IsMeasureGroupDeleted("group2")) assert.False(t, loaded.IsGroupCompleted("group3")) - assert.False(t, loaded.IsStreamCompleted("group1", "stream2")) assert.False(t, loaded.IsMeasureCompleted("group1", "measure2")) assert.False(t, loaded.IsStreamGroupDeleted("group3")) assert.False(t, loaded.IsMeasureGroupDeleted("group3")) diff --git a/banyand/backup/lifecycle/service.go b/banyand/backup/lifecycle/service.go index a1dafce0..862b2034 100644 --- a/banyand/backup/lifecycle/service.go +++ b/banyand/backup/lifecycle/service.go @@ -21,7 +21,6 @@ import ( "context" "encoding/json" "fmt" - "math" "os" "path/filepath" "sort" @@ -39,13 +38,13 @@ import ( modelv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1" streamv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1" "github.com/apache/skywalking-banyandb/banyand/backup/snapshot" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" "github.com/apache/skywalking-banyandb/banyand/measure" "github.com/apache/skywalking-banyandb/banyand/metadata" "github.com/apache/skywalking-banyandb/banyand/metadata/schema" "github.com/apache/skywalking-banyandb/banyand/observability" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/banyand/queue" - "github.com/apache/skywalking-banyandb/banyand/stream" "github.com/apache/skywalking-banyandb/pkg/logger" "github.com/apache/skywalking-banyandb/pkg/node" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" @@ -77,6 +76,7 @@ type lifecycleService struct { maxExecutionTimes int enableTLS bool insecure bool + chunkSize run.Bytes } // NewService creates a new lifecycle service. @@ -107,6 +107,8 @@ func (l *lifecycleService) FlagSet() *run.FlagSet { "Schedule expression for periodic backup. Options: @yearly, @monthly, @weekly, @daily, @hourly or @every <duration>", ) flagS.IntVar(&l.maxExecutionTimes, "max-execution-times", 0, "Maximum number of times to execute the lifecycle migration. 0 means no limit.") + l.chunkSize = run.Bytes(1024 * 1024) + flagS.VarP(&l.chunkSize, "chunk-size", "", "Chunk size in bytes for streaming data during migration (default: 1MB)") return flagS } @@ -212,13 +214,7 @@ func (l *lifecycleService) action() error { for _, g := range groups { switch g.Catalog { case commonv1.Catalog_CATALOG_STREAM: - if streamSVC == nil { - l.l.Error().Msgf("stream service is not available, skipping group: %s", g.Metadata.Name) - progress.MarkStreamError(g.Metadata.Name, "", fmt.Sprintf("stream service unavailable for group %s", g.Metadata.Name)) - allGroupsCompleted = false - continue - } - l.processStreamGroup(ctx, g, streamSVC, nodes, labels, progress) + l.processStreamGroup(ctx, g, streamDir, nodes, labels, progress) case commonv1.Catalog_CATALOG_MEASURE: if measureSVC == nil { l.l.Error().Msgf("measure service is not available, skipping group: %s", g.Metadata.Name) @@ -247,9 +243,7 @@ func (l *lifecycleService) action() error { // generateReport gathers counts & errors from Progress, writes one JSON file per run, and keeps only 5 latest. func (l *lifecycleService) generateReport(p *Progress) { type grp struct { - StreamCounts map[string]int `json:"stream_counts"` MeasureCounts map[string]int `json:"measure_counts"` - StreamErrors map[string]string `json:"stream_errors"` MeasureErrors map[string]string `json:"measure_errors"` Name string `json:"name"` } @@ -261,25 +255,21 @@ func (l *lifecycleService) generateReport(p *Progress) { // build report var groups []grp - for name := range p.CompletedStreams { + for name := range p.CompletedGroups { groups = append(groups, grp{ Name: name, - StreamCounts: p.StreamCounts[name], MeasureCounts: p.MeasureCounts[name], - StreamErrors: p.StreamErrors[name], MeasureErrors: p.MeasureErrors[name], }) } // also include groups that had only measures for name := range p.CompletedMeasures { - if _, seen := p.CompletedStreams[name]; seen { + if _, seen := p.CompletedGroups[name]; seen { continue } groups = append(groups, grp{ Name: name, - StreamCounts: p.StreamCounts[name], MeasureCounts: p.MeasureCounts[name], - StreamErrors: p.StreamErrors[name], MeasureErrors: p.MeasureErrors[name], }) } @@ -367,23 +357,11 @@ func (l *lifecycleService) getGroupsToProcess(ctx context.Context, progress *Pro return groups, nil } -func (l *lifecycleService) processStreamGroup(ctx context.Context, g *commonv1.Group, streamSVC stream.Service, - nodes []*databasev1.Node, labels map[string]string, progress *Progress, +func (l *lifecycleService) processStreamGroup(ctx context.Context, g *commonv1.Group, + streamDir string, nodes []*databasev1.Node, labels map[string]string, progress *Progress, ) { - shardNum, replicas, selector, client, err := parseGroup(g, labels, nodes, l.l, l.metadata) - if err != nil { - l.l.Error().Err(err).Msgf("failed to parse group %s", g.Metadata.Name) - return - } - defer client.GracefulStop() - - ss, err := l.metadata.StreamRegistry().ListStream(ctx, schema.ListOpt{Group: g.Metadata.Name}) - if err != nil { - l.l.Error().Err(err).Msgf("failed to list streams in group %s", g.Metadata.Name) - return - } - - tr := streamSVC.GetRemovalSegmentsTimeRange(g.Metadata.Name) + // Calculate removal segments time range based on group TTL configuration + tr := l.getRemovalSegmentsTimeRange(g) if tr.Start.IsZero() && tr.End.IsZero() { l.l.Info().Msgf("no removal segments time range for group %s, skipping stream migration", g.Metadata.Name) progress.MarkGroupCompleted(g.Metadata.Name) @@ -391,92 +369,93 @@ func (l *lifecycleService) processStreamGroup(ctx context.Context, g *commonv1.G return } - l.processStreams(ctx, g, ss, streamSVC, tr, shardNum, replicas, selector, client, progress) - - allStreamsDone := true - for _, s := range ss { - if !progress.IsStreamCompleted(g.Metadata.Name, s.Metadata.Name) { - allStreamsDone = false - } - } - if allStreamsDone { - l.l.Info().Msgf("deleting expired stream segments for group: %s", g.Metadata.Name) - l.deleteExpiredStreamSegments(ctx, g, tr, progress) - progress.MarkGroupCompleted(g.Metadata.Name) - progress.Save(l.progressFilePath, l.l) - } else { - l.l.Info().Msgf("skipping delete expired stream segments for group %s: some streams not fully migrated", g.Metadata.Name) + // Use file-based migration instead of element-based + err := l.processStreamGroupFileBased(ctx, g, streamDir, tr, nodes, labels, progress) + if err != nil { + l.l.Error().Err(err).Msgf("failed to migrate stream group %s using file-based approach", g.Metadata.Name) + return } + + l.l.Info().Msgf("deleting expired stream segments for group: %s", g.Metadata.Name) + l.deleteExpiredStreamSegments(ctx, g, tr, progress) + progress.MarkGroupCompleted(g.Metadata.Name) + progress.Save(l.progressFilePath, l.l) } -func (l *lifecycleService) processStreams( - ctx context.Context, - g *commonv1.Group, - streams []*databasev1.Stream, - streamSVC stream.Service, - tr *timestamp.TimeRange, - shardNum uint32, - replicas uint32, - selector node.Selector, - client queue.Client, - progress *Progress, -) { - for _, s := range streams { - if progress.IsStreamCompleted(g.Metadata.Name, s.Metadata.Name) { - l.l.Debug().Msgf("skipping already completed stream: %s/%s", g.Metadata.Name, s.Metadata.Name) - continue - } - sum, err := l.processSingleStream(ctx, s, streamSVC, tr, shardNum, replicas, selector, client) - if err != nil { - progress.MarkStreamError(g.Metadata.Name, s.Metadata.Name, err.Error()) - } else { - if sum < 1 { - l.l.Debug().Msgf("no elements migrated for stream %s", s.Metadata.Name) - } else { - l.l.Info().Msgf("migrated %d elements in stream %s", sum, s.Metadata.Name) - } - progress.MarkStreamCompleted(g.Metadata.Name, s.Metadata.Name, sum) - } - progress.Save(l.progressFilePath, l.l) +// processStreamGroupFileBased uses file-based migration instead of element-based queries. +func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g *commonv1.Group, + streamDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node, labels map[string]string, progress *Progress, +) error { + if progress.IsStreamGroupDeleted(g.Metadata.Name) { + l.l.Info().Msgf("skipping already completed file-based migration for group: %s", g.Metadata.Name) + return nil } -} -func (l *lifecycleService) processSingleStream(ctx context.Context, s *databasev1.Stream, - streamSVC stream.Service, tr *timestamp.TimeRange, shardNum uint32, replicas uint32, selector node.Selector, client queue.Client, -) (int, error) { - q, err := streamSVC.Stream(s.Metadata) + l.l.Info().Msgf("starting file-based stream migration for group: %s", g.Metadata.Name) + + // Use the file-based migration with existing visitor pattern + err := MigrateStreamWithFileBased( + streamDir, // Use snapshot directory as source + *tr, // Time range for segments to migrate + g, // Group configuration + labels, // Node labels + nodes, // Target nodes + l.metadata, // Metadata repository + storage.IntervalRule{Unit: storage.DAY, Num: 1}, // Use daily segments as default + l.l, // Logger + int(l.chunkSize), // Chunk size for streaming + ) if err != nil { - l.l.Error().Err(err).Msgf("failed to get stream %s", s.Metadata.Name) - return 0, err + return fmt.Errorf("file-based stream migration failed: %w", err) } - tagProjection := make([]model.TagProjection, len(s.TagFamilies)) - entity := make([]*modelv1.TagValue, len(s.Entity.TagNames)) - for idx := range s.Entity.TagNames { - entity[idx] = pbv1.AnyTagValue + l.l.Info().Msgf("completed file-based stream migration for group: %s", g.Metadata.Name) + return nil +} + +// getRemovalSegmentsTimeRange calculates the time range for segments that should be migrated +// based on the group's TTL configuration, similar to storage.segmentController.getExpiredSegmentsTimeRange. +func (l *lifecycleService) getRemovalSegmentsTimeRange(g *commonv1.Group) *timestamp.TimeRange { + if g.ResourceOpts == nil || g.ResourceOpts.Ttl == nil { + l.l.Debug().Msgf("no TTL configured for group %s", g.Metadata.Name) + return ×tamp.TimeRange{} // Return empty time range } - for i, tf := range s.TagFamilies { - tagProjection[i] = model.TagProjection{ - Family: tf.Name, - Names: make([]string, len(tf.Tags)), - } - for j, t := range tf.Tags { - tagProjection[i].Names[j] = t.Name - } + + // Convert TTL to storage.IntervalRule + ttl := storage.MustToIntervalRule(g.ResourceOpts.Ttl) + + // Calculate deadline based on TTL (same logic as segmentController.getExpiredSegmentsTimeRange) + deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttl)) + + // Create time range for segments before the deadline + timeRange := ×tamp.TimeRange{ + Start: time.Time{}, // Will be set to earliest segment start time + End: deadline, // All segments before this time should be migrated + IncludeStart: true, + IncludeEnd: false, } - result, err := q.Query(ctx, model.StreamQueryOptions{ - Name: s.Metadata.Name, - TagProjection: tagProjection, - Entities: [][]*modelv1.TagValue{entity}, - TimeRange: tr, - MaxElementSize: math.MaxInt, - }) - if err != nil { - l.l.Error().Err(err).Msgf("failed to query stream %s", s.Metadata.Name) - return 0, err + l.l.Info(). + Str("group", g.Metadata.Name). + Time("deadline", deadline). + Str("ttl", fmt.Sprintf("%d %s", g.ResourceOpts.Ttl.Num, g.ResourceOpts.Ttl.Unit.String())). + Msg("calculated removal segments time range based on TTL") + + return timeRange +} + +// calculateTTLDuration calculates the duration for a TTL interval rule. +// This implements the same logic as storage.IntervalRule.estimatedDuration(). +func (l *lifecycleService) calculateTTLDuration(ttl storage.IntervalRule) time.Duration { + switch ttl.Unit { + case storage.HOUR: + return time.Hour * time.Duration(ttl.Num) + case storage.DAY: + return 24 * time.Hour * time.Duration(ttl.Num) + default: + l.l.Warn().Msgf("unknown TTL unit %v, defaulting to 1 day", ttl.Unit) + return 24 * time.Hour } - return migrateStream(ctx, s, result, shardNum, replicas, selector, client, l.l), nil } func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g *commonv1.Group, tr *timestamp.TimeRange, progress *Progress) { diff --git a/lifecycle b/lifecycle new file mode 100755 index 00000000..56e1ae3f Binary files /dev/null and b/lifecycle differ