Copilot commented on code in PR #836:
URL: 
https://github.com/apache/skywalking-banyandb/pull/836#discussion_r2497616376


##########
banyand/trace/syncer.go:
##########
@@ -226,49 +231,33 @@ func (tst *tsTable) needToSync(partsToSync []*part) bool {
        return len(partsToSync) > 0 && len(nodes) > 0
 }
 
-// executeSyncOperation performs the actual synchronization of parts to nodes.
-func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync 
map[uint64]struct{}) error {
-       sort.Slice(partsToSync, func(i, j int) bool {
-               return partsToSync[i].partMetadata.ID < 
partsToSync[j].partMetadata.ID
-       })
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed 
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(
+       ctx context.Context, parts []*part, partIDsMap map[uint64]struct{},
+       nodes []string, sidxMap map[string]sidx.SIDX, releaseFuncs *[]func(),
+) ([]queue.FailedPart, error) {
+       var allFailedParts []queue.FailedPart
 
-       ctx := context.Background()
-       releaseFuncs := make([]func(), 0, len(partsToSync))
-       defer func() {
-               for _, release := range releaseFuncs {
-                       release()
-               }
-       }()
-
-       nodes := tst.getNodes()
-       if tst.loopCloser != nil && tst.loopCloser.Closed() {
-               return errClosed
-       }
-       sidxMap := tst.getAllSidx()
        for _, node := range nodes {
                if tst.loopCloser != nil && tst.loopCloser.Closed() {
-                       return errClosed
+                       return allFailedParts, errClosed
                }
-               // Prepare all streaming parts data
+
+               // Prepare streaming parts data
                streamingParts := make([]queue.StreamingPartData, 0)
+
                // Add sidx streaming parts
                for name, sidx := range sidxMap {
-                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
-                       if len(sidxStreamingParts) != len(partIDsToSync) {
-                               logger.Panicf("sidx streaming parts count 
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
-                               return nil
-                       }
+                       sidxStreamingParts, sidxReleaseFuncs := 
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
                        streamingParts = append(streamingParts, 
sidxStreamingParts...)
-                       releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
-               }
-               if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
-                       logger.Panicf("streaming parts count mismatch: %d != 
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
-                       return nil
+                       *releaseFuncs = append(*releaseFuncs, 
sidxReleaseFuncs...)

Review Comment:
   Missing validation that sidx.StreamingParts returns the expected number of 
parts. The original code at lines 253-255 (now removed) included panic checks 
to verify `len(sidxStreamingParts) == len(partIDsToSync)` and 
`len(streamingParts) == len(partIDsToSync)*len(sidxMap)`. These validation 
checks should be retained or replaced with appropriate error handling to detect 
mismatches early.



##########
banyand/queue/pub/chunked_sync.go:
##########
@@ -129,6 +144,9 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx 
context.Context, parts []queu
                result := finalResp.GetSyncResult()
                success = result.Success
        }
+       if !success && len(failedParts) < len(parts) {
+               success = true
+       }
 

Review Comment:
   The logic for determining partial success is unclear. When `result.Success` 
is false but some parts succeeded (fewer than all parts failed), setting 
`success = true` changes the result semantics. This could mask actual sync 
failures. Consider either keeping `Success = false` and letting callers check 
`FailedParts`, or documenting that Success indicates 'at least one part 
succeeded' rather than 'all parts succeeded'.
   ```suggestion
   
   ```



##########
banyand/internal/storage/failed_parts_handler.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "errors"
+       "fmt"
+       iofs "io/fs"
+       "os"
+       "path/filepath"
+       "sort"
+       "strconv"
+       "sync"
+       "time"
+
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+       // DefaultInitialRetryDelay is the initial delay before the first retry.
+       DefaultInitialRetryDelay = 1 * time.Second
+       // DefaultMaxRetries is the maximum number of retry attempts.
+       DefaultMaxRetries = 3
+       // DefaultBackoffMultiplier is the multiplier for exponential backoff.
+       DefaultBackoffMultiplier = 2
+       // FailedPartsDirName is the name of the directory for failed parts.
+       FailedPartsDirName = "failed-parts"
+)
+
+// FailedPartsHandler handles retry logic and filesystem fallback for failed 
parts.
+type FailedPartsHandler struct {
+       fileSystem        fs.FileSystem
+       l                 *logger.Logger
+       root              string
+       failedPartsDir    string
+       initialRetryDelay time.Duration
+       maxRetries        int
+       backoffMultiplier int
+       maxTotalSizeBytes uint64
+       sizeMu            sync.Mutex
+}
+
+// PartInfo contains information needed to retry or copy a failed part.
+type PartInfo struct {
+       SourcePath string
+       PartType   string
+       PartID     uint64
+}
+
+// NewFailedPartsHandler creates a new handler for failed parts.
+func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l 
*logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler {
+       failedPartsDir := filepath.Join(root, FailedPartsDirName)
+       fileSystem.MkdirIfNotExist(failedPartsDir, DirPerm)
+
+       return &FailedPartsHandler{
+               fileSystem:        fileSystem,
+               root:              root,
+               failedPartsDir:    failedPartsDir,
+               l:                 l,
+               initialRetryDelay: DefaultInitialRetryDelay,
+               maxRetries:        DefaultMaxRetries,
+               backoffMultiplier: DefaultBackoffMultiplier,
+               maxTotalSizeBytes: maxTotalSizeBytes,
+       }
+}
+
+// RetryFailedParts attempts to retry failed parts with exponential backoff.
+// Returns the list of permanently failed part IDs after all retries are 
exhausted.
+func (h *FailedPartsHandler) RetryFailedParts(
+       ctx context.Context,
+       failedParts []queue.FailedPart,
+       partsInfo map[uint64][]*PartInfo,
+       syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) ([]uint64, error) {
+       if len(failedParts) == 0 {
+               return nil, nil
+       }
+
+       // Group failed parts by part ID
+       failedPartIDs := make(map[uint64]string)
+       for _, fp := range failedParts {
+               partID, err := strconv.ParseUint(fp.PartID, 10, 64)
+               if err != nil {
+                       h.l.Warn().Err(err).Str("partID", 
fp.PartID).Msg("failed to parse part ID, skipping")
+                       continue
+               }
+               failedPartIDs[partID] = fp.Error
+       }
+
+       h.l.Warn().
+               Int("count", len(failedPartIDs)).
+               Msg("starting retry process for failed parts")
+
+       // Retry with exponential backoff
+       var stillFailing []uint64
+       for partID, errMsg := range failedPartIDs {
+               if err := h.retryPartWithBackoff(ctx, partID, errMsg, 
syncFunc); err != nil {
+                       h.l.Error().
+                               Err(err).
+                               Uint64("partID", partID).
+                               Msg("part failed after all retries")
+                       stillFailing = append(stillFailing, partID)
+               }
+       }
+
+       // Copy permanently failed parts to failed-parts directory
+       var permanentlyFailed []uint64
+       for _, partID := range stillFailing {
+               partInfoList, exists := partsInfo[partID]
+               if !exists || len(partInfoList) == 0 {
+                       h.l.Warn().Uint64("partID", partID).Msg("no part info 
found for failed part")
+                       permanentlyFailed = append(permanentlyFailed, partID)
+                       continue
+               }
+
+               // Copy all parts with this ID (core + all SIDX parts)
+               allCopied := true
+               for _, partInfo := range partInfoList {
+                       destSubDir := fmt.Sprintf("%016x_%s", partID, 
partInfo.PartType)
+                       if err := h.CopyToFailedPartsDir(partID, 
partInfo.SourcePath, destSubDir); err != nil {
+                               h.l.Error().
+                                       Err(err).
+                                       Uint64("partID", partID).
+                                       Str("partType", partInfo.PartType).
+                                       Str("sourcePath", partInfo.SourcePath).
+                                       Msg("failed to copy part to 
failed-parts directory")
+                               allCopied = false
+                       } else {
+                               h.l.Info().
+                                       Uint64("partID", partID).
+                                       Str("partType", partInfo.PartType).
+                                       Str("destination", 
filepath.Join(h.failedPartsDir, destSubDir)).
+                                       Msg("successfully copied failed part to 
failed-parts directory")
+                       }
+               }
+               if !allCopied {
+                       h.l.Warn().Uint64("partID", partID).Msg("some parts 
failed to copy")
+               }
+               permanentlyFailed = append(permanentlyFailed, partID)
+       }
+
+       return permanentlyFailed, nil
+}
+
+// retryPartWithBackoff retries a single part with exponential backoff.
+func (h *FailedPartsHandler) retryPartWithBackoff(
+       ctx context.Context,
+       partID uint64,
+       initialError string,
+       syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) error {
+       delay := h.initialRetryDelay
+
+       for attempt := 1; attempt <= h.maxRetries; attempt++ {
+               // Wait before retry
+               select {
+               case <-ctx.Done():
+                       return fmt.Errorf("context canceled during retry: %w", 
ctx.Err())
+               case <-time.After(delay):
+               }
+
+               h.l.Info().
+                       Uint64("partID", partID).
+                       Int("attempt", attempt).
+                       Int("maxRetries", h.maxRetries).
+                       Dur("delay", delay).
+                       Msg("retrying failed part")
+
+               // Attempt to sync just this part
+               newFailedParts, err := syncFunc([]uint64{partID})
+               if err != nil {
+                       h.l.Warn().
+                               Err(err).
+                               Uint64("partID", partID).
+                               Int("attempt", attempt).
+                               Msg("retry attempt failed with error")
+                       delay *= time.Duration(h.backoffMultiplier)
+                       continue
+               }
+
+               // Check if this part still failed
+               partStillFailed := false
+               for _, fp := range newFailedParts {
+                       fpID, _ := strconv.ParseUint(fp.PartID, 10, 64)
+                       if fpID == partID {
+                               partStillFailed = true
+                               h.l.Warn().
+                                       Uint64("partID", partID).
+                                       Int("attempt", attempt).
+                                       Str("error", fp.Error).
+                                       Msg("retry attempt failed")
+                               break
+                       }
+               }
+
+               if !partStillFailed {
+                       h.l.Info().
+                               Uint64("partID", partID).
+                               Int("attempt", attempt).
+                               Msg("part successfully synced after retry")
+                       return nil
+               }
+
+               // Exponential backoff for next attempt
+               delay *= time.Duration(h.backoffMultiplier)
+       }
+
+       return fmt.Errorf("part failed after %d retry attempts, initial error: 
%s", h.maxRetries, initialError)
+}
+
+// CopyToFailedPartsDir copies a part to the failed-parts directory using hard 
links.
+func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath 
string, destSubDir string) error {
+       destPath := filepath.Join(h.failedPartsDir, destSubDir)
+
+       if h.maxTotalSizeBytes > 0 {
+               h.sizeMu.Lock()
+               defer h.sizeMu.Unlock()
+       }
+
+       // Check if already exists
+       entries := h.fileSystem.ReadDir(h.failedPartsDir)
+       for _, entry := range entries {
+               if entry.Name() == destSubDir {
+                       h.l.Info().
+                               Uint64("partID", partID).
+                               Str("destSubDir", destSubDir).
+                               Msg("part already exists in failed-parts 
directory")
+                       return nil
+               }
+       }
+
+       if _, err := os.Stat(sourcePath); err != nil {
+               h.l.Error().
+                       Err(err).
+                       Uint64("partID", partID).
+                       Str("sourcePath", sourcePath).
+                       Msg("failed to stat source path before copying to 
failed-parts directory")
+               return fmt.Errorf("failed to stat source path %s: %w", 
sourcePath, err)
+       }
+
+       if h.maxTotalSizeBytes > 0 {
+               currentSize, err := calculatePathSize(h.failedPartsDir)
+               if err != nil {
+                       return fmt.Errorf("failed to calculate current 
failed-parts size: %w", err)
+               }
+               sourceSize, err := calculatePathSize(sourcePath)
+               if err != nil {
+                       return fmt.Errorf("failed to calculate size of source 
path %s: %w", sourcePath, err)
+               }
+
+               if currentSize+sourceSize > h.maxTotalSizeBytes {
+                       type partDir struct {
+                               modTime time.Time
+                               name    string
+                               path    string
+                               size    uint64
+                       }
+
+                       partDirs := make([]partDir, 0, len(entries))
+                       for _, entry := range entries {
+                               if !entry.IsDir() || entry.Name() == destSubDir 
{
+                                       continue
+                               }
+
+                               dirPath := filepath.Join(h.failedPartsDir, 
entry.Name())
+                               info, err := os.Stat(dirPath)
+                               if err != nil {
+                                       h.l.Warn().
+                                               Err(err).
+                                               Str("path", dirPath).
+                                               Msg("failed to stat existing 
failed part directory during eviction")
+                                       continue
+                               }
+
+                               dirSize, err := calculatePathSize(dirPath)
+                               if err != nil {
+                                       h.l.Warn().
+                                               Err(err).
+                                               Str("path", dirPath).
+                                               Msg("failed to calculate size 
for existing failed part directory during eviction")
+                                       continue
+                               }
+
+                               partDirs = append(partDirs, partDir{
+                                       name:    entry.Name(),
+                                       path:    dirPath,
+                                       modTime: info.ModTime(),
+                                       size:    dirSize,
+                               })
+                       }
+
+                       sort.Slice(partDirs, func(i, j int) bool {
+                               if 
partDirs[i].modTime.Equal(partDirs[j].modTime) {
+                                       return partDirs[i].name < 
partDirs[j].name
+                               }
+                               return 
partDirs[i].modTime.Before(partDirs[j].modTime)
+                       })
+
+                       for _, dir := range partDirs {
+                               if currentSize+sourceSize <= 
h.maxTotalSizeBytes {
+                                       break
+                               }
+
+                               if err := os.RemoveAll(dir.path); err != nil {
+                                       return fmt.Errorf("failed to remove 
oldest failed part %s: %w", dir.name, err)
+                               }
+
+                               h.l.Info().
+                                       Str("removedFailedPartDir", dir.name).
+                                       Uint64("freedBytes", dir.size).
+                                       Msg("removed oldest failed part to 
honor size limit")
+
+                               if currentSize >= dir.size {
+                                       currentSize -= dir.size
+                               } else {

Review Comment:
   The safety check for underflow (lines 331-335) suggests potential 
calculation issues. If currentSize can be less than dir.size, this indicates 
the size tracking is inconsistent. Consider investigating why this edge case 
exists and either fix the root cause or add a warning log when this condition 
occurs.
   ```suggestion
                                } else {
                                        h.l.Warn().
                                                Uint64("currentSize", 
currentSize).
                                                Uint64("dirSize", dir.size).
                                                Str("dirName", dir.name).
                                                Msg("currentSize is less than 
dir.size during failed-parts eviction; setting currentSize to 0. This indicates 
a possible size tracking inconsistency.")
   ```



##########
banyand/queue/pub/chunked_sync_test.go:
##########
@@ -228,8 +228,209 @@ var _ = ginkgo.Describe("Chunked Sync Retry Mechanism", 
func() {
                        
gomega.Expect(mockServer.checksumMismatchCount).To(gomega.Equal(0))
                })
        })
+
+       ginkgo.Context("when a part fails mid-read during chunk building", 
func() {
+               ginkgo.It("should discard incomplete chunk and continue with 
non-failed parts", func() {
+                       address := getAddress()
+
+                       // Track chunks received to verify no partial data is 
sent
+                       var receivedChunks []*clusterv1.SyncPartRequest
+                       var mu sync.Mutex
+
+                       // Custom mock server that records chunks
+                       s := grpc.NewServer()
+                       mockSvc := &recordingMockServer{
+                               receivedChunks: &receivedChunks,
+                               mu:             &mu,
+                       }
+                       clusterv1.RegisterChunkedSyncServiceServer(s, mockSvc)
+
+                       lis, err := net.Listen("tcp", address)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+                       go func() {
+                               _ = s.Serve(lis)
+                       }()
+                       defer s.GracefulStop()
+
+                       conn, err := grpc.NewClient(address, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       defer conn.Close()
+
+                       client := &chunkedSyncClient{
+                               conn:      conn,
+                               log:       
logger.GetLogger("test-chunked-sync"),
+                               chunkSize: 1024, // Small chunk size to force 
multiple chunks
+                       }
+
+                       // Create test data: Part 1 (will succeed), Part 2 
(will fail mid-read), Part 3 (will succeed)
+                       part1Data := make([]byte, 800)
+                       for i := range part1Data {
+                               part1Data[i] = byte('A')
+                       }
+
+                       part2Data := make([]byte, 1500) // Large enough to span 
multiple reads
+                       for i := range part2Data {
+                               part2Data[i] = byte('B')
+                       }
+
+                       part3Data := make([]byte, 600)
+                       for i := range part3Data {
+                               part3Data[i] = byte('C')
+                       }
+
+                       parts := []queue.StreamingPartData{
+                               {
+                                       ID:      1,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               
createFileInfo("part1-file1.dat", part1Data),
+                                       },
+                               },
+                               {
+                                       ID:      2,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               {
+                                                       Name: "part2-file1.dat",
+                                                       Reader: &failingReader{
+                                                               data:      
part2Data,
+                                                               failAfter: 500, 
// Fail after reading 500 bytes
+                                                       },
+                                               },
+                                       },
+                               },
+                               {
+                                       ID:      3,
+                                       Group:   "test-group",
+                                       ShardID: 1,
+                                       Topic:   "stream_write",
+                                       Files: []queue.FileInfo{
+                                               
createFileInfo("part3-file1.dat", part3Data),
+                                       },
+                               },
+                       }
+
+                       ctx, cancel := 
context.WithTimeout(context.Background(), 30*time.Second)
+                       defer cancel()
+
+                       result, err := client.SyncStreamingParts(ctx, parts)
+                       gomega.Expect(err).NotTo(gomega.HaveOccurred())
+                       gomega.Expect(result).NotTo(gomega.BeNil())
+
+                       // Verify that part 2 is marked as failed
+                       gomega.Expect(result.FailedParts).To(gomega.HaveLen(1))
+                       
gomega.Expect(result.FailedParts[0].PartID).To(gomega.Equal("2"))
+
+                       // Verify that the result is still considered 
successful (partial success)
+                       gomega.Expect(result.Success).To(gomega.BeTrue())
+
+                       // Verify chunks received
+                       mu.Lock()
+                       defer mu.Unlock()
+
+                       // Find where part 2 appears in chunks
+                       var part2FirstChunkIdx, part2LastChunkIdx int = -1, -1

Review Comment:
   [nitpick] Initializing both variables to -1 on the same line is less 
readable than separate declarations. Consider using separate lines for clarity: 
`part2FirstChunkIdx := -1` and `part2LastChunkIdx := -1`.
   ```suggestion
                        var part2FirstChunkIdx int = -1
                        var part2LastChunkIdx int = -1
   ```



##########
test/integration/distributed/sync_retry/sync_retry_test.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+       "errors"
+       "fmt"
+       "io/fs"
+       "os"
+       "path/filepath"
+       "time"
+
+       g "github.com/onsi/ginkgo/v2"
+       "github.com/onsi/gomega"
+       "google.golang.org/grpc"
+       "google.golang.org/grpc/credentials/insecure"
+
+       "github.com/apache/skywalking-banyandb/api/data"
+       "github.com/apache/skywalking-banyandb/banyand/queue"
+       "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       casesmeasuredata 
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+       casesstreamdata 
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+       casestracedata 
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+const (
+       DefaultMaxRetries  = 3
+       FailedPartsDirName = "failed-parts"
+)
+
+var _ = g.Describe("Chunked sync failure handling", g.Ordered, func() {
+       g.BeforeEach(func() {
+               gomega.Expect(clearFailedPartsDirs()).To(gomega.Succeed())
+       })
+
+       g.AfterEach(func() {
+               queue.ClearChunkedSyncFailureInjector()
+       })
+
+       g.It("retries stream parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, we need 2 * (1 initial 
+ 3 retries) = 8 failures
+               // to ensure parts permanently fail
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicStreamPartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-5 * 
time.Minute).Truncate(time.Millisecond)
+               casesstreamdata.Write(conn, "sw", baseTime, 
500*time.Millisecond)
+
+               g.By("waiting for injected stream sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicStreamPartSync.String())
+               }, flags.EventuallyTimeout, 
500*time.Millisecond).Should(gomega.BeNumerically(">=", 
2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for stream failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("stream")
+               }, flags.EventuallyTimeout, 
time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("stream")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("stream failed parts recorded at %s", dir))
+       })
+
+       g.It("retries measure parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes, 2 shards (2 parts), and 3 max retries:
+               // 2 parts * 2 nodes * (1 initial + 3 retries) = 16 failures 
needed
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicMeasurePartSync.String(): 2 * 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-24 * 
time.Hour).Truncate(time.Millisecond)
+               casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+               g.By("waiting for injected measure sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicMeasurePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for measure failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("measure")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("measure")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("measure failed parts recorded at %s", dir))
+       })
+
+       g.It("retries trace parts and records failed segments", func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, we need 2 * (1 initial 
+ 3 retries) = 8 failures
+               // to ensure parts permanently fail
+               injector, cleanup := 
withChunkedSyncFailureInjector(map[string]int{
+                       data.TopicTracePartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               baseTime := time.Now().Add(-10 * 
time.Minute).Truncate(time.Millisecond)
+               casestracedata.WriteToGroup(conn, "sw", "test-trace-group", 
"sw", baseTime, 500*time.Millisecond)
+
+               g.By("waiting for injected trace sync failures")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicTracePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for trace failed-parts directory to be populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("trace")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("trace")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("trace failed parts recorded at %s", dir))
+       })
+
+       g.It("handles stream connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, inject connection-level 
errors
+               // that cause syncPartsToNodesHelper to return an error
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicStreamPartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-15 * 
time.Minute).Truncate(time.Millisecond)
+               casesstreamdata.Write(conn, "sw", baseTime, 
500*time.Millisecond)
+
+               g.By("waiting for injected stream connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicStreamPartSync.String())
+               }, flags.EventuallyTimeout, 
500*time.Millisecond).Should(gomega.BeNumerically(">=", 
2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for stream failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("stream")
+               }, flags.EventuallyTimeout, 
time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("stream")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("stream failed parts recorded at %s (from 
connection errors)", dir))
+       })
+
+       g.It("handles measure connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes, 2 shards, and 3 max retries, inject 
connection-level errors
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicMeasurePartSync.String(): 2 * 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-48 * 
time.Hour).Truncate(time.Millisecond)
+               casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric", 
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+               g.By("waiting for injected measure connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicMeasurePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for measure failed-parts directory to be 
populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("measure")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("measure")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("measure failed parts recorded at %s (from 
connection errors)", dir))
+       })
+
+       g.It("handles trace connection errors and marks all parts as failed", 
func() {
+               conn := dialLiaison()
+               defer conn.Close()
+
+               // With 2 data nodes and 3 max retries, inject connection-level 
errors
+               injector, cleanup := 
withChunkedSyncErrorInjector(map[string]int{
+                       data.TopicTracePartSync.String(): 2 * 
(DefaultMaxRetries + 1),
+               })
+               defer cleanup()
+
+               // Use a different time range to create new parts
+               baseTime := time.Now().Add(-2 * 
time.Hour).Truncate(time.Millisecond)
+               casestracedata.WriteToGroup(conn, "sw", "test-trace-group", 
"sw", baseTime, 500*time.Millisecond)
+
+               g.By("waiting for injected trace connection errors")
+               gomega.Eventually(func() int {
+                       return 
injector.attemptsFor(data.TopicTracePartSync.String())
+               }, flags.EventuallyTimeout, 
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+               g.By("waiting for trace failed-parts directory to be populated")
+               gomega.Eventually(func() string {
+                       return locateFailedPartsDir("trace")
+               }, flags.EventuallyTimeout, 
2*time.Second).ShouldNot(gomega.BeEmpty())
+               dir := locateFailedPartsDir("trace")
+               gomega.Expect(dir).NotTo(gomega.BeEmpty())
+               g.By(fmt.Sprintf("trace failed parts recorded at %s (from 
connection errors)", dir))
+       })
+})
+
+func dialLiaison() *grpc.ClientConn {
+       var conn *grpc.ClientConn
+       var err error
+       // Retry connection with Eventually to handle liaison startup timing
+       gomega.Eventually(func() error {
+               conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second, 
grpc.WithTransportCredentials(insecure.NewCredentials()))
+               return err
+       }, 30*time.Second, time.Second).Should(gomega.Succeed())
+       return conn
+}
+
+func clearFailedPartsDirs() error {
+       for _, root := range dataPaths {
+               if err := filepath.WalkDir(root, func(path string, d 
fs.DirEntry, err error) error {
+                       if err != nil {
+                               return nil
+                       }
+                       if d.IsDir() && d.Name() == FailedPartsDirName {
+                               if removeErr := os.RemoveAll(path); removeErr 
!= nil {
+                                       return removeErr
+                               }
+                               return fs.SkipDir
+                       }
+                       return nil
+               }); err != nil {
+                       return err
+               }
+       }
+       return nil
+}
+
+func locateFailedPartsDir(module string) string {
+       errFound := errors.New("found")
+
+       // Search both data node paths and any temp directories (for liaison)
+       searchPaths := append([]string{}, dataPaths...)
+
+       // Also search common temp directory patterns for liaison write queues
+       tmpDirs, _ := filepath.Glob("/tmp/banyandb-test-*")
+       searchPaths = append(searchPaths, tmpDirs...)
+       tmpDirs2, _ := filepath.Glob(os.TempDir() + "/banyandb-test-*")

Review Comment:
   Glob errors are silently ignored with `_`. If glob pattern evaluation fails 
(e.g., malformed pattern), the search will silently skip those paths. Consider 
logging or handling these errors, especially since this is a test helper that 
should be reliable.
   ```suggestion
        tmpDirs, err := filepath.Glob("/tmp/banyandb-test-*")
        if err != nil {
                fmt.Fprintf(os.Stderr, "Warning: failed to glob 
/tmp/banyandb-test-*: %v\n", err)
        }
        searchPaths = append(searchPaths, tmpDirs...)
        tmpDirs2, err := filepath.Glob(os.TempDir() + "/banyandb-test-*")
        if err != nil {
                fmt.Fprintf(os.Stderr, "Warning: failed to glob %s: %v\n", 
os.TempDir()+"/banyandb-test-*", err)
        }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to