This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit dd02d125d0d4ffbcecbc7256a5e0d2077802ac16
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 30 15:55:53 2025 +0800

    Add file-based stream migration implementation
    
    - Introduced `file_migration_integration.go` to handle file-based stream 
migration using a visitor pattern.
    - Created `file_migration_visitor.go` to define the `MigrationVisitor` 
struct and its methods for processing stream parts.
    - Updated `progress.go` to track progress at the part level for stream 
migrations.
    - Refactored `service.go` to utilize the new file-based migration approach 
for stream groups.
---
 .../backup/lifecycle/file_migration_integration.go | 126 +++++++
 banyand/backup/lifecycle/file_migration_visitor.go | 410 +++++++++++++++++++++
 .../lifecycle/file_migration_visitor_test.go       | 161 ++++++++
 banyand/backup/lifecycle/progress.go               | 195 +++++++---
 banyand/backup/lifecycle/progress_test.go          |   9 -
 banyand/backup/lifecycle/service.go                | 193 +++++-----
 lifecycle                                          | Bin 0 -> 38228265 bytes
 7 files changed, 929 insertions(+), 165 deletions(-)

diff --git a/banyand/backup/lifecycle/file_migration_integration.go 
b/banyand/backup/lifecycle/file_migration_integration.go
new file mode 100644
index 00000000..35c00482
--- /dev/null
+++ b/banyand/backup/lifecycle/file_migration_integration.go
@@ -0,0 +1,126 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "github.com/apache/skywalking-banyandb/api/common"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/stream"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// MigrateStreamWithFileBased performs file-based stream migration using the 
visitor pattern.
+func MigrateStreamWithFileBased(
+       tsdbRootPath string,
+       timeRange timestamp.TimeRange,
+       group *commonv1.Group,
+       nodeLabels map[string]string,
+       nodes []*databasev1.Node,
+       metadata metadata.Repo,
+       intervalRule storage.IntervalRule,
+       logger *logger.Logger,
+       chunkSize int,
+) error {
+       // Create file-based migration visitor
+       visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, metadata, 
logger, nil, "", chunkSize)
+       if err != nil {
+               return err
+       }
+       defer visitor.Close()
+
+       // Use the existing VisitStreamsInTimeRange function with our 
file-based visitor
+       return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor, 
intervalRule)
+}
+
+// MigrateStreamWithFileBasedAndProgress performs file-based stream migration 
with progress tracking.
+func MigrateStreamWithFileBasedAndProgress(
+       tsdbRootPath string,
+       timeRange timestamp.TimeRange,
+       group *commonv1.Group,
+       nodeLabels map[string]string,
+       nodes []*databasev1.Node,
+       metadata metadata.Repo,
+       intervalRule storage.IntervalRule,
+       logger *logger.Logger,
+       progress *Progress,
+       streamName string,
+       chunkSize int,
+) error {
+       // Count total parts for this stream before starting migration
+       totalParts, err := countStreamParts(tsdbRootPath, timeRange, 
streamName, intervalRule)
+       if err != nil {
+               logger.Warn().Err(err).Str("stream", streamName).Msg("failed to 
count stream parts, proceeding without part count")
+       } else {
+               logger.Info().Str("stream", streamName).Int("total_parts", 
totalParts).Msg("counted stream parts for progress tracking")
+       }
+
+       // Create file-based migration visitor with progress tracking
+       visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, metadata, 
logger, progress, streamName, chunkSize)
+       if err != nil {
+               return err
+       }
+       defer visitor.Close()
+
+       // Set the total part count for progress tracking
+       if totalParts > 0 {
+               visitor.SetStreamPartCount(totalParts)
+       }
+
+       // Use the existing VisitStreamsInTimeRange function with our 
file-based visitor
+       return stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, visitor, 
intervalRule)
+}
+
+// countStreamParts counts the total number of parts for a specific stream in 
the given time range.
+func countStreamParts(tsdbRootPath string, timeRange timestamp.TimeRange, 
streamName string, intervalRule storage.IntervalRule) (int, error) {
+       // Create a simple visitor to count parts
+       partCounter := &partCountVisitor{streamName: streamName}
+
+       // Use the existing VisitStreamsInTimeRange function to count parts
+       err := stream.VisitStreamsInTimeRange(tsdbRootPath, timeRange, 
partCounter, intervalRule)
+       if err != nil {
+               return 0, err
+       }
+
+       return partCounter.partCount, nil
+}
+
+// partCountVisitor is a simple visitor that counts parts for a specific 
stream.
+type partCountVisitor struct {
+       streamName string
+       partCount  int
+}
+
+// VisitSeries implements stream.Visitor.
+func (pcv *partCountVisitor) VisitSeries(_ *timestamp.TimeRange, _ string) 
error {
+       return nil
+}
+
+// VisitPart implements stream.Visitor.
+func (pcv *partCountVisitor) VisitPart(_ *timestamp.TimeRange, _ 
common.ShardID, _ string) error {
+       pcv.partCount++
+       return nil
+}
+
+// VisitElementIndex implements stream.Visitor.
+func (pcv *partCountVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ 
common.ShardID, _ string) error {
+       return nil
+}
diff --git a/banyand/backup/lifecycle/file_migration_visitor.go 
b/banyand/backup/lifecycle/file_migration_visitor.go
new file mode 100644
index 00000000..a4a9ae41
--- /dev/null
+++ b/banyand/backup/lifecycle/file_migration_visitor.go
@@ -0,0 +1,410 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "path/filepath"
+       "strconv"
+       "strings"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/api/data"
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/node"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+const (
+       // Stream file names from banyand/stream/syncer.go.
+       streamMetaName          = "meta"
+       streamPrimaryName       = "primary"
+       streamTimestampsName    = "timestamps"
+       streamTagFamiliesPrefix = "tag_families_"
+       streamTagMetadataPrefix = "tag_metadata_"
+       streamTagFilterPrefix   = "tag_filter_"
+)
+
+// MigrationVisitor implements the stream.Visitor interface for file-based 
migration.
+type MigrationVisitor struct {
+       selector       node.Selector                      // From parseGroup - 
node selector
+       client         queue.Client                       // From parseGroup - 
queue client
+       chunkedClients map[string]queue.ChunkedSyncClient // Per-node chunked 
sync clients cache
+       logger         *logger.Logger
+       progress       *Progress // Progress tracker for migration states
+       group          string
+       streamName     string
+       targetShardNum uint32 // From parseGroup - target shard count
+       replicas       uint32 // From parseGroup - replica count
+       chunkSize      int    // Chunk size for streaming data
+}
+
+// partMetadata matches the structure in banyand/stream/part_metadata.go.
+type partMetadata struct {
+       CompressedSizeBytes   uint64 `json:"compressedSizeBytes"`
+       UncompressedSizeBytes uint64 `json:"uncompressedSizeBytes"`
+       TotalCount            uint64 `json:"totalCount"`
+       BlocksCount           uint64 `json:"blocksCount"`
+       MinTimestamp          int64  `json:"minTimestamp"`
+       MaxTimestamp          int64  `json:"maxTimestamp"`
+}
+
+// NewMigrationVisitor creates a new file-based migration visitor.
+func NewMigrationVisitor(group *commonv1.Group, nodeLabels map[string]string,
+       nodes []*databasev1.Node, metadata metadata.Repo, l *logger.Logger,
+       progress *Progress, streamName string, chunkSize int,
+) (*MigrationVisitor, error) {
+       // Use existing parseGroup function to get sharding parameters
+       shardNum, replicas, selector, client, err := parseGroup(group, 
nodeLabels, nodes, l, metadata)
+       if err != nil {
+               return nil, err
+       }
+
+       return &MigrationVisitor{
+               group:          group.Metadata.Name,
+               targetShardNum: shardNum,
+               replicas:       replicas,
+               selector:       selector,
+               client:         client,
+               chunkedClients: make(map[string]queue.ChunkedSyncClient),
+               logger:         l,
+               progress:       progress,
+               streamName:     streamName,
+               chunkSize:      chunkSize,
+       }, nil
+}
+
+// VisitSeries implements stream.Visitor.
+func (mv *MigrationVisitor) VisitSeries(_ *timestamp.TimeRange, 
seriesIndexPath string) error {
+       // TODO: Implement series index migration if needed
+       mv.logger.Debug().
+               Str("path", seriesIndexPath).
+               Msg("skipping series index migration (not implemented)")
+       return nil
+}
+
+// VisitPart implements stream.Visitor - core migration logic.
+func (mv *MigrationVisitor) VisitPart(_ *timestamp.TimeRange, sourceShardID 
common.ShardID, partPath string) error {
+       // Extract part ID from path for progress tracking
+       partID, err := mv.extractPartIDFromPath(partPath)
+       if err != nil {
+               errorMsg := fmt.Sprintf("failed to extract part ID from path 
%s: %v", partPath, err)
+               mv.progress.MarkStreamPartError(mv.group, mv.streamName, 0, 
errorMsg)
+               return fmt.Errorf("failed to extract part ID from path %s: %w", 
partPath, err)
+       }
+
+       // Check if this part has already been completed
+       if mv.progress.IsStreamPartCompleted(mv.group, mv.streamName, partID) {
+               mv.logger.Debug().
+                       Uint64("part_id", partID).
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Msg("part already completed, skipping")
+               return nil
+       }
+
+       // Calculate target shard ID based on source shard ID mapping
+       targetShardID := mv.calculateTargetShardID(uint32(sourceShardID))
+
+       mv.logger.Info().
+               Uint64("part_id", partID).
+               Uint32("source_shard", uint32(sourceShardID)).
+               Uint32("target_shard", targetShardID).
+               Str("part_path", partPath).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("migrating part")
+
+       // Create file readers for the entire part (similar to syncer.go:79-132)
+       files, release := mv.createPartFileReaders(partPath)
+       defer release()
+
+       // Stream entire part to target shard replicas
+       if err := mv.streamPartToTargetShard(targetShardID, files, partPath, 
partID); err != nil {
+               errorMsg := fmt.Sprintf("failed to stream part to target shard: 
%v", err)
+               mv.progress.MarkStreamPartError(mv.group, mv.streamName, 
partID, errorMsg)
+               return fmt.Errorf("failed to stream part to target shard: %w", 
err)
+       }
+
+       // Mark part as completed in progress tracker
+       mv.progress.MarkStreamPartCompleted(mv.group, mv.streamName, partID)
+
+       mv.logger.Info().
+               Uint64("part_id", partID).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Int("completed_parts", 
mv.progress.GetStreamPartProgress(mv.group, mv.streamName)).
+               Int("total_parts", mv.progress.GetStreamPartCount(mv.group, 
mv.streamName)).
+               Msg("part migration completed successfully")
+
+       return nil
+}
+
+// VisitElementIndex implements stream.Visitor.
+func (mv *MigrationVisitor) VisitElementIndex(_ *timestamp.TimeRange, _ 
common.ShardID, indexPath string) error {
+       // TODO: Implement element index migration if needed
+       mv.logger.Debug().
+               Str("path", indexPath).
+               Msg("skipping element index migration (not implemented)")
+       return nil
+}
+
+// calculateTargetShardID maps source shard ID to target shard ID.
+func (mv *MigrationVisitor) calculateTargetShardID(sourceShardID uint32) 
uint32 {
+       // Simple modulo-based mapping from source shard to target shard
+       // This ensures deterministic and balanced distribution
+       return sourceShardID % mv.targetShardNum
+}
+
+// streamPartToTargetShard sends part data to all replicas of the target shard.
+func (mv *MigrationVisitor) streamPartToTargetShard(targetShardID uint32, 
files []queue.FileInfo, partPath string, partID uint64) error {
+       copies := mv.replicas + 1
+
+       // Send to all replicas using the exact pattern from steps.go:219-236
+       for replicaID := uint32(0); replicaID < copies; replicaID++ {
+               // Use selector.Pick exactly like steps.go:220
+               nodeID, err := mv.selector.Pick(mv.group, "", targetShardID, 
replicaID)
+               if err != nil {
+                       return fmt.Errorf("failed to pick node for shard %d 
replica %d: %w", targetShardID, replicaID, err)
+               }
+
+               // Stream part data to target node using chunked sync
+               if err := mv.streamPartToNode(nodeID, targetShardID, files, 
partPath, partID); err != nil {
+                       return fmt.Errorf("failed to stream part to node %s: 
%w", nodeID, err)
+               }
+       }
+
+       return nil
+}
+
+// streamPartToNode streams part data to a specific target node.
+func (mv *MigrationVisitor) streamPartToNode(nodeID string, targetShardID 
uint32, files []queue.FileInfo, partPath string, partID uint64) error {
+       // Get or create chunked client for this node (cache hit optimization)
+       chunkedClient, exists := mv.chunkedClients[nodeID]
+       if !exists {
+               var err error
+               // Create new chunked sync client via queue.Client
+               chunkedClient, err = mv.client.NewChunkedSyncClient(nodeID, 
uint32(mv.chunkSize))
+               if err != nil {
+                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", nodeID, err)
+               }
+               mv.chunkedClients[nodeID] = chunkedClient // Cache for reuse
+       }
+
+       // Create streaming part data from the part files
+       streamingParts, err := mv.createStreamingPartFromFiles(targetShardID, 
files, partPath)
+       if err != nil {
+               return fmt.Errorf("failed to create streaming parts: %w", err)
+       }
+
+       // Stream using chunked transfer (same as syncer.go:202)
+       ctx := context.Background()
+       result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts)
+       if err != nil {
+               return fmt.Errorf("failed to sync streaming parts to node %s: 
%w", nodeID, err)
+       }
+
+       if !result.Success {
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+       }
+
+       // Log success metrics (same pattern as syncer.go:210-217)
+       mv.logger.Info().
+               Str("node", nodeID).
+               Str("session", result.SessionID).
+               Uint64("bytes", result.TotalBytes).
+               Int64("duration_ms", result.DurationMs).
+               Uint32("chunks", result.ChunksCount).
+               Uint32("parts", result.PartsCount).
+               Uint32("target_shard", targetShardID).
+               Uint64("part_id", partID).
+               Str("part_path", partPath).
+               Str("stream", mv.streamName).
+               Str("group", mv.group).
+               Msg("file-based migration part completed successfully")
+
+       return nil
+}
+
+// createStreamingPartFromFiles creates StreamingPartData from part files and 
metadata.
+func (mv *MigrationVisitor) createStreamingPartFromFiles(targetShardID uint32, 
files []queue.FileInfo, partPath string) ([]queue.StreamingPartData, error) {
+       // Calculate part metadata from files
+       partID, err := mv.extractPartIDFromPath(partPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to extract part ID from path %s: 
%w", partPath, err)
+       }
+
+       // Extract metadata from metadata.json file in the part directory
+       metadata, err := mv.readPartMetadata(partPath)
+       if err != nil {
+               return nil, fmt.Errorf("failed to read part metadata: %w", err)
+       }
+
+       partData := queue.StreamingPartData{
+               ID:                    partID,
+               Group:                 mv.group,
+               ShardID:               targetShardID, // Use calculated target 
shard
+               Topic:                 data.TopicStreamPartSync.String(),
+               Files:                 files,
+               CompressedSizeBytes:   metadata.CompressedSizeBytes,
+               UncompressedSizeBytes: metadata.UncompressedSizeBytes,
+               TotalCount:            metadata.TotalCount,
+               BlocksCount:           metadata.BlocksCount,
+               MinTimestamp:          metadata.MinTimestamp,
+               MaxTimestamp:          metadata.MaxTimestamp,
+       }
+
+       return []queue.StreamingPartData{partData}, nil
+}
+
+// createPartFileReaders creates file readers for all files in a part.
+func (mv *MigrationVisitor) createPartFileReaders(partPath string) 
([]queue.FileInfo, func()) {
+       // Read part files using the same pattern as createPartFileReaders in 
syncer.go:79-132
+       lfs := fs.NewLocalFileSystem()
+       var files []queue.FileInfo
+
+       // Read stream metadata file
+       metaFilePath := filepath.Join(partPath, streamMetaName)
+       if file, err := lfs.OpenFile(metaFilePath); err == nil {
+               files = append(files, queue.FileInfo{
+                       Name:   streamMetaName,
+                       Reader: file.SequentialRead(),
+               })
+       }
+
+       // Read primary data file
+       primaryFilePath := filepath.Join(partPath, streamPrimaryName)
+       if file, err := lfs.OpenFile(primaryFilePath); err == nil {
+               files = append(files, queue.FileInfo{
+                       Name:   streamPrimaryName,
+                       Reader: file.SequentialRead(),
+               })
+       }
+
+       // Read timestamps file
+       timestampsFilePath := filepath.Join(partPath, streamTimestampsName)
+       if file, err := lfs.OpenFile(timestampsFilePath); err == nil {
+               files = append(files, queue.FileInfo{
+                       Name:   streamTimestampsName,
+                       Reader: file.SequentialRead(),
+               })
+       }
+
+       // Read tag family files (similar to syncer.go:106-127)
+       entries := lfs.ReadDir(partPath)
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       name := entry.Name()
+                       if strings.HasPrefix(name, streamTagFamiliesPrefix) ||
+                               strings.HasPrefix(name, 
streamTagMetadataPrefix) ||
+                               strings.HasPrefix(name, streamTagFilterPrefix) {
+                               filePath := filepath.Join(partPath, name)
+                               if file, err := lfs.OpenFile(filePath); err == 
nil {
+                                       files = append(files, queue.FileInfo{
+                                               Name:   name,
+                                               Reader: file.SequentialRead(),
+                                       })
+                               }
+                       }
+               }
+       }
+
+       // Return cleanup function to close all readers
+       releaseFunc := func() {
+               for _, file := range files {
+                       if file.Reader != nil {
+                               file.Reader.Close()
+                       }
+               }
+       }
+
+       return files, releaseFunc
+}
+
+// readPartMetadata reads and parses the metadata.json file from a part 
directory.
+func (mv *MigrationVisitor) readPartMetadata(partPath string) (*partMetadata, 
error) {
+       lfs := fs.NewLocalFileSystem()
+       metadataPath := filepath.Join(partPath, "metadata.json")
+
+       // Read metadata.json file
+       data, err := lfs.Read(metadataPath)
+       if err != nil {
+               return nil, fmt.Errorf("cannot read metadata.json: %w", err)
+       }
+
+       // Unmarshal JSON data
+       var metadata partMetadata
+       if err := json.Unmarshal(data, &metadata); err != nil {
+               return nil, fmt.Errorf("cannot parse metadata.json: %w", err)
+       }
+
+       // Validate metadata (same validation as part_metadata.go:78-80)
+       if metadata.MinTimestamp > metadata.MaxTimestamp {
+               return nil, fmt.Errorf("invalid metadata: MinTimestamp (%d) 
cannot exceed MaxTimestamp (%d)",
+                       metadata.MinTimestamp, metadata.MaxTimestamp)
+       }
+
+       return &metadata, nil
+}
+
+// extractPartIDFromPath extracts the part ID from the part directory path.
+func (mv *MigrationVisitor) extractPartIDFromPath(partPath string) (uint64, 
error) {
+       // Extract the 16-character hex part ID from the path
+       // Part paths typically end with the hex part ID directory
+       partName := filepath.Base(partPath)
+       if len(partName) != 16 {
+               return 0, fmt.Errorf("invalid part path format: %s", partPath)
+       }
+
+       partID, err := strconv.ParseUint(partName, 16, 64)
+       if err != nil {
+               return 0, fmt.Errorf("failed to parse part ID from %s: %w", 
partName, err)
+       }
+
+       return partID, nil
+}
+
+// Close cleans up all chunked sync clients.
+func (mv *MigrationVisitor) Close() error {
+       for nodeID, client := range mv.chunkedClients {
+               if err := client.Close(); err != nil {
+                       mv.logger.Warn().Err(err).Str("node", 
nodeID).Msg("failed to close chunked sync client")
+               }
+       }
+       mv.chunkedClients = make(map[string]queue.ChunkedSyncClient)
+       return nil
+}
+
+// SetStreamPartCount sets the total number of parts for the current stream.
+func (mv *MigrationVisitor) SetStreamPartCount(totalParts int) {
+       if mv.progress != nil {
+               mv.progress.SetStreamPartCount(mv.group, mv.streamName, 
totalParts)
+               mv.logger.Info().
+                       Str("stream", mv.streamName).
+                       Str("group", mv.group).
+                       Int("total_parts", totalParts).
+                       Msg("set stream part count for progress tracking")
+       }
+}
diff --git a/banyand/backup/lifecycle/file_migration_visitor_test.go 
b/banyand/backup/lifecycle/file_migration_visitor_test.go
new file mode 100644
index 00000000..525f676c
--- /dev/null
+++ b/banyand/backup/lifecycle/file_migration_visitor_test.go
@@ -0,0 +1,161 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestMigrationVisitor_calculateTargetShardID(t *testing.T) {
+       tests := []struct {
+               name           string
+               targetShardNum uint32
+               sourceShardID  uint32
+               expectedTarget uint32
+       }{
+               {
+                       name:           "simple modulo mapping",
+                       targetShardNum: 4,
+                       sourceShardID:  7,
+                       expectedTarget: 3, // 7 % 4 = 3
+               },
+               {
+                       name:           "exact division",
+                       targetShardNum: 5,
+                       sourceShardID:  10,
+                       expectedTarget: 0, // 10 % 5 = 0
+               },
+               {
+                       name:           "same shard mapping",
+                       targetShardNum: 8,
+                       sourceShardID:  3,
+                       expectedTarget: 3, // 3 % 8 = 3
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       mv := &MigrationVisitor{
+                               targetShardNum: tt.targetShardNum,
+                               logger:         logger.GetLogger("test"),
+                       }
+
+                       result := mv.calculateTargetShardID(tt.sourceShardID)
+                       assert.Equal(t, tt.expectedTarget, result)
+               })
+       }
+}
+
+func TestMigrationVisitor_extractPartIDFromPath(t *testing.T) {
+       tests := []struct {
+               name        string
+               partPath    string
+               expectedID  uint64
+               expectError bool
+       }{
+               {
+                       name:        "valid 16-char hex path",
+                       partPath:    
"/data/stream/shard0/seg-123/0123456789abcdef",
+                       expectedID:  0x0123456789abcdef,
+                       expectError: false,
+               },
+               {
+                       name:        "invalid short path",
+                       partPath:    "/data/stream/shard0/seg-123/abc",
+                       expectedID:  0,
+                       expectError: true,
+               },
+               {
+                       name:        "invalid non-hex path",
+                       partPath:    
"/data/stream/shard0/seg-123/ghijklmnopqrstuv",
+                       expectedID:  0,
+                       expectError: true,
+               },
+       }
+
+       for _, tt := range tests {
+               t.Run(tt.name, func(t *testing.T) {
+                       mv := &MigrationVisitor{
+                               logger: logger.GetLogger("test"),
+                       }
+
+                       result, err := mv.extractPartIDFromPath(tt.partPath)
+
+                       if tt.expectError {
+                               assert.Error(t, err)
+                       } else {
+                               assert.NoError(t, err)
+                               assert.Equal(t, tt.expectedID, result)
+                       }
+               })
+       }
+}
+
+func TestNewMigrationVisitor_Construction(t *testing.T) {
+       // Create minimal test group
+       group := &commonv1.Group{
+               Metadata: &commonv1.Metadata{
+                       Name: "test-group",
+               },
+               Catalog: commonv1.Catalog_CATALOG_STREAM,
+               ResourceOpts: &commonv1.ResourceOpts{
+                       Stages: []*commonv1.LifecycleStage{
+                               {
+                                       Name:     "hot",
+                                       ShardNum: 4,
+                                       Replicas: 2,
+                               },
+                       },
+               },
+       }
+
+       // Create test node
+       nodes := []*databasev1.Node{
+               {
+                       Metadata: &commonv1.Metadata{
+                               Name: "test-node",
+                       },
+                       GrpcAddress: "localhost:17912",
+                       Roles:       
[]databasev1.Role{databasev1.Role_ROLE_DATA},
+               },
+       }
+
+       nodeLabels := map[string]string{
+               "nodeRole": "data",
+       }
+
+       // This test may fail without proper metadata setup, but validates 
structure
+       visitor, err := NewMigrationVisitor(group, nodeLabels, nodes, nil, 
logger.GetLogger("test"), nil, "", 1024*1024)
+       // We expect this to fail due to missing metadata, but the structure 
should be valid
+       if err != nil {
+               t.Logf("Expected error due to minimal test setup: %v", err)
+               return
+       }
+
+       require.NotNil(t, visitor)
+       assert.Equal(t, "test-group", visitor.group)
+       assert.NotNil(t, visitor.chunkedClients)
+       assert.NotNil(t, visitor.logger)
+}
diff --git a/banyand/backup/lifecycle/progress.go 
b/banyand/backup/lifecycle/progress.go
index 09bc6130..32d4c907 100644
--- a/banyand/backup/lifecycle/progress.go
+++ b/banyand/backup/lifecycle/progress.go
@@ -29,17 +29,19 @@ import (
 // Progress tracks the lifecycle migration progress to support resume after 
crash.
 type Progress struct {
        CompletedGroups      map[string]bool              
`json:"completed_groups"`
-       CompletedStreams     map[string]map[string]bool   
`json:"completed_streams"`
        CompletedMeasures    map[string]map[string]bool   
`json:"completed_measures"`
        DeletedStreamGroups  map[string]bool              
`json:"deleted_stream_groups"`
        DeletedMeasureGroups map[string]bool              
`json:"deleted_measure_groups"`
-       StreamErrors         map[string]map[string]string `json:"stream_errors"`
        MeasureErrors        map[string]map[string]string 
`json:"measure_errors"`
-       StreamCounts         map[string]map[string]int    `json:"stream_counts"`
        MeasureCounts        map[string]map[string]int    
`json:"measure_counts"`
-       SnapshotStreamDir    string                       
`json:"snapshot_stream_dir"`
-       SnapshotMeasureDir   string                       
`json:"snapshot_measure_dir"`
-       mu                   sync.Mutex                   `json:"-"`
+       // Part-level tracking for stream migration
+       CompletedStreamParts map[string]map[string]map[uint64]bool   
`json:"completed_stream_parts"` // group -> stream -> partID -> completed
+       StreamPartErrors     map[string]map[string]map[uint64]string 
`json:"stream_part_errors"`     // group -> stream -> partID -> error
+       StreamPartCounts     map[string]map[string]int               
`json:"stream_part_counts"`     // group -> stream -> total parts
+       StreamPartProgress   map[string]map[string]int               
`json:"stream_part_progress"`   // group -> stream -> completed parts count
+       SnapshotStreamDir    string                                  
`json:"snapshot_stream_dir"`
+       SnapshotMeasureDir   string                                  
`json:"snapshot_measure_dir"`
+       mu                   sync.Mutex                              `json:"-"`
 }
 
 // AllGroupsFullyCompleted checks if all groups are fully completed.
@@ -59,14 +61,15 @@ func (p *Progress) AllGroupsFullyCompleted(groups 
[]*commonv1.Group) bool {
 func NewProgress() *Progress {
        return &Progress{
                CompletedGroups:      make(map[string]bool),
-               CompletedStreams:     make(map[string]map[string]bool),
                CompletedMeasures:    make(map[string]map[string]bool),
                DeletedStreamGroups:  make(map[string]bool),
                DeletedMeasureGroups: make(map[string]bool),
-               StreamErrors:         make(map[string]map[string]string),
                MeasureErrors:        make(map[string]map[string]string),
-               StreamCounts:         make(map[string]map[string]int),
                MeasureCounts:        make(map[string]map[string]int),
+               CompletedStreamParts: 
make(map[string]map[string]map[uint64]bool),
+               StreamPartErrors:     
make(map[string]map[string]map[uint64]string),
+               StreamPartCounts:     make(map[string]map[string]int),
+               StreamPartProgress:   make(map[string]map[string]int),
        }
 }
 
@@ -133,30 +136,6 @@ func (p *Progress) IsGroupCompleted(group string) bool {
        return p.CompletedGroups[group]
 }
 
-// MarkStreamCompleted marks a stream as completed.
-func (p *Progress) MarkStreamCompleted(group, stream string, count int) {
-       p.mu.Lock()
-       defer p.mu.Unlock()
-       if p.CompletedStreams[group] == nil {
-               p.CompletedStreams[group] = make(map[string]bool)
-       }
-       p.CompletedStreams[group][stream] = true
-       if p.StreamCounts[group] == nil {
-               p.StreamCounts[group] = make(map[string]int)
-       }
-       p.StreamCounts[group][stream] = count
-}
-
-// IsStreamCompleted checks if a stream has been completed.
-func (p *Progress) IsStreamCompleted(group, stream string) bool {
-       p.mu.Lock()
-       defer p.mu.Unlock()
-       if streams, ok := p.CompletedStreams[group]; ok {
-               return streams[stream]
-       }
-       return false
-}
-
 // MarkMeasureCompleted marks a measure as completed.
 func (p *Progress) MarkMeasureCompleted(group, measure string, count int) {
        p.mu.Lock()
@@ -209,28 +188,13 @@ func (p *Progress) IsMeasureGroupDeleted(group string) 
bool {
        return p.DeletedMeasureGroups[group]
 }
 
-// ClearErrors resets all prior stream/measure error records.
+// ClearErrors resets all prior measure error records.
 func (p *Progress) ClearErrors() {
        p.mu.Lock()
        defer p.mu.Unlock()
-       p.StreamErrors = make(map[string]map[string]string)
        p.MeasureErrors = make(map[string]map[string]string)
 }
 
-// MarkStreamError records an error message for a specific stream.
-func (p *Progress) MarkStreamError(group, stream, msg string) {
-       p.mu.Lock()
-       defer p.mu.Unlock()
-       if p.StreamErrors[group] == nil {
-               p.StreamErrors[group] = make(map[string]string)
-       }
-       p.StreamErrors[group][stream] = msg
-       if p.CompletedStreams[group] == nil {
-               p.CompletedStreams[group] = make(map[string]bool)
-       }
-       p.CompletedStreams[group][stream] = false
-}
-
 // MarkMeasureError records an error message for a specific measure.
 func (p *Progress) MarkMeasureError(group, measure, msg string) {
        p.mu.Lock()
@@ -259,3 +223,136 @@ func (p *Progress) Remove(path string, l *logger.Logger) {
        }
        l.Info().Msgf("Removed progress file at %s", path)
 }
+
+// MarkStreamPartCompleted marks a specific part of a stream as completed.
+func (p *Progress) MarkStreamPartCompleted(group, stream string, partID 
uint64) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.CompletedStreamParts[group] == nil {
+               p.CompletedStreamParts[group] = make(map[string]map[uint64]bool)
+       }
+       if p.CompletedStreamParts[group][stream] == nil {
+               p.CompletedStreamParts[group][stream] = make(map[uint64]bool)
+       }
+
+       // Mark part as completed
+       p.CompletedStreamParts[group][stream][partID] = true
+
+       // Update progress count
+       if p.StreamPartProgress[group] == nil {
+               p.StreamPartProgress[group] = make(map[string]int)
+       }
+       p.StreamPartProgress[group][stream]++
+}
+
+// IsStreamPartCompleted checks if a specific part of a stream has been 
completed.
+func (p *Progress) IsStreamPartCompleted(group, stream string, partID uint64) 
bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if streams, ok := p.CompletedStreamParts[group]; ok {
+               if parts, ok := streams[stream]; ok {
+                       return parts[partID]
+               }
+       }
+       return false
+}
+
+// MarkStreamPartError records an error for a specific part of a stream.
+func (p *Progress) MarkStreamPartError(group, stream string, partID uint64, 
errorMsg string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       // Initialize nested maps if they don't exist
+       if p.StreamPartErrors[group] == nil {
+               p.StreamPartErrors[group] = make(map[string]map[uint64]string)
+       }
+       if p.StreamPartErrors[group][stream] == nil {
+               p.StreamPartErrors[group][stream] = make(map[uint64]string)
+       }
+
+       // Record the error
+       p.StreamPartErrors[group][stream][partID] = errorMsg
+}
+
+// SetStreamPartCount sets the total number of parts for a stream.
+func (p *Progress) SetStreamPartCount(group, stream string, totalParts int) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if p.StreamPartCounts[group] == nil {
+               p.StreamPartCounts[group] = make(map[string]int)
+       }
+       p.StreamPartCounts[group][stream] = totalParts
+
+       // Initialize progress tracking
+       if p.StreamPartProgress[group] == nil {
+               p.StreamPartProgress[group] = make(map[string]int)
+       }
+       if p.StreamPartProgress[group][stream] == 0 {
+               p.StreamPartProgress[group][stream] = 0
+       }
+}
+
+// GetStreamPartCount returns the total number of parts for a stream.
+func (p *Progress) GetStreamPartCount(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if counts, ok := p.StreamPartCounts[group]; ok {
+               return counts[stream]
+       }
+       return 0
+}
+
+// GetStreamPartProgress returns the number of completed parts for a stream.
+func (p *Progress) GetStreamPartProgress(group, stream string) int {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if progress, ok := p.StreamPartProgress[group]; ok {
+               return progress[stream]
+       }
+       return 0
+}
+
+// IsStreamFullyCompleted checks if all parts of a stream have been completed.
+func (p *Progress) IsStreamFullyCompleted(group, stream string) bool {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       totalParts := p.StreamPartCounts[group][stream]
+       completedParts := p.StreamPartProgress[group][stream]
+
+       return totalParts > 0 && completedParts >= totalParts
+}
+
+// GetStreamPartErrors returns all errors for a specific stream.
+func (p *Progress) GetStreamPartErrors(group, stream string) map[uint64]string 
{
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if streams, ok := p.StreamPartErrors[group]; ok {
+               if parts, ok := streams[stream]; ok {
+                       // Return a copy to avoid race conditions
+                       result := make(map[uint64]string)
+                       for partID, errorMsg := range parts {
+                               result[partID] = errorMsg
+                       }
+                       return result
+               }
+       }
+       return nil
+}
+
+// ClearStreamPartErrors clears all part errors for a specific stream.
+func (p *Progress) ClearStreamPartErrors(group, stream string) {
+       p.mu.Lock()
+       defer p.mu.Unlock()
+
+       if streams, ok := p.StreamPartErrors[group]; ok {
+               delete(streams, stream)
+       }
+}
diff --git a/banyand/backup/lifecycle/progress_test.go 
b/banyand/backup/lifecycle/progress_test.go
index 5d3943a2..09f67151 100644
--- a/banyand/backup/lifecycle/progress_test.go
+++ b/banyand/backup/lifecycle/progress_test.go
@@ -40,7 +40,6 @@ func TestProgress(t *testing.T) {
                progress := NewProgress()
                assert.NotNil(t, progress)
                assert.Empty(t, progress.CompletedGroups)
-               assert.Empty(t, progress.CompletedStreams)
                assert.Empty(t, progress.CompletedMeasures)
                assert.Empty(t, progress.DeletedStreamGroups)
                assert.Empty(t, progress.DeletedMeasureGroups)
@@ -53,11 +52,6 @@ func TestProgress(t *testing.T) {
                assert.True(t, progress.IsGroupCompleted("group1"))
                assert.False(t, progress.IsGroupCompleted("group2"))
 
-               progress.MarkStreamCompleted("group1", "stream1", 1)
-               assert.True(t, progress.IsStreamCompleted("group1", "stream1"))
-               assert.False(t, progress.IsStreamCompleted("group1", "stream2"))
-               assert.False(t, progress.IsStreamCompleted("group2", "stream1"))
-
                progress.MarkMeasureCompleted("group1", "measure1", 1)
                assert.True(t, progress.IsMeasureCompleted("group1", 
"measure1"))
                assert.False(t, progress.IsMeasureCompleted("group1", 
"measure2"))
@@ -75,7 +69,6 @@ func TestProgress(t *testing.T) {
        t.Run("SaveAndLoad", func(t *testing.T) {
                progress := NewProgress()
                progress.MarkGroupCompleted("group1")
-               progress.MarkStreamCompleted("group1", "stream1", 1)
                progress.MarkMeasureCompleted("group1", "measure1", 1)
                progress.MarkStreamGroupDeleted("group2")
                progress.MarkMeasureGroupDeleted("group2")
@@ -85,13 +78,11 @@ func TestProgress(t *testing.T) {
                loaded := LoadProgress(progressPath, l)
 
                assert.True(t, loaded.IsGroupCompleted("group1"))
-               assert.True(t, loaded.IsStreamCompleted("group1", "stream1"))
                assert.True(t, loaded.IsMeasureCompleted("group1", "measure1"))
                assert.True(t, loaded.IsStreamGroupDeleted("group2"))
                assert.True(t, loaded.IsMeasureGroupDeleted("group2"))
 
                assert.False(t, loaded.IsGroupCompleted("group3"))
-               assert.False(t, loaded.IsStreamCompleted("group1", "stream2"))
                assert.False(t, loaded.IsMeasureCompleted("group1", "measure2"))
                assert.False(t, loaded.IsStreamGroupDeleted("group3"))
                assert.False(t, loaded.IsMeasureGroupDeleted("group3"))
diff --git a/banyand/backup/lifecycle/service.go 
b/banyand/backup/lifecycle/service.go
index a1dafce0..862b2034 100644
--- a/banyand/backup/lifecycle/service.go
+++ b/banyand/backup/lifecycle/service.go
@@ -21,7 +21,6 @@ import (
        "context"
        "encoding/json"
        "fmt"
-       "math"
        "os"
        "path/filepath"
        "sort"
@@ -39,13 +38,13 @@ import (
        modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/banyand/backup/snapshot"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/measure"
        "github.com/apache/skywalking-banyandb/banyand/metadata"
        "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
        "github.com/apache/skywalking-banyandb/banyand/observability"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/banyand/queue"
-       "github.com/apache/skywalking-banyandb/banyand/stream"
        "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/node"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
@@ -77,6 +76,7 @@ type lifecycleService struct {
        maxExecutionTimes int
        enableTLS         bool
        insecure          bool
+       chunkSize         run.Bytes
 }
 
 // NewService creates a new lifecycle service.
@@ -107,6 +107,8 @@ func (l *lifecycleService) FlagSet() *run.FlagSet {
                "Schedule expression for periodic backup. Options: @yearly, 
@monthly, @weekly, @daily, @hourly or @every <duration>",
        )
        flagS.IntVar(&l.maxExecutionTimes, "max-execution-times", 0, "Maximum 
number of times to execute the lifecycle migration. 0 means no limit.")
+       l.chunkSize = run.Bytes(1024 * 1024)
+       flagS.VarP(&l.chunkSize, "chunk-size", "", "Chunk size in bytes for 
streaming data during migration (default: 1MB)")
        return flagS
 }
 
@@ -212,13 +214,7 @@ func (l *lifecycleService) action() error {
        for _, g := range groups {
                switch g.Catalog {
                case commonv1.Catalog_CATALOG_STREAM:
-                       if streamSVC == nil {
-                               l.l.Error().Msgf("stream service is not 
available, skipping group: %s", g.Metadata.Name)
-                               progress.MarkStreamError(g.Metadata.Name, "", 
fmt.Sprintf("stream service unavailable for group %s", g.Metadata.Name))
-                               allGroupsCompleted = false
-                               continue
-                       }
-                       l.processStreamGroup(ctx, g, streamSVC, nodes, labels, 
progress)
+                       l.processStreamGroup(ctx, g, streamDir, nodes, labels, 
progress)
                case commonv1.Catalog_CATALOG_MEASURE:
                        if measureSVC == nil {
                                l.l.Error().Msgf("measure service is not 
available, skipping group: %s", g.Metadata.Name)
@@ -247,9 +243,7 @@ func (l *lifecycleService) action() error {
 // generateReport gathers counts & errors from Progress, writes one JSON file 
per run, and keeps only 5 latest.
 func (l *lifecycleService) generateReport(p *Progress) {
        type grp struct {
-               StreamCounts  map[string]int    `json:"stream_counts"`
                MeasureCounts map[string]int    `json:"measure_counts"`
-               StreamErrors  map[string]string `json:"stream_errors"`
                MeasureErrors map[string]string `json:"measure_errors"`
                Name          string            `json:"name"`
        }
@@ -261,25 +255,21 @@ func (l *lifecycleService) generateReport(p *Progress) {
 
        // build report
        var groups []grp
-       for name := range p.CompletedStreams {
+       for name := range p.CompletedGroups {
                groups = append(groups, grp{
                        Name:          name,
-                       StreamCounts:  p.StreamCounts[name],
                        MeasureCounts: p.MeasureCounts[name],
-                       StreamErrors:  p.StreamErrors[name],
                        MeasureErrors: p.MeasureErrors[name],
                })
        }
        // also include groups that had only measures
        for name := range p.CompletedMeasures {
-               if _, seen := p.CompletedStreams[name]; seen {
+               if _, seen := p.CompletedGroups[name]; seen {
                        continue
                }
                groups = append(groups, grp{
                        Name:          name,
-                       StreamCounts:  p.StreamCounts[name],
                        MeasureCounts: p.MeasureCounts[name],
-                       StreamErrors:  p.StreamErrors[name],
                        MeasureErrors: p.MeasureErrors[name],
                })
        }
@@ -367,23 +357,11 @@ func (l *lifecycleService) getGroupsToProcess(ctx 
context.Context, progress *Pro
        return groups, nil
 }
 
-func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group, streamSVC stream.Service,
-       nodes []*databasev1.Node, labels map[string]string, progress *Progress,
+func (l *lifecycleService) processStreamGroup(ctx context.Context, g 
*commonv1.Group,
+       streamDir string, nodes []*databasev1.Node, labels map[string]string, 
progress *Progress,
 ) {
-       shardNum, replicas, selector, client, err := parseGroup(g, labels, 
nodes, l.l, l.metadata)
-       if err != nil {
-               l.l.Error().Err(err).Msgf("failed to parse group %s", 
g.Metadata.Name)
-               return
-       }
-       defer client.GracefulStop()
-
-       ss, err := l.metadata.StreamRegistry().ListStream(ctx, 
schema.ListOpt{Group: g.Metadata.Name})
-       if err != nil {
-               l.l.Error().Err(err).Msgf("failed to list streams in group %s", 
g.Metadata.Name)
-               return
-       }
-
-       tr := streamSVC.GetRemovalSegmentsTimeRange(g.Metadata.Name)
+       // Calculate removal segments time range based on group TTL 
configuration
+       tr := l.getRemovalSegmentsTimeRange(g)
        if tr.Start.IsZero() && tr.End.IsZero() {
                l.l.Info().Msgf("no removal segments time range for group %s, 
skipping stream migration", g.Metadata.Name)
                progress.MarkGroupCompleted(g.Metadata.Name)
@@ -391,92 +369,93 @@ func (l *lifecycleService) processStreamGroup(ctx 
context.Context, g *commonv1.G
                return
        }
 
-       l.processStreams(ctx, g, ss, streamSVC, tr, shardNum, replicas, 
selector, client, progress)
-
-       allStreamsDone := true
-       for _, s := range ss {
-               if !progress.IsStreamCompleted(g.Metadata.Name, 
s.Metadata.Name) {
-                       allStreamsDone = false
-               }
-       }
-       if allStreamsDone {
-               l.l.Info().Msgf("deleting expired stream segments for group: 
%s", g.Metadata.Name)
-               l.deleteExpiredStreamSegments(ctx, g, tr, progress)
-               progress.MarkGroupCompleted(g.Metadata.Name)
-               progress.Save(l.progressFilePath, l.l)
-       } else {
-               l.l.Info().Msgf("skipping delete expired stream segments for 
group %s: some streams not fully migrated", g.Metadata.Name)
+       // Use file-based migration instead of element-based
+       err := l.processStreamGroupFileBased(ctx, g, streamDir, tr, nodes, 
labels, progress)
+       if err != nil {
+               l.l.Error().Err(err).Msgf("failed to migrate stream group %s 
using file-based approach", g.Metadata.Name)
+               return
        }
+
+       l.l.Info().Msgf("deleting expired stream segments for group: %s", 
g.Metadata.Name)
+       l.deleteExpiredStreamSegments(ctx, g, tr, progress)
+       progress.MarkGroupCompleted(g.Metadata.Name)
+       progress.Save(l.progressFilePath, l.l)
 }
 
-func (l *lifecycleService) processStreams(
-       ctx context.Context,
-       g *commonv1.Group,
-       streams []*databasev1.Stream,
-       streamSVC stream.Service,
-       tr *timestamp.TimeRange,
-       shardNum uint32,
-       replicas uint32,
-       selector node.Selector,
-       client queue.Client,
-       progress *Progress,
-) {
-       for _, s := range streams {
-               if progress.IsStreamCompleted(g.Metadata.Name, s.Metadata.Name) 
{
-                       l.l.Debug().Msgf("skipping already completed stream: 
%s/%s", g.Metadata.Name, s.Metadata.Name)
-                       continue
-               }
-               sum, err := l.processSingleStream(ctx, s, streamSVC, tr, 
shardNum, replicas, selector, client)
-               if err != nil {
-                       progress.MarkStreamError(g.Metadata.Name, 
s.Metadata.Name, err.Error())
-               } else {
-                       if sum < 1 {
-                               l.l.Debug().Msgf("no elements migrated for 
stream %s", s.Metadata.Name)
-                       } else {
-                               l.l.Info().Msgf("migrated %d elements in stream 
%s", sum, s.Metadata.Name)
-                       }
-                       progress.MarkStreamCompleted(g.Metadata.Name, 
s.Metadata.Name, sum)
-               }
-               progress.Save(l.progressFilePath, l.l)
+// processStreamGroupFileBased uses file-based migration instead of 
element-based queries.
+func (l *lifecycleService) processStreamGroupFileBased(_ context.Context, g 
*commonv1.Group,
+       streamDir string, tr *timestamp.TimeRange, nodes []*databasev1.Node, 
labels map[string]string, progress *Progress,
+) error {
+       if progress.IsStreamGroupDeleted(g.Metadata.Name) {
+               l.l.Info().Msgf("skipping already completed file-based 
migration for group: %s", g.Metadata.Name)
+               return nil
        }
-}
 
-func (l *lifecycleService) processSingleStream(ctx context.Context, s 
*databasev1.Stream,
-       streamSVC stream.Service, tr *timestamp.TimeRange, shardNum uint32, 
replicas uint32, selector node.Selector, client queue.Client,
-) (int, error) {
-       q, err := streamSVC.Stream(s.Metadata)
+       l.l.Info().Msgf("starting file-based stream migration for group: %s", 
g.Metadata.Name)
+
+       // Use the file-based migration with existing visitor pattern
+       err := MigrateStreamWithFileBased(
+               streamDir,  // Use snapshot directory as source
+               *tr,        // Time range for segments to migrate
+               g,          // Group configuration
+               labels,     // Node labels
+               nodes,      // Target nodes
+               l.metadata, // Metadata repository
+               storage.IntervalRule{Unit: storage.DAY, Num: 1}, // Use daily 
segments as default
+               l.l,              // Logger
+               int(l.chunkSize), // Chunk size for streaming
+       )
        if err != nil {
-               l.l.Error().Err(err).Msgf("failed to get stream %s", 
s.Metadata.Name)
-               return 0, err
+               return fmt.Errorf("file-based stream migration failed: %w", err)
        }
 
-       tagProjection := make([]model.TagProjection, len(s.TagFamilies))
-       entity := make([]*modelv1.TagValue, len(s.Entity.TagNames))
-       for idx := range s.Entity.TagNames {
-               entity[idx] = pbv1.AnyTagValue
+       l.l.Info().Msgf("completed file-based stream migration for group: %s", 
g.Metadata.Name)
+       return nil
+}
+
+// getRemovalSegmentsTimeRange calculates the time range for segments that 
should be migrated
+// based on the group's TTL configuration, similar to 
storage.segmentController.getExpiredSegmentsTimeRange.
+func (l *lifecycleService) getRemovalSegmentsTimeRange(g *commonv1.Group) 
*timestamp.TimeRange {
+       if g.ResourceOpts == nil || g.ResourceOpts.Ttl == nil {
+               l.l.Debug().Msgf("no TTL configured for group %s", 
g.Metadata.Name)
+               return &timestamp.TimeRange{} // Return empty time range
        }
-       for i, tf := range s.TagFamilies {
-               tagProjection[i] = model.TagProjection{
-                       Family: tf.Name,
-                       Names:  make([]string, len(tf.Tags)),
-               }
-               for j, t := range tf.Tags {
-                       tagProjection[i].Names[j] = t.Name
-               }
+
+       // Convert TTL to storage.IntervalRule
+       ttl := storage.MustToIntervalRule(g.ResourceOpts.Ttl)
+
+       // Calculate deadline based on TTL (same logic as 
segmentController.getExpiredSegmentsTimeRange)
+       deadline := time.Now().Local().Add(-l.calculateTTLDuration(ttl))
+
+       // Create time range for segments before the deadline
+       timeRange := &timestamp.TimeRange{
+               Start:        time.Time{}, // Will be set to earliest segment 
start time
+               End:          deadline,    // All segments before this time 
should be migrated
+               IncludeStart: true,
+               IncludeEnd:   false,
        }
 
-       result, err := q.Query(ctx, model.StreamQueryOptions{
-               Name:           s.Metadata.Name,
-               TagProjection:  tagProjection,
-               Entities:       [][]*modelv1.TagValue{entity},
-               TimeRange:      tr,
-               MaxElementSize: math.MaxInt,
-       })
-       if err != nil {
-               l.l.Error().Err(err).Msgf("failed to query stream %s", 
s.Metadata.Name)
-               return 0, err
+       l.l.Info().
+               Str("group", g.Metadata.Name).
+               Time("deadline", deadline).
+               Str("ttl", fmt.Sprintf("%d %s", g.ResourceOpts.Ttl.Num, 
g.ResourceOpts.Ttl.Unit.String())).
+               Msg("calculated removal segments time range based on TTL")
+
+       return timeRange
+}
+
+// calculateTTLDuration calculates the duration for a TTL interval rule.
+// This implements the same logic as storage.IntervalRule.estimatedDuration().
+func (l *lifecycleService) calculateTTLDuration(ttl storage.IntervalRule) 
time.Duration {
+       switch ttl.Unit {
+       case storage.HOUR:
+               return time.Hour * time.Duration(ttl.Num)
+       case storage.DAY:
+               return 24 * time.Hour * time.Duration(ttl.Num)
+       default:
+               l.l.Warn().Msgf("unknown TTL unit %v, defaulting to 1 day", 
ttl.Unit)
+               return 24 * time.Hour
        }
-       return migrateStream(ctx, s, result, shardNum, replicas, selector, 
client, l.l), nil
 }
 
 func (l *lifecycleService) deleteExpiredStreamSegments(ctx context.Context, g 
*commonv1.Group, tr *timestamp.TimeRange, progress *Progress) {
diff --git a/lifecycle b/lifecycle
new file mode 100755
index 00000000..56e1ae3f
Binary files /dev/null and b/lifecycle differ

Reply via email to