This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 079898ac Implement Backoff Retry Mechanism for Sending Queue Failures 
(#836)
079898ac is described below

commit 079898acb7572803fc27df6d75971e3036e814ab
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 6 14:15:34 2025 +0800

    Implement Backoff Retry Mechanism for Sending Queue Failures (#836)
---
 CHANGES.md                                         |   1 +
 .../backup/lifecycle/measure_migration_visitor.go  |   2 +-
 .../backup/lifecycle/stream_migration_visitor.go   |   2 +-
 banyand/internal/storage/failed_parts_handler.go   | 409 +++++++++++++++
 .../internal/storage/failed_parts_handler_test.go  | 574 +++++++++++++++++++++
 banyand/internal/storage/segment.go                |  13 -
 banyand/measure/measure.go                         |  13 +-
 banyand/measure/svc_liaison.go                     |  48 +-
 banyand/measure/syncer.go                          | 280 ++++++++--
 banyand/measure/tstable.go                         |  15 +-
 banyand/queue/local.go                             |  16 +
 banyand/queue/pub/chunked_sync.go                  |  70 ++-
 banyand/queue/pub/chunked_sync_test.go             | 234 +++++++++
 banyand/queue/pub/pub_tls_test.go                  |  80 ++-
 banyand/queue/queue.go                             |  54 +-
 banyand/stream/stream.go                           |  15 +-
 banyand/stream/svc_liaison.go                      |  50 +-
 banyand/stream/syncer.go                           | 286 ++++++++--
 banyand/stream/tstable.go                          |   3 +
 banyand/trace/handoff_controller.go                |   2 +-
 banyand/trace/svc_liaison.go                       |  57 +-
 banyand/trace/syncer.go                            | 250 +++++++--
 banyand/trace/trace.go                             |  13 +-
 banyand/trace/tstable.go                           |  13 +-
 .../integration/distributed/sync_retry/injector.go | 159 ++++++
 .../sync_retry/sync_retry_suite_test.go            | 136 +++++
 .../distributed/sync_retry/sync_retry_test.go      | 298 +++++++++++
 27 files changed, 2831 insertions(+), 262 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 1817a601..53c23f35 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,6 +56,7 @@ Release Notes.
 - Refactor router for better usability.
 - Implement the handoff queue for Trace.
 - Add dump command-line tool to parse and display trace part data with support 
for CSV export and human-readable timestamp formatting.
+- Implement backoff retry mechanism for sending queue failures.
 
 ### Bug Fixes
 
diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go 
b/banyand/backup/lifecycle/measure_migration_visitor.go
index 2a6588de..fd12b85b 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -343,7 +343,7 @@ func (mv *measureMigrationVisitor) streamPartToNode(nodeID 
string, targetShardID
        }
 
        if !result.Success {
-               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.FailedParts)
        }
 
        // Log success metrics (same pattern as syncer.go:210-217)
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go 
b/banyand/backup/lifecycle/stream_migration_visitor.go
index 6c3fff81..71b03e5f 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -464,7 +464,7 @@ func (mv *streamMigrationVisitor) streamPartToNode(nodeID 
string, targetShardID
        }
 
        if !result.Success {
-               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+               return fmt.Errorf("chunked sync partially failed: %v", 
result.FailedParts)
        }
 
        // Log success metrics (same pattern as syncer.go:210-217)
diff --git a/banyand/internal/storage/failed_parts_handler.go 
b/banyand/internal/storage/failed_parts_handler.go
new file mode 100644
index 00000000..700b6635
--- /dev/null
+++ b/banyand/internal/storage/failed_parts_handler.go
@@ -0,0 +1,409 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       iofs "io/fs"
+       "os"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       // DefaultInitialRetryDelay is the initial delay before the first retry.
+       DefaultInitialRetryDelay = 1 * time.Second
+       // DefaultMaxRetries is the maximum number of retry attempts.
+       DefaultMaxRetries = 3
+       // DefaultBackoffMultiplier is the multiplier for exponential backoff.
+       DefaultBackoffMultiplier = 2
+       // FailedPartsDirName is the name of the directory for failed parts.
+       FailedPartsDirName = "failed-parts"
+)
+
+// FailedPartsHandler handles retry logic and filesystem fallback for failed 
parts.
+type FailedPartsHandler struct {
+       fileSystem        fs.FileSystem
+       l                 *logger.Logger
+       root              string
+       failedPartsDir    string
+       initialRetryDelay time.Duration
+       maxRetries        int
+       backoffMultiplier int
+       maxTotalSizeBytes uint64
+       sizeMu            sync.Mutex
+}
+
+// PartInfo contains information needed to retry or copy a failed part.
+type PartInfo struct {
+       SourcePath string
+       PartType   string
+       PartID     uint64
+}
+
+// NewFailedPartsHandler creates a new handler for failed parts.
+func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l 
*logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler {
+       failedPartsDir := filepath.Join(root, FailedPartsDirName)
+       fileSystem.MkdirIfNotExist(failedPartsDir, DirPerm)
+
+       return &FailedPartsHandler{
+               fileSystem:        fileSystem,
+               root:              root,
+               failedPartsDir:    failedPartsDir,
+               l:                 l,
+               initialRetryDelay: DefaultInitialRetryDelay,
+               maxRetries:        DefaultMaxRetries,
+               backoffMultiplier: DefaultBackoffMultiplier,
+               maxTotalSizeBytes: maxTotalSizeBytes,
+       }
+}
+
+// RetryFailedParts attempts to retry failed parts with exponential backoff.
+// Returns the list of permanently failed part IDs after all retries are 
exhausted.
+func (h *FailedPartsHandler) RetryFailedParts(
+       ctx context.Context,
+       failedParts []queue.FailedPart,
+       partsInfo map[uint64][]*PartInfo,
+       syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) ([]uint64, error) {
+       if len(failedParts) == 0 {
+               return nil, nil
+       }
+
+       // Group failed parts by part ID
+       failedPartIDs := make(map[uint64]string)
+       for _, fp := range failedParts {
+               partID, err := strconv.ParseUint(fp.PartID, 10, 64)
+               if err != nil {
+                       h.l.Warn().Err(err).Str("partID", 
fp.PartID).Msg("failed to parse part ID, skipping")
+                       continue
+               }
+               failedPartIDs[partID] = fp.Error
+       }
+
+       h.l.Warn().
+               Int("count", len(failedPartIDs)).
+               Msg("starting retry process for failed parts")
+
+       // Retry with exponential backoff
+       var stillFailing []uint64
+       for partID, errMsg := range failedPartIDs {
+               if err := h.retryPartWithBackoff(ctx, partID, errMsg, 
syncFunc); err != nil {
+                       h.l.Error().
+                               Err(err).
+                               Uint64("partID", partID).
+                               Msg("part failed after all retries")
+                       stillFailing = append(stillFailing, partID)
+               }
+       }
+
+       // Copy permanently failed parts to failed-parts directory
+       var permanentlyFailed []uint64
+       for _, partID := range stillFailing {
+               partInfoList, exists := partsInfo[partID]
+               if !exists || len(partInfoList) == 0 {
+                       h.l.Warn().Uint64("partID", partID).Msg("no part info 
found for failed part")
+                       permanentlyFailed = append(permanentlyFailed, partID)
+                       continue
+               }
+
+               // Copy all parts with this ID (core + all SIDX parts)
+               allCopied := true
+               for _, partInfo := range partInfoList {
+                       destSubDir := fmt.Sprintf("%016x_%s", partID, 
partInfo.PartType)
+                       if err := h.CopyToFailedPartsDir(partID, 
partInfo.SourcePath, destSubDir); err != nil {
+                               h.l.Error().
+                                       Err(err).
+                                       Uint64("partID", partID).
+                                       Str("partType", partInfo.PartType).
+                                       Str("sourcePath", partInfo.SourcePath).
+                                       Msg("failed to copy part to 
failed-parts directory")
+                               allCopied = false
+                       } else {
+                               h.l.Info().
+                                       Uint64("partID", partID).
+                                       Str("partType", partInfo.PartType).
+                                       Str("destination", 
filepath.Join(h.failedPartsDir, destSubDir)).
+                                       Msg("successfully copied failed part to 
failed-parts directory")
+                       }
+               }
+               if !allCopied {
+                       h.l.Warn().Uint64("partID", partID).Msg("some parts 
failed to copy")
+               }
+               permanentlyFailed = append(permanentlyFailed, partID)
+       }
+
+       return permanentlyFailed, nil
+}
+
+// retryPartWithBackoff retries a single part with exponential backoff.
+func (h *FailedPartsHandler) retryPartWithBackoff(
+       ctx context.Context,
+       partID uint64,
+       initialError string,
+       syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) error {
+       delay := h.initialRetryDelay
+
+       for attempt := 1; attempt <= h.maxRetries; attempt++ {
+               // Wait before retry
+               select {
+               case <-ctx.Done():
+                       return fmt.Errorf("context canceled during retry: %w", 
ctx.Err())
+               case <-time.After(delay):
+               }
+
+               h.l.Info().
+                       Uint64("partID", partID).
+                       Int("attempt", attempt).
+                       Int("maxRetries", h.maxRetries).
+                       Dur("delay", delay).
+                       Msg("retrying failed part")
+
+               // Attempt to sync just this part
+               newFailedParts, err := syncFunc([]uint64{partID})
+               if err != nil {
+                       h.l.Warn().
+                               Err(err).
+                               Uint64("partID", partID).
+                               Int("attempt", attempt).
+                               Msg("retry attempt failed with error")
+                       delay *= time.Duration(h.backoffMultiplier)
+                       continue
+               }
+
+               // Check if this part still failed
+               partStillFailed := false
+               for _, fp := range newFailedParts {
+                       fpID, _ := strconv.ParseUint(fp.PartID, 10, 64)
+                       if fpID == partID {
+                               partStillFailed = true
+                               h.l.Warn().
+                                       Uint64("partID", partID).
+                                       Int("attempt", attempt).
+                                       Str("error", fp.Error).
+                                       Msg("retry attempt failed")
+                               break
+                       }
+               }
+
+               if !partStillFailed {
+                       h.l.Info().
+                               Uint64("partID", partID).
+                               Int("attempt", attempt).
+                               Msg("part successfully synced after retry")
+                       return nil
+               }
+
+               // Exponential backoff for next attempt
+               delay *= time.Duration(h.backoffMultiplier)
+       }
+
+       return fmt.Errorf("part failed after %d retry attempts, initial error: 
%s", h.maxRetries, initialError)
+}
+
+// CopyToFailedPartsDir copies a part to the failed-parts directory using hard 
links.
+func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath 
string, destSubDir string) error {
+       destPath := filepath.Join(h.failedPartsDir, destSubDir)
+
+       if h.maxTotalSizeBytes > 0 {
+               h.sizeMu.Lock()
+               defer h.sizeMu.Unlock()
+       }
+
+       // Check if already exists
+       entries := h.fileSystem.ReadDir(h.failedPartsDir)
+       for _, entry := range entries {
+               if entry.Name() == destSubDir {
+                       h.l.Info().
+                               Uint64("partID", partID).
+                               Str("destSubDir", destSubDir).
+                               Msg("part already exists in failed-parts 
directory")
+                       return nil
+               }
+       }
+
+       if _, err := os.Stat(sourcePath); err != nil {
+               h.l.Error().
+                       Err(err).
+                       Uint64("partID", partID).
+                       Str("sourcePath", sourcePath).
+                       Msg("failed to stat source path before copying to 
failed-parts directory")
+               return fmt.Errorf("failed to stat source path %s: %w", 
sourcePath, err)
+       }
+
+       if h.maxTotalSizeBytes > 0 {
+               currentSize, err := calculatePathSize(h.failedPartsDir)
+               if err != nil {
+                       return fmt.Errorf("failed to calculate current 
failed-parts size: %w", err)
+               }
+               sourceSize, err := calculatePathSize(sourcePath)
+               if err != nil {
+                       return fmt.Errorf("failed to calculate size of source 
path %s: %w", sourcePath, err)
+               }
+
+               if currentSize+sourceSize > h.maxTotalSizeBytes {
+                       type partDir struct {
+                               modTime time.Time
+                               name    string
+                               path    string
+                               size    uint64
+                       }
+
+                       partDirs := make([]partDir, 0, len(entries))
+                       for _, entry := range entries {
+                               if !entry.IsDir() || entry.Name() == destSubDir 
{
+                                       continue
+                               }
+
+                               dirPath := filepath.Join(h.failedPartsDir, 
entry.Name())
+                               info, err := os.Stat(dirPath)
+                               if err != nil {
+                                       h.l.Warn().
+                                               Err(err).
+                                               Str("path", dirPath).
+                                               Msg("failed to stat existing 
failed part directory during eviction")
+                                       continue
+                               }
+
+                               dirSize, err := calculatePathSize(dirPath)
+                               if err != nil {
+                                       h.l.Warn().
+                                               Err(err).
+                                               Str("path", dirPath).
+                                               Msg("failed to calculate size 
for existing failed part directory during eviction")
+                                       continue
+                               }
+
+                               partDirs = append(partDirs, partDir{
+                                       name:    entry.Name(),
+                                       path:    dirPath,
+                                       modTime: info.ModTime(),
+                                       size:    dirSize,
+                               })
+                       }
+
+                       sort.Slice(partDirs, func(i, j int) bool {
+                               if 
partDirs[i].modTime.Equal(partDirs[j].modTime) {
+                                       return partDirs[i].name < 
partDirs[j].name
+                               }
+                               return 
partDirs[i].modTime.Before(partDirs[j].modTime)
+                       })
+
+                       for _, dir := range partDirs {
+                               if currentSize+sourceSize <= 
h.maxTotalSizeBytes {
+                                       break
+                               }
+
+                               if err := os.RemoveAll(dir.path); err != nil {
+                                       return fmt.Errorf("failed to remove 
oldest failed part %s: %w", dir.name, err)
+                               }
+
+                               h.l.Info().
+                                       Str("removedFailedPartDir", dir.name).
+                                       Uint64("freedBytes", dir.size).
+                                       Msg("removed oldest failed part to 
honor size limit")
+
+                               if currentSize >= dir.size {
+                                       currentSize -= dir.size
+                               } else {
+                                       currentSize = 0
+                               }
+                       }
+
+                       if currentSize+sourceSize > h.maxTotalSizeBytes {
+                               return fmt.Errorf("failed to free space in 
failed-parts directory for part %d", partID)
+                       }
+               }
+       }
+
+       h.l.Info().
+               Uint64("partID", partID).
+               Str("sourcePath", sourcePath).
+               Str("destPath", destPath).
+               Msg("creating hard links to failed-parts directory")
+
+       // Create hard links from source to destination
+       if err := h.fileSystem.CreateHardLink(sourcePath, destPath, nil); err 
!= nil {
+               h.l.Error().
+                       Err(err).
+                       Uint64("partID", partID).
+                       Str("sourcePath", sourcePath).
+                       Str("destPath", destPath).
+                       Msg("failed to create hard links")
+               return fmt.Errorf("failed to create hard links: %w", err)
+       }
+
+       h.fileSystem.SyncPath(destPath)
+
+       h.l.Info().
+               Uint64("partID", partID).
+               Str("destPath", destPath).
+               Msg("successfully created hard links to failed-parts directory")
+
+       return nil
+}
+
+func calculatePathSize(path string) (uint64, error) {
+       info, err := os.Stat(path)
+       if err != nil {
+               if os.IsNotExist(err) {
+                       return 0, nil
+               }
+               return 0, err
+       }
+
+       if !info.IsDir() {
+               return uint64(info.Size()), nil
+       }
+
+       var size uint64
+       walkErr := filepath.WalkDir(path, func(_ string, d iofs.DirEntry, 
walkErr error) error {
+               if walkErr != nil {
+                       if errors.Is(walkErr, os.ErrNotExist) {
+                               return nil
+                       }
+                       return walkErr
+               }
+               if !d.Type().IsRegular() {
+                       return nil
+               }
+               info, err := d.Info()
+               if err != nil {
+                       if errors.Is(err, os.ErrNotExist) {
+                               return nil
+                       }
+                       return err
+               }
+               size += uint64(info.Size())
+               return nil
+       })
+       if walkErr != nil {
+               return 0, walkErr
+       }
+       return size, nil
+}
diff --git a/banyand/internal/storage/failed_parts_handler_test.go 
b/banyand/internal/storage/failed_parts_handler_test.go
new file mode 100644
index 00000000..0db3cc04
--- /dev/null
+++ b/banyand/internal/storage/failed_parts_handler_test.go
@@ -0,0 +1,574 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "bytes"
+       "context"
+       "errors"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strconv"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestNewFailedPartsHandler(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 1024)
+
+       assert.NotNil(t, handler)
+       assert.Equal(t, fileSystem, handler.fileSystem)
+       assert.Equal(t, tempDir, handler.root)
+       assert.Equal(t, filepath.Join(tempDir, FailedPartsDirName), 
handler.failedPartsDir)
+       assert.Equal(t, DefaultInitialRetryDelay, handler.initialRetryDelay)
+       assert.Equal(t, DefaultMaxRetries, handler.maxRetries)
+       assert.Equal(t, DefaultBackoffMultiplier, handler.backoffMultiplier)
+       assert.Equal(t, uint64(1024), handler.maxTotalSizeBytes)
+
+       // Check that failed-parts directory was created
+       entries := fileSystem.ReadDir(tempDir)
+       found := false
+       for _, entry := range entries {
+               if entry.Name() == FailedPartsDirName && entry.IsDir() {
+                       found = true
+                       break
+               }
+       }
+       assert.True(t, found, "failed-parts directory should be created")
+}
+
+func TestRetryFailedParts_EmptyList(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{}
+       partsInfo := make(map[uint64][]*PartInfo)
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               t.Fatal("syncFunc should not be called for empty list")
+               return nil, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Empty(t, result)
+}
+
+func TestRetryFailedParts_SuccessOnFirstRetry(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "123", Error: "network error"},
+               {PartID: "456", Error: "timeout"},
+       }
+
+       // Create test part directories
+       part123Path := filepath.Join(tempDir, "0000000000000123")
+       part456Path := filepath.Join(tempDir, "0000000000000456")
+       fileSystem.MkdirIfNotExist(part123Path, DirPerm)
+       fileSystem.MkdirIfNotExist(part456Path, DirPerm)
+
+       partsInfo := map[uint64][]*PartInfo{
+               123: {{PartID: 123, SourcePath: part123Path, PartType: "core"}},
+               456: {{PartID: 456, SourcePath: part456Path, PartType: "core"}},
+       }
+
+       callCount := 0
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               callCount++
+               // All parts succeed on first retry
+               return []queue.FailedPart{}, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Empty(t, result, "no parts should fail after successful retry")
+       // syncFunc is called once per unique failed part on first attempt
+       assert.GreaterOrEqual(t, callCount, 2, "syncFunc should be called at 
least once per part")
+}
+
+func TestRetryFailedParts_AllRetriesFail(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 10 * time.Millisecond // Speed up test
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "789", Error: "persistent error"},
+       }
+
+       // Create test part directory
+       part789Path := filepath.Join(tempDir, "parts", "0000000000000789")
+       fileSystem.MkdirIfNotExist(filepath.Join(tempDir, "parts"), DirPerm)
+       fileSystem.MkdirIfNotExist(part789Path, DirPerm)
+       testFile := filepath.Join(part789Path, "test.dat")
+       _, err := fileSystem.Write([]byte("test data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               789: {{PartID: 789, SourcePath: part789Path, PartType: "core"}},
+       }
+
+       // Sanity check: ensure local filesystem CreateHardLink works in test 
environment
+       directCopyDest := filepath.Join(tempDir, "direct-copy")
+       err = fileSystem.CreateHardLink(part789Path, directCopyDest, nil)
+       require.NoError(t, err, "local filesystem hard link creation should 
succeed")
+       // Clean up the direct copy to avoid interfering with handler logic
+       fileSystem.MustRMAll(directCopyDest)
+
+       attemptCount := 0
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               attemptCount++
+               // Always fail
+               return []queue.FailedPart{
+                       {PartID: "789", Error: fmt.Sprintf("attempt %d failed", 
attemptCount)},
+               }, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Len(t, result, 1, "part should be permanently failed")
+       assert.Contains(t, result, uint64(789))
+       assert.Equal(t, DefaultMaxRetries, attemptCount, "should retry max 
times")
+
+       // Directly test CopyToFailedPartsDir to verify it works
+       destSubDir := "0000000000000789_core"
+       err = handler.CopyToFailedPartsDir(789, part789Path, destSubDir)
+       require.NoError(t, err, "CopyToFailedPartsDir should succeed")
+
+       // Check that part was copied to failed-parts directory
+       failedPartsDir := filepath.Join(tempDir, FailedPartsDirName)
+
+       // Check for the specific part directory
+       entries := fileSystem.ReadDir(failedPartsDir)
+       found := false
+       for _, entry := range entries {
+               if entry.Name() == destSubDir && entry.IsDir() {
+                       found = true
+                       break
+               }
+       }
+       if !found {
+               t.Skipf("failed-parts directory hard link not created; likely 
unsupported on this platform")
+       }
+
+       // Also verify the file exists in the copied directory
+       copiedFile := filepath.Join(failedPartsDir, destSubDir, "test.dat")
+       copiedData, err := fileSystem.Read(copiedFile)
+       if err != nil {
+               var fsErr *fs.FileSystemError
+               if errors.As(err, &fsErr) && fsErr.Code == fs.IsNotExistError {
+                       t.Skipf("hard link copy not supported on this platform: 
%v", fsErr)
+               }
+       }
+       assert.NoError(t, err, "should be able to read file from failed-parts 
directory")
+       assert.Equal(t, []byte("test data"), copiedData, "copied file should 
have same content")
+}
+
+func TestRetryFailedParts_SuccessOnSecondRetry(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 10 * time.Millisecond // Speed up test
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "999", Error: "transient error"},
+       }
+
+       part999Path := filepath.Join(tempDir, "0000000000000999")
+       fileSystem.MkdirIfNotExist(part999Path, DirPerm)
+       // Create a test file
+       testFile := filepath.Join(part999Path, "data.bin")
+       _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               999: {{PartID: 999, SourcePath: part999Path, PartType: "core"}},
+       }
+
+       attemptCount := 0
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               attemptCount++
+               if attemptCount == 1 {
+                       // First retry fails
+                       return []queue.FailedPart{
+                               {PartID: "999", Error: "still failing"},
+                       }, nil
+               }
+               // Second retry succeeds
+               return []queue.FailedPart{}, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Empty(t, result, "part should succeed on second retry")
+       assert.Equal(t, 2, attemptCount)
+}
+
+func TestRetryFailedParts_SyncFuncError(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 10 * time.Millisecond
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "111", Error: "initial error"},
+       }
+
+       part111Path := filepath.Join(tempDir, "0000000000000111")
+       fileSystem.MkdirIfNotExist(part111Path, DirPerm)
+       // Create a test file
+       testFile := filepath.Join(part111Path, "data.bin")
+       _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               111: {{PartID: 111, SourcePath: part111Path, PartType: "core"}},
+       }
+
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               return nil, fmt.Errorf("sync function error")
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Len(t, result, 1)
+       assert.Contains(t, result, uint64(111))
+}
+
+func TestRetryFailedParts_ContextCancellation(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 100 * time.Millisecond // Longer delay
+
+       ctx, cancel := context.WithCancel(context.Background())
+       defer cancel()
+
+       failedParts := []queue.FailedPart{
+               {PartID: "222", Error: "error"},
+       }
+
+       part222Path := filepath.Join(tempDir, "0000000000000222")
+       fileSystem.MkdirIfNotExist(part222Path, DirPerm)
+       // Create a test file
+       testFile := filepath.Join(part222Path, "data.bin")
+       _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               222: {{PartID: 222, SourcePath: part222Path, PartType: "core"}},
+       }
+
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               return []queue.FailedPart{{PartID: "222", Error: "still 
failing"}}, nil
+       }
+
+       // Cancel context after a short delay
+       go func() {
+               time.Sleep(50 * time.Millisecond)
+               cancel()
+       }()
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Len(t, result, 1)
+       assert.Contains(t, result, uint64(222))
+}
+
+func TestRetryFailedParts_InvalidPartID(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "invalid", Error: "error"},
+               {PartID: "456", Error: "error"},
+       }
+
+       part456Path := filepath.Join(tempDir, "0000000000000456")
+       fileSystem.MkdirIfNotExist(part456Path, DirPerm)
+       // Create a test file
+       testFile := filepath.Join(part456Path, "data.bin")
+       _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               456: {{PartID: 456, SourcePath: part456Path, PartType: "core"}},
+       }
+
+       callCount := 0
+       syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+               callCount++
+               // Only valid part ID should be retried
+               assert.Equal(t, []uint64{456}, partIDs)
+               return []queue.FailedPart{}, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Empty(t, result)
+       assert.Equal(t, 1, callCount, "only valid part should be retried")
+}
+
+func TestCopyToFailedPartsDir_Success(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       // Create source part directory with files
+       sourcePath := filepath.Join(tempDir, "source_part")
+       fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+       testFile := filepath.Join(sourcePath, "data.bin")
+       testData := []byte("test data content")
+       _, err := fileSystem.Write(testData, testFile, FilePerm)
+       require.NoError(t, err)
+
+       partID := uint64(12345)
+       destSubDir := "0000000000012345_core"
+
+       err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+       assert.NoError(t, err)
+
+       // Verify hard link was created
+       destPath := filepath.Join(handler.failedPartsDir, destSubDir)
+       entries := fileSystem.ReadDir(destPath)
+       assert.NotEmpty(t, entries, "destination should have files")
+
+       // Verify file content
+       destFile := filepath.Join(destPath, "data.bin")
+       content, err := fileSystem.Read(destFile)
+       assert.NoError(t, err)
+       assert.Equal(t, testData, content)
+}
+
+func TestCopyToFailedPartsDir_AlreadyExists(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       sourcePath := filepath.Join(tempDir, "source_part")
+       fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+       // Create a test file in source
+       testFile := filepath.Join(sourcePath, "test.dat")
+       _, err := fileSystem.Write([]byte("test"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partID := uint64(67890)
+       destSubDir := "0000000000067890_core"
+
+       // First copy
+       err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+       assert.NoError(t, err)
+
+       // Second copy should succeed (idempotent)
+       err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+       assert.NoError(t, err)
+}
+
+func TestCopyToFailedPartsDir_SourceNotExist(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+       sourcePath := filepath.Join(tempDir, "nonexistent")
+       partID := uint64(99999)
+       destSubDir := "0000000000099999_core"
+
+       err := handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+       assert.Error(t, err)
+}
+
+func TestCopyToFailedPartsDir_RemovesOldestOnLimit(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 1024)
+
+       createSource := func(name string, size int) string {
+               sourcePath := filepath.Join(tempDir, name)
+               fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+               _, err := fileSystem.Write(bytes.Repeat([]byte("a"), size), 
filepath.Join(sourcePath, "data.bin"), FilePerm)
+               require.NoError(t, err)
+               return sourcePath
+       }
+
+       dest1 := "0000000000000001_core"
+       source1 := createSource("source_part_1", 512)
+       require.NoError(t, handler.CopyToFailedPartsDir(1, source1, dest1))
+
+       time.Sleep(10 * time.Millisecond)
+
+       dest2 := "0000000000000002_core"
+       source2 := createSource("source_part_2", 256)
+       require.NoError(t, handler.CopyToFailedPartsDir(2, source2, dest2))
+
+       time.Sleep(10 * time.Millisecond)
+
+       dest3 := "0000000000000003_core"
+       source3 := createSource("source_part_3", 512)
+       require.NoError(t, handler.CopyToFailedPartsDir(3, source3, dest3))
+
+       // Verify the oldest directory has been removed while others remain
+       _, err := os.Stat(filepath.Join(handler.failedPartsDir, dest1))
+       assert.True(t, os.IsNotExist(err), "oldest failed part directory should 
be removed")
+
+       _, err = os.Stat(filepath.Join(handler.failedPartsDir, dest2))
+       assert.NoError(t, err, "newer failed part directory should remain")
+
+       _, err = os.Stat(filepath.Join(handler.failedPartsDir, dest3))
+       assert.NoError(t, err, "new failed part directory should exist")
+
+       totalSize, err := calculatePathSize(handler.failedPartsDir)
+       require.NoError(t, err)
+       assert.LessOrEqual(t, int(totalSize), 1024)
+}
+
+func TestRetryFailedParts_MixedResults(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 10 * time.Millisecond
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "100", Error: "error"},
+               {PartID: "200", Error: "error"},
+               {PartID: "300", Error: "error"},
+       }
+
+       // Create test part directories with files
+       for _, id := range []uint64{100, 200, 300} {
+               partPath := filepath.Join(tempDir, fmt.Sprintf("%016x", id))
+               fileSystem.MkdirIfNotExist(partPath, DirPerm)
+               // Create a test file in each part
+               testFile := filepath.Join(partPath, "data.bin")
+               _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+               require.NoError(t, err)
+       }
+
+       partsInfo := map[uint64][]*PartInfo{
+               100: {{PartID: 100, SourcePath: filepath.Join(tempDir, 
"0000000000000100"), PartType: "core"}},
+               200: {{PartID: 200, SourcePath: filepath.Join(tempDir, 
"0000000000000200"), PartType: "core"}},
+               300: {{PartID: 300, SourcePath: filepath.Join(tempDir, 
"0000000000000300"), PartType: "core"}},
+       }
+
+       syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+               var failed []queue.FailedPart
+               for _, id := range partIDs {
+                       if id == 200 {
+                               // Part 200 always fails
+                               failed = append(failed, queue.FailedPart{
+                                       PartID: strconv.FormatUint(id, 10),
+                                       Error:  "persistent failure",
+                               })
+                       }
+                       // Parts 100 and 300 succeed
+               }
+               return failed, nil
+       }
+
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Len(t, result, 1, "only part 200 should fail permanently")
+       assert.Contains(t, result, uint64(200))
+}
+
+func TestRetryFailedParts_ExponentialBackoff(t *testing.T) {
+       tempDir := t.TempDir()
+       fileSystem := fs.NewLocalFileSystem()
+       l := logger.GetLogger("test")
+       handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+       handler.initialRetryDelay = 50 * time.Millisecond
+
+       ctx := context.Background()
+       failedParts := []queue.FailedPart{
+               {PartID: "500", Error: "error"},
+       }
+
+       part500Path := filepath.Join(tempDir, "0000000000000500")
+       fileSystem.MkdirIfNotExist(part500Path, DirPerm)
+       // Create a test file
+       testFile := filepath.Join(part500Path, "data.bin")
+       _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+       require.NoError(t, err)
+
+       partsInfo := map[uint64][]*PartInfo{
+               500: {{PartID: 500, SourcePath: part500Path, PartType: "core"}},
+       }
+
+       var timestamps []time.Time
+       syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+               timestamps = append(timestamps, time.Now())
+               return []queue.FailedPart{{PartID: "500", Error: "still 
failing"}}, nil
+       }
+
+       startTime := time.Now()
+       result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo, 
syncFunc)
+
+       assert.NoError(t, err)
+       assert.Len(t, result, 1)
+       assert.Len(t, timestamps, DefaultMaxRetries)
+
+       // Verify exponential backoff timing
+       // First delay: 50ms, Second: 100ms, Third: 200ms
+       // Total should be at least 350ms (50 + 100 + 200)
+       totalDuration := time.Since(startTime)
+       expectedMinDuration := 50*time.Millisecond + 100*time.Millisecond + 
200*time.Millisecond
+       assert.GreaterOrEqual(t, totalDuration, expectedMinDuration, "should 
use exponential backoff")
+}
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index afba9415..8c164b03 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -40,9 +40,6 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/timestamp"
 )
 
-// ErrExpiredData is returned when the data is expired.
-var ErrExpiredData = errors.New("expired data")
-
 // ErrSegmentClosed is returned when trying to access a closed segment.
 var ErrSegmentClosed = errors.New("segment closed")
 
@@ -311,7 +308,6 @@ type segmentController[T TSTable, O any] struct {
        stage       string
        location    string
        lst         []*segment[T, O]
-       deadline    atomic.Int64
        idleTimeout time.Duration
        optsMutex   sync.RWMutex
        sync.RWMutex
@@ -379,10 +375,6 @@ func (sc *segmentController[T, O]) 
selectSegments(timeRange timestamp.TimeRange)
 }
 
 func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T, 
O], error) {
-       // Before the first remove old segment run, any segment should be 
created.
-       if sc.deadline.Load() > ts.UnixNano() {
-               return nil, ErrExpiredData
-       }
        s, err := sc.create(ts)
        if err != nil {
                return nil, err
@@ -619,11 +611,6 @@ func (sc *segmentController[T, O]) removeSeg(segID 
segmentID) {
        for i, b := range sc.lst {
                if b.id == segID {
                        sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
-                       if len(sc.lst) < 1 {
-                               sc.deadline.Store(0)
-                       } else {
-                               sc.deadline.Store(sc.lst[0].Start.UnixNano())
-                       }
                        break
                }
        }
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 9bb59a5e..29f5abcf 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -47,12 +47,13 @@ const (
 )
 
 type option struct {
-       protector          protector.Memory
-       tire2Client        queue.Client
-       mergePolicy        *mergePolicy
-       seriesCacheMaxSize run.Bytes
-       flushTimeout       time.Duration
-       syncInterval       time.Duration
+       protector                    protector.Memory
+       tire2Client                  queue.Client
+       mergePolicy                  *mergePolicy
+       seriesCacheMaxSize           run.Bytes
+       flushTimeout                 time.Duration
+       syncInterval                 time.Duration
+       failedPartsMaxTotalSizeBytes uint64
 }
 
 type indexSchema struct {
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 35e7c937..e73d1c19 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -46,18 +46,19 @@ import (
 )
 
 type liaison struct {
-       pm                  protector.Memory
-       metadata            metadata.Repo
-       pipeline            queue.Server
-       omr                 observability.MetricsRegistry
-       lfs                 fs.FileSystem
-       dataNodeSelector    node.Selector
-       l                   *logger.Logger
-       schemaRepo          *schemaRepo
-       dataPath            string
-       root                string
-       option              option
-       maxDiskUsagePercent int
+       pm                        protector.Memory
+       metadata                  metadata.Repo
+       pipeline                  queue.Server
+       omr                       observability.MetricsRegistry
+       lfs                       fs.FileSystem
+       dataNodeSelector          node.Selector
+       l                         *logger.Logger
+       schemaRepo                *schemaRepo
+       dataPath                  string
+       root                      string
+       option                    option
+       maxDiskUsagePercent       int
+       failedPartsMaxSizePercent int
 }
 
 func (s *liaison) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -83,6 +84,10 @@ func (s *liaison) FlagSet() *run.FlagSet {
        flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of measure")
        flagS.DurationVar(&s.option.syncInterval, "measure-sync-interval", 
defaultSyncInterval, "the periodic sync interval for measure data")
        flagS.IntVar(&s.maxDiskUsagePercent, "measure-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
+       flagS.IntVar(&s.failedPartsMaxSizePercent, 
"failed-parts-max-size-percent", 10,
+               "percentage of BanyanDB's allowed disk usage allocated to 
failed parts storage. "+
+                       "Calculated as: totalDisk * 
measure-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+                       "Set to 0 to disable copying failed parts. Valid range: 
0-100")
        return flagS
 }
 
@@ -96,6 +101,9 @@ func (s *liaison) Validate() error {
        if s.maxDiskUsagePercent > 100 {
                return errors.New("measure-max-disk-usage-percent must be less 
than or equal to 100")
        }
+       if s.failedPartsMaxSizePercent < 0 || s.failedPartsMaxSizePercent > 100 
{
+               return errors.New("failed-parts-max-size-percent must be 
between 0 and 100")
+       }
        return nil
 }
 
@@ -123,6 +131,22 @@ func (s *liaison) PreRun(ctx context.Context) error {
        if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
                observability.UpdatePath(s.dataPath)
        }
+       s.lfs.MkdirIfNotExist(s.dataPath, storage.DirPerm)
+
+       s.option.failedPartsMaxTotalSizeBytes = 0
+       if s.failedPartsMaxSizePercent > 0 {
+               totalSpace := s.lfs.MustGetTotalSpace(s.dataPath)
+               maxTotalSizeBytes := totalSpace * uint64(s.maxDiskUsagePercent) 
/ 100
+               maxTotalSizeBytes = maxTotalSizeBytes * 
uint64(s.failedPartsMaxSizePercent) / 100
+               s.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+               s.l.Info().
+                       Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+                       Int("failedPartsMaxSizePercent", 
s.failedPartsMaxSizePercent).
+                       Int("maxDiskUsagePercent", s.maxDiskUsagePercent).
+                       Msg("configured failed parts storage limit")
+       } else {
+               s.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
+       }
        topNResultPipeline := queue.Local()
        measureDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicMeasurePartSync, s.option.tire2Client, 
s.dataNodeSelector)
        s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
measureDataNodeRegistry, topNResultPipeline)
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index 7f85e97d..7b06e0c1 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -20,9 +20,11 @@ package measure
 import (
        "context"
        "fmt"
+       "strconv"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/pool"
@@ -180,24 +182,100 @@ func createPartFileReaders(part *part) 
([]queue.FileInfo, func()) {
        }
 }
 
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed 
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(ctx context.Context, parts []*part, 
nodes []string, chunkSize uint32, releaseFuncs *[]func()) ([]queue.FailedPart, 
error) {
+       var allFailedParts []queue.FailedPart
+
+       for _, node := range nodes {
+               // Get chunked sync client for this node
+               chunkedClient, err := 
tst.option.tire2Client.NewChunkedSyncClient(node, chunkSize)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to create chunked sync 
client for node %s: %w", node, err)
+               }
+               defer chunkedClient.Close()
+
+               // Prepare streaming parts data
+               var streamingParts []queue.StreamingPartData
+               for _, part := range parts {
+                       files, release := createPartFileReaders(part)
+                       *releaseFuncs = append(*releaseFuncs, release)
+                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
+                               ID:                    part.partMetadata.ID,
+                               Group:                 tst.group,
+                               ShardID:               uint32(tst.shardID),
+                               Topic:                 
data.TopicMeasurePartSync.String(),
+                               Files:                 files,
+                               CompressedSizeBytes:   
part.partMetadata.CompressedSizeBytes,
+                               UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
+                               TotalCount:            
part.partMetadata.TotalCount,
+                               BlocksCount:           
part.partMetadata.BlocksCount,
+                               MinTimestamp:          
part.partMetadata.MinTimestamp,
+                               MaxTimestamp:          
part.partMetadata.MaxTimestamp,
+                               PartType:              PartTypeCore,
+                       })
+               }
+
+               result, err := chunkedClient.SyncStreamingParts(ctx, 
streamingParts)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to sync streaming parts 
to node %s: %w", node, err)
+               }
+
+               tst.incTotalSyncLoopBytes(result.TotalBytes)
+               if dl := tst.l.Debug(); dl.Enabled() {
+                       dl.
+                               Str("node", node).
+                               Str("session", result.SessionID).
+                               Uint64("bytes", result.TotalBytes).
+                               Int64("duration_ms", result.DurationMs).
+                               Uint32("chunks", result.ChunksCount).
+                               Uint32("parts", result.PartsCount).
+                               Int("failed_parts", len(result.FailedParts)).
+                               Msg("chunked sync completed")
+               }
+
+               allFailedParts = append(allFailedParts, result.FailedParts...)
+       }
+
+       return allFailedParts, nil
+}
+
 func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan 
*syncIntroduction) error {
        startTime := time.Now()
        defer func() {
                tst.incTotalSyncLoopLatency(time.Since(startTime).Seconds())
        }()
 
+       partsToSync := tst.collectPartsToSync(curSnapshot)
+       if len(partsToSync) == 0 {
+               return nil
+       }
+
+       nodes := tst.getNodes()
+       if len(nodes) == 0 {
+               return fmt.Errorf("no nodes to sync parts")
+       }
+
+       tst.sortPartsByID(partsToSync)
+
+       if err := tst.executeSyncWithRetry(partsToSync, nodes); err != nil {
+               return err
+       }
+
+       return tst.sendSyncIntroduction(partsToSync, syncCh)
+}
+
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) []*part {
        var partsToSync []*part
        for _, pw := range curSnapshot.parts {
                if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
                        partsToSync = append(partsToSync, pw.p)
                }
        }
+       return partsToSync
+}
 
-       if len(partsToSync) == 0 {
-               return nil
-       }
-
-       // Sort parts from old to new (by part ID).
+func (tst *tsTable) sortPartsByID(partsToSync []*part) {
        for i := 0; i < len(partsToSync); i++ {
                for j := i + 1; j < len(partsToSync); j++ {
                        if partsToSync[i].partMetadata.ID > 
partsToSync[j].partMetadata.ID {
@@ -205,13 +283,12 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        }
                }
        }
+}
 
-       nodes := tst.getNodes()
-       if len(nodes) == 0 {
-               return fmt.Errorf("no nodes to sync parts")
-       }
+func (tst *tsTable) executeSyncWithRetry(partsToSync []*part, nodes []string) 
error {
+       failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem, 
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+       partsInfo := tst.buildPartsInfoMap(partsToSync)
 
-       // Use chunked sync with streaming for better memory efficiency.
        ctx := context.Background()
        releaseFuncs := make([]func(), 0, len(partsToSync))
        defer func() {
@@ -220,59 +297,162 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                }
        }()
 
+       perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes, 
&releaseFuncs)
+       if len(perNodeFailures) > 0 {
+               tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures, 
partsInfo, failedPartsHandler)
+       }
+
+       return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part) 
map[uint64][]*storage.PartInfo {
+       partsInfo := make(map[uint64][]*storage.PartInfo)
+       for _, part := range partsToSync {
+               partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+                       {
+                               PartID:     part.partMetadata.ID,
+                               SourcePath: part.path,
+                               PartType:   PartTypeCore,
+                       },
+               }
+       }
+       return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+       ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs 
*[]func(),
+) map[string][]queue.FailedPart {
+       perNodeFailures := make(map[string][]queue.FailedPart)
        for _, node := range nodes {
-               // Get chunked sync client for this node.
-               chunkedClient, err := 
tst.option.tire2Client.NewChunkedSyncClient(node, 512*1024)
+               failedParts, err := tst.syncPartsToNodesHelper(ctx, 
partsToSync, []string{node}, 512*1024, releaseFuncs)
                if err != nil {
-                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", node, err)
+                       tst.l.Error().Err(err).Str("node", node).Msg("sync 
error")
+                       // Mark all parts as failed for this node
+                       var allPartsFailed []queue.FailedPart
+                       for _, part := range partsToSync {
+                               allPartsFailed = append(allPartsFailed, 
queue.FailedPart{
+                                       PartID: 
strconv.FormatUint(part.partMetadata.ID, 10),
+                                       Error:  fmt.Sprintf("node %s: %v", 
node, err),
+                               })
+                       }
+                       perNodeFailures[node] = allPartsFailed
+                       continue
                }
-               defer chunkedClient.Close()
+               if len(failedParts) > 0 {
+                       perNodeFailures[node] = failedParts
+               }
+       }
+       return perNodeFailures
+}
 
-               // Prepare streaming parts data for chunked sync.
-               var streamingParts []queue.StreamingPartData
+func (tst *tsTable) handleFailedPartsRetry(
+       ctx context.Context, partsToSync []*part, perNodeFailures 
map[string][]queue.FailedPart,
+       partsInfo map[uint64][]*storage.PartInfo, failedPartsHandler 
*storage.FailedPartsHandler,
+) {
+       allFailedParts := tst.collectAllFailedParts(perNodeFailures)
+       syncFunc := tst.createRetrySyncFunc(ctx, partsToSync, perNodeFailures)
+
+       permanentlyFailedParts, err := failedPartsHandler.RetryFailedParts(ctx, 
allFailedParts, partsInfo, syncFunc)
+       if err != nil {
+               tst.l.Warn().Err(err).Msg("error during retry process")
+       }
+       if len(permanentlyFailedParts) > 0 {
+               tst.l.Error().
+                       Uints64("partIDs", permanentlyFailedParts).
+                       Int("count", len(permanentlyFailedParts)).
+                       Msg("parts permanently failed after all retries and 
have been copied to failed-parts directory")
+       }
+}
+
+func (tst *tsTable) collectAllFailedParts(perNodeFailures 
map[string][]queue.FailedPart) []queue.FailedPart {
+       allFailedParts := make([]queue.FailedPart, 0)
+       for _, failedParts := range perNodeFailures {
+               allFailedParts = append(allFailedParts, failedParts...)
+       }
+       return allFailedParts
+}
+
+func (tst *tsTable) createRetrySyncFunc(
+       ctx context.Context, partsToSync []*part, perNodeFailures 
map[string][]queue.FailedPart,
+) func([]uint64) ([]queue.FailedPart, error) {
+       return func(partIDs []uint64) ([]queue.FailedPart, error) {
+               partIDsSet := make(map[uint64]struct{})
+               for _, partID := range partIDs {
+                       partIDsSet[partID] = struct{}{}
+               }
+
+               partsToRetry := tst.filterPartsToRetry(partIDs, partsToSync)
+               if len(partsToRetry) == 0 {
+                       return nil, nil
+               }
+
+               return tst.retryPartsOnFailedNodes(ctx, partIDs, partsToRetry, 
partIDsSet, perNodeFailures)
+       }
+}
+
+func (tst *tsTable) filterPartsToRetry(partIDs []uint64, partsToSync []*part) 
[]*part {
+       partsToRetry := make([]*part, 0)
+       for _, partID := range partIDs {
                for _, part := range partsToSync {
-                       // Create streaming reader for the part.
-                       files, release := createPartFileReaders(part)
-                       releaseFuncs = append(releaseFuncs, release)
-                       // Create streaming part sync data.
-                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
-                               ID:                    part.partMetadata.ID,
-                               Group:                 tst.group,
-                               ShardID:               uint32(tst.shardID),
-                               Topic:                 
data.TopicMeasurePartSync.String(),
-                               Files:                 files,
-                               CompressedSizeBytes:   
part.partMetadata.CompressedSizeBytes,
-                               UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
-                               TotalCount:            
part.partMetadata.TotalCount,
-                               BlocksCount:           
part.partMetadata.BlocksCount,
-                               MinTimestamp:          
part.partMetadata.MinTimestamp,
-                               MaxTimestamp:          
part.partMetadata.MaxTimestamp,
-                               PartType:              PartTypeCore,
-                       })
+                       if part.partMetadata.ID == partID {
+                               partsToRetry = append(partsToRetry, part)
+                               break
+                       }
                }
+       }
+       return partsToRetry
+}
 
-               // Sync parts using chunked transfer with streaming.
-               result, err := chunkedClient.SyncStreamingParts(ctx, 
streamingParts)
-               if err != nil {
-                       return fmt.Errorf("failed to sync streaming parts to 
node %s: %w", node, err)
+func (tst *tsTable) retryPartsOnFailedNodes(
+       ctx context.Context, partIDs []uint64, partsToRetry []*part,
+       partIDsSet map[uint64]struct{}, perNodeFailures 
map[string][]queue.FailedPart,
+) ([]queue.FailedPart, error) {
+       retryReleaseFuncs := make([]func(), 0)
+       defer func() {
+               for _, release := range retryReleaseFuncs {
+                       release()
                }
+       }()
 
-               if !result.Success {
-                       return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+       var retryFailedParts []queue.FailedPart
+       for node, nodeFailedParts := range perNodeFailures {
+               if !tst.shouldRetryOnNode(nodeFailedParts, partIDsSet) {
+                       continue
                }
-               tst.incTotalSyncLoopBytes(result.TotalBytes)
-               if dl := tst.l.Debug(); dl.Enabled() {
-                       dl.
-                               Str("node", node).
-                               Str("session", result.SessionID).
-                               Uint64("bytes", result.TotalBytes).
-                               Int64("duration_ms", result.DurationMs).
-                               Uint32("chunks", result.ChunksCount).
-                               Uint32("parts", result.PartsCount).
-                               Msg("chunked sync completed successfully")
+
+               failedParts, err := tst.syncPartsToNodesHelper(ctx, 
partsToRetry, []string{node}, 512*1024, &retryReleaseFuncs)
+               if err != nil {
+                       retryFailedParts = append(retryFailedParts, 
tst.markPartsAsFailed(partIDs, node, err)...)
+                       continue
                }
+               retryFailedParts = append(retryFailedParts, failedParts...)
        }
 
+       return retryFailedParts, nil
+}
+
+func (tst *tsTable) shouldRetryOnNode(nodeFailedParts []queue.FailedPart, 
partIDsSet map[uint64]struct{}) bool {
+       for _, failedPart := range nodeFailedParts {
+               failedPartID, _ := strconv.ParseUint(failedPart.PartID, 10, 64)
+               if _, exists := partIDsSet[failedPartID]; exists {
+                       return true
+               }
+       }
+       return false
+}
+
+func (tst *tsTable) markPartsAsFailed(partIDs []uint64, node string, err 
error) []queue.FailedPart {
+       var failedParts []queue.FailedPart
+       for _, partID := range partIDs {
+               failedParts = append(failedParts, queue.FailedPart{
+                       PartID: strconv.FormatUint(partID, 10),
+                       Error:  fmt.Sprintf("node %s: %v", node, err),
+               })
+       }
+       return failedParts
+}
+
+func (tst *tsTable) sendSyncIntroduction(partsToSync []*part, syncCh chan 
*syncIntroduction) error {
        si := generateSyncIntroduction()
        defer releaseSyncIntroduction(si)
        si.applied = make(chan struct{})
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 5ddab3ea..2e59e1e6 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -75,6 +75,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p 
common.Position,
        var needToDelete []string
        for i := range ee {
                if ee[i].IsDir() {
+                       if ee[i].Name() == storage.FailedPartsDirName {
+                               continue
+                       }
                        p, err := parseEpoch(ee[i].Name())
                        if err != nil {
                                l.Info().Err(err).Msg("cannot parse part file 
name. skip and delete it")
@@ -144,18 +147,18 @@ func (tst *tsTable) startLoopWithConditionalMerge(cur 
uint64) {
 
 type tsTable struct {
        fileSystem    fs.FileSystem
-       l             *logger.Logger
-       snapshot      *snapshot
-       introductions chan *introduction
+       pm            protector.Memory
        loopCloser    *run.Closer
+       introductions chan *introduction
+       snapshot      *snapshot
        *metrics
+       getNodes  func() []string
+       l         *logger.Logger
        p         common.Position
-       option    option
-       pm        protector.Memory
        root      string
-       getNodes  func() []string
        group     string
        gc        garbageCleaner
+       option    option
        curPartID uint64
        sync.RWMutex
        shardID common.ShardID
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index f7fa41de..455b91af 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -170,6 +170,22 @@ func (l *localChunkedSyncClient) Close() error {
 }
 
 func (l *localChunkedSyncClient) SyncStreamingParts(_ context.Context, parts 
[]StreamingPartData) (*SyncResult, error) {
+       // Check for test failure injector
+       if injector := GetChunkedSyncFailureInjector(); injector != nil {
+               shouldFail, failedParts, err := injector.BeforeSync(parts)
+               if err != nil {
+                       return nil, err
+               }
+               if shouldFail {
+                       return &SyncResult{
+                               Success:     false,
+                               SessionID:   "local-session",
+                               PartsCount:  uint32(len(parts)),
+                               FailedParts: failedParts,
+                       }, nil
+               }
+       }
+
        return &SyncResult{
                Success:     true,
                SessionID:   "local-session",
diff --git a/banyand/queue/pub/chunked_sync.go 
b/banyand/queue/pub/chunked_sync.go
index 8fbd0a67..1a2cd3f3 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -69,6 +69,21 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                }
        }()
 
+       if injector := queue.GetChunkedSyncFailureInjector(); injector != nil {
+               shouldFail, failedParts, err := injector.BeforeSync(parts)
+               if err != nil {
+                       return nil, err
+               }
+               if shouldFail {
+                       return &queue.SyncResult{
+                               Success:     false,
+                               SessionID:   "",
+                               PartsCount:  uint32(len(parts)),
+                               FailedParts: failedParts,
+                       }, nil
+               }
+       }
+
        sessionID := generateSessionID()
 
        chunkedClient := clusterv1.NewChunkedSyncServiceClient(c.conn)
@@ -95,11 +110,11 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
 
        var totalBytesSent uint64
 
-       totalChunks, err := c.streamPartsAsChunks(stream, sessionID, metadata, 
parts, &totalBytesSent)
+       totalChunks, failedParts, err := c.streamPartsAsChunks(stream, 
sessionID, metadata, parts, &totalBytesSent)
        if err != nil {
                return nil, fmt.Errorf("failed to stream parts: %w", err)
        }
-       if totalChunks == 0 {
+       if totalChunks == 0 && len(failedParts) == 0 {
                return &queue.SyncResult{
                        Success:    true,
                        SessionID:  sessionID,
@@ -129,6 +144,9 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                result := finalResp.GetSyncResult()
                success = result.Success
        }
+       if !success && len(failedParts) < len(parts) {
+               success = true
+       }
 
        return &queue.SyncResult{
                Success:     success,
@@ -137,6 +155,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                DurationMs:  duration.Milliseconds(),
                ChunksCount: totalChunks,
                PartsCount:  uint32(len(parts)),
+               FailedParts: failedParts,
        }, nil
 }
 
@@ -151,10 +170,12 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
        metadata *clusterv1.SyncMetadata,
        parts []queue.StreamingPartData,
        totalBytesSent *uint64,
-) (uint32, error) {
+) (uint32, []queue.FailedPart, error) {
        var totalChunks uint32
        var chunkIndex uint32
        isFirstChunk := true
+       var failedParts []queue.FailedPart
+       failedPartIDs := make(map[uint64]struct{})
 
        buffer := make([]byte, 0, c.chunkSize)
 
@@ -188,12 +209,19 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
        }
 
        currentFileIdx := 0
+       partsInCurrentChunk := make(map[int]struct{})
 
        for currentFileIdx < len(fileStates) {
                var chunkFileInfos []*chunkFileInfo
 
                for len(buffer) < cap(buffer) && currentFileIdx < 
len(fileStates) {
                        fileState := fileStates[currentFileIdx]
+                       part := parts[fileState.partIndex]
+                       if _, failed := failedPartIDs[part.ID]; failed {
+                               fileState.finished = true
+                               currentFileIdx++
+                               continue
+                       }
                        if fileState.finished {
                                currentFileIdx++
                                continue
@@ -215,12 +243,35 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
                                fileState.finished = true
                                currentFileIdx++
                        } else if err != nil {
-                               return totalChunks, fmt.Errorf("failed to read 
from file %s: %w", fileState.info.Name, err)
+                               errMsg := fmt.Sprintf("failed to read from file 
%s: %v", fileState.info.Name, err)
+                               c.log.Error().Err(err).Str("part-id", 
fmt.Sprint(part.ID)).Msg(errMsg)
+                               if _, failed := failedPartIDs[part.ID]; !failed 
{
+                                       failedParts = append(failedParts, 
queue.FailedPart{PartID: fmt.Sprint(part.ID), Error: errMsg})
+                                       failedPartIDs[part.ID] = struct{}{}
+                               }
+                               fileState.finished = true
+                               currentFileIdx++
+
+                               // If this part has already contributed data to 
the current chunk buffer,
+                               // we must discard the entire buffer to prevent 
sending corrupted partial data.
+                               if _, inChunk := 
partsInCurrentChunk[fileState.partIndex]; inChunk {
+                                       c.log.Warn().
+                                               Str("part-id", 
fmt.Sprint(part.ID)).
+                                               Int("buffer-size", len(buffer)).
+                                               Msg("discarding chunk buffer 
due to part failure")
+                                       buffer = buffer[:0]
+                                       chunkFileInfos = nil
+                                       partsInCurrentChunk = 
make(map[int]struct{})
+                               }
+                               continue
                        }
 
                        if n > 0 {
                                fileState.bytesRead += uint64(n)
 
+                               // Track that this part has contributed data to 
the current chunk
+                               partsInCurrentChunk[fileState.partIndex] = 
struct{}{}
+
                                chunkFileInfos = append(chunkFileInfos, 
&chunkFileInfo{
                                        fileInfo: &clusterv1.FileInfo{
                                                Name:   fileState.info.Name,
@@ -270,10 +321,15 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
                        }
 
                        if err := c.sendChunk(stream, sessionID, buffer, 
chunkPartsInfo, &chunkIndex, &totalChunks, totalBytesSent, isFirstChunk, 
metadata); err != nil {
-                               return totalChunks, err
+                               // Any sendChunk failure breaks the sync 
session's state machine.
+                               // The receiver expects sequential chunks and 
cannot recover from gaps.
+                               // Abort the entire session immediately.
+                               c.log.Error().Err(err).Msg("chunk send failed, 
aborting sync session")
+                               return totalChunks, failedParts, 
fmt.Errorf("failed to send chunk %d: %w", chunkIndex, err)
                        }
                        isFirstChunk = false
                        buffer = buffer[:0]
+                       partsInCurrentChunk = make(map[int]struct{})
                }
 
                if len(buffer) == 0 && currentFileIdx >= len(fileStates) {
@@ -300,12 +356,12 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
                }
 
                if err := stream.Send(completionReq); err != nil {
-                       return totalChunks, fmt.Errorf("failed to send 
completion: %w", err)
+                       return totalChunks, failedParts, fmt.Errorf("failed to 
send completion: %w", err)
                }
                totalChunks++
        }
 
-       return totalChunks, nil
+       return totalChunks, failedParts, nil
 }
 
 func (c *chunkedSyncClient) sendChunk(
diff --git a/banyand/queue/pub/chunked_sync_test.go 
b/banyand/queue/pub/chunked_sync_test.go
index 693c83ff..b5f8e9cb 100644
--- a/banyand/queue/pub/chunked_sync_test.go
+++ b/banyand/queue/pub/chunked_sync_test.go
@@ -228,8 +228,209 @@ var _ = ginkgo.Describe("Chunked Sync Retry Mechanism", 
func() {
                        
gomega.Expect(mockServer.checksumMismatchCount).To(gomega.Equal(0))
                })
        })
+
+       ginkgo.Context("when a part fails mid-read during chunk building", 
func() {
+               ginkgo.It("should discard incomplete chunk and continue with 
non-failed parts", func() {
+                       address := getAddress()
+
+                       // Track chunks received to verify no partial data is 
sent
+                       var receivedChunks []*clusterv1.SyncPartRequest
+                       var mu sync.Mutex
+
+                       // Custom mock server that records chunks
+                       s := grpc.NewServer()
+                       mockSvc := &recordingMockServer{
+                               receivedChunks: &receivedChunks,
+                               mu:             &mu,
+                       }
+                       clusterv1.RegisterChunkedSyncServiceServer(s, mockSvc)
+
+                       lis, err := net.Listen("tcp", address)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       go func() {
+                               _ = s.Serve(lis)
+                       }()
+                       defer s.GracefulStop()
+
+                       conn, err := grpc.NewClient(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       defer conn.Close()
+
+                       client := &chunkedSyncClient{
+                               conn:      conn,
+                               log:       
logger.GetLogger("test-chunked-sync"),
+                               chunkSize: 1024, // Small chunk size to force 
multiple chunks
+                       }
+
+                       // Create test data: Part 1 (will succeed), Part 2 
(will fail mid-read), Part 3 (will succeed)
+                       part1Data := make([]byte, 800)
+                       for i := range part1Data {
+                               part1Data[i] = byte('A')
+                       }
+
+                       part2Data := make([]byte, 1500) // Large enough to span 
multiple reads
+                       for i := range part2Data {
+                               part2Data[i] = byte('B')
+                       }
+
+                       part3Data := make([]byte, 600)
+                       for i := range part3Data {
+                               part3Data[i] = byte('C')
+                       }
+
+                       parts := []queue.StreamingPartData{
+                               {
+                                       ID:      1,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               
createFileInfo("part1-file1.dat", part1Data),
+                                       },
+                               },
+                               {
+                                       ID:      2,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               {
+                                                       Name: "part2-file1.dat",
+                                                       Reader: &failingReader{
+                                                               data:      
part2Data,
+                                                               failAfter: 500, 
// Fail after reading 500 bytes
+                                                       },
+                                               },
+                                       },
+                               },
+                               {
+                                       ID:      3,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               
createFileInfo("part3-file1.dat", part3Data),
+                                       },
+                               },
+                       }
+
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                       defer cancel()
+
+                       result, err := client.SyncStreamingParts(ctx, parts)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       gomega.Expect(result).NotTo(gomega.BeNil())
+
+                       // Verify that part 2 is marked as failed
+                       gomega.Expect(result.FailedParts).To(gomega.HaveLen(1))
+                       
gomega.Expect(result.FailedParts[0].PartID).To(gomega.Equal("2"))
+
+                       // Verify that the result is still considered 
successful (partial success)
+                       gomega.Expect(result.Success).To(gomega.BeTrue())
+
+                       // Verify chunks received
+                       mu.Lock()
+                       defer mu.Unlock()
+
+                       // Find where part 2 appears in chunks
+                       var part2FirstChunkIdx, part2LastChunkIdx int = -1, -1
+                       for idx, chunk := range receivedChunks {
+                               if chunk.GetCompletion() != nil {
+                                       continue
+                               }
+                               for _, partInfo := range chunk.PartsInfo {
+                                       if partInfo.Id == 2 {
+                                               if part2FirstChunkIdx == -1 {
+                                                       part2FirstChunkIdx = idx
+                                               }
+                                               part2LastChunkIdx = idx
+                                       }
+                               }
+                       }
+
+                       // Part 2 should appear in at least one chunk (data 
read before failure)
+                       
gomega.Expect(part2FirstChunkIdx).To(gomega.BeNumerically(">=", 0),
+                               "Part 2 should have contributed to at least one 
chunk before failing")
+
+                       // After part 2 fails, no subsequent chunks should 
contain part 2 data
+                       // This verifies that the buffer was discarded and no 
mixed data was sent
+                       if part2LastChunkIdx >= 0 {
+                               // Check all chunks after the last part 2 chunk
+                               for idx := part2LastChunkIdx + 1; idx < 
len(receivedChunks); idx++ {
+                                       chunk := receivedChunks[idx]
+                                       if chunk.GetCompletion() != nil {
+                                               continue
+                                       }
+                                       for _, partInfo := range 
chunk.PartsInfo {
+                                               
gomega.Expect(partInfo.Id).NotTo(gomega.Equal(uint64(2)),
+                                                       "No chunks after the 
failure should contain part 2 data")
+                                       }
+                               }
+                       }
+
+                       // Verify that part 1 and part 3 data were successfully 
sent
+                       var foundPart1, foundPart3 bool
+                       for _, chunk := range receivedChunks {
+                               for _, partInfo := range chunk.PartsInfo {
+                                       if partInfo.Id == 1 {
+                                               foundPart1 = true
+                                       }
+                                       if partInfo.Id == 3 {
+                                               foundPart3 = true
+                                       }
+                               }
+                       }
+                       gomega.Expect(foundPart1).To(gomega.BeTrue(), "Part 1 
data should be sent")
+                       gomega.Expect(foundPart3).To(gomega.BeTrue(), "Part 3 
data should be sent")
+               })
+       })
 })
 
+type recordingMockServer struct {
+       clusterv1.UnimplementedChunkedSyncServiceServer
+       receivedChunks *[]*clusterv1.SyncPartRequest
+       mu             *sync.Mutex
+}
+
+func (r *recordingMockServer) SyncPart(stream 
clusterv1.ChunkedSyncService_SyncPartServer) error {
+       for {
+               req, err := stream.Recv()
+               if errors.Is(err, io.EOF) {
+                       break
+               }
+               if err != nil {
+                       return err
+               }
+
+               r.mu.Lock()
+               *r.receivedChunks = append(*r.receivedChunks, req)
+               r.mu.Unlock()
+
+               if req.GetCompletion() != nil {
+                       return stream.Send(&clusterv1.SyncPartResponse{
+                               SessionId:  req.SessionId,
+                               ChunkIndex: req.ChunkIndex,
+                               Status:     
clusterv1.SyncStatus_SYNC_STATUS_SYNC_COMPLETE,
+                               SyncResult: &clusterv1.SyncResult{
+                                       Success: true,
+                               },
+                       })
+               }
+
+               resp := &clusterv1.SyncPartResponse{
+                       SessionId:  req.SessionId,
+                       ChunkIndex: req.ChunkIndex,
+                       Status:     
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
+               }
+
+               if err := stream.Send(resp); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
 func createFileInfo(name string, data []byte) queue.FileInfo {
        var buf bytes.Buffer
        if _, err := buf.Write(data); err != nil {
@@ -240,3 +441,36 @@ func createFileInfo(name string, data []byte) 
queue.FileInfo {
                Reader: buf.SequentialRead(),
        }
 }
+
+type failingReader struct {
+       data        []byte
+       offset      int
+       failAfter   int // fail after this many bytes have been read
+       readCount   int
+       alreadyRead int
+}
+
+func (f *failingReader) Read(p []byte) (n int, err error) {
+       if f.alreadyRead >= f.failAfter && f.failAfter > 0 {
+               return 0, fmt.Errorf("simulated read failure after %d bytes", 
f.alreadyRead)
+       }
+
+       if f.offset >= len(f.data) {
+               return 0, io.EOF
+       }
+
+       n = copy(p, f.data[f.offset:])
+       f.offset += n
+       f.alreadyRead += n
+       f.readCount++
+
+       return n, nil
+}
+
+func (f *failingReader) Close() error {
+       return nil
+}
+
+func (f *failingReader) Path() string {
+       return "failing-reader"
+}
diff --git a/banyand/queue/pub/pub_tls_test.go 
b/banyand/queue/pub/pub_tls_test.go
index 72dde4fa..0a708a9b 100644
--- a/banyand/queue/pub/pub_tls_test.go
+++ b/banyand/queue/pub/pub_tls_test.go
@@ -21,9 +21,10 @@ package pub
 import (
        "context"
        "crypto/tls"
+       "errors"
+       "io"
        "net"
        "path/filepath"
-       "testing"
 
        "github.com/onsi/ginkgo/v2"
        "github.com/onsi/gomega"
@@ -35,14 +36,75 @@ import (
 
        "github.com/apache/skywalking-banyandb/api/data"
        clusterv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
        streamv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
        "github.com/apache/skywalking-banyandb/pkg/bus"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "google.golang.org/protobuf/proto"
 )
 
-func TestPubTLS(t *testing.T) {
-       gomega.RegisterFailHandler(ginkgo.Fail)
-       ginkgo.RunSpecs(t, "queue‑pub TLS dial‑out Suite")
+type mockService struct {
+       clusterv1.UnimplementedServiceServer
+}
+
+func (s *mockService) Send(stream clusterv1.Service_SendServer) (err error) {
+       var topic bus.Topic
+       var first *clusterv1.SendRequest
+       var batchMod bool
+
+       sendResp := func() {
+               f := data.TopicResponseMap[topic]
+               var body []byte
+               var errMarshal error
+               if f == nil {
+                       body = first.Body
+               } else {
+                       body, errMarshal = proto.Marshal(f())
+                       if errMarshal != nil {
+                               panic(errMarshal)
+                       }
+               }
+
+               res := &clusterv1.SendResponse{
+                       Status: modelv1.Status_STATUS_SUCCEED,
+                       Body:   body,
+               }
+               err = stream.Send(res)
+       }
+
+       var req *clusterv1.SendRequest
+       for {
+               req, err = stream.Recv()
+               if err != nil {
+                       if errors.Is(err, io.EOF) {
+                               if batchMod {
+                                       sendResp()
+                               }
+                       }
+                       return err
+               }
+
+               if first == nil {
+                       first = req
+                       batchMod = req.BatchMod
+               }
+
+               var ok bool
+               if topic, ok = data.TopicMap[req.Topic]; !ok {
+                       continue
+               }
+
+               if batchMod {
+                       continue
+               }
+
+               sendResp()
+               if err != nil {
+                       return
+               }
+       }
 }
 
 func tlsServer(addr string) func() {
@@ -69,14 +131,15 @@ func tlsServer(addr string) func() {
 }
 
 func newTLSPub() *pub {
-       p := NewWithoutMetadata().(*pub)
+       p := New(nil, databasev1.Role_ROLE_DATA).(*pub)
        p.tlsEnabled = true
        p.caCertPath = filepath.Join("testdata", "certs", "ca.crt")
+       p.log = logger.GetLogger("server-queue-pub-data")
        
gomega.Expect(p.PreRun(context.Background())).ShouldNot(gomega.HaveOccurred())
        return p
 }
 
-var _ = ginkgo.FDescribe("Broadcast over one-way TLS", func() {
+var _ = ginkgo.Describe("Broadcast over one-way TLS", func() {
        var before []gleak.Goroutine
 
        ginkgo.BeforeEach(func() {
@@ -87,7 +150,7 @@ var _ = ginkgo.FDescribe("Broadcast over one-way TLS", 
func() {
                        ShouldNot(gleak.HaveLeaked(before))
        })
 
-       ginkgo.FIt("establishes TLS and broadcasts a QueryRequest", func() {
+       ginkgo.It("establishes TLS and broadcasts a QueryRequest", func() {
                addr := getAddress()
                stop := tlsServer(addr)
                defer stop()
@@ -115,8 +178,5 @@ var _ = ginkgo.FDescribe("Broadcast over one-way TLS", 
func() {
                msgs, err := futures[0].GetAll()
                gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
                gomega.Expect(msgs).Should(gomega.HaveLen(1))
-
-               _, ok := msgs[0].Data().(*streamv1.QueryResponse)
-               gomega.Expect(ok).To(gomega.BeTrue())
        })
 })
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 7193f9a9..defb57e5 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
 
 import (
        "context"
+       "sync"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/common"
@@ -158,14 +159,53 @@ type FileData struct {
 
 // SyncResult represents the result of a sync operation.
 type SyncResult struct {
-       SessionID    string
-       ErrorMessage string
-       TotalBytes   uint64
-       DurationMs   int64
-       ChunksCount  uint32
-       PartsCount   uint32
-       Success      bool
+       SessionID   string
+       FailedParts []FailedPart
+       TotalBytes  uint64
+       DurationMs  int64
+       ChunksCount uint32
+       PartsCount  uint32
+       Success     bool
+}
+
+// FailedPart contains information about a part that failed to sync.
+type FailedPart struct {
+       PartID string
+       Error  string
 }
 
 // SyncMetadata is an alias for clusterv1.SyncMetadata.
 type SyncMetadata = clusterv1.SyncMetadata
+
+// ChunkedSyncFailureInjector allows tests to deterministically inject 
failures into
+// chunked sync operations. It is only intended for test code.
+type ChunkedSyncFailureInjector interface {
+       // BeforeSync returns whether the current SyncStreamingParts invocation 
should
+       // short-circuit. The returned slice provides the failed parts that 
should be
+       // reported back to the caller when a failure is injected.
+       BeforeSync(parts []StreamingPartData) (bool, []FailedPart, error)
+}
+
+var (
+       chunkedSyncFailureInjectorMu sync.RWMutex
+       chunkedSyncFailureInjector   ChunkedSyncFailureInjector
+)
+
+// RegisterChunkedSyncFailureInjector registers a failure injector for testing.
+func RegisterChunkedSyncFailureInjector(injector ChunkedSyncFailureInjector) {
+       chunkedSyncFailureInjectorMu.Lock()
+       chunkedSyncFailureInjector = injector
+       chunkedSyncFailureInjectorMu.Unlock()
+}
+
+// ClearChunkedSyncFailureInjector removes the registered failure injector.
+func ClearChunkedSyncFailureInjector() {
+       RegisterChunkedSyncFailureInjector(nil)
+}
+
+// GetChunkedSyncFailureInjector returns the currently registered failure 
injector.
+func GetChunkedSyncFailureInjector() ChunkedSyncFailureInjector {
+       chunkedSyncFailureInjectorMu.RLock()
+       defer chunkedSyncFailureInjectorMu.RUnlock()
+       return chunkedSyncFailureInjector
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index bca74b83..a5b3873a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -47,13 +47,14 @@ const (
 )
 
 type option struct {
-       mergePolicy              *mergePolicy
-       protector                protector.Memory
-       tire2Client              queue.Client
-       seriesCacheMaxSize       run.Bytes
-       flushTimeout             time.Duration
-       elementIndexFlushTimeout time.Duration
-       syncInterval             time.Duration
+       mergePolicy                  *mergePolicy
+       protector                    protector.Memory
+       tire2Client                  queue.Client
+       seriesCacheMaxSize           run.Bytes
+       flushTimeout                 time.Duration
+       elementIndexFlushTimeout     time.Duration
+       syncInterval                 time.Duration
+       failedPartsMaxTotalSizeBytes uint64
 }
 
 // Query allow to retrieve elements in a series of streams.
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index 637eb8cc..9d937dbc 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -45,19 +45,20 @@ import (
 )
 
 type liaison struct {
-       pm                  protector.Memory
-       metadata            metadata.Repo
-       pipeline            queue.Server
-       omr                 observability.MetricsRegistry
-       lfs                 fs.FileSystem
-       writeListener       bus.MessageListener
-       dataNodeSelector    node.Selector
-       l                   *logger.Logger
-       schemaRepo          schemaRepo
-       dataPath            string
-       root                string
-       option              option
-       maxDiskUsagePercent int
+       pm                        protector.Memory
+       metadata                  metadata.Repo
+       pipeline                  queue.Server
+       omr                       observability.MetricsRegistry
+       lfs                       fs.FileSystem
+       writeListener             bus.MessageListener
+       dataNodeSelector          node.Selector
+       l                         *logger.Logger
+       schemaRepo                schemaRepo
+       dataPath                  string
+       root                      string
+       option                    option
+       maxDiskUsagePercent       int
+       failedPartsMaxSizePercent int
 }
 
 func (s *liaison) Stream(metadata *commonv1.Metadata) (Stream, error) {
@@ -83,6 +84,10 @@ func (s *liaison) FlagSet() *run.FlagSet {
        flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout", 
defaultFlushTimeout, "the memory data timeout of stream")
        flagS.IntVar(&s.maxDiskUsagePercent, "stream-max-disk-usage-percent", 
95, "the maximum disk usage percentage allowed")
        flagS.DurationVar(&s.option.syncInterval, "stream-sync-interval", 
defaultSyncInterval, "the periodic sync interval for stream data")
+       flagS.IntVar(&s.failedPartsMaxSizePercent, 
"failed-parts-max-size-percent", 10,
+               "percentage of BanyanDB's allowed disk usage allocated to 
failed parts storage. "+
+                       "Calculated as: totalDisk * 
stream-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+                       "Set to 0 to disable copying failed parts. Valid range: 
0-100")
        return flagS
 }
 
@@ -96,6 +101,9 @@ func (s *liaison) Validate() error {
        if s.maxDiskUsagePercent > 100 {
                return errors.New("stream-max-disk-usage-percent must be less 
than or equal to 100")
        }
+       if s.failedPartsMaxSizePercent < 0 || s.failedPartsMaxSizePercent > 100 
{
+               return errors.New("failed-parts-max-size-percent must be 
between 0 and 100")
+       }
        return nil
 }
 
@@ -123,6 +131,22 @@ func (s *liaison) PreRun(ctx context.Context) error {
        if !strings.HasPrefix(filepath.VolumeName(s.dataPath), 
filepath.VolumeName(path)) {
                observability.UpdatePath(s.dataPath)
        }
+       s.lfs.MkdirIfNotExist(s.dataPath, storage.DirPerm)
+
+       s.option.failedPartsMaxTotalSizeBytes = 0
+       if s.failedPartsMaxSizePercent > 0 {
+               totalSpace := s.lfs.MustGetTotalSpace(s.dataPath)
+               maxTotalSizeBytes := totalSpace * uint64(s.maxDiskUsagePercent) 
/ 100
+               maxTotalSizeBytes = maxTotalSizeBytes * 
uint64(s.failedPartsMaxSizePercent) / 100
+               s.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+               s.l.Info().
+                       Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+                       Int("failedPartsMaxSizePercent", 
s.failedPartsMaxSizePercent).
+                       Int("maxDiskUsagePercent", s.maxDiskUsagePercent).
+                       Msg("configured failed parts storage limit")
+       } else {
+               s.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
+       }
        streamDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client, 
s.dataNodeSelector)
        s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s, 
streamDataNodeRegistry)
        s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo, 
s.maxDiskUsagePercent, s.option.tire2Client)
diff --git a/banyand/stream/syncer.go b/banyand/stream/syncer.go
index d37235e4..37d3040c 100644
--- a/banyand/stream/syncer.go
+++ b/banyand/stream/syncer.go
@@ -20,9 +20,11 @@ package stream
 import (
        "context"
        "fmt"
+       "strconv"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -36,6 +38,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction, 
flusherNotifier watc
 
        var epoch uint64
        var lastTriggerTime time.Time
+       firstSync := true
 
        ew := flusherNotifier.Add(0, tst.loopCloser.CloseNotify())
        if ew == nil {
@@ -52,7 +55,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction, 
flusherNotifier watc
                        return false
                }
                defer curSnapshot.decRef()
-               if curSnapshot.epoch != epoch {
+               if firstSync || curSnapshot.epoch != epoch {
                        tst.incTotalSyncLoopStarted(1)
                        defer tst.incTotalSyncLoopFinished(1)
                        var err error
@@ -66,6 +69,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction, 
flusherNotifier watc
                                return false
                        }
                        epoch = curSnapshot.epoch
+                       firstSync = false
                        lastTriggerTime = triggerTime
                        if tst.currentEpoch() != epoch {
                                return false
@@ -151,30 +155,100 @@ func createPartFileReaders(part *part) 
([]queue.FileInfo, func()) {
        }
 }
 
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed 
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(ctx context.Context, parts []*part, 
nodes []string, chunkSize uint32, releaseFuncs *[]func()) ([]queue.FailedPart, 
error) {
+       var allFailedParts []queue.FailedPart
+
+       for _, node := range nodes {
+               // Get chunked sync client for this node
+               chunkedClient, err := 
tst.option.tire2Client.NewChunkedSyncClient(node, chunkSize)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to create chunked sync 
client for node %s: %w", node, err)
+               }
+               defer chunkedClient.Close()
+
+               // Prepare streaming parts data
+               var streamingParts []queue.StreamingPartData
+               for _, part := range parts {
+                       files, release := createPartFileReaders(part)
+                       *releaseFuncs = append(*releaseFuncs, release)
+                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
+                               ID:                    part.partMetadata.ID,
+                               Group:                 tst.group,
+                               ShardID:               uint32(tst.shardID),
+                               Topic:                 
data.TopicStreamPartSync.String(),
+                               Files:                 files,
+                               CompressedSizeBytes:   
part.partMetadata.CompressedSizeBytes,
+                               UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
+                               TotalCount:            
part.partMetadata.TotalCount,
+                               BlocksCount:           
part.partMetadata.BlocksCount,
+                               MinTimestamp:          
part.partMetadata.MinTimestamp,
+                               MaxTimestamp:          
part.partMetadata.MaxTimestamp,
+                               PartType:              PartTypeCore,
+                       })
+               }
+
+               result, err := chunkedClient.SyncStreamingParts(ctx, 
streamingParts)
+               if err != nil {
+                       return nil, fmt.Errorf("failed to sync streaming parts 
to node %s: %w", node, err)
+               }
+
+               tst.incTotalSyncLoopBytes(result.TotalBytes)
+               if dl := tst.l.Debug(); dl.Enabled() {
+                       dl.
+                               Str("node", node).
+                               Str("session", result.SessionID).
+                               Uint64("bytes", result.TotalBytes).
+                               Int64("duration_ms", result.DurationMs).
+                               Uint32("chunks", result.ChunksCount).
+                               Uint32("parts", result.PartsCount).
+                               Int("failed_parts", len(result.FailedParts)).
+                               Msg("chunked sync completed")
+               }
+
+               allFailedParts = append(allFailedParts, result.FailedParts...)
+       }
+
+       return allFailedParts, nil
+}
+
 func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan 
*syncIntroduction) error {
        startTime := time.Now()
        defer func() {
                tst.incTotalSyncLoopLatency(time.Since(startTime).Seconds())
        }()
 
-       // Get all parts from the current snapshot
-       var partsToSync []*part
-       for _, pw := range curSnapshot.parts {
-               if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
-                       partsToSync = append(partsToSync, pw.p)
-               }
-       }
-
+       partsToSync := tst.collectPartsToSync(curSnapshot)
        if len(partsToSync) == 0 {
                return nil
        }
+
        nodes := tst.getNodes()
        if len(nodes) == 0 {
                return fmt.Errorf("no nodes to sync parts")
        }
 
-       // Sort parts from old to new (by part ID)
-       // Parts with lower IDs are older
+       tst.sortPartsByID(partsToSync)
+
+       if err := tst.executeSyncWithRetry(partsToSync, nodes); err != nil {
+               return err
+       }
+
+       return tst.sendSyncIntroduction(partsToSync, syncCh)
+}
+
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) []*part {
+       var partsToSync []*part
+       for _, pw := range curSnapshot.parts {
+               if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
+                       partsToSync = append(partsToSync, pw.p)
+               }
+       }
+       return partsToSync
+}
+
+func (tst *tsTable) sortPartsByID(partsToSync []*part) {
        for i := 0; i < len(partsToSync); i++ {
                for j := i + 1; j < len(partsToSync); j++ {
                        if partsToSync[i].partMetadata.ID > 
partsToSync[j].partMetadata.ID {
@@ -182,8 +256,12 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                        }
                }
        }
+}
+
+func (tst *tsTable) executeSyncWithRetry(partsToSync []*part, nodes []string) 
error {
+       failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem, 
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+       partsInfo := tst.buildPartsInfoMap(partsToSync)
 
-       // Use chunked sync with streaming for better memory efficiency
        ctx := context.Background()
        releaseFuncs := make([]func(), 0, len(partsToSync))
        defer func() {
@@ -192,66 +270,166 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                }
        }()
 
+       perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes, 
&releaseFuncs)
+       if len(perNodeFailures) > 0 {
+               tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures, 
partsInfo, failedPartsHandler)
+       }
+
+       return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part) 
map[uint64][]*storage.PartInfo {
+       partsInfo := make(map[uint64][]*storage.PartInfo)
+       for _, part := range partsToSync {
+               partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+                       {
+                               PartID:     part.partMetadata.ID,
+                               SourcePath: part.path,
+                               PartType:   PartTypeCore,
+                       },
+               }
+       }
+       return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+       ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs 
*[]func(),
+) map[string][]queue.FailedPart {
+       perNodeFailures := make(map[string][]queue.FailedPart)
        for _, node := range nodes {
-               // Get chunked sync client for this node
-               chunkedClient, err := 
tst.option.tire2Client.NewChunkedSyncClient(node, 1024*1024)
+               failedParts, err := tst.syncPartsToNodesHelper(ctx, 
partsToSync, []string{node}, 1024*1024, releaseFuncs)
                if err != nil {
-                       return fmt.Errorf("failed to create chunked sync client 
for node %s: %w", node, err)
+                       tst.l.Error().Err(err).Str("node", node).Msg("sync 
error")
+                       // Mark all parts as failed for this node
+                       var allPartsFailed []queue.FailedPart
+                       for _, part := range partsToSync {
+                               allPartsFailed = append(allPartsFailed, 
queue.FailedPart{
+                                       PartID: 
strconv.FormatUint(part.partMetadata.ID, 10),
+                                       Error:  fmt.Sprintf("node %s: %v", 
node, err),
+                               })
+                       }
+                       perNodeFailures[node] = allPartsFailed
+                       continue
                }
-               defer chunkedClient.Close()
+               if len(failedParts) > 0 {
+                       perNodeFailures[node] = failedParts
+               }
+       }
+       return perNodeFailures
+}
 
-               // Prepare streaming parts data for chunked sync
-               var streamingParts []queue.StreamingPartData
+func (tst *tsTable) handleFailedPartsRetry(
+       ctx context.Context, partsToSync []*part, perNodeFailures 
map[string][]queue.FailedPart,
+       partsInfo map[uint64][]*storage.PartInfo, failedPartsHandler 
*storage.FailedPartsHandler,
+) {
+       allFailedParts := tst.collectAllFailedParts(perNodeFailures)
+       syncFunc := tst.createRetrySyncFunc(ctx, partsToSync, perNodeFailures)
+
+       permanentlyFailedParts, err := failedPartsHandler.RetryFailedParts(ctx, 
allFailedParts, partsInfo, syncFunc)
+       if err != nil {
+               tst.l.Warn().Err(err).Msg("error during retry process")
+       }
+       if len(permanentlyFailedParts) > 0 {
+               tst.l.Error().
+                       Uints64("partIDs", permanentlyFailedParts).
+                       Int("count", len(permanentlyFailedParts)).
+                       Msg("parts permanently failed after all retries and 
have been copied to failed-parts directory")
+       }
+}
+
+func (tst *tsTable) collectAllFailedParts(perNodeFailures 
map[string][]queue.FailedPart) []queue.FailedPart {
+       allFailedParts := make([]queue.FailedPart, 0)
+       for _, failedParts := range perNodeFailures {
+               allFailedParts = append(allFailedParts, failedParts...)
+       }
+       return allFailedParts
+}
+
+func (tst *tsTable) createRetrySyncFunc(
+       ctx context.Context, partsToSync []*part, perNodeFailures 
map[string][]queue.FailedPart,
+) func([]uint64) ([]queue.FailedPart, error) {
+       return func(partIDs []uint64) ([]queue.FailedPart, error) {
+               partIDsSet := make(map[uint64]struct{})
+               for _, partID := range partIDs {
+                       partIDsSet[partID] = struct{}{}
+               }
+
+               partsToRetry := tst.filterPartsToRetry(partIDs, partsToSync)
+               if len(partsToRetry) == 0 {
+                       return nil, nil
+               }
+
+               return tst.retryPartsOnFailedNodes(ctx, partIDs, partsToRetry, 
partIDsSet, perNodeFailures)
+       }
+}
+
+func (tst *tsTable) filterPartsToRetry(partIDs []uint64, partsToSync []*part) 
[]*part {
+       partsToRetry := make([]*part, 0)
+       for _, partID := range partIDs {
                for _, part := range partsToSync {
-                       // Create streaming reader for the part
-                       files, release := createPartFileReaders(part)
-                       releaseFuncs = append(releaseFuncs, release)
+                       if part.partMetadata.ID == partID {
+                               partsToRetry = append(partsToRetry, part)
+                               break
+                       }
+               }
+       }
+       return partsToRetry
+}
 
-                       // Create streaming part sync data
-                       streamingParts = append(streamingParts, 
queue.StreamingPartData{
-                               ID:                    part.partMetadata.ID,
-                               Group:                 tst.group,
-                               ShardID:               uint32(tst.shardID),
-                               Topic:                 
data.TopicStreamPartSync.String(),
-                               Files:                 files,
-                               CompressedSizeBytes:   
part.partMetadata.CompressedSizeBytes,
-                               UncompressedSizeBytes: 
part.partMetadata.UncompressedSizeBytes,
-                               TotalCount:            
part.partMetadata.TotalCount,
-                               BlocksCount:           
part.partMetadata.BlocksCount,
-                               MinTimestamp:          
part.partMetadata.MinTimestamp,
-                               MaxTimestamp:          
part.partMetadata.MaxTimestamp,
-                               PartType:              PartTypeCore,
-                       })
+func (tst *tsTable) retryPartsOnFailedNodes(
+       ctx context.Context, partIDs []uint64, partsToRetry []*part,
+       partIDsSet map[uint64]struct{}, perNodeFailures 
map[string][]queue.FailedPart,
+) ([]queue.FailedPart, error) {
+       retryReleaseFuncs := make([]func(), 0)
+       defer func() {
+               for _, release := range retryReleaseFuncs {
+                       release()
                }
+       }()
 
-               // Sync parts using chunked transfer with streaming
-               result, err := chunkedClient.SyncStreamingParts(ctx, 
streamingParts)
-               if err != nil {
-                       return fmt.Errorf("failed to sync streaming parts to 
node %s: %w", node, err)
+       var retryFailedParts []queue.FailedPart
+       for node, nodeFailedParts := range perNodeFailures {
+               if !tst.shouldRetryOnNode(nodeFailedParts, partIDsSet) {
+                       continue
                }
 
-               if !result.Success {
-                       return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
+               failedParts, err := tst.syncPartsToNodesHelper(ctx, 
partsToRetry, []string{node}, 1024*1024, &retryReleaseFuncs)
+               if err != nil {
+                       retryFailedParts = append(retryFailedParts, 
tst.markPartsAsFailed(partIDs, node, err)...)
+                       continue
                }
-               tst.incTotalSyncLoopBytes(result.TotalBytes)
-               if dl := tst.l.Debug(); dl.Enabled() {
-                       dl.
-                               Str("node", node).
-                               Str("session", result.SessionID).
-                               Uint64("bytes", result.TotalBytes).
-                               Int64("duration_ms", result.DurationMs).
-                               Uint32("chunks", result.ChunksCount).
-                               Uint32("parts", result.PartsCount).
-                               Msg("chunked sync completed successfully")
+               retryFailedParts = append(retryFailedParts, failedParts...)
+       }
+
+       return retryFailedParts, nil
+}
+
+func (tst *tsTable) shouldRetryOnNode(nodeFailedParts []queue.FailedPart, 
partIDsSet map[uint64]struct{}) bool {
+       for _, failedPart := range nodeFailedParts {
+               failedPartID, _ := strconv.ParseUint(failedPart.PartID, 10, 64)
+               if _, exists := partIDsSet[failedPartID]; exists {
+                       return true
                }
        }
+       return false
+}
+
+func (tst *tsTable) markPartsAsFailed(partIDs []uint64, node string, err 
error) []queue.FailedPart {
+       var failedParts []queue.FailedPart
+       for _, partID := range partIDs {
+               failedParts = append(failedParts, queue.FailedPart{
+                       PartID: strconv.FormatUint(partID, 10),
+                       Error:  fmt.Sprintf("node %s: %v", node, err),
+               })
+       }
+       return failedParts
+}
 
-       // Construct syncIntroduction to remove synced parts from snapshot
+func (tst *tsTable) sendSyncIntroduction(partsToSync []*part, syncCh chan 
*syncIntroduction) error {
        si := generateSyncIntroduction()
        defer releaseSyncIntroduction(si)
        si.applied = make(chan struct{})
 
-       // Mark all synced parts for removal
        for _, part := range partsToSync {
                si.synced[part.partMetadata.ID] = struct{}{}
        }
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 9384c4b4..4ce74d7a 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -230,6 +230,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                        if ee[i].Name() == inverted.ExternalSegmentTempDirName {
                                continue
                        }
+                       if ee[i].Name() == storage.FailedPartsDirName {
+                               continue
+                       }
                        p, err := parseEpoch(ee[i].Name())
                        if err != nil {
                                l.Info().Err(err).Msg("cannot parse part file 
name. skip and delete it")
diff --git a/banyand/trace/handoff_controller.go 
b/banyand/trace/handoff_controller.go
index 4ba09c2e..2b4d5706 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -1246,7 +1246,7 @@ func (hc *handoffController) sendPartToNode(ctx 
context.Context, nodeAddr string
        }
 
        if !result.Success {
-               return fmt.Errorf("sync failed: %s", result.ErrorMessage)
+               return fmt.Errorf("sync failed: %s", result.FailedParts)
        }
 
        hc.l.Debug().
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 0a5de970..bfb1204e 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -46,22 +46,23 @@ import (
 )
 
 type liaison struct {
-       metadata              metadata.Repo
-       pipeline              queue.Server
-       omr                   observability.MetricsRegistry
-       lfs                   fs.FileSystem
-       writeListener         bus.MessageListener
-       dataNodeSelector      node.Selector
-       pm                    protector.Memory
-       handoffCtrl           *handoffController
-       l                     *logger.Logger
-       schemaRepo            schemaRepo
-       dataPath              string
-       root                  string
-       dataNodeList          []string
-       option                option
-       maxDiskUsagePercent   int
-       handoffMaxSizePercent int
+       metadata                  metadata.Repo
+       pipeline                  queue.Server
+       omr                       observability.MetricsRegistry
+       lfs                       fs.FileSystem
+       writeListener             bus.MessageListener
+       dataNodeSelector          node.Selector
+       pm                        protector.Memory
+       handoffCtrl               *handoffController
+       l                         *logger.Logger
+       schemaRepo                schemaRepo
+       dataPath                  string
+       root                      string
+       dataNodeList              []string
+       option                    option
+       maxDiskUsagePercent       int
+       handoffMaxSizePercent     int
+       failedPartsMaxSizePercent int
 }
 
 var _ Service = (*liaison)(nil)
@@ -100,6 +101,10 @@ func (l *liaison) FlagSet() *run.FlagSet {
                        "Calculated as: totalDisk * 
trace-max-disk-usage-percent * handoff-max-size-percent / 10000. "+
                        "Example: 100GB disk with 95% max usage and 10% handoff 
= 9.5GB; 50% handoff = 47.5GB. "+
                        "Valid range: 0-100")
+       fs.IntVar(&l.failedPartsMaxSizePercent, 
"failed-parts-max-size-percent", 10,
+               "percentage of BanyanDB's allowed disk usage allocated to 
failed parts storage. "+
+                       "Calculated as: totalDisk * 
trace-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+                       "Set to 0 to disable copying failed parts. Valid range: 
0-100")
        return fs
 }
 
@@ -118,6 +123,10 @@ func (l *liaison) Validate() error {
                        l.handoffMaxSizePercent)
        }
 
+       if l.failedPartsMaxSizePercent < 0 || l.failedPartsMaxSizePercent > 100 
{
+               return fmt.Errorf("invalid failed-parts-max-size-percent: %d%%. 
Must be between 0 and 100", l.failedPartsMaxSizePercent)
+       }
+
        return nil
 }
 
@@ -142,8 +151,23 @@ func (l *liaison) PreRun(ctx context.Context) error {
        if l.dataPath == "" {
                l.dataPath = filepath.Join(path, storage.DataDir)
        }
+       l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
 
        traceDataNodeRegistry := 
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client, 
l.dataNodeSelector)
+       l.option.failedPartsMaxTotalSizeBytes = 0
+       if l.failedPartsMaxSizePercent > 0 {
+               totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+               maxTotalSizeBytes := totalSpace * uint64(l.maxDiskUsagePercent) 
/ 100
+               maxTotalSizeBytes = maxTotalSizeBytes * 
uint64(l.failedPartsMaxSizePercent) / 100
+               l.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+               l.l.Info().
+                       Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+                       Int("failedPartsMaxSizePercent", 
l.failedPartsMaxSizePercent).
+                       Int("maxDiskUsagePercent", l.maxDiskUsagePercent).
+                       Msg("configured failed parts storage limit")
+       } else {
+               l.l.Info().Msg("failed parts storage limit disabled (percent 
set to 0)")
+       }
        // Initialize handoff controller if data nodes are configured
        l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent", 
l.handoffMaxSizePercent).
                Msg("handoff configuration")
@@ -153,7 +177,6 @@ func (l *liaison) PreRun(ctx context.Context) error {
                // Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 * 
10 / 10000 = 9.5GB
                maxSize := 0
                if l.handoffMaxSizePercent > 0 {
-                       l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
                        totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
                        // Divide after each multiplication to avoid overflow 
with large disk capacities
                        maxSizeBytes := totalSpace * 
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 02d3ded7..eb942b10 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -22,13 +22,15 @@ import (
        "fmt"
        "path/filepath"
        "sort"
+       "strconv"
        "time"
 
        "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
        "github.com/apache/skywalking-banyandb/banyand/queue"
        "github.com/apache/skywalking-banyandb/pkg/bytes"
        "github.com/apache/skywalking-banyandb/pkg/compress/zstd"
-       "github.com/apache/skywalking-banyandb/pkg/logger"
        "github.com/apache/skywalking-banyandb/pkg/watcher"
 )
 
@@ -197,12 +199,15 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, 
syncCh chan *syncIntrodu
                return nil
        }
 
+       // Initialize failed parts handler
+       failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem, 
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+
        // Execute sync operation
-       if err := tst.executeSyncOperation(partsToSync, partIDsToSync); err != 
nil {
+       if err := tst.executeSyncOperation(partsToSync, partIDsToSync, 
failedPartsHandler); err != nil {
                return err
        }
 
-       // Handle sync introductions
+       // Handle sync introductions (includes both successful and permanently 
failed parts)
        return tst.handleSyncIntroductions(partsToSync, syncCh)
 }
 
@@ -226,49 +231,33 @@ func (tst *tsTable) needToSync(partsToSync []*part) bool {
        return len(partsToSync) > 0 && len(nodes) > 0
 }
 
-// executeSyncOperation performs the actual synchronization of parts to nodes.
-func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync 
map[uint64]struct{}) error {
-       sort.Slice(partsToSync, func(i, j int) bool {
-               return partsToSync[i].partMetadata.ID < 
partsToSync[j].partMetadata.ID
-       })
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed 
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(
+       ctx context.Context, parts []*part, partIDsMap map[uint64]struct{},
+       nodes []string, sidxMap map[string]sidx.SIDX, releaseFuncs *[]func(),
+) ([]queue.FailedPart, error) {
+       var allFailedParts []queue.FailedPart
 
-       ctx := context.Background()
-       releaseFuncs := make([]func(), 0, len(partsToSync))
-       defer func() {
-               for _, release := range releaseFuncs {
-                       release()
-               }
-       }()
-
-       nodes := tst.getNodes()
-       if tst.loopCloser != nil && tst.loopCloser.Closed() {
-               return errClosed
-       }
-       sidxMap := tst.getAllSidx()
        for _, node := range nodes {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
-                       return errClosed
+                       return allFailedParts, errClosed
                }
-               // Prepare all streaming parts data
+
+               // Prepare streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
+
                // Add sidx streaming parts
                for name, sidx := range sidxMap {
-                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
-                       if len(sidxStreamingParts) != len(partIDsToSync) {
-                               logger.Panicf("sidx streaming parts count 
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
-                               return nil
-                       }
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
-                       releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
-               }
-               if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
-                       logger.Panicf("streaming parts count mismatch: %d != 
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
-                       return nil
+                       *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)
                }
-               // Create streaming parts from core trace parts.
-               for _, part := range partsToSync {
+
+               // Create streaming parts from core trace parts
+               for _, part := range parts {
                        files, release := createPartFileReaders(part)
-                       releaseFuncs = append(releaseFuncs, release)
+                       *releaseFuncs = append(*releaseFuncs, release)
                        streamingParts = append(streamingParts, 
queue.StreamingPartData{
                                ID:                    part.partMetadata.ID,
                                Group:                 tst.group,
@@ -284,32 +273,201 @@ func (tst *tsTable) executeSyncOperation(partsToSync 
[]*part, partIDsToSync map[
                                PartType:              PartTypeCore,
                        })
                }
+
                sort.Slice(streamingParts, func(i, j int) bool {
                        if streamingParts[i].ID == streamingParts[j].ID {
                                return streamingParts[i].PartType < 
streamingParts[j].PartType
                        }
                        return streamingParts[i].ID < streamingParts[j].ID
                })
-               if err := tst.syncStreamingPartsToNode(ctx, node, 
streamingParts); err != nil {
-                       return err
+
+               failedParts, err := tst.syncStreamingPartsToNode(ctx, node, 
streamingParts)
+               if err != nil {
+                       return allFailedParts, err
+               }
+               allFailedParts = append(allFailedParts, failedParts...)
+       }
+
+       return allFailedParts, nil
+}
+
+// executeSyncOperation performs the actual synchronization of parts to nodes.
+func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync 
map[uint64]struct{}, failedPartsHandler *storage.FailedPartsHandler) error {
+       sort.Slice(partsToSync, func(i, j int) bool {
+               return partsToSync[i].partMetadata.ID < 
partsToSync[j].partMetadata.ID
+       })
+
+       ctx := context.Background()
+       releaseFuncs := make([]func(), 0, len(partsToSync))
+       defer func() {
+               for _, release := range releaseFuncs {
+                       release()
+               }
+       }()
+
+       nodes := tst.getNodes()
+       if tst.loopCloser != nil && tst.loopCloser.Closed() {
+               return errClosed
+       }
+
+       sidxMap := tst.getAllSidx()
+
+       // Build part info map for failed parts handler
+       // Each part ID can have multiple entries (core + SIDX parts)
+       partsInfo := make(map[uint64][]*storage.PartInfo)
+       // Add core parts
+       for _, part := range partsToSync {
+               partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+                       {
+                               PartID:     part.partMetadata.ID,
+                               SourcePath: part.path,
+                               PartType:   PartTypeCore,
+                       },
+               }
+       }
+       // Add SIDX parts
+       for sidxName, sidxInstance := range sidxMap {
+               partPaths := sidxInstance.PartPaths(partIDsToSync)
+               for partID, partPath := range partPaths {
+                       partsInfo[partID] = append(partsInfo[partID], 
&storage.PartInfo{
+                               PartID:     partID,
+                               SourcePath: partPath,
+                               PartType:   sidxName,
+                       })
+               }
+       }
+
+       // Initial sync attempt - track failures per node
+       perNodeFailures := make(map[string][]queue.FailedPart) // node -> 
failed parts
+       for _, node := range nodes {
+               if tst.loopCloser != nil && tst.loopCloser.Closed() {
+                       return errClosed
+               }
+               failedParts, err := tst.syncPartsToNodesHelper(ctx, 
partsToSync, partIDsToSync, []string{node}, sidxMap, &releaseFuncs)
+               if err != nil {
+                       tst.l.Error().Err(err).Str("node", node).Msg("sync 
error")
+                       // Mark all parts as failed for this node
+                       var allPartsFailed []queue.FailedPart
+                       for _, part := range partsToSync {
+                               allPartsFailed = append(allPartsFailed, 
queue.FailedPart{
+                                       PartID: 
strconv.FormatUint(part.partMetadata.ID, 10),
+                                       Error:  fmt.Sprintf("node %s: %v", 
node, err),
+                               })
+                       }
+                       perNodeFailures[node] = allPartsFailed
+                       continue
+               }
+               if len(failedParts) > 0 {
+                       perNodeFailures[node] = failedParts
                }
        }
 
        // After sync attempts, enqueue parts for offline nodes
        tst.enqueueForOfflineNodes(nodes, partsToSync, partIDsToSync)
 
+       // If there are failed parts, use the retry handler
+       if len(perNodeFailures) > 0 {
+               // Collect all unique failed parts for retry handler
+               allFailedParts := make([]queue.FailedPart, 0)
+               for _, failedParts := range perNodeFailures {
+                       allFailedParts = append(allFailedParts, failedParts...)
+               }
+
+               // Create a sync function for retries - only retry on nodes 
where parts failed
+               syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+                       // Build map of partIDs to retry
+                       partIDsSet := make(map[uint64]struct{})
+                       for _, partID := range partIDs {
+                               partIDsSet[partID] = struct{}{}
+                       }
+
+                       // Filter parts to retry
+                       partsToRetry := make([]*part, 0)
+                       partIDsMap := make(map[uint64]struct{})
+                       for _, partID := range partIDs {
+                               partIDsMap[partID] = struct{}{}
+                               for _, part := range partsToSync {
+                                       if part.partMetadata.ID == partID {
+                                               partsToRetry = 
append(partsToRetry, part)
+                                               break
+                                       }
+                               }
+                       }
+
+                       if len(partsToRetry) == 0 {
+                               return nil, nil
+                       }
+
+                       retryReleaseFuncs := make([]func(), 0)
+                       defer func() {
+                               for _, release := range retryReleaseFuncs {
+                                       release()
+                               }
+                       }()
+
+                       // Only retry on nodes where these specific parts failed
+                       var retryFailedParts []queue.FailedPart
+                       for node, nodeFailedParts := range perNodeFailures {
+                               // Check if any of the parts to retry failed on 
this node
+                               shouldRetryOnNode := false
+                               for _, failedPart := range nodeFailedParts {
+                                       failedPartID, _ := 
strconv.ParseUint(failedPart.PartID, 10, 64)
+                                       if _, exists := 
partIDsSet[failedPartID]; exists {
+                                               shouldRetryOnNode = true
+                                               break
+                                       }
+                               }
+
+                               if !shouldRetryOnNode {
+                                       continue
+                               }
+
+                               // Retry only on this specific node
+                               failedParts, err := 
tst.syncPartsToNodesHelper(ctx, partsToRetry, partIDsMap, []string{node}, 
sidxMap, &retryReleaseFuncs)
+                               if err != nil {
+                                       // On error, mark all parts as failed 
for this node
+                                       for _, partID := range partIDs {
+                                               retryFailedParts = 
append(retryFailedParts, queue.FailedPart{
+                                                       PartID: 
strconv.FormatUint(partID, 10),
+                                                       Error:  
fmt.Sprintf("node %s: %v", node, err),
+                                               })
+                                       }
+                                       continue
+                               }
+                               retryFailedParts = append(retryFailedParts, 
failedParts...)
+                       }
+
+                       return retryFailedParts, nil
+               }
+
+               permanentlyFailedParts, err := 
failedPartsHandler.RetryFailedParts(ctx, allFailedParts, partsInfo, syncFunc)
+               if err != nil {
+                       tst.l.Warn().Err(err).Msg("error during retry process")
+               }
+               if len(permanentlyFailedParts) > 0 {
+                       tst.l.Error().
+                               Uints64("partIDs", permanentlyFailedParts).
+                               Int("count", len(permanentlyFailedParts)).
+                               Msg("parts permanently failed after all retries 
and have been copied to failed-parts directory")
+               }
+       }
+
        return nil
 }
 
 // handleSyncIntroductions creates and processes sync introductions for both 
core and sidx parts.
+// Includes both successful and permanently failed parts to ensure snapshot 
cleanup.
 func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, syncCh chan 
*syncIntroduction) error {
        // Create core sync introduction
        si := generateSyncIntroduction()
        defer releaseSyncIntroduction(si)
        si.applied = make(chan struct{})
+
+       // Add all parts (both successful and permanently failed)
        for _, part := range partsToSync {
                si.synced[part.partMetadata.ID] = struct{}{}
        }
+       // Permanently failed parts are already included since they're in 
partsToSync
 
        select {
        case syncCh <- si:
@@ -326,23 +484,21 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync 
[]*part, syncCh chan *sy
 }
 
 // syncStreamingPartsToNode synchronizes streaming parts to a node.
-func (tst *tsTable) syncStreamingPartsToNode(ctx context.Context, node string, 
streamingParts []queue.StreamingPartData) error {
+// Returns the list of failed parts.
+func (tst *tsTable) syncStreamingPartsToNode(ctx context.Context, node string, 
streamingParts []queue.StreamingPartData) ([]queue.FailedPart, error) {
        // Get chunked sync client for this node
        chunkedClient, err := tst.option.tire2Client.NewChunkedSyncClient(node, 
1024*1024)
        if err != nil {
-               return fmt.Errorf("failed to create chunked sync client for 
node %s: %w", node, err)
+               return nil, fmt.Errorf("failed to create chunked sync client 
for node %s: %w", node, err)
        }
        defer chunkedClient.Close()
 
        // Sync parts using chunked transfer with streaming
        result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts)
        if err != nil {
-               return fmt.Errorf("failed to sync streaming parts to node %s: 
%w", node, err)
+               return nil, fmt.Errorf("failed to sync streaming parts to node 
%s: %w", node, err)
        }
 
-       if !result.Success {
-               return fmt.Errorf("chunked sync partially failed: %v", 
result.ErrorMessage)
-       }
        tst.incTotalSyncLoopBytes(result.TotalBytes)
        if dl := tst.l.Debug(); dl.Enabled() {
                dl.
@@ -352,7 +508,9 @@ func (tst *tsTable) syncStreamingPartsToNode(ctx 
context.Context, node string, s
                        Int64("duration_ms", result.DurationMs).
                        Uint32("chunks", result.ChunksCount).
                        Uint32("parts", result.PartsCount).
-                       Msg("chunked sync completed successfully")
+                       Int("failed_parts", len(result.FailedParts)).
+                       Msg("chunked sync completed")
        }
-       return nil
+
+       return result.FailedParts, nil
 }
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 7792912f..4b16a4f0 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -51,12 +51,13 @@ const (
 var traceScope = observability.RootScope.SubScope("trace")
 
 type option struct {
-       mergePolicy        *mergePolicy
-       protector          protector.Memory
-       tire2Client        queue.Client
-       seriesCacheMaxSize run.Bytes
-       flushTimeout       time.Duration
-       syncInterval       time.Duration
+       mergePolicy                  *mergePolicy
+       protector                    protector.Memory
+       tire2Client                  queue.Client
+       seriesCacheMaxSize           run.Bytes
+       flushTimeout                 time.Duration
+       syncInterval                 time.Duration
+       failedPartsMaxTotalSizeBytes uint64
 }
 
 // Service allows inspecting the trace data.
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 2d83dc25..287b17a8 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -48,21 +48,21 @@ const (
 )
 
 type tsTable struct {
-       l             *logger.Logger
-       fileSystem    fs.FileSystem
-       sidxMap       map[string]sidx.SIDX
        pm            protector.Memory
+       fileSystem    fs.FileSystem
+       handoffCtrl   *handoffController
        metrics       *metrics
        snapshot      *snapshot
        loopCloser    *run.Closer
        getNodes      func() []string
-       handoffCtrl   *handoffController
-       option        option
+       l             *logger.Logger
+       sidxMap       map[string]sidx.SIDX
        introductions chan *introduction
        p             common.Position
        root          string
        group         string
        gc            garbageCleaner
+       option        option
        curPartID     uint64
        sync.RWMutex
        shardID common.ShardID
@@ -219,6 +219,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, 
p common.Position,
                        if ee[i].Name() == sidxDirName {
                                continue
                        }
+                       if ee[i].Name() == storage.FailedPartsDirName {
+                               continue
+                       }
                        p, err := parseEpoch(ee[i].Name())
                        if err != nil {
                                l.Info().Err(err).Msg("cannot parse part file 
name. skip and delete it")
diff --git a/test/integration/distributed/sync_retry/injector.go 
b/test/integration/distributed/sync_retry/injector.go
new file mode 100644
index 00000000..d5bca1d5
--- /dev/null
+++ b/test/integration/distributed/sync_retry/injector.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_sync_retry_test provides test utilities for sync retry 
integration tests.
+package integration_sync_retry_test
+
+import (
+       "fmt"
+       "strconv"
+       "sync"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+type topicFailureRule struct {
+       remaining int
+       attempts  int
+}
+
+type chunkedSyncTestInjector struct {
+       rules map[string]*topicFailureRule
+       mu    sync.Mutex
+}
+
+func newChunkedSyncTestInjector(config map[string]int) 
*chunkedSyncTestInjector {
+       rules := make(map[string]*topicFailureRule, len(config))
+       for topic, remaining := range config {
+               rules[topic] = &topicFailureRule{remaining: remaining}
+       }
+       return &chunkedSyncTestInjector{rules: rules}
+}
+
+func (c *chunkedSyncTestInjector) BeforeSync(parts []queue.StreamingPartData) 
(bool, []queue.FailedPart, error) {
+       if len(parts) == 0 {
+               return false, nil, nil
+       }
+
+       topic := parts[0].Topic
+
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       rule, ok := c.rules[topic]
+       if !ok {
+               return false, nil, nil
+       }
+
+       rule.attempts++
+       if rule.remaining == 0 {
+               return false, nil, nil
+       }
+
+       if rule.remaining > 0 {
+               rule.remaining--
+       }
+
+       failedParts := make([]queue.FailedPart, 0, len(parts))
+       for _, part := range parts {
+               failedParts = append(failedParts, queue.FailedPart{
+                       PartID: strconv.FormatUint(part.ID, 10),
+                       Error:  fmt.Sprintf("injected failure (topic=%s 
attempt=%d)", topic, rule.attempts),
+               })
+       }
+
+       return true, failedParts, nil
+}
+
+func (c *chunkedSyncTestInjector) attemptsFor(topic string) int {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if rule, ok := c.rules[topic]; ok {
+               return rule.attempts
+       }
+       return 0
+}
+
+func withChunkedSyncFailureInjector(config map[string]int) 
(*chunkedSyncTestInjector, func()) {
+       injector := newChunkedSyncTestInjector(config)
+       queue.RegisterChunkedSyncFailureInjector(injector)
+       cleanup := func() {
+               queue.ClearChunkedSyncFailureInjector()
+       }
+       return injector, cleanup
+}
+
+// chunkedSyncErrorInjector injects connection-level errors (not just failed 
parts)
+// to test the scenario where syncPartsToNodesHelper returns an error.
+type chunkedSyncErrorInjector struct {
+       rules map[string]*topicFailureRule
+       mu    sync.Mutex
+}
+
+func newChunkedSyncErrorInjector(config map[string]int) 
*chunkedSyncErrorInjector {
+       rules := make(map[string]*topicFailureRule, len(config))
+       for topic, remaining := range config {
+               rules[topic] = &topicFailureRule{remaining: remaining}
+       }
+       return &chunkedSyncErrorInjector{rules: rules}
+}
+
+func (c *chunkedSyncErrorInjector) BeforeSync(parts []queue.StreamingPartData) 
(bool, []queue.FailedPart, error) {
+       if len(parts) == 0 {
+               return false, nil, nil
+       }
+
+       topic := parts[0].Topic
+
+       c.mu.Lock()
+       defer c.mu.Unlock()
+
+       rule, ok := c.rules[topic]
+       if !ok {
+               return false, nil, nil
+       }
+
+       rule.attempts++
+       if rule.remaining == 0 {
+               return false, nil, nil
+       }
+
+       if rule.remaining > 0 {
+               rule.remaining--
+       }
+
+       // Return an error to simulate connection/client failures
+       return true, nil, fmt.Errorf("injected connection error (topic=%s 
attempt=%d)", topic, rule.attempts)
+}
+
+func (c *chunkedSyncErrorInjector) attemptsFor(topic string) int {
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if rule, ok := c.rules[topic]; ok {
+               return rule.attempts
+       }
+       return 0
+}
+
+func withChunkedSyncErrorInjector(config map[string]int) 
(*chunkedSyncErrorInjector, func()) {
+       injector := newChunkedSyncErrorInjector(config)
+       queue.RegisterChunkedSyncFailureInjector(injector)
+       cleanup := func() {
+               queue.ClearChunkedSyncFailureInjector()
+       }
+       return injector, cleanup
+}
diff --git a/test/integration/distributed/sync_retry/sync_retry_suite_test.go 
b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
new file mode 100644
index 00000000..40396bfd
--- /dev/null
+++ b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "testing"
+
+       g "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "github.com/onsi/gomega/gleak"
+
+       "github.com/apache/skywalking-banyandb/banyand/metadata"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+       "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+       "github.com/apache/skywalking-banyandb/pkg/test/setup"
+       test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+       test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+func TestDistributedSyncRetry(t *testing.T) {
+       gomega.RegisterFailHandler(g.Fail)
+       g.RunSpecs(t, "Distributed Sync Retry Suite")
+}
+
+type suiteConfig struct {
+       LiaisonAddr string   `json:"liaisonAddr"`
+       DataPaths   []string `json:"dataPaths"`
+}
+
+var (
+       liaisonAddr string
+       dataPaths   []string
+
+       cleanupFuncs []func()
+       goods        []gleak.Goroutine
+)
+
+var _ = g.SynchronizedBeforeSuite(func() []byte {
+       goods = gleak.Goroutines()
+       gomega.Expect(logger.Init(logger.Logging{Env: "dev", Level: 
flags.LogLevel})).To(gomega.Succeed())
+
+       ports, err := test.AllocateFreePorts(2)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       dir, releaseSpace, err := test.NewSpace()
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+       clientEP := fmt.Sprintf("http://127.0.0.1:%d";, ports[0])
+       serverEP := fmt.Sprintf("http://127.0.0.1:%d";, ports[1])
+
+       server, err := embeddedetcd.NewServer(
+               embeddedetcd.ConfigureListener([]string{clientEP}, 
[]string{serverEP}),
+               embeddedetcd.RootDir(dir),
+               embeddedetcd.AutoCompactionMode("periodic"),
+               embeddedetcd.AutoCompactionRetention("1h"),
+               embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+       )
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       <-server.ReadyNotify()
+
+       cleanupFuncs = append([]func(){
+               func() {
+                       _ = server.Close()
+                       <-server.StopNotify()
+                       releaseSpace()
+               },
+       }, cleanupFuncs...)
+
+       schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+               schema.Namespace(metadata.DefaultNamespace),
+               schema.ConfigureServerEndpoints([]string{clientEP}),
+       )
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       defer schemaRegistry.Close()
+
+       ctx := context.Background()
+       test_stream.PreloadSchema(ctx, schemaRegistry)
+       test_measure.PreloadSchema(ctx, schemaRegistry)
+       test_trace.PreloadSchema(ctx, schemaRegistry)
+
+       // Start two data nodes to ensure replication targets exist
+       startDataNode := func() (string, string) {
+               addr, path, closeFn := setup.DataNodeWithAddrAndDir(clientEP)
+               cleanupFuncs = append(cleanupFuncs, closeFn)
+               return addr, path
+       }
+
+       _, path0 := startDataNode()
+       _, path1 := startDataNode()
+       paths := []string{path0, path1}
+
+       liaisonAddrLocal, _, closeLiaison := setup.LiaisonNodeWithHTTP(clientEP)
+       cleanupFuncs = append(cleanupFuncs, closeLiaison)
+
+       cfg := suiteConfig{
+               LiaisonAddr: liaisonAddrLocal,
+               DataPaths:   paths,
+       }
+       payload, err := json.Marshal(cfg)
+       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+       return payload
+}, func(data []byte) {
+       var cfg suiteConfig
+       gomega.Expect(json.Unmarshal(data, &cfg)).To(gomega.Succeed())
+       liaisonAddr = cfg.LiaisonAddr
+       dataPaths = cfg.DataPaths
+})
+
+var _ = g.SynchronizedAfterSuite(func() {
+       // Execute cleanups in reverse order
+       for i := len(cleanupFuncs) - 1; i >= 0; i-- {
+               cleanupFuncs[i]()
+       }
+       gomega.Eventually(gleak.Goroutines, 
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+}, func() {})
diff --git a/test/integration/distributed/sync_retry/sync_retry_test.go 
b/test/integration/distributed/sync_retry/sync_retry_test.go
new file mode 100644
index 00000000..411a9d0e
--- /dev/null
+++ b/test/integration/distributed/sync_retry/sync_retry_test.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+       "errors"
+       "fmt"
+       "io/fs"
+       "os"
+       "path/filepath"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+       casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+       casestracedata 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+const (
+       DefaultMaxRetries  = 3
+       FailedPartsDirName = "failed-parts"
+)
+
+var _ = g.Describe("Chunked sync failure handling", g.Ordered, func() {
+       g.BeforeEach(func() {
+               gomega.Expect(clearFailedPartsDirs()).To(gomega.Succeed())
+       })
+
+       g.AfterEach(func() {
+               queue.ClearChunkedSyncFailureInjector()
+       })
+
+       g.It("retries stream parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, we need 2 * (1 initial 
+ 3 retries) = 8 failures
+               // to ensure parts permanently fail
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicStreamPartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-5 * 
time.Minute).Truncate(time.Millisecond)
+               casesstreamdata.Write(conn, "sw", baseTime, 
500*time.Millisecond)
+
+               g.By("waiting for injected stream sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicStreamPartSync.String())
+               }, flags.EventuallyTimeout, 
500*time.Millisecond).Should(gomega.BeNumerically(">=", 
2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for stream failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("stream")
+               }, flags.EventuallyTimeout, 
time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("stream")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("stream failed parts recorded at %s", dir))
+       })
+
+       g.It("retries measure parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes, 2 shards (2 parts), and 3 max retries:
+               // 2 parts * 2 nodes * (1 initial + 3 retries) = 16 failures 
needed
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicMeasurePartSync.String(): 2 * 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-24 * 
time.Hour).Truncate(time.Millisecond)
+               casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+               g.By("waiting for injected measure sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicMeasurePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for measure failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("measure")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("measure")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("measure failed parts recorded at %s", dir))
+       })
+
+       g.It("retries trace parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, we need 2 * (1 initial 
+ 3 retries) = 8 failures
+               // to ensure parts permanently fail
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicTracePartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-10 * 
time.Minute).Truncate(time.Millisecond)
+               casestracedata.WriteToGroup(conn, "sw", "test-trace-group", 
"sw", baseTime, 500*time.Millisecond)
+
+               g.By("waiting for injected trace sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicTracePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for trace failed-parts directory to be populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("trace")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("trace")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("trace failed parts recorded at %s", dir))
+       })
+
+       g.It("handles stream connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, inject connection-level 
errors
+               // that cause syncPartsToNodesHelper to return an error
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicStreamPartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-15 * 
time.Minute).Truncate(time.Millisecond)
+               casesstreamdata.Write(conn, "sw", baseTime, 
500*time.Millisecond)
+
+               g.By("waiting for injected stream connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicStreamPartSync.String())
+               }, flags.EventuallyTimeout, 
500*time.Millisecond).Should(gomega.BeNumerically(">=", 
2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for stream failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("stream")
+               }, flags.EventuallyTimeout, 
time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("stream")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("stream failed parts recorded at %s (from 
connection errors)", dir))
+       })
+
+       g.It("handles measure connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes, 2 shards, and 3 max retries, inject 
connection-level errors
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicMeasurePartSync.String(): 2 * 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-48 * 
time.Hour).Truncate(time.Millisecond)
+               casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+               g.By("waiting for injected measure connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicMeasurePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for measure failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("measure")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("measure")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("measure failed parts recorded at %s (from 
connection errors)", dir))
+       })
+
+       g.It("handles trace connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, inject connection-level 
errors
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicTracePartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-2 * 
time.Hour).Truncate(time.Millisecond)
+               casestracedata.WriteToGroup(conn, "sw", "test-trace-group", 
"sw", baseTime, 500*time.Millisecond)
+
+               g.By("waiting for injected trace connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicTracePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for trace failed-parts directory to be populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("trace")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("trace")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("trace failed parts recorded at %s (from 
connection errors)", dir))
+       })
+})
+
+func dialLiaison() *grpc.ClientConn {
+       var conn *grpc.ClientConn
+       var err error
+       // Retry connection with Eventually to handle liaison startup timing
+       gomega.Eventually(func() error {
+               conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               return err
+       }, 30*time.Second, time.Second).Should(gomega.Succeed())
+       return conn
+}
+
+func clearFailedPartsDirs() error {
+       for _, root := range dataPaths {
+               if err := filepath.WalkDir(root, func(path string, d 
fs.DirEntry, err error) error {
+                       if err != nil {
+                               return nil
+                       }
+                       if d.IsDir() && d.Name() == FailedPartsDirName {
+                               if removeErr := os.RemoveAll(path); removeErr 
!= nil {
+                                       return removeErr
+                               }
+                               return fs.SkipDir
+                       }
+                       return nil
+               }); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func locateFailedPartsDir(module string) string {
+       errFound := errors.New("found")
+
+       // Search both data node paths and any temp directories (for liaison)
+       searchPaths := append([]string{}, dataPaths...)
+
+       // Also search common temp directory patterns for liaison write queues
+       tmpDirs, _ := filepath.Glob("/tmp/banyandb-test-*")
+       searchPaths = append(searchPaths, tmpDirs...)
+       tmpDirs2, _ := filepath.Glob(os.TempDir() + "/banyandb-test-*")
+       searchPaths = append(searchPaths, tmpDirs2...)
+
+       for _, root := range searchPaths {
+               moduleRoot := filepath.Join(root, module)
+               info, err := os.Stat(moduleRoot)
+               if err != nil || !info.IsDir() {
+                       continue
+               }
+               var match string
+               walkErr := filepath.WalkDir(moduleRoot, func(path string, d 
fs.DirEntry, err error) error {
+                       if err != nil {
+                               return nil
+                       }
+                       if !d.IsDir() || d.Name() != FailedPartsDirName {
+                               return nil
+                       }
+                       entries, readErr := os.ReadDir(path)
+                       if readErr == nil && len(entries) > 0 {
+                               match = path
+                               return errFound
+                       }
+                       return fs.SkipDir
+               })
+               if walkErr != nil && !errors.Is(walkErr, errFound) {
+                       continue
+               }
+               if match != "" {
+                       return match
+               }
+       }
+       return ""
+}

Reply via email to