Copilot commented on code in PR #836:
URL:
https://github.com/apache/skywalking-banyandb/pull/836#discussion_r2494162702
##########
banyand/stream/syncer.go:
##########
@@ -52,7 +55,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction,
flusherNotifier watc
return false
}
defer curSnapshot.decRef()
- if curSnapshot.epoch != epoch {
+ if firstSync || curSnapshot.epoch != epoch {
Review Comment:
The `firstSync` flag forces a sync on the first iteration regardless of
epoch changes. While this ensures initial synchronization, the comment on line
210 ('Handle sync introductions (includes both successful and permanently
failed parts)') and the retry logic suggest that permanently failed parts from
previous runs might not be handled properly on restart. Consider documenting
why the first sync is needed or if there's a risk of re-attempting failed parts
that were already copied to the failed-parts directory.
##########
banyand/queue/pub/chunked_sync.go:
##########
@@ -129,6 +144,9 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
result := finalResp.GetSyncResult()
success = result.Success
}
+ if !success && len(failedParts) < len(parts) {
Review Comment:
This logic is incorrect. Setting `success = true` when some (but not all)
parts fail creates an ambiguous state. If `failedParts` contains any entries,
the sync should be considered unsuccessful. This condition should either be
removed or the logic should be: if `len(failedParts) == 0 { success = true }`.
```suggestion
if !success && len(failedParts) == 0 {
```
##########
banyand/measure/syncer.go:
##########
@@ -220,59 +297,153 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}()
+ perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes,
&releaseFuncs)
+ if len(perNodeFailures) > 0 {
+ tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures,
partsInfo, failedPartsHandler)
+ }
+
+ return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part)
map[uint64][]*storage.PartInfo {
+ partsInfo := make(map[uint64][]*storage.PartInfo)
+ for _, part := range partsToSync {
+ partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+ {
+ PartID: part.partMetadata.ID,
+ SourcePath: part.path,
+ PartType: PartTypeCore,
+ },
+ }
+ }
+ return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+ ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs
*[]func(),
+) map[string][]queue.FailedPart {
+ perNodeFailures := make(map[string][]queue.FailedPart)
for _, node := range nodes {
- // Get chunked sync client for this node.
- chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, 512*1024)
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToSync, []string{node}, 512*1024, releaseFuncs)
if err != nil {
- return fmt.Errorf("failed to create chunked sync client
for node %s: %w", node, err)
+ tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
Review Comment:
When `syncPartsToNodesHelper` returns an error, the parts that were supposed
to sync to that node are not added to `perNodeFailures`. This means they won't
be retried and won't be copied to the failed-parts directory. All parts should
be marked as failed for that node when the sync operation returns an error.
```suggestion
tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
// Mark all parts as failed for this node
failedPartsForNode := make([]queue.FailedPart,
len(partsToSync))
for i, part := range partsToSync {
failedPartsForNode[i] = queue.FailedPart{
PartID: part.partMetadata.ID,
Node: node,
}
}
perNodeFailures[node] = failedPartsForNode
```
##########
banyand/queue/pub/chunked_sync.go:
##########
@@ -194,6 +215,12 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
for len(buffer) < cap(buffer) && currentFileIdx <
len(fileStates) {
fileState := fileStates[currentFileIdx]
+ part := parts[fileState.partIndex]
+ if _, failed := failedPartIDs[part.ID]; failed {
+ fileState.finished = true
+ currentFileIdx++
+ continue
+ }
Review Comment:
Once a part's file fails to read, all remaining files for that part are
skipped. However, the chunk being built may contain data from other parts that
have not failed. This could result in incomplete chunks being sent, potentially
corrupting the sync state. Consider either: (1) failing the entire chunk if any
part in it has failed, or (2) ensuring chunks only contain data from non-failed
parts.
##########
banyand/queue/pub/chunked_sync.go:
##########
@@ -270,7 +305,16 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
}
if err := c.sendChunk(stream, sessionID, buffer,
chunkPartsInfo, &chunkIndex, &totalChunks, totalBytesSent, isFirstChunk,
metadata); err != nil {
- return totalChunks, err
+ errMsg := fmt.Sprintf("failed to send chunk:
%v", err)
+ c.log.Error().Err(err).Msg(errMsg)
+ for _, p := range chunkPartsInfo {
+ if _, ok := failedPartIDs[p.Id]; !ok {
+ failedPartIDs[p.Id] = struct{}{}
+ failedParts =
append(failedParts, queue.FailedPart{PartID: fmt.Sprint(p.Id), Error: errMsg})
+ }
+ }
+ buffer = buffer[:0]
+ continue
Review Comment:
When `sendChunk` fails, the function marks all parts in the chunk as failed
and continues processing. However, the remote receiver may be in an
inconsistent state after a failed chunk send. The sync session should likely be
aborted entirely rather than continuing to send subsequent chunks, as the
receiver's state machine may not handle partial chunk sequences correctly.
```suggestion
return totalChunks, failedParts,
fmt.Errorf("failed to send chunk: %w", err)
```
##########
banyand/internal/storage/failed_parts_handler.go:
##########
@@ -0,0 +1,262 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "fmt"
+ "path/filepath"
+ "strconv"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ // DefaultInitialRetryDelay is the initial delay before the first retry.
+ DefaultInitialRetryDelay = 1 * time.Second
+ // DefaultMaxRetries is the maximum number of retry attempts.
+ DefaultMaxRetries = 3
+ // DefaultBackoffMultiplier is the multiplier for exponential backoff.
+ DefaultBackoffMultiplier = 2
+ // FailedPartsDirName is the name of the directory for failed parts.
+ FailedPartsDirName = "failed-parts"
+)
+
+// FailedPartsHandler handles retry logic and filesystem fallback for failed
parts.
+type FailedPartsHandler struct {
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ root string
+ failedPartsDir string
+ initialRetryDelay time.Duration
+ maxRetries int
+ backoffMultiplier int
+}
+
+// PartInfo contains information needed to retry or copy a failed part.
+type PartInfo struct {
+ SourcePath string
+ PartType string
+ PartID uint64
+}
+
+// NewFailedPartsHandler creates a new handler for failed parts.
+func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l
*logger.Logger) *FailedPartsHandler {
+ failedPartsDir := filepath.Join(root, FailedPartsDirName)
+ fileSystem.MkdirIfNotExist(failedPartsDir, DirPerm)
+
+ return &FailedPartsHandler{
+ fileSystem: fileSystem,
+ root: root,
+ failedPartsDir: failedPartsDir,
+ l: l,
+ initialRetryDelay: DefaultInitialRetryDelay,
+ maxRetries: DefaultMaxRetries,
+ backoffMultiplier: DefaultBackoffMultiplier,
+ }
+}
+
+// RetryFailedParts attempts to retry failed parts with exponential backoff.
+// Returns the list of permanently failed part IDs after all retries are
exhausted.
+func (h *FailedPartsHandler) RetryFailedParts(
+ ctx context.Context,
+ failedParts []queue.FailedPart,
+ partsInfo map[uint64][]*PartInfo,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) ([]uint64, error) {
+ if len(failedParts) == 0 {
+ return nil, nil
+ }
+
+ // Group failed parts by part ID
+ failedPartIDs := make(map[uint64]string)
+ for _, fp := range failedParts {
+ partID, err := strconv.ParseUint(fp.PartID, 10, 64)
+ if err != nil {
+ h.l.Warn().Err(err).Str("partID",
fp.PartID).Msg("failed to parse part ID, skipping")
+ continue
+ }
+ failedPartIDs[partID] = fp.Error
+ }
+
+ h.l.Warn().
+ Int("count", len(failedPartIDs)).
+ Msg("starting retry process for failed parts")
+
+ // Retry with exponential backoff
+ var stillFailing []uint64
+ for partID, errMsg := range failedPartIDs {
+ if err := h.retryPartWithBackoff(ctx, partID, errMsg,
syncFunc); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Msg("part failed after all retries")
+ stillFailing = append(stillFailing, partID)
+ }
+ }
+
+ // Copy permanently failed parts to failed-parts directory
+ var permanentlyFailed []uint64
+ for _, partID := range stillFailing {
+ partInfoList, exists := partsInfo[partID]
+ if !exists || len(partInfoList) == 0 {
+ h.l.Warn().Uint64("partID", partID).Msg("no part info
found for failed part")
+ permanentlyFailed = append(permanentlyFailed, partID)
+ continue
+ }
+
+ // Copy all parts with this ID (core + all SIDX parts)
+ allCopied := true
+ for _, partInfo := range partInfoList {
+ destSubDir := fmt.Sprintf("%016x_%s", partID,
partInfo.PartType)
+ if err := h.CopyToFailedPartsDir(partID,
partInfo.SourcePath, destSubDir); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("sourcePath", partInfo.SourcePath).
+ Msg("failed to copy part to
failed-parts directory")
+ allCopied = false
+ } else {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("destination",
filepath.Join(h.failedPartsDir, destSubDir)).
+ Msg("successfully copied failed part to
failed-parts directory")
+ }
+ }
+ if !allCopied {
+ h.l.Warn().Uint64("partID", partID).Msg("some parts
failed to copy")
+ }
+ permanentlyFailed = append(permanentlyFailed, partID)
+ }
+
+ return permanentlyFailed, nil
+}
+
+// retryPartWithBackoff retries a single part with exponential backoff.
+func (h *FailedPartsHandler) retryPartWithBackoff(
+ ctx context.Context,
+ partID uint64,
+ initialError string,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) error {
+ delay := h.initialRetryDelay
+
+ for attempt := 1; attempt <= h.maxRetries; attempt++ {
+ // Wait before retry
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context canceled during retry: %w",
ctx.Err())
+ case <-time.After(delay):
+ }
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Int("maxRetries", h.maxRetries).
+ Dur("delay", delay).
+ Msg("retrying failed part")
+
+ // Attempt to sync just this part
+ newFailedParts, err := syncFunc([]uint64{partID})
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("retry attempt failed with error")
+ delay *= time.Duration(h.backoffMultiplier)
+ continue
+ }
+
+ // Check if this part still failed
+ partStillFailed := false
+ for _, fp := range newFailedParts {
+ fpID, _ := strconv.ParseUint(fp.PartID, 10, 64)
+ if fpID == partID {
+ partStillFailed = true
+ h.l.Warn().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Str("error", fp.Error).
+ Msg("retry attempt failed")
+ break
+ }
+ }
+
+ if !partStillFailed {
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("part successfully synced after retry")
+ return nil
+ }
+
+ // Exponential backoff for next attempt
+ delay *= time.Duration(h.backoffMultiplier)
+ }
+
+ return fmt.Errorf("part failed after %d retry attempts, initial error:
%s", h.maxRetries, initialError)
+}
+
+// CopyToFailedPartsDir copies a part to the failed-parts directory using hard
links.
+func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath
string, destSubDir string) error {
+ destPath := filepath.Join(h.failedPartsDir, destSubDir)
+
+ // Check if already exists
+ entries := h.fileSystem.ReadDir(h.failedPartsDir)
+ for _, entry := range entries {
+ if entry.Name() == destSubDir {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("destSubDir", destSubDir).
+ Msg("part already exists in failed-parts
directory")
+ return nil
+ }
+ }
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("sourcePath", sourcePath).
+ Str("destPath", destPath).
+ Msg("creating hard links to failed-parts directory")
+
+ // Create hard links from source to destination
+ if err := h.fileSystem.CreateHardLink(sourcePath, destPath, nil); err
!= nil {
Review Comment:
The `CreateHardLink` operation may fail on filesystems that don't support
hard links (e.g., some network filesystems, or when source and destination are
on different filesystems). Consider adding a fallback to regular file copy when
hard linking fails, or at minimum, document this limitation and the expected
filesystem requirements.
##########
banyand/trace/syncer.go:
##########
@@ -226,49 +231,33 @@ func (tst *tsTable) needToSync(partsToSync []*part) bool {
return len(partsToSync) > 0 && len(nodes) > 0
}
-// executeSyncOperation performs the actual synchronization of parts to nodes.
-func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync
map[uint64]struct{}) error {
- sort.Slice(partsToSync, func(i, j int) bool {
- return partsToSync[i].partMetadata.ID <
partsToSync[j].partMetadata.ID
- })
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(
+ ctx context.Context, parts []*part, partIDsMap map[uint64]struct{},
+ nodes []string, sidxMap map[string]sidx.SIDX, releaseFuncs *[]func(),
+) ([]queue.FailedPart, error) {
+ var allFailedParts []queue.FailedPart
- ctx := context.Background()
- releaseFuncs := make([]func(), 0, len(partsToSync))
- defer func() {
- for _, release := range releaseFuncs {
- release()
- }
- }()
-
- nodes := tst.getNodes()
- if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
- }
- sidxMap := tst.getAllSidx()
for _, node := range nodes {
if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
+ return allFailedParts, errClosed
}
- // Prepare all streaming parts data
+
+ // Prepare streaming parts data
streamingParts := make([]queue.StreamingPartData, 0)
+
// Add sidx streaming parts
for name, sidx := range sidxMap {
- sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
- if len(sidxStreamingParts) != len(partIDsToSync) {
- logger.Panicf("sidx streaming parts count
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
- return nil
- }
+ sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
Review Comment:
The removed panic checks that verified `len(sidxStreamingParts) ==
len(partIDsToSync)` have been deleted. Without these validation checks,
mismatches between expected and actual streaming parts will silently produce
incorrect results. If the validation is no longer needed, this should be
documented; otherwise, restore the checks or add equivalent validation.
```suggestion
sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
if len(sidxStreamingParts) != len(partIDsMap) {
return nil, fmt.Errorf("mismatch between
expected and actual streaming parts for sidx '%s': expected %d, got %d", name,
len(partIDsMap), len(sidxStreamingParts))
}
```
##########
banyand/stream/syncer.go:
##########
@@ -192,66 +270,157 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}()
+ perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes,
&releaseFuncs)
+ if len(perNodeFailures) > 0 {
+ tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures,
partsInfo, failedPartsHandler)
+ }
+
+ return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part)
map[uint64][]*storage.PartInfo {
+ partsInfo := make(map[uint64][]*storage.PartInfo)
+ for _, part := range partsToSync {
+ partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+ {
+ PartID: part.partMetadata.ID,
+ SourcePath: part.path,
+ PartType: PartTypeCore,
+ },
+ }
+ }
+ return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+ ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs
*[]func(),
+) map[string][]queue.FailedPart {
+ perNodeFailures := make(map[string][]queue.FailedPart)
for _, node := range nodes {
- // Get chunked sync client for this node
- chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, 1024*1024)
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToSync, []string{node}, 1024*1024, releaseFuncs)
if err != nil {
- return fmt.Errorf("failed to create chunked sync client
for node %s: %w", node, err)
+ tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
Review Comment:
When `syncPartsToNodesHelper` returns an error, the parts that were supposed
to sync to that node are not added to `perNodeFailures`. This means they won't
be retried and won't be copied to the failed-parts directory. All parts should
be marked as failed for that node when the sync operation returns an error.
```suggestion
tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
// Mark all parts as failed for this node
failedPartsForNode := make([]queue.FailedPart,
len(partsToSync))
for i, part := range partsToSync {
failedPartsForNode[i] = queue.FailedPart{
PartID: part.partMetadata.ID,
Node: node,
}
}
perNodeFailures[node] = failedPartsForNode
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]