This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 079898ac Implement Backoff Retry Mechanism for Sending Queue Failures
(#836)
079898ac is described below
commit 079898acb7572803fc27df6d75971e3036e814ab
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu Nov 6 14:15:34 2025 +0800
Implement Backoff Retry Mechanism for Sending Queue Failures (#836)
---
CHANGES.md | 1 +
.../backup/lifecycle/measure_migration_visitor.go | 2 +-
.../backup/lifecycle/stream_migration_visitor.go | 2 +-
banyand/internal/storage/failed_parts_handler.go | 409 +++++++++++++++
.../internal/storage/failed_parts_handler_test.go | 574 +++++++++++++++++++++
banyand/internal/storage/segment.go | 13 -
banyand/measure/measure.go | 13 +-
banyand/measure/svc_liaison.go | 48 +-
banyand/measure/syncer.go | 280 ++++++++--
banyand/measure/tstable.go | 15 +-
banyand/queue/local.go | 16 +
banyand/queue/pub/chunked_sync.go | 70 ++-
banyand/queue/pub/chunked_sync_test.go | 234 +++++++++
banyand/queue/pub/pub_tls_test.go | 80 ++-
banyand/queue/queue.go | 54 +-
banyand/stream/stream.go | 15 +-
banyand/stream/svc_liaison.go | 50 +-
banyand/stream/syncer.go | 286 ++++++++--
banyand/stream/tstable.go | 3 +
banyand/trace/handoff_controller.go | 2 +-
banyand/trace/svc_liaison.go | 57 +-
banyand/trace/syncer.go | 250 +++++++--
banyand/trace/trace.go | 13 +-
banyand/trace/tstable.go | 13 +-
.../integration/distributed/sync_retry/injector.go | 159 ++++++
.../sync_retry/sync_retry_suite_test.go | 136 +++++
.../distributed/sync_retry/sync_retry_test.go | 298 +++++++++++
27 files changed, 2831 insertions(+), 262 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 1817a601..53c23f35 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -56,6 +56,7 @@ Release Notes.
- Refactor router for better usability.
- Implement the handoff queue for Trace.
- Add dump command-line tool to parse and display trace part data with support
for CSV export and human-readable timestamp formatting.
+- Implement backoff retry mechanism for sending queue failures.
### Bug Fixes
diff --git a/banyand/backup/lifecycle/measure_migration_visitor.go
b/banyand/backup/lifecycle/measure_migration_visitor.go
index 2a6588de..fd12b85b 100644
--- a/banyand/backup/lifecycle/measure_migration_visitor.go
+++ b/banyand/backup/lifecycle/measure_migration_visitor.go
@@ -343,7 +343,7 @@ func (mv *measureMigrationVisitor) streamPartToNode(nodeID
string, targetShardID
}
if !result.Success {
- return fmt.Errorf("chunked sync partially failed: %v",
result.ErrorMessage)
+ return fmt.Errorf("chunked sync partially failed: %v",
result.FailedParts)
}
// Log success metrics (same pattern as syncer.go:210-217)
diff --git a/banyand/backup/lifecycle/stream_migration_visitor.go
b/banyand/backup/lifecycle/stream_migration_visitor.go
index 6c3fff81..71b03e5f 100644
--- a/banyand/backup/lifecycle/stream_migration_visitor.go
+++ b/banyand/backup/lifecycle/stream_migration_visitor.go
@@ -464,7 +464,7 @@ func (mv *streamMigrationVisitor) streamPartToNode(nodeID
string, targetShardID
}
if !result.Success {
- return fmt.Errorf("chunked sync partially failed: %v",
result.ErrorMessage)
+ return fmt.Errorf("chunked sync partially failed: %v",
result.FailedParts)
}
// Log success metrics (same pattern as syncer.go:210-217)
diff --git a/banyand/internal/storage/failed_parts_handler.go
b/banyand/internal/storage/failed_parts_handler.go
new file mode 100644
index 00000000..700b6635
--- /dev/null
+++ b/banyand/internal/storage/failed_parts_handler.go
@@ -0,0 +1,409 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ iofs "io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ // DefaultInitialRetryDelay is the initial delay before the first retry.
+ DefaultInitialRetryDelay = 1 * time.Second
+ // DefaultMaxRetries is the maximum number of retry attempts.
+ DefaultMaxRetries = 3
+ // DefaultBackoffMultiplier is the multiplier for exponential backoff.
+ DefaultBackoffMultiplier = 2
+ // FailedPartsDirName is the name of the directory for failed parts.
+ FailedPartsDirName = "failed-parts"
+)
+
+// FailedPartsHandler handles retry logic and filesystem fallback for failed
parts.
+type FailedPartsHandler struct {
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ root string
+ failedPartsDir string
+ initialRetryDelay time.Duration
+ maxRetries int
+ backoffMultiplier int
+ maxTotalSizeBytes uint64
+ sizeMu sync.Mutex
+}
+
+// PartInfo contains information needed to retry or copy a failed part.
+type PartInfo struct {
+ SourcePath string
+ PartType string
+ PartID uint64
+}
+
+// NewFailedPartsHandler creates a new handler for failed parts.
+func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l
*logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler {
+ failedPartsDir := filepath.Join(root, FailedPartsDirName)
+ fileSystem.MkdirIfNotExist(failedPartsDir, DirPerm)
+
+ return &FailedPartsHandler{
+ fileSystem: fileSystem,
+ root: root,
+ failedPartsDir: failedPartsDir,
+ l: l,
+ initialRetryDelay: DefaultInitialRetryDelay,
+ maxRetries: DefaultMaxRetries,
+ backoffMultiplier: DefaultBackoffMultiplier,
+ maxTotalSizeBytes: maxTotalSizeBytes,
+ }
+}
+
+// RetryFailedParts attempts to retry failed parts with exponential backoff.
+// Returns the list of permanently failed part IDs after all retries are
exhausted.
+func (h *FailedPartsHandler) RetryFailedParts(
+ ctx context.Context,
+ failedParts []queue.FailedPart,
+ partsInfo map[uint64][]*PartInfo,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) ([]uint64, error) {
+ if len(failedParts) == 0 {
+ return nil, nil
+ }
+
+ // Group failed parts by part ID
+ failedPartIDs := make(map[uint64]string)
+ for _, fp := range failedParts {
+ partID, err := strconv.ParseUint(fp.PartID, 10, 64)
+ if err != nil {
+ h.l.Warn().Err(err).Str("partID",
fp.PartID).Msg("failed to parse part ID, skipping")
+ continue
+ }
+ failedPartIDs[partID] = fp.Error
+ }
+
+ h.l.Warn().
+ Int("count", len(failedPartIDs)).
+ Msg("starting retry process for failed parts")
+
+ // Retry with exponential backoff
+ var stillFailing []uint64
+ for partID, errMsg := range failedPartIDs {
+ if err := h.retryPartWithBackoff(ctx, partID, errMsg,
syncFunc); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Msg("part failed after all retries")
+ stillFailing = append(stillFailing, partID)
+ }
+ }
+
+ // Copy permanently failed parts to failed-parts directory
+ var permanentlyFailed []uint64
+ for _, partID := range stillFailing {
+ partInfoList, exists := partsInfo[partID]
+ if !exists || len(partInfoList) == 0 {
+ h.l.Warn().Uint64("partID", partID).Msg("no part info
found for failed part")
+ permanentlyFailed = append(permanentlyFailed, partID)
+ continue
+ }
+
+ // Copy all parts with this ID (core + all SIDX parts)
+ allCopied := true
+ for _, partInfo := range partInfoList {
+ destSubDir := fmt.Sprintf("%016x_%s", partID,
partInfo.PartType)
+ if err := h.CopyToFailedPartsDir(partID,
partInfo.SourcePath, destSubDir); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("sourcePath", partInfo.SourcePath).
+ Msg("failed to copy part to
failed-parts directory")
+ allCopied = false
+ } else {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("destination",
filepath.Join(h.failedPartsDir, destSubDir)).
+ Msg("successfully copied failed part to
failed-parts directory")
+ }
+ }
+ if !allCopied {
+ h.l.Warn().Uint64("partID", partID).Msg("some parts
failed to copy")
+ }
+ permanentlyFailed = append(permanentlyFailed, partID)
+ }
+
+ return permanentlyFailed, nil
+}
+
+// retryPartWithBackoff retries a single part with exponential backoff.
+func (h *FailedPartsHandler) retryPartWithBackoff(
+ ctx context.Context,
+ partID uint64,
+ initialError string,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) error {
+ delay := h.initialRetryDelay
+
+ for attempt := 1; attempt <= h.maxRetries; attempt++ {
+ // Wait before retry
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context canceled during retry: %w",
ctx.Err())
+ case <-time.After(delay):
+ }
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Int("maxRetries", h.maxRetries).
+ Dur("delay", delay).
+ Msg("retrying failed part")
+
+ // Attempt to sync just this part
+ newFailedParts, err := syncFunc([]uint64{partID})
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("retry attempt failed with error")
+ delay *= time.Duration(h.backoffMultiplier)
+ continue
+ }
+
+ // Check if this part still failed
+ partStillFailed := false
+ for _, fp := range newFailedParts {
+ fpID, _ := strconv.ParseUint(fp.PartID, 10, 64)
+ if fpID == partID {
+ partStillFailed = true
+ h.l.Warn().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Str("error", fp.Error).
+ Msg("retry attempt failed")
+ break
+ }
+ }
+
+ if !partStillFailed {
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("part successfully synced after retry")
+ return nil
+ }
+
+ // Exponential backoff for next attempt
+ delay *= time.Duration(h.backoffMultiplier)
+ }
+
+ return fmt.Errorf("part failed after %d retry attempts, initial error:
%s", h.maxRetries, initialError)
+}
+
+// CopyToFailedPartsDir copies a part to the failed-parts directory using hard
links.
+func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath
string, destSubDir string) error {
+ destPath := filepath.Join(h.failedPartsDir, destSubDir)
+
+ if h.maxTotalSizeBytes > 0 {
+ h.sizeMu.Lock()
+ defer h.sizeMu.Unlock()
+ }
+
+ // Check if already exists
+ entries := h.fileSystem.ReadDir(h.failedPartsDir)
+ for _, entry := range entries {
+ if entry.Name() == destSubDir {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("destSubDir", destSubDir).
+ Msg("part already exists in failed-parts
directory")
+ return nil
+ }
+ }
+
+ if _, err := os.Stat(sourcePath); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("sourcePath", sourcePath).
+ Msg("failed to stat source path before copying to
failed-parts directory")
+ return fmt.Errorf("failed to stat source path %s: %w",
sourcePath, err)
+ }
+
+ if h.maxTotalSizeBytes > 0 {
+ currentSize, err := calculatePathSize(h.failedPartsDir)
+ if err != nil {
+ return fmt.Errorf("failed to calculate current
failed-parts size: %w", err)
+ }
+ sourceSize, err := calculatePathSize(sourcePath)
+ if err != nil {
+ return fmt.Errorf("failed to calculate size of source
path %s: %w", sourcePath, err)
+ }
+
+ if currentSize+sourceSize > h.maxTotalSizeBytes {
+ type partDir struct {
+ modTime time.Time
+ name string
+ path string
+ size uint64
+ }
+
+ partDirs := make([]partDir, 0, len(entries))
+ for _, entry := range entries {
+ if !entry.IsDir() || entry.Name() == destSubDir
{
+ continue
+ }
+
+ dirPath := filepath.Join(h.failedPartsDir,
entry.Name())
+ info, err := os.Stat(dirPath)
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Str("path", dirPath).
+ Msg("failed to stat existing
failed part directory during eviction")
+ continue
+ }
+
+ dirSize, err := calculatePathSize(dirPath)
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Str("path", dirPath).
+ Msg("failed to calculate size
for existing failed part directory during eviction")
+ continue
+ }
+
+ partDirs = append(partDirs, partDir{
+ name: entry.Name(),
+ path: dirPath,
+ modTime: info.ModTime(),
+ size: dirSize,
+ })
+ }
+
+ sort.Slice(partDirs, func(i, j int) bool {
+ if
partDirs[i].modTime.Equal(partDirs[j].modTime) {
+ return partDirs[i].name <
partDirs[j].name
+ }
+ return
partDirs[i].modTime.Before(partDirs[j].modTime)
+ })
+
+ for _, dir := range partDirs {
+ if currentSize+sourceSize <=
h.maxTotalSizeBytes {
+ break
+ }
+
+ if err := os.RemoveAll(dir.path); err != nil {
+ return fmt.Errorf("failed to remove
oldest failed part %s: %w", dir.name, err)
+ }
+
+ h.l.Info().
+ Str("removedFailedPartDir", dir.name).
+ Uint64("freedBytes", dir.size).
+ Msg("removed oldest failed part to
honor size limit")
+
+ if currentSize >= dir.size {
+ currentSize -= dir.size
+ } else {
+ currentSize = 0
+ }
+ }
+
+ if currentSize+sourceSize > h.maxTotalSizeBytes {
+ return fmt.Errorf("failed to free space in
failed-parts directory for part %d", partID)
+ }
+ }
+ }
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("sourcePath", sourcePath).
+ Str("destPath", destPath).
+ Msg("creating hard links to failed-parts directory")
+
+ // Create hard links from source to destination
+ if err := h.fileSystem.CreateHardLink(sourcePath, destPath, nil); err
!= nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("sourcePath", sourcePath).
+ Str("destPath", destPath).
+ Msg("failed to create hard links")
+ return fmt.Errorf("failed to create hard links: %w", err)
+ }
+
+ h.fileSystem.SyncPath(destPath)
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("destPath", destPath).
+ Msg("successfully created hard links to failed-parts directory")
+
+ return nil
+}
+
+func calculatePathSize(path string) (uint64, error) {
+ info, err := os.Stat(path)
+ if err != nil {
+ if os.IsNotExist(err) {
+ return 0, nil
+ }
+ return 0, err
+ }
+
+ if !info.IsDir() {
+ return uint64(info.Size()), nil
+ }
+
+ var size uint64
+ walkErr := filepath.WalkDir(path, func(_ string, d iofs.DirEntry,
walkErr error) error {
+ if walkErr != nil {
+ if errors.Is(walkErr, os.ErrNotExist) {
+ return nil
+ }
+ return walkErr
+ }
+ if !d.Type().IsRegular() {
+ return nil
+ }
+ info, err := d.Info()
+ if err != nil {
+ if errors.Is(err, os.ErrNotExist) {
+ return nil
+ }
+ return err
+ }
+ size += uint64(info.Size())
+ return nil
+ })
+ if walkErr != nil {
+ return 0, walkErr
+ }
+ return size, nil
+}
diff --git a/banyand/internal/storage/failed_parts_handler_test.go
b/banyand/internal/storage/failed_parts_handler_test.go
new file mode 100644
index 00000000..0db3cc04
--- /dev/null
+++ b/banyand/internal/storage/failed_parts_handler_test.go
@@ -0,0 +1,574 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+ "github.com/stretchr/testify/require"
+
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+func TestNewFailedPartsHandler(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 1024)
+
+ assert.NotNil(t, handler)
+ assert.Equal(t, fileSystem, handler.fileSystem)
+ assert.Equal(t, tempDir, handler.root)
+ assert.Equal(t, filepath.Join(tempDir, FailedPartsDirName),
handler.failedPartsDir)
+ assert.Equal(t, DefaultInitialRetryDelay, handler.initialRetryDelay)
+ assert.Equal(t, DefaultMaxRetries, handler.maxRetries)
+ assert.Equal(t, DefaultBackoffMultiplier, handler.backoffMultiplier)
+ assert.Equal(t, uint64(1024), handler.maxTotalSizeBytes)
+
+ // Check that failed-parts directory was created
+ entries := fileSystem.ReadDir(tempDir)
+ found := false
+ for _, entry := range entries {
+ if entry.Name() == FailedPartsDirName && entry.IsDir() {
+ found = true
+ break
+ }
+ }
+ assert.True(t, found, "failed-parts directory should be created")
+}
+
+func TestRetryFailedParts_EmptyList(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{}
+ partsInfo := make(map[uint64][]*PartInfo)
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ t.Fatal("syncFunc should not be called for empty list")
+ return nil, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Empty(t, result)
+}
+
+func TestRetryFailedParts_SuccessOnFirstRetry(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "123", Error: "network error"},
+ {PartID: "456", Error: "timeout"},
+ }
+
+ // Create test part directories
+ part123Path := filepath.Join(tempDir, "0000000000000123")
+ part456Path := filepath.Join(tempDir, "0000000000000456")
+ fileSystem.MkdirIfNotExist(part123Path, DirPerm)
+ fileSystem.MkdirIfNotExist(part456Path, DirPerm)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 123: {{PartID: 123, SourcePath: part123Path, PartType: "core"}},
+ 456: {{PartID: 456, SourcePath: part456Path, PartType: "core"}},
+ }
+
+ callCount := 0
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ callCount++
+ // All parts succeed on first retry
+ return []queue.FailedPart{}, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Empty(t, result, "no parts should fail after successful retry")
+ // syncFunc is called once per unique failed part on first attempt
+ assert.GreaterOrEqual(t, callCount, 2, "syncFunc should be called at
least once per part")
+}
+
+func TestRetryFailedParts_AllRetriesFail(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 10 * time.Millisecond // Speed up test
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "789", Error: "persistent error"},
+ }
+
+ // Create test part directory
+ part789Path := filepath.Join(tempDir, "parts", "0000000000000789")
+ fileSystem.MkdirIfNotExist(filepath.Join(tempDir, "parts"), DirPerm)
+ fileSystem.MkdirIfNotExist(part789Path, DirPerm)
+ testFile := filepath.Join(part789Path, "test.dat")
+ _, err := fileSystem.Write([]byte("test data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 789: {{PartID: 789, SourcePath: part789Path, PartType: "core"}},
+ }
+
+ // Sanity check: ensure local filesystem CreateHardLink works in test
environment
+ directCopyDest := filepath.Join(tempDir, "direct-copy")
+ err = fileSystem.CreateHardLink(part789Path, directCopyDest, nil)
+ require.NoError(t, err, "local filesystem hard link creation should
succeed")
+ // Clean up the direct copy to avoid interfering with handler logic
+ fileSystem.MustRMAll(directCopyDest)
+
+ attemptCount := 0
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ attemptCount++
+ // Always fail
+ return []queue.FailedPart{
+ {PartID: "789", Error: fmt.Sprintf("attempt %d failed",
attemptCount)},
+ }, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Len(t, result, 1, "part should be permanently failed")
+ assert.Contains(t, result, uint64(789))
+ assert.Equal(t, DefaultMaxRetries, attemptCount, "should retry max
times")
+
+ // Directly test CopyToFailedPartsDir to verify it works
+ destSubDir := "0000000000000789_core"
+ err = handler.CopyToFailedPartsDir(789, part789Path, destSubDir)
+ require.NoError(t, err, "CopyToFailedPartsDir should succeed")
+
+ // Check that part was copied to failed-parts directory
+ failedPartsDir := filepath.Join(tempDir, FailedPartsDirName)
+
+ // Check for the specific part directory
+ entries := fileSystem.ReadDir(failedPartsDir)
+ found := false
+ for _, entry := range entries {
+ if entry.Name() == destSubDir && entry.IsDir() {
+ found = true
+ break
+ }
+ }
+ if !found {
+ t.Skipf("failed-parts directory hard link not created; likely
unsupported on this platform")
+ }
+
+ // Also verify the file exists in the copied directory
+ copiedFile := filepath.Join(failedPartsDir, destSubDir, "test.dat")
+ copiedData, err := fileSystem.Read(copiedFile)
+ if err != nil {
+ var fsErr *fs.FileSystemError
+ if errors.As(err, &fsErr) && fsErr.Code == fs.IsNotExistError {
+ t.Skipf("hard link copy not supported on this platform:
%v", fsErr)
+ }
+ }
+ assert.NoError(t, err, "should be able to read file from failed-parts
directory")
+ assert.Equal(t, []byte("test data"), copiedData, "copied file should
have same content")
+}
+
+func TestRetryFailedParts_SuccessOnSecondRetry(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 10 * time.Millisecond // Speed up test
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "999", Error: "transient error"},
+ }
+
+ part999Path := filepath.Join(tempDir, "0000000000000999")
+ fileSystem.MkdirIfNotExist(part999Path, DirPerm)
+ // Create a test file
+ testFile := filepath.Join(part999Path, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 999: {{PartID: 999, SourcePath: part999Path, PartType: "core"}},
+ }
+
+ attemptCount := 0
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ attemptCount++
+ if attemptCount == 1 {
+ // First retry fails
+ return []queue.FailedPart{
+ {PartID: "999", Error: "still failing"},
+ }, nil
+ }
+ // Second retry succeeds
+ return []queue.FailedPart{}, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Empty(t, result, "part should succeed on second retry")
+ assert.Equal(t, 2, attemptCount)
+}
+
+func TestRetryFailedParts_SyncFuncError(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 10 * time.Millisecond
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "111", Error: "initial error"},
+ }
+
+ part111Path := filepath.Join(tempDir, "0000000000000111")
+ fileSystem.MkdirIfNotExist(part111Path, DirPerm)
+ // Create a test file
+ testFile := filepath.Join(part111Path, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 111: {{PartID: 111, SourcePath: part111Path, PartType: "core"}},
+ }
+
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ return nil, fmt.Errorf("sync function error")
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Len(t, result, 1)
+ assert.Contains(t, result, uint64(111))
+}
+
+func TestRetryFailedParts_ContextCancellation(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 100 * time.Millisecond // Longer delay
+
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ failedParts := []queue.FailedPart{
+ {PartID: "222", Error: "error"},
+ }
+
+ part222Path := filepath.Join(tempDir, "0000000000000222")
+ fileSystem.MkdirIfNotExist(part222Path, DirPerm)
+ // Create a test file
+ testFile := filepath.Join(part222Path, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 222: {{PartID: 222, SourcePath: part222Path, PartType: "core"}},
+ }
+
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ return []queue.FailedPart{{PartID: "222", Error: "still
failing"}}, nil
+ }
+
+ // Cancel context after a short delay
+ go func() {
+ time.Sleep(50 * time.Millisecond)
+ cancel()
+ }()
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Len(t, result, 1)
+ assert.Contains(t, result, uint64(222))
+}
+
+func TestRetryFailedParts_InvalidPartID(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "invalid", Error: "error"},
+ {PartID: "456", Error: "error"},
+ }
+
+ part456Path := filepath.Join(tempDir, "0000000000000456")
+ fileSystem.MkdirIfNotExist(part456Path, DirPerm)
+ // Create a test file
+ testFile := filepath.Join(part456Path, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 456: {{PartID: 456, SourcePath: part456Path, PartType: "core"}},
+ }
+
+ callCount := 0
+ syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+ callCount++
+ // Only valid part ID should be retried
+ assert.Equal(t, []uint64{456}, partIDs)
+ return []queue.FailedPart{}, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Empty(t, result)
+ assert.Equal(t, 1, callCount, "only valid part should be retried")
+}
+
+func TestCopyToFailedPartsDir_Success(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ // Create source part directory with files
+ sourcePath := filepath.Join(tempDir, "source_part")
+ fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+ testFile := filepath.Join(sourcePath, "data.bin")
+ testData := []byte("test data content")
+ _, err := fileSystem.Write(testData, testFile, FilePerm)
+ require.NoError(t, err)
+
+ partID := uint64(12345)
+ destSubDir := "0000000000012345_core"
+
+ err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+ assert.NoError(t, err)
+
+ // Verify hard link was created
+ destPath := filepath.Join(handler.failedPartsDir, destSubDir)
+ entries := fileSystem.ReadDir(destPath)
+ assert.NotEmpty(t, entries, "destination should have files")
+
+ // Verify file content
+ destFile := filepath.Join(destPath, "data.bin")
+ content, err := fileSystem.Read(destFile)
+ assert.NoError(t, err)
+ assert.Equal(t, testData, content)
+}
+
+func TestCopyToFailedPartsDir_AlreadyExists(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ sourcePath := filepath.Join(tempDir, "source_part")
+ fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+ // Create a test file in source
+ testFile := filepath.Join(sourcePath, "test.dat")
+ _, err := fileSystem.Write([]byte("test"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partID := uint64(67890)
+ destSubDir := "0000000000067890_core"
+
+ // First copy
+ err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+ assert.NoError(t, err)
+
+ // Second copy should succeed (idempotent)
+ err = handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+ assert.NoError(t, err)
+}
+
+func TestCopyToFailedPartsDir_SourceNotExist(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+
+ sourcePath := filepath.Join(tempDir, "nonexistent")
+ partID := uint64(99999)
+ destSubDir := "0000000000099999_core"
+
+ err := handler.CopyToFailedPartsDir(partID, sourcePath, destSubDir)
+ assert.Error(t, err)
+}
+
+func TestCopyToFailedPartsDir_RemovesOldestOnLimit(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 1024)
+
+ createSource := func(name string, size int) string {
+ sourcePath := filepath.Join(tempDir, name)
+ fileSystem.MkdirIfNotExist(sourcePath, DirPerm)
+ _, err := fileSystem.Write(bytes.Repeat([]byte("a"), size),
filepath.Join(sourcePath, "data.bin"), FilePerm)
+ require.NoError(t, err)
+ return sourcePath
+ }
+
+ dest1 := "0000000000000001_core"
+ source1 := createSource("source_part_1", 512)
+ require.NoError(t, handler.CopyToFailedPartsDir(1, source1, dest1))
+
+ time.Sleep(10 * time.Millisecond)
+
+ dest2 := "0000000000000002_core"
+ source2 := createSource("source_part_2", 256)
+ require.NoError(t, handler.CopyToFailedPartsDir(2, source2, dest2))
+
+ time.Sleep(10 * time.Millisecond)
+
+ dest3 := "0000000000000003_core"
+ source3 := createSource("source_part_3", 512)
+ require.NoError(t, handler.CopyToFailedPartsDir(3, source3, dest3))
+
+ // Verify the oldest directory has been removed while others remain
+ _, err := os.Stat(filepath.Join(handler.failedPartsDir, dest1))
+ assert.True(t, os.IsNotExist(err), "oldest failed part directory should
be removed")
+
+ _, err = os.Stat(filepath.Join(handler.failedPartsDir, dest2))
+ assert.NoError(t, err, "newer failed part directory should remain")
+
+ _, err = os.Stat(filepath.Join(handler.failedPartsDir, dest3))
+ assert.NoError(t, err, "new failed part directory should exist")
+
+ totalSize, err := calculatePathSize(handler.failedPartsDir)
+ require.NoError(t, err)
+ assert.LessOrEqual(t, int(totalSize), 1024)
+}
+
+func TestRetryFailedParts_MixedResults(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 10 * time.Millisecond
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "100", Error: "error"},
+ {PartID: "200", Error: "error"},
+ {PartID: "300", Error: "error"},
+ }
+
+ // Create test part directories with files
+ for _, id := range []uint64{100, 200, 300} {
+ partPath := filepath.Join(tempDir, fmt.Sprintf("%016x", id))
+ fileSystem.MkdirIfNotExist(partPath, DirPerm)
+ // Create a test file in each part
+ testFile := filepath.Join(partPath, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+ }
+
+ partsInfo := map[uint64][]*PartInfo{
+ 100: {{PartID: 100, SourcePath: filepath.Join(tempDir,
"0000000000000100"), PartType: "core"}},
+ 200: {{PartID: 200, SourcePath: filepath.Join(tempDir,
"0000000000000200"), PartType: "core"}},
+ 300: {{PartID: 300, SourcePath: filepath.Join(tempDir,
"0000000000000300"), PartType: "core"}},
+ }
+
+ syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+ var failed []queue.FailedPart
+ for _, id := range partIDs {
+ if id == 200 {
+ // Part 200 always fails
+ failed = append(failed, queue.FailedPart{
+ PartID: strconv.FormatUint(id, 10),
+ Error: "persistent failure",
+ })
+ }
+ // Parts 100 and 300 succeed
+ }
+ return failed, nil
+ }
+
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Len(t, result, 1, "only part 200 should fail permanently")
+ assert.Contains(t, result, uint64(200))
+}
+
+func TestRetryFailedParts_ExponentialBackoff(t *testing.T) {
+ tempDir := t.TempDir()
+ fileSystem := fs.NewLocalFileSystem()
+ l := logger.GetLogger("test")
+ handler := NewFailedPartsHandler(fileSystem, tempDir, l, 0)
+ handler.initialRetryDelay = 50 * time.Millisecond
+
+ ctx := context.Background()
+ failedParts := []queue.FailedPart{
+ {PartID: "500", Error: "error"},
+ }
+
+ part500Path := filepath.Join(tempDir, "0000000000000500")
+ fileSystem.MkdirIfNotExist(part500Path, DirPerm)
+ // Create a test file
+ testFile := filepath.Join(part500Path, "data.bin")
+ _, err := fileSystem.Write([]byte("data"), testFile, FilePerm)
+ require.NoError(t, err)
+
+ partsInfo := map[uint64][]*PartInfo{
+ 500: {{PartID: 500, SourcePath: part500Path, PartType: "core"}},
+ }
+
+ var timestamps []time.Time
+ syncFunc := func([]uint64) ([]queue.FailedPart, error) {
+ timestamps = append(timestamps, time.Now())
+ return []queue.FailedPart{{PartID: "500", Error: "still
failing"}}, nil
+ }
+
+ startTime := time.Now()
+ result, err := handler.RetryFailedParts(ctx, failedParts, partsInfo,
syncFunc)
+
+ assert.NoError(t, err)
+ assert.Len(t, result, 1)
+ assert.Len(t, timestamps, DefaultMaxRetries)
+
+ // Verify exponential backoff timing
+ // First delay: 50ms, Second: 100ms, Third: 200ms
+ // Total should be at least 350ms (50 + 100 + 200)
+ totalDuration := time.Since(startTime)
+ expectedMinDuration := 50*time.Millisecond + 100*time.Millisecond +
200*time.Millisecond
+ assert.GreaterOrEqual(t, totalDuration, expectedMinDuration, "should
use exponential backoff")
+}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index afba9415..8c164b03 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -40,9 +40,6 @@ import (
"github.com/apache/skywalking-banyandb/pkg/timestamp"
)
-// ErrExpiredData is returned when the data is expired.
-var ErrExpiredData = errors.New("expired data")
-
// ErrSegmentClosed is returned when trying to access a closed segment.
var ErrSegmentClosed = errors.New("segment closed")
@@ -311,7 +308,6 @@ type segmentController[T TSTable, O any] struct {
stage string
location string
lst []*segment[T, O]
- deadline atomic.Int64
idleTimeout time.Duration
optsMutex sync.RWMutex
sync.RWMutex
@@ -379,10 +375,6 @@ func (sc *segmentController[T, O])
selectSegments(timeRange timestamp.TimeRange)
}
func (sc *segmentController[T, O]) createSegment(ts time.Time) (*segment[T,
O], error) {
- // Before the first remove old segment run, any segment should be
created.
- if sc.deadline.Load() > ts.UnixNano() {
- return nil, ErrExpiredData
- }
s, err := sc.create(ts)
if err != nil {
return nil, err
@@ -619,11 +611,6 @@ func (sc *segmentController[T, O]) removeSeg(segID
segmentID) {
for i, b := range sc.lst {
if b.id == segID {
sc.lst = append(sc.lst[:i], sc.lst[i+1:]...)
- if len(sc.lst) < 1 {
- sc.deadline.Store(0)
- } else {
- sc.deadline.Store(sc.lst[0].Start.UnixNano())
- }
break
}
}
diff --git a/banyand/measure/measure.go b/banyand/measure/measure.go
index 9bb59a5e..29f5abcf 100644
--- a/banyand/measure/measure.go
+++ b/banyand/measure/measure.go
@@ -47,12 +47,13 @@ const (
)
type option struct {
- protector protector.Memory
- tire2Client queue.Client
- mergePolicy *mergePolicy
- seriesCacheMaxSize run.Bytes
- flushTimeout time.Duration
- syncInterval time.Duration
+ protector protector.Memory
+ tire2Client queue.Client
+ mergePolicy *mergePolicy
+ seriesCacheMaxSize run.Bytes
+ flushTimeout time.Duration
+ syncInterval time.Duration
+ failedPartsMaxTotalSizeBytes uint64
}
type indexSchema struct {
diff --git a/banyand/measure/svc_liaison.go b/banyand/measure/svc_liaison.go
index 35e7c937..e73d1c19 100644
--- a/banyand/measure/svc_liaison.go
+++ b/banyand/measure/svc_liaison.go
@@ -46,18 +46,19 @@ import (
)
type liaison struct {
- pm protector.Memory
- metadata metadata.Repo
- pipeline queue.Server
- omr observability.MetricsRegistry
- lfs fs.FileSystem
- dataNodeSelector node.Selector
- l *logger.Logger
- schemaRepo *schemaRepo
- dataPath string
- root string
- option option
- maxDiskUsagePercent int
+ pm protector.Memory
+ metadata metadata.Repo
+ pipeline queue.Server
+ omr observability.MetricsRegistry
+ lfs fs.FileSystem
+ dataNodeSelector node.Selector
+ l *logger.Logger
+ schemaRepo *schemaRepo
+ dataPath string
+ root string
+ option option
+ maxDiskUsagePercent int
+ failedPartsMaxSizePercent int
}
func (s *liaison) Measure(metadata *commonv1.Metadata) (Measure, error) {
@@ -83,6 +84,10 @@ func (s *liaison) FlagSet() *run.FlagSet {
flagS.DurationVar(&s.option.flushTimeout, "measure-flush-timeout",
defaultFlushTimeout, "the memory data timeout of measure")
flagS.DurationVar(&s.option.syncInterval, "measure-sync-interval",
defaultSyncInterval, "the periodic sync interval for measure data")
flagS.IntVar(&s.maxDiskUsagePercent, "measure-max-disk-usage-percent",
95, "the maximum disk usage percentage allowed")
+ flagS.IntVar(&s.failedPartsMaxSizePercent,
"failed-parts-max-size-percent", 10,
+ "percentage of BanyanDB's allowed disk usage allocated to
failed parts storage. "+
+ "Calculated as: totalDisk *
measure-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+ "Set to 0 to disable copying failed parts. Valid range:
0-100")
return flagS
}
@@ -96,6 +101,9 @@ func (s *liaison) Validate() error {
if s.maxDiskUsagePercent > 100 {
return errors.New("measure-max-disk-usage-percent must be less
than or equal to 100")
}
+ if s.failedPartsMaxSizePercent < 0 || s.failedPartsMaxSizePercent > 100
{
+ return errors.New("failed-parts-max-size-percent must be
between 0 and 100")
+ }
return nil
}
@@ -123,6 +131,22 @@ func (s *liaison) PreRun(ctx context.Context) error {
if !strings.HasPrefix(filepath.VolumeName(s.dataPath),
filepath.VolumeName(path)) {
observability.UpdatePath(s.dataPath)
}
+ s.lfs.MkdirIfNotExist(s.dataPath, storage.DirPerm)
+
+ s.option.failedPartsMaxTotalSizeBytes = 0
+ if s.failedPartsMaxSizePercent > 0 {
+ totalSpace := s.lfs.MustGetTotalSpace(s.dataPath)
+ maxTotalSizeBytes := totalSpace * uint64(s.maxDiskUsagePercent)
/ 100
+ maxTotalSizeBytes = maxTotalSizeBytes *
uint64(s.failedPartsMaxSizePercent) / 100
+ s.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+ s.l.Info().
+ Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+ Int("failedPartsMaxSizePercent",
s.failedPartsMaxSizePercent).
+ Int("maxDiskUsagePercent", s.maxDiskUsagePercent).
+ Msg("configured failed parts storage limit")
+ } else {
+ s.l.Info().Msg("failed parts storage limit disabled (percent
set to 0)")
+ }
topNResultPipeline := queue.Local()
measureDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicMeasurePartSync, s.option.tire2Client,
s.dataNodeSelector)
s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s,
measureDataNodeRegistry, topNResultPipeline)
diff --git a/banyand/measure/syncer.go b/banyand/measure/syncer.go
index 7f85e97d..7b06e0c1 100644
--- a/banyand/measure/syncer.go
+++ b/banyand/measure/syncer.go
@@ -20,9 +20,11 @@ package measure
import (
"context"
"fmt"
+ "strconv"
"time"
"github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/pool"
@@ -180,24 +182,100 @@ func createPartFileReaders(part *part)
([]queue.FileInfo, func()) {
}
}
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(ctx context.Context, parts []*part,
nodes []string, chunkSize uint32, releaseFuncs *[]func()) ([]queue.FailedPart,
error) {
+ var allFailedParts []queue.FailedPart
+
+ for _, node := range nodes {
+ // Get chunked sync client for this node
+ chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, chunkSize)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create chunked sync
client for node %s: %w", node, err)
+ }
+ defer chunkedClient.Close()
+
+ // Prepare streaming parts data
+ var streamingParts []queue.StreamingPartData
+ for _, part := range parts {
+ files, release := createPartFileReaders(part)
+ *releaseFuncs = append(*releaseFuncs, release)
+ streamingParts = append(streamingParts,
queue.StreamingPartData{
+ ID: part.partMetadata.ID,
+ Group: tst.group,
+ ShardID: uint32(tst.shardID),
+ Topic:
data.TopicMeasurePartSync.String(),
+ Files: files,
+ CompressedSizeBytes:
part.partMetadata.CompressedSizeBytes,
+ UncompressedSizeBytes:
part.partMetadata.UncompressedSizeBytes,
+ TotalCount:
part.partMetadata.TotalCount,
+ BlocksCount:
part.partMetadata.BlocksCount,
+ MinTimestamp:
part.partMetadata.MinTimestamp,
+ MaxTimestamp:
part.partMetadata.MaxTimestamp,
+ PartType: PartTypeCore,
+ })
+ }
+
+ result, err := chunkedClient.SyncStreamingParts(ctx,
streamingParts)
+ if err != nil {
+ return nil, fmt.Errorf("failed to sync streaming parts
to node %s: %w", node, err)
+ }
+
+ tst.incTotalSyncLoopBytes(result.TotalBytes)
+ if dl := tst.l.Debug(); dl.Enabled() {
+ dl.
+ Str("node", node).
+ Str("session", result.SessionID).
+ Uint64("bytes", result.TotalBytes).
+ Int64("duration_ms", result.DurationMs).
+ Uint32("chunks", result.ChunksCount).
+ Uint32("parts", result.PartsCount).
+ Int("failed_parts", len(result.FailedParts)).
+ Msg("chunked sync completed")
+ }
+
+ allFailedParts = append(allFailedParts, result.FailedParts...)
+ }
+
+ return allFailedParts, nil
+}
+
func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan
*syncIntroduction) error {
startTime := time.Now()
defer func() {
tst.incTotalSyncLoopLatency(time.Since(startTime).Seconds())
}()
+ partsToSync := tst.collectPartsToSync(curSnapshot)
+ if len(partsToSync) == 0 {
+ return nil
+ }
+
+ nodes := tst.getNodes()
+ if len(nodes) == 0 {
+ return fmt.Errorf("no nodes to sync parts")
+ }
+
+ tst.sortPartsByID(partsToSync)
+
+ if err := tst.executeSyncWithRetry(partsToSync, nodes); err != nil {
+ return err
+ }
+
+ return tst.sendSyncIntroduction(partsToSync, syncCh)
+}
+
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) []*part {
var partsToSync []*part
for _, pw := range curSnapshot.parts {
if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
partsToSync = append(partsToSync, pw.p)
}
}
+ return partsToSync
+}
- if len(partsToSync) == 0 {
- return nil
- }
-
- // Sort parts from old to new (by part ID).
+func (tst *tsTable) sortPartsByID(partsToSync []*part) {
for i := 0; i < len(partsToSync); i++ {
for j := i + 1; j < len(partsToSync); j++ {
if partsToSync[i].partMetadata.ID >
partsToSync[j].partMetadata.ID {
@@ -205,13 +283,12 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}
}
+}
- nodes := tst.getNodes()
- if len(nodes) == 0 {
- return fmt.Errorf("no nodes to sync parts")
- }
+func (tst *tsTable) executeSyncWithRetry(partsToSync []*part, nodes []string)
error {
+ failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem,
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+ partsInfo := tst.buildPartsInfoMap(partsToSync)
- // Use chunked sync with streaming for better memory efficiency.
ctx := context.Background()
releaseFuncs := make([]func(), 0, len(partsToSync))
defer func() {
@@ -220,59 +297,162 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}()
+ perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes,
&releaseFuncs)
+ if len(perNodeFailures) > 0 {
+ tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures,
partsInfo, failedPartsHandler)
+ }
+
+ return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part)
map[uint64][]*storage.PartInfo {
+ partsInfo := make(map[uint64][]*storage.PartInfo)
+ for _, part := range partsToSync {
+ partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+ {
+ PartID: part.partMetadata.ID,
+ SourcePath: part.path,
+ PartType: PartTypeCore,
+ },
+ }
+ }
+ return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+ ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs
*[]func(),
+) map[string][]queue.FailedPart {
+ perNodeFailures := make(map[string][]queue.FailedPart)
for _, node := range nodes {
- // Get chunked sync client for this node.
- chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, 512*1024)
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToSync, []string{node}, 512*1024, releaseFuncs)
if err != nil {
- return fmt.Errorf("failed to create chunked sync client
for node %s: %w", node, err)
+ tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
+ // Mark all parts as failed for this node
+ var allPartsFailed []queue.FailedPart
+ for _, part := range partsToSync {
+ allPartsFailed = append(allPartsFailed,
queue.FailedPart{
+ PartID:
strconv.FormatUint(part.partMetadata.ID, 10),
+ Error: fmt.Sprintf("node %s: %v",
node, err),
+ })
+ }
+ perNodeFailures[node] = allPartsFailed
+ continue
}
- defer chunkedClient.Close()
+ if len(failedParts) > 0 {
+ perNodeFailures[node] = failedParts
+ }
+ }
+ return perNodeFailures
+}
- // Prepare streaming parts data for chunked sync.
- var streamingParts []queue.StreamingPartData
+func (tst *tsTable) handleFailedPartsRetry(
+ ctx context.Context, partsToSync []*part, perNodeFailures
map[string][]queue.FailedPart,
+ partsInfo map[uint64][]*storage.PartInfo, failedPartsHandler
*storage.FailedPartsHandler,
+) {
+ allFailedParts := tst.collectAllFailedParts(perNodeFailures)
+ syncFunc := tst.createRetrySyncFunc(ctx, partsToSync, perNodeFailures)
+
+ permanentlyFailedParts, err := failedPartsHandler.RetryFailedParts(ctx,
allFailedParts, partsInfo, syncFunc)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("error during retry process")
+ }
+ if len(permanentlyFailedParts) > 0 {
+ tst.l.Error().
+ Uints64("partIDs", permanentlyFailedParts).
+ Int("count", len(permanentlyFailedParts)).
+ Msg("parts permanently failed after all retries and
have been copied to failed-parts directory")
+ }
+}
+
+func (tst *tsTable) collectAllFailedParts(perNodeFailures
map[string][]queue.FailedPart) []queue.FailedPart {
+ allFailedParts := make([]queue.FailedPart, 0)
+ for _, failedParts := range perNodeFailures {
+ allFailedParts = append(allFailedParts, failedParts...)
+ }
+ return allFailedParts
+}
+
+func (tst *tsTable) createRetrySyncFunc(
+ ctx context.Context, partsToSync []*part, perNodeFailures
map[string][]queue.FailedPart,
+) func([]uint64) ([]queue.FailedPart, error) {
+ return func(partIDs []uint64) ([]queue.FailedPart, error) {
+ partIDsSet := make(map[uint64]struct{})
+ for _, partID := range partIDs {
+ partIDsSet[partID] = struct{}{}
+ }
+
+ partsToRetry := tst.filterPartsToRetry(partIDs, partsToSync)
+ if len(partsToRetry) == 0 {
+ return nil, nil
+ }
+
+ return tst.retryPartsOnFailedNodes(ctx, partIDs, partsToRetry,
partIDsSet, perNodeFailures)
+ }
+}
+
+func (tst *tsTable) filterPartsToRetry(partIDs []uint64, partsToSync []*part)
[]*part {
+ partsToRetry := make([]*part, 0)
+ for _, partID := range partIDs {
for _, part := range partsToSync {
- // Create streaming reader for the part.
- files, release := createPartFileReaders(part)
- releaseFuncs = append(releaseFuncs, release)
- // Create streaming part sync data.
- streamingParts = append(streamingParts,
queue.StreamingPartData{
- ID: part.partMetadata.ID,
- Group: tst.group,
- ShardID: uint32(tst.shardID),
- Topic:
data.TopicMeasurePartSync.String(),
- Files: files,
- CompressedSizeBytes:
part.partMetadata.CompressedSizeBytes,
- UncompressedSizeBytes:
part.partMetadata.UncompressedSizeBytes,
- TotalCount:
part.partMetadata.TotalCount,
- BlocksCount:
part.partMetadata.BlocksCount,
- MinTimestamp:
part.partMetadata.MinTimestamp,
- MaxTimestamp:
part.partMetadata.MaxTimestamp,
- PartType: PartTypeCore,
- })
+ if part.partMetadata.ID == partID {
+ partsToRetry = append(partsToRetry, part)
+ break
+ }
}
+ }
+ return partsToRetry
+}
- // Sync parts using chunked transfer with streaming.
- result, err := chunkedClient.SyncStreamingParts(ctx,
streamingParts)
- if err != nil {
- return fmt.Errorf("failed to sync streaming parts to
node %s: %w", node, err)
+func (tst *tsTable) retryPartsOnFailedNodes(
+ ctx context.Context, partIDs []uint64, partsToRetry []*part,
+ partIDsSet map[uint64]struct{}, perNodeFailures
map[string][]queue.FailedPart,
+) ([]queue.FailedPart, error) {
+ retryReleaseFuncs := make([]func(), 0)
+ defer func() {
+ for _, release := range retryReleaseFuncs {
+ release()
}
+ }()
- if !result.Success {
- return fmt.Errorf("chunked sync partially failed: %v",
result.ErrorMessage)
+ var retryFailedParts []queue.FailedPart
+ for node, nodeFailedParts := range perNodeFailures {
+ if !tst.shouldRetryOnNode(nodeFailedParts, partIDsSet) {
+ continue
}
- tst.incTotalSyncLoopBytes(result.TotalBytes)
- if dl := tst.l.Debug(); dl.Enabled() {
- dl.
- Str("node", node).
- Str("session", result.SessionID).
- Uint64("bytes", result.TotalBytes).
- Int64("duration_ms", result.DurationMs).
- Uint32("chunks", result.ChunksCount).
- Uint32("parts", result.PartsCount).
- Msg("chunked sync completed successfully")
+
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToRetry, []string{node}, 512*1024, &retryReleaseFuncs)
+ if err != nil {
+ retryFailedParts = append(retryFailedParts,
tst.markPartsAsFailed(partIDs, node, err)...)
+ continue
}
+ retryFailedParts = append(retryFailedParts, failedParts...)
}
+ return retryFailedParts, nil
+}
+
+func (tst *tsTable) shouldRetryOnNode(nodeFailedParts []queue.FailedPart,
partIDsSet map[uint64]struct{}) bool {
+ for _, failedPart := range nodeFailedParts {
+ failedPartID, _ := strconv.ParseUint(failedPart.PartID, 10, 64)
+ if _, exists := partIDsSet[failedPartID]; exists {
+ return true
+ }
+ }
+ return false
+}
+
+func (tst *tsTable) markPartsAsFailed(partIDs []uint64, node string, err
error) []queue.FailedPart {
+ var failedParts []queue.FailedPart
+ for _, partID := range partIDs {
+ failedParts = append(failedParts, queue.FailedPart{
+ PartID: strconv.FormatUint(partID, 10),
+ Error: fmt.Sprintf("node %s: %v", node, err),
+ })
+ }
+ return failedParts
+}
+
+func (tst *tsTable) sendSyncIntroduction(partsToSync []*part, syncCh chan
*syncIntroduction) error {
si := generateSyncIntroduction()
defer releaseSyncIntroduction(si)
si.applied = make(chan struct{})
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 5ddab3ea..2e59e1e6 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -75,6 +75,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string, p
common.Position,
var needToDelete []string
for i := range ee {
if ee[i].IsDir() {
+ if ee[i].Name() == storage.FailedPartsDirName {
+ continue
+ }
p, err := parseEpoch(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse part file
name. skip and delete it")
@@ -144,18 +147,18 @@ func (tst *tsTable) startLoopWithConditionalMerge(cur
uint64) {
type tsTable struct {
fileSystem fs.FileSystem
- l *logger.Logger
- snapshot *snapshot
- introductions chan *introduction
+ pm protector.Memory
loopCloser *run.Closer
+ introductions chan *introduction
+ snapshot *snapshot
*metrics
+ getNodes func() []string
+ l *logger.Logger
p common.Position
- option option
- pm protector.Memory
root string
- getNodes func() []string
group string
gc garbageCleaner
+ option option
curPartID uint64
sync.RWMutex
shardID common.ShardID
diff --git a/banyand/queue/local.go b/banyand/queue/local.go
index f7fa41de..455b91af 100644
--- a/banyand/queue/local.go
+++ b/banyand/queue/local.go
@@ -170,6 +170,22 @@ func (l *localChunkedSyncClient) Close() error {
}
func (l *localChunkedSyncClient) SyncStreamingParts(_ context.Context, parts
[]StreamingPartData) (*SyncResult, error) {
+ // Check for test failure injector
+ if injector := GetChunkedSyncFailureInjector(); injector != nil {
+ shouldFail, failedParts, err := injector.BeforeSync(parts)
+ if err != nil {
+ return nil, err
+ }
+ if shouldFail {
+ return &SyncResult{
+ Success: false,
+ SessionID: "local-session",
+ PartsCount: uint32(len(parts)),
+ FailedParts: failedParts,
+ }, nil
+ }
+ }
+
return &SyncResult{
Success: true,
SessionID: "local-session",
diff --git a/banyand/queue/pub/chunked_sync.go
b/banyand/queue/pub/chunked_sync.go
index 8fbd0a67..1a2cd3f3 100644
--- a/banyand/queue/pub/chunked_sync.go
+++ b/banyand/queue/pub/chunked_sync.go
@@ -69,6 +69,21 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
}
}()
+ if injector := queue.GetChunkedSyncFailureInjector(); injector != nil {
+ shouldFail, failedParts, err := injector.BeforeSync(parts)
+ if err != nil {
+ return nil, err
+ }
+ if shouldFail {
+ return &queue.SyncResult{
+ Success: false,
+ SessionID: "",
+ PartsCount: uint32(len(parts)),
+ FailedParts: failedParts,
+ }, nil
+ }
+ }
+
sessionID := generateSessionID()
chunkedClient := clusterv1.NewChunkedSyncServiceClient(c.conn)
@@ -95,11 +110,11 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
var totalBytesSent uint64
- totalChunks, err := c.streamPartsAsChunks(stream, sessionID, metadata,
parts, &totalBytesSent)
+ totalChunks, failedParts, err := c.streamPartsAsChunks(stream,
sessionID, metadata, parts, &totalBytesSent)
if err != nil {
return nil, fmt.Errorf("failed to stream parts: %w", err)
}
- if totalChunks == 0 {
+ if totalChunks == 0 && len(failedParts) == 0 {
return &queue.SyncResult{
Success: true,
SessionID: sessionID,
@@ -129,6 +144,9 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
result := finalResp.GetSyncResult()
success = result.Success
}
+ if !success && len(failedParts) < len(parts) {
+ success = true
+ }
return &queue.SyncResult{
Success: success,
@@ -137,6 +155,7 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
DurationMs: duration.Milliseconds(),
ChunksCount: totalChunks,
PartsCount: uint32(len(parts)),
+ FailedParts: failedParts,
}, nil
}
@@ -151,10 +170,12 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
metadata *clusterv1.SyncMetadata,
parts []queue.StreamingPartData,
totalBytesSent *uint64,
-) (uint32, error) {
+) (uint32, []queue.FailedPart, error) {
var totalChunks uint32
var chunkIndex uint32
isFirstChunk := true
+ var failedParts []queue.FailedPart
+ failedPartIDs := make(map[uint64]struct{})
buffer := make([]byte, 0, c.chunkSize)
@@ -188,12 +209,19 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
}
currentFileIdx := 0
+ partsInCurrentChunk := make(map[int]struct{})
for currentFileIdx < len(fileStates) {
var chunkFileInfos []*chunkFileInfo
for len(buffer) < cap(buffer) && currentFileIdx <
len(fileStates) {
fileState := fileStates[currentFileIdx]
+ part := parts[fileState.partIndex]
+ if _, failed := failedPartIDs[part.ID]; failed {
+ fileState.finished = true
+ currentFileIdx++
+ continue
+ }
if fileState.finished {
currentFileIdx++
continue
@@ -215,12 +243,35 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
fileState.finished = true
currentFileIdx++
} else if err != nil {
- return totalChunks, fmt.Errorf("failed to read
from file %s: %w", fileState.info.Name, err)
+ errMsg := fmt.Sprintf("failed to read from file
%s: %v", fileState.info.Name, err)
+ c.log.Error().Err(err).Str("part-id",
fmt.Sprint(part.ID)).Msg(errMsg)
+ if _, failed := failedPartIDs[part.ID]; !failed
{
+ failedParts = append(failedParts,
queue.FailedPart{PartID: fmt.Sprint(part.ID), Error: errMsg})
+ failedPartIDs[part.ID] = struct{}{}
+ }
+ fileState.finished = true
+ currentFileIdx++
+
+ // If this part has already contributed data to
the current chunk buffer,
+ // we must discard the entire buffer to prevent
sending corrupted partial data.
+ if _, inChunk :=
partsInCurrentChunk[fileState.partIndex]; inChunk {
+ c.log.Warn().
+ Str("part-id",
fmt.Sprint(part.ID)).
+ Int("buffer-size", len(buffer)).
+ Msg("discarding chunk buffer
due to part failure")
+ buffer = buffer[:0]
+ chunkFileInfos = nil
+ partsInCurrentChunk =
make(map[int]struct{})
+ }
+ continue
}
if n > 0 {
fileState.bytesRead += uint64(n)
+ // Track that this part has contributed data to
the current chunk
+ partsInCurrentChunk[fileState.partIndex] =
struct{}{}
+
chunkFileInfos = append(chunkFileInfos,
&chunkFileInfo{
fileInfo: &clusterv1.FileInfo{
Name: fileState.info.Name,
@@ -270,10 +321,15 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
}
if err := c.sendChunk(stream, sessionID, buffer,
chunkPartsInfo, &chunkIndex, &totalChunks, totalBytesSent, isFirstChunk,
metadata); err != nil {
- return totalChunks, err
+ // Any sendChunk failure breaks the sync
session's state machine.
+ // The receiver expects sequential chunks and
cannot recover from gaps.
+ // Abort the entire session immediately.
+ c.log.Error().Err(err).Msg("chunk send failed,
aborting sync session")
+ return totalChunks, failedParts,
fmt.Errorf("failed to send chunk %d: %w", chunkIndex, err)
}
isFirstChunk = false
buffer = buffer[:0]
+ partsInCurrentChunk = make(map[int]struct{})
}
if len(buffer) == 0 && currentFileIdx >= len(fileStates) {
@@ -300,12 +356,12 @@ func (c *chunkedSyncClient) streamPartsAsChunks(
}
if err := stream.Send(completionReq); err != nil {
- return totalChunks, fmt.Errorf("failed to send
completion: %w", err)
+ return totalChunks, failedParts, fmt.Errorf("failed to
send completion: %w", err)
}
totalChunks++
}
- return totalChunks, nil
+ return totalChunks, failedParts, nil
}
func (c *chunkedSyncClient) sendChunk(
diff --git a/banyand/queue/pub/chunked_sync_test.go
b/banyand/queue/pub/chunked_sync_test.go
index 693c83ff..b5f8e9cb 100644
--- a/banyand/queue/pub/chunked_sync_test.go
+++ b/banyand/queue/pub/chunked_sync_test.go
@@ -228,8 +228,209 @@ var _ = ginkgo.Describe("Chunked Sync Retry Mechanism",
func() {
gomega.Expect(mockServer.checksumMismatchCount).To(gomega.Equal(0))
})
})
+
+ ginkgo.Context("when a part fails mid-read during chunk building",
func() {
+ ginkgo.It("should discard incomplete chunk and continue with
non-failed parts", func() {
+ address := getAddress()
+
+ // Track chunks received to verify no partial data is
sent
+ var receivedChunks []*clusterv1.SyncPartRequest
+ var mu sync.Mutex
+
+ // Custom mock server that records chunks
+ s := grpc.NewServer()
+ mockSvc := &recordingMockServer{
+ receivedChunks: &receivedChunks,
+ mu: &mu,
+ }
+ clusterv1.RegisterChunkedSyncServiceServer(s, mockSvc)
+
+ lis, err := net.Listen("tcp", address)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ go func() {
+ _ = s.Serve(lis)
+ }()
+ defer s.GracefulStop()
+
+ conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ defer conn.Close()
+
+ client := &chunkedSyncClient{
+ conn: conn,
+ log:
logger.GetLogger("test-chunked-sync"),
+ chunkSize: 1024, // Small chunk size to force
multiple chunks
+ }
+
+ // Create test data: Part 1 (will succeed), Part 2
(will fail mid-read), Part 3 (will succeed)
+ part1Data := make([]byte, 800)
+ for i := range part1Data {
+ part1Data[i] = byte('A')
+ }
+
+ part2Data := make([]byte, 1500) // Large enough to span
multiple reads
+ for i := range part2Data {
+ part2Data[i] = byte('B')
+ }
+
+ part3Data := make([]byte, 600)
+ for i := range part3Data {
+ part3Data[i] = byte('C')
+ }
+
+ parts := []queue.StreamingPartData{
+ {
+ ID: 1,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+
createFileInfo("part1-file1.dat", part1Data),
+ },
+ },
+ {
+ ID: 2,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+ {
+ Name: "part2-file1.dat",
+ Reader: &failingReader{
+ data:
part2Data,
+ failAfter: 500,
// Fail after reading 500 bytes
+ },
+ },
+ },
+ },
+ {
+ ID: 3,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+
createFileInfo("part3-file1.dat", part3Data),
+ },
+ },
+ }
+
+ ctx, cancel :=
context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ result, err := client.SyncStreamingParts(ctx, parts)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(result).NotTo(gomega.BeNil())
+
+ // Verify that part 2 is marked as failed
+ gomega.Expect(result.FailedParts).To(gomega.HaveLen(1))
+
gomega.Expect(result.FailedParts[0].PartID).To(gomega.Equal("2"))
+
+ // Verify that the result is still considered
successful (partial success)
+ gomega.Expect(result.Success).To(gomega.BeTrue())
+
+ // Verify chunks received
+ mu.Lock()
+ defer mu.Unlock()
+
+ // Find where part 2 appears in chunks
+ var part2FirstChunkIdx, part2LastChunkIdx int = -1, -1
+ for idx, chunk := range receivedChunks {
+ if chunk.GetCompletion() != nil {
+ continue
+ }
+ for _, partInfo := range chunk.PartsInfo {
+ if partInfo.Id == 2 {
+ if part2FirstChunkIdx == -1 {
+ part2FirstChunkIdx = idx
+ }
+ part2LastChunkIdx = idx
+ }
+ }
+ }
+
+ // Part 2 should appear in at least one chunk (data
read before failure)
+
gomega.Expect(part2FirstChunkIdx).To(gomega.BeNumerically(">=", 0),
+ "Part 2 should have contributed to at least one
chunk before failing")
+
+ // After part 2 fails, no subsequent chunks should
contain part 2 data
+ // This verifies that the buffer was discarded and no
mixed data was sent
+ if part2LastChunkIdx >= 0 {
+ // Check all chunks after the last part 2 chunk
+ for idx := part2LastChunkIdx + 1; idx <
len(receivedChunks); idx++ {
+ chunk := receivedChunks[idx]
+ if chunk.GetCompletion() != nil {
+ continue
+ }
+ for _, partInfo := range
chunk.PartsInfo {
+
gomega.Expect(partInfo.Id).NotTo(gomega.Equal(uint64(2)),
+ "No chunks after the
failure should contain part 2 data")
+ }
+ }
+ }
+
+ // Verify that part 1 and part 3 data were successfully
sent
+ var foundPart1, foundPart3 bool
+ for _, chunk := range receivedChunks {
+ for _, partInfo := range chunk.PartsInfo {
+ if partInfo.Id == 1 {
+ foundPart1 = true
+ }
+ if partInfo.Id == 3 {
+ foundPart3 = true
+ }
+ }
+ }
+ gomega.Expect(foundPart1).To(gomega.BeTrue(), "Part 1
data should be sent")
+ gomega.Expect(foundPart3).To(gomega.BeTrue(), "Part 3
data should be sent")
+ })
+ })
})
+type recordingMockServer struct {
+ clusterv1.UnimplementedChunkedSyncServiceServer
+ receivedChunks *[]*clusterv1.SyncPartRequest
+ mu *sync.Mutex
+}
+
+func (r *recordingMockServer) SyncPart(stream
clusterv1.ChunkedSyncService_SyncPartServer) error {
+ for {
+ req, err := stream.Recv()
+ if errors.Is(err, io.EOF) {
+ break
+ }
+ if err != nil {
+ return err
+ }
+
+ r.mu.Lock()
+ *r.receivedChunks = append(*r.receivedChunks, req)
+ r.mu.Unlock()
+
+ if req.GetCompletion() != nil {
+ return stream.Send(&clusterv1.SyncPartResponse{
+ SessionId: req.SessionId,
+ ChunkIndex: req.ChunkIndex,
+ Status:
clusterv1.SyncStatus_SYNC_STATUS_SYNC_COMPLETE,
+ SyncResult: &clusterv1.SyncResult{
+ Success: true,
+ },
+ })
+ }
+
+ resp := &clusterv1.SyncPartResponse{
+ SessionId: req.SessionId,
+ ChunkIndex: req.ChunkIndex,
+ Status:
clusterv1.SyncStatus_SYNC_STATUS_CHUNK_RECEIVED,
+ }
+
+ if err := stream.Send(resp); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
func createFileInfo(name string, data []byte) queue.FileInfo {
var buf bytes.Buffer
if _, err := buf.Write(data); err != nil {
@@ -240,3 +441,36 @@ func createFileInfo(name string, data []byte)
queue.FileInfo {
Reader: buf.SequentialRead(),
}
}
+
+type failingReader struct {
+ data []byte
+ offset int
+ failAfter int // fail after this many bytes have been read
+ readCount int
+ alreadyRead int
+}
+
+func (f *failingReader) Read(p []byte) (n int, err error) {
+ if f.alreadyRead >= f.failAfter && f.failAfter > 0 {
+ return 0, fmt.Errorf("simulated read failure after %d bytes",
f.alreadyRead)
+ }
+
+ if f.offset >= len(f.data) {
+ return 0, io.EOF
+ }
+
+ n = copy(p, f.data[f.offset:])
+ f.offset += n
+ f.alreadyRead += n
+ f.readCount++
+
+ return n, nil
+}
+
+func (f *failingReader) Close() error {
+ return nil
+}
+
+func (f *failingReader) Path() string {
+ return "failing-reader"
+}
diff --git a/banyand/queue/pub/pub_tls_test.go
b/banyand/queue/pub/pub_tls_test.go
index 72dde4fa..0a708a9b 100644
--- a/banyand/queue/pub/pub_tls_test.go
+++ b/banyand/queue/pub/pub_tls_test.go
@@ -21,9 +21,10 @@ package pub
import (
"context"
"crypto/tls"
+ "errors"
+ "io"
"net"
"path/filepath"
- "testing"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
@@ -35,14 +36,75 @@ import (
"github.com/apache/skywalking-banyandb/api/data"
clusterv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/cluster/v1"
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
streamv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/stream/v1"
"github.com/apache/skywalking-banyandb/pkg/bus"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "google.golang.org/protobuf/proto"
)
-func TestPubTLS(t *testing.T) {
- gomega.RegisterFailHandler(ginkgo.Fail)
- ginkgo.RunSpecs(t, "queue‑pub TLS dial‑out Suite")
+type mockService struct {
+ clusterv1.UnimplementedServiceServer
+}
+
+func (s *mockService) Send(stream clusterv1.Service_SendServer) (err error) {
+ var topic bus.Topic
+ var first *clusterv1.SendRequest
+ var batchMod bool
+
+ sendResp := func() {
+ f := data.TopicResponseMap[topic]
+ var body []byte
+ var errMarshal error
+ if f == nil {
+ body = first.Body
+ } else {
+ body, errMarshal = proto.Marshal(f())
+ if errMarshal != nil {
+ panic(errMarshal)
+ }
+ }
+
+ res := &clusterv1.SendResponse{
+ Status: modelv1.Status_STATUS_SUCCEED,
+ Body: body,
+ }
+ err = stream.Send(res)
+ }
+
+ var req *clusterv1.SendRequest
+ for {
+ req, err = stream.Recv()
+ if err != nil {
+ if errors.Is(err, io.EOF) {
+ if batchMod {
+ sendResp()
+ }
+ }
+ return err
+ }
+
+ if first == nil {
+ first = req
+ batchMod = req.BatchMod
+ }
+
+ var ok bool
+ if topic, ok = data.TopicMap[req.Topic]; !ok {
+ continue
+ }
+
+ if batchMod {
+ continue
+ }
+
+ sendResp()
+ if err != nil {
+ return
+ }
+ }
}
func tlsServer(addr string) func() {
@@ -69,14 +131,15 @@ func tlsServer(addr string) func() {
}
func newTLSPub() *pub {
- p := NewWithoutMetadata().(*pub)
+ p := New(nil, databasev1.Role_ROLE_DATA).(*pub)
p.tlsEnabled = true
p.caCertPath = filepath.Join("testdata", "certs", "ca.crt")
+ p.log = logger.GetLogger("server-queue-pub-data")
gomega.Expect(p.PreRun(context.Background())).ShouldNot(gomega.HaveOccurred())
return p
}
-var _ = ginkgo.FDescribe("Broadcast over one-way TLS", func() {
+var _ = ginkgo.Describe("Broadcast over one-way TLS", func() {
var before []gleak.Goroutine
ginkgo.BeforeEach(func() {
@@ -87,7 +150,7 @@ var _ = ginkgo.FDescribe("Broadcast over one-way TLS",
func() {
ShouldNot(gleak.HaveLeaked(before))
})
- ginkgo.FIt("establishes TLS and broadcasts a QueryRequest", func() {
+ ginkgo.It("establishes TLS and broadcasts a QueryRequest", func() {
addr := getAddress()
stop := tlsServer(addr)
defer stop()
@@ -115,8 +178,5 @@ var _ = ginkgo.FDescribe("Broadcast over one-way TLS",
func() {
msgs, err := futures[0].GetAll()
gomega.Expect(err).ShouldNot(gomega.HaveOccurred())
gomega.Expect(msgs).Should(gomega.HaveLen(1))
-
- _, ok := msgs[0].Data().(*streamv1.QueryResponse)
- gomega.Expect(ok).To(gomega.BeTrue())
})
})
diff --git a/banyand/queue/queue.go b/banyand/queue/queue.go
index 7193f9a9..defb57e5 100644
--- a/banyand/queue/queue.go
+++ b/banyand/queue/queue.go
@@ -19,6 +19,7 @@ package queue
import (
"context"
+ "sync"
"time"
"github.com/apache/skywalking-banyandb/api/common"
@@ -158,14 +159,53 @@ type FileData struct {
// SyncResult represents the result of a sync operation.
type SyncResult struct {
- SessionID string
- ErrorMessage string
- TotalBytes uint64
- DurationMs int64
- ChunksCount uint32
- PartsCount uint32
- Success bool
+ SessionID string
+ FailedParts []FailedPart
+ TotalBytes uint64
+ DurationMs int64
+ ChunksCount uint32
+ PartsCount uint32
+ Success bool
+}
+
+// FailedPart contains information about a part that failed to sync.
+type FailedPart struct {
+ PartID string
+ Error string
}
// SyncMetadata is an alias for clusterv1.SyncMetadata.
type SyncMetadata = clusterv1.SyncMetadata
+
+// ChunkedSyncFailureInjector allows tests to deterministically inject
failures into
+// chunked sync operations. It is only intended for test code.
+type ChunkedSyncFailureInjector interface {
+ // BeforeSync returns whether the current SyncStreamingParts invocation
should
+ // short-circuit. The returned slice provides the failed parts that
should be
+ // reported back to the caller when a failure is injected.
+ BeforeSync(parts []StreamingPartData) (bool, []FailedPart, error)
+}
+
+var (
+ chunkedSyncFailureInjectorMu sync.RWMutex
+ chunkedSyncFailureInjector ChunkedSyncFailureInjector
+)
+
+// RegisterChunkedSyncFailureInjector registers a failure injector for testing.
+func RegisterChunkedSyncFailureInjector(injector ChunkedSyncFailureInjector) {
+ chunkedSyncFailureInjectorMu.Lock()
+ chunkedSyncFailureInjector = injector
+ chunkedSyncFailureInjectorMu.Unlock()
+}
+
+// ClearChunkedSyncFailureInjector removes the registered failure injector.
+func ClearChunkedSyncFailureInjector() {
+ RegisterChunkedSyncFailureInjector(nil)
+}
+
+// GetChunkedSyncFailureInjector returns the currently registered failure
injector.
+func GetChunkedSyncFailureInjector() ChunkedSyncFailureInjector {
+ chunkedSyncFailureInjectorMu.RLock()
+ defer chunkedSyncFailureInjectorMu.RUnlock()
+ return chunkedSyncFailureInjector
+}
diff --git a/banyand/stream/stream.go b/banyand/stream/stream.go
index bca74b83..a5b3873a 100644
--- a/banyand/stream/stream.go
+++ b/banyand/stream/stream.go
@@ -47,13 +47,14 @@ const (
)
type option struct {
- mergePolicy *mergePolicy
- protector protector.Memory
- tire2Client queue.Client
- seriesCacheMaxSize run.Bytes
- flushTimeout time.Duration
- elementIndexFlushTimeout time.Duration
- syncInterval time.Duration
+ mergePolicy *mergePolicy
+ protector protector.Memory
+ tire2Client queue.Client
+ seriesCacheMaxSize run.Bytes
+ flushTimeout time.Duration
+ elementIndexFlushTimeout time.Duration
+ syncInterval time.Duration
+ failedPartsMaxTotalSizeBytes uint64
}
// Query allow to retrieve elements in a series of streams.
diff --git a/banyand/stream/svc_liaison.go b/banyand/stream/svc_liaison.go
index 637eb8cc..9d937dbc 100644
--- a/banyand/stream/svc_liaison.go
+++ b/banyand/stream/svc_liaison.go
@@ -45,19 +45,20 @@ import (
)
type liaison struct {
- pm protector.Memory
- metadata metadata.Repo
- pipeline queue.Server
- omr observability.MetricsRegistry
- lfs fs.FileSystem
- writeListener bus.MessageListener
- dataNodeSelector node.Selector
- l *logger.Logger
- schemaRepo schemaRepo
- dataPath string
- root string
- option option
- maxDiskUsagePercent int
+ pm protector.Memory
+ metadata metadata.Repo
+ pipeline queue.Server
+ omr observability.MetricsRegistry
+ lfs fs.FileSystem
+ writeListener bus.MessageListener
+ dataNodeSelector node.Selector
+ l *logger.Logger
+ schemaRepo schemaRepo
+ dataPath string
+ root string
+ option option
+ maxDiskUsagePercent int
+ failedPartsMaxSizePercent int
}
func (s *liaison) Stream(metadata *commonv1.Metadata) (Stream, error) {
@@ -83,6 +84,10 @@ func (s *liaison) FlagSet() *run.FlagSet {
flagS.DurationVar(&s.option.flushTimeout, "stream-flush-timeout",
defaultFlushTimeout, "the memory data timeout of stream")
flagS.IntVar(&s.maxDiskUsagePercent, "stream-max-disk-usage-percent",
95, "the maximum disk usage percentage allowed")
flagS.DurationVar(&s.option.syncInterval, "stream-sync-interval",
defaultSyncInterval, "the periodic sync interval for stream data")
+ flagS.IntVar(&s.failedPartsMaxSizePercent,
"failed-parts-max-size-percent", 10,
+ "percentage of BanyanDB's allowed disk usage allocated to
failed parts storage. "+
+ "Calculated as: totalDisk *
stream-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+ "Set to 0 to disable copying failed parts. Valid range:
0-100")
return flagS
}
@@ -96,6 +101,9 @@ func (s *liaison) Validate() error {
if s.maxDiskUsagePercent > 100 {
return errors.New("stream-max-disk-usage-percent must be less
than or equal to 100")
}
+ if s.failedPartsMaxSizePercent < 0 || s.failedPartsMaxSizePercent > 100
{
+ return errors.New("failed-parts-max-size-percent must be
between 0 and 100")
+ }
return nil
}
@@ -123,6 +131,22 @@ func (s *liaison) PreRun(ctx context.Context) error {
if !strings.HasPrefix(filepath.VolumeName(s.dataPath),
filepath.VolumeName(path)) {
observability.UpdatePath(s.dataPath)
}
+ s.lfs.MkdirIfNotExist(s.dataPath, storage.DirPerm)
+
+ s.option.failedPartsMaxTotalSizeBytes = 0
+ if s.failedPartsMaxSizePercent > 0 {
+ totalSpace := s.lfs.MustGetTotalSpace(s.dataPath)
+ maxTotalSizeBytes := totalSpace * uint64(s.maxDiskUsagePercent)
/ 100
+ maxTotalSizeBytes = maxTotalSizeBytes *
uint64(s.failedPartsMaxSizePercent) / 100
+ s.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+ s.l.Info().
+ Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+ Int("failedPartsMaxSizePercent",
s.failedPartsMaxSizePercent).
+ Int("maxDiskUsagePercent", s.maxDiskUsagePercent).
+ Msg("configured failed parts storage limit")
+ } else {
+ s.l.Info().Msg("failed parts storage limit disabled (percent
set to 0)")
+ }
streamDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicStreamPartSync, s.option.tire2Client,
s.dataNodeSelector)
s.schemaRepo = newLiaisonSchemaRepo(s.dataPath, s,
streamDataNodeRegistry)
s.writeListener = setUpWriteQueueCallback(s.l, &s.schemaRepo,
s.maxDiskUsagePercent, s.option.tire2Client)
diff --git a/banyand/stream/syncer.go b/banyand/stream/syncer.go
index d37235e4..37d3040c 100644
--- a/banyand/stream/syncer.go
+++ b/banyand/stream/syncer.go
@@ -20,9 +20,11 @@ package stream
import (
"context"
"fmt"
+ "strconv"
"time"
"github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
"github.com/apache/skywalking-banyandb/pkg/watcher"
@@ -36,6 +38,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction,
flusherNotifier watc
var epoch uint64
var lastTriggerTime time.Time
+ firstSync := true
ew := flusherNotifier.Add(0, tst.loopCloser.CloseNotify())
if ew == nil {
@@ -52,7 +55,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction,
flusherNotifier watc
return false
}
defer curSnapshot.decRef()
- if curSnapshot.epoch != epoch {
+ if firstSync || curSnapshot.epoch != epoch {
tst.incTotalSyncLoopStarted(1)
defer tst.incTotalSyncLoopFinished(1)
var err error
@@ -66,6 +69,7 @@ func (tst *tsTable) syncLoop(syncCh chan *syncIntroduction,
flusherNotifier watc
return false
}
epoch = curSnapshot.epoch
+ firstSync = false
lastTriggerTime = triggerTime
if tst.currentEpoch() != epoch {
return false
@@ -151,30 +155,100 @@ func createPartFileReaders(part *part)
([]queue.FileInfo, func()) {
}
}
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(ctx context.Context, parts []*part,
nodes []string, chunkSize uint32, releaseFuncs *[]func()) ([]queue.FailedPart,
error) {
+ var allFailedParts []queue.FailedPart
+
+ for _, node := range nodes {
+ // Get chunked sync client for this node
+ chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, chunkSize)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create chunked sync
client for node %s: %w", node, err)
+ }
+ defer chunkedClient.Close()
+
+ // Prepare streaming parts data
+ var streamingParts []queue.StreamingPartData
+ for _, part := range parts {
+ files, release := createPartFileReaders(part)
+ *releaseFuncs = append(*releaseFuncs, release)
+ streamingParts = append(streamingParts,
queue.StreamingPartData{
+ ID: part.partMetadata.ID,
+ Group: tst.group,
+ ShardID: uint32(tst.shardID),
+ Topic:
data.TopicStreamPartSync.String(),
+ Files: files,
+ CompressedSizeBytes:
part.partMetadata.CompressedSizeBytes,
+ UncompressedSizeBytes:
part.partMetadata.UncompressedSizeBytes,
+ TotalCount:
part.partMetadata.TotalCount,
+ BlocksCount:
part.partMetadata.BlocksCount,
+ MinTimestamp:
part.partMetadata.MinTimestamp,
+ MaxTimestamp:
part.partMetadata.MaxTimestamp,
+ PartType: PartTypeCore,
+ })
+ }
+
+ result, err := chunkedClient.SyncStreamingParts(ctx,
streamingParts)
+ if err != nil {
+ return nil, fmt.Errorf("failed to sync streaming parts
to node %s: %w", node, err)
+ }
+
+ tst.incTotalSyncLoopBytes(result.TotalBytes)
+ if dl := tst.l.Debug(); dl.Enabled() {
+ dl.
+ Str("node", node).
+ Str("session", result.SessionID).
+ Uint64("bytes", result.TotalBytes).
+ Int64("duration_ms", result.DurationMs).
+ Uint32("chunks", result.ChunksCount).
+ Uint32("parts", result.PartsCount).
+ Int("failed_parts", len(result.FailedParts)).
+ Msg("chunked sync completed")
+ }
+
+ allFailedParts = append(allFailedParts, result.FailedParts...)
+ }
+
+ return allFailedParts, nil
+}
+
func (tst *tsTable) syncSnapshot(curSnapshot *snapshot, syncCh chan
*syncIntroduction) error {
startTime := time.Now()
defer func() {
tst.incTotalSyncLoopLatency(time.Since(startTime).Seconds())
}()
- // Get all parts from the current snapshot
- var partsToSync []*part
- for _, pw := range curSnapshot.parts {
- if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
- partsToSync = append(partsToSync, pw.p)
- }
- }
-
+ partsToSync := tst.collectPartsToSync(curSnapshot)
if len(partsToSync) == 0 {
return nil
}
+
nodes := tst.getNodes()
if len(nodes) == 0 {
return fmt.Errorf("no nodes to sync parts")
}
- // Sort parts from old to new (by part ID)
- // Parts with lower IDs are older
+ tst.sortPartsByID(partsToSync)
+
+ if err := tst.executeSyncWithRetry(partsToSync, nodes); err != nil {
+ return err
+ }
+
+ return tst.sendSyncIntroduction(partsToSync, syncCh)
+}
+
+func (tst *tsTable) collectPartsToSync(curSnapshot *snapshot) []*part {
+ var partsToSync []*part
+ for _, pw := range curSnapshot.parts {
+ if pw.mp == nil && pw.p.partMetadata.TotalCount > 0 {
+ partsToSync = append(partsToSync, pw.p)
+ }
+ }
+ return partsToSync
+}
+
+func (tst *tsTable) sortPartsByID(partsToSync []*part) {
for i := 0; i < len(partsToSync); i++ {
for j := i + 1; j < len(partsToSync); j++ {
if partsToSync[i].partMetadata.ID >
partsToSync[j].partMetadata.ID {
@@ -182,8 +256,12 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}
}
+}
+
+func (tst *tsTable) executeSyncWithRetry(partsToSync []*part, nodes []string)
error {
+ failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem,
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+ partsInfo := tst.buildPartsInfoMap(partsToSync)
- // Use chunked sync with streaming for better memory efficiency
ctx := context.Background()
releaseFuncs := make([]func(), 0, len(partsToSync))
defer func() {
@@ -192,66 +270,166 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
}
}()
+ perNodeFailures := tst.performInitialSync(ctx, partsToSync, nodes,
&releaseFuncs)
+ if len(perNodeFailures) > 0 {
+ tst.handleFailedPartsRetry(ctx, partsToSync, perNodeFailures,
partsInfo, failedPartsHandler)
+ }
+
+ return nil
+}
+
+func (tst *tsTable) buildPartsInfoMap(partsToSync []*part)
map[uint64][]*storage.PartInfo {
+ partsInfo := make(map[uint64][]*storage.PartInfo)
+ for _, part := range partsToSync {
+ partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+ {
+ PartID: part.partMetadata.ID,
+ SourcePath: part.path,
+ PartType: PartTypeCore,
+ },
+ }
+ }
+ return partsInfo
+}
+
+func (tst *tsTable) performInitialSync(
+ ctx context.Context, partsToSync []*part, nodes []string, releaseFuncs
*[]func(),
+) map[string][]queue.FailedPart {
+ perNodeFailures := make(map[string][]queue.FailedPart)
for _, node := range nodes {
- // Get chunked sync client for this node
- chunkedClient, err :=
tst.option.tire2Client.NewChunkedSyncClient(node, 1024*1024)
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToSync, []string{node}, 1024*1024, releaseFuncs)
if err != nil {
- return fmt.Errorf("failed to create chunked sync client
for node %s: %w", node, err)
+ tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
+ // Mark all parts as failed for this node
+ var allPartsFailed []queue.FailedPart
+ for _, part := range partsToSync {
+ allPartsFailed = append(allPartsFailed,
queue.FailedPart{
+ PartID:
strconv.FormatUint(part.partMetadata.ID, 10),
+ Error: fmt.Sprintf("node %s: %v",
node, err),
+ })
+ }
+ perNodeFailures[node] = allPartsFailed
+ continue
}
- defer chunkedClient.Close()
+ if len(failedParts) > 0 {
+ perNodeFailures[node] = failedParts
+ }
+ }
+ return perNodeFailures
+}
- // Prepare streaming parts data for chunked sync
- var streamingParts []queue.StreamingPartData
+func (tst *tsTable) handleFailedPartsRetry(
+ ctx context.Context, partsToSync []*part, perNodeFailures
map[string][]queue.FailedPart,
+ partsInfo map[uint64][]*storage.PartInfo, failedPartsHandler
*storage.FailedPartsHandler,
+) {
+ allFailedParts := tst.collectAllFailedParts(perNodeFailures)
+ syncFunc := tst.createRetrySyncFunc(ctx, partsToSync, perNodeFailures)
+
+ permanentlyFailedParts, err := failedPartsHandler.RetryFailedParts(ctx,
allFailedParts, partsInfo, syncFunc)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("error during retry process")
+ }
+ if len(permanentlyFailedParts) > 0 {
+ tst.l.Error().
+ Uints64("partIDs", permanentlyFailedParts).
+ Int("count", len(permanentlyFailedParts)).
+ Msg("parts permanently failed after all retries and
have been copied to failed-parts directory")
+ }
+}
+
+func (tst *tsTable) collectAllFailedParts(perNodeFailures
map[string][]queue.FailedPart) []queue.FailedPart {
+ allFailedParts := make([]queue.FailedPart, 0)
+ for _, failedParts := range perNodeFailures {
+ allFailedParts = append(allFailedParts, failedParts...)
+ }
+ return allFailedParts
+}
+
+func (tst *tsTable) createRetrySyncFunc(
+ ctx context.Context, partsToSync []*part, perNodeFailures
map[string][]queue.FailedPart,
+) func([]uint64) ([]queue.FailedPart, error) {
+ return func(partIDs []uint64) ([]queue.FailedPart, error) {
+ partIDsSet := make(map[uint64]struct{})
+ for _, partID := range partIDs {
+ partIDsSet[partID] = struct{}{}
+ }
+
+ partsToRetry := tst.filterPartsToRetry(partIDs, partsToSync)
+ if len(partsToRetry) == 0 {
+ return nil, nil
+ }
+
+ return tst.retryPartsOnFailedNodes(ctx, partIDs, partsToRetry,
partIDsSet, perNodeFailures)
+ }
+}
+
+func (tst *tsTable) filterPartsToRetry(partIDs []uint64, partsToSync []*part)
[]*part {
+ partsToRetry := make([]*part, 0)
+ for _, partID := range partIDs {
for _, part := range partsToSync {
- // Create streaming reader for the part
- files, release := createPartFileReaders(part)
- releaseFuncs = append(releaseFuncs, release)
+ if part.partMetadata.ID == partID {
+ partsToRetry = append(partsToRetry, part)
+ break
+ }
+ }
+ }
+ return partsToRetry
+}
- // Create streaming part sync data
- streamingParts = append(streamingParts,
queue.StreamingPartData{
- ID: part.partMetadata.ID,
- Group: tst.group,
- ShardID: uint32(tst.shardID),
- Topic:
data.TopicStreamPartSync.String(),
- Files: files,
- CompressedSizeBytes:
part.partMetadata.CompressedSizeBytes,
- UncompressedSizeBytes:
part.partMetadata.UncompressedSizeBytes,
- TotalCount:
part.partMetadata.TotalCount,
- BlocksCount:
part.partMetadata.BlocksCount,
- MinTimestamp:
part.partMetadata.MinTimestamp,
- MaxTimestamp:
part.partMetadata.MaxTimestamp,
- PartType: PartTypeCore,
- })
+func (tst *tsTable) retryPartsOnFailedNodes(
+ ctx context.Context, partIDs []uint64, partsToRetry []*part,
+ partIDsSet map[uint64]struct{}, perNodeFailures
map[string][]queue.FailedPart,
+) ([]queue.FailedPart, error) {
+ retryReleaseFuncs := make([]func(), 0)
+ defer func() {
+ for _, release := range retryReleaseFuncs {
+ release()
}
+ }()
- // Sync parts using chunked transfer with streaming
- result, err := chunkedClient.SyncStreamingParts(ctx,
streamingParts)
- if err != nil {
- return fmt.Errorf("failed to sync streaming parts to
node %s: %w", node, err)
+ var retryFailedParts []queue.FailedPart
+ for node, nodeFailedParts := range perNodeFailures {
+ if !tst.shouldRetryOnNode(nodeFailedParts, partIDsSet) {
+ continue
}
- if !result.Success {
- return fmt.Errorf("chunked sync partially failed: %v",
result.ErrorMessage)
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToRetry, []string{node}, 1024*1024, &retryReleaseFuncs)
+ if err != nil {
+ retryFailedParts = append(retryFailedParts,
tst.markPartsAsFailed(partIDs, node, err)...)
+ continue
}
- tst.incTotalSyncLoopBytes(result.TotalBytes)
- if dl := tst.l.Debug(); dl.Enabled() {
- dl.
- Str("node", node).
- Str("session", result.SessionID).
- Uint64("bytes", result.TotalBytes).
- Int64("duration_ms", result.DurationMs).
- Uint32("chunks", result.ChunksCount).
- Uint32("parts", result.PartsCount).
- Msg("chunked sync completed successfully")
+ retryFailedParts = append(retryFailedParts, failedParts...)
+ }
+
+ return retryFailedParts, nil
+}
+
+func (tst *tsTable) shouldRetryOnNode(nodeFailedParts []queue.FailedPart,
partIDsSet map[uint64]struct{}) bool {
+ for _, failedPart := range nodeFailedParts {
+ failedPartID, _ := strconv.ParseUint(failedPart.PartID, 10, 64)
+ if _, exists := partIDsSet[failedPartID]; exists {
+ return true
}
}
+ return false
+}
+
+func (tst *tsTable) markPartsAsFailed(partIDs []uint64, node string, err
error) []queue.FailedPart {
+ var failedParts []queue.FailedPart
+ for _, partID := range partIDs {
+ failedParts = append(failedParts, queue.FailedPart{
+ PartID: strconv.FormatUint(partID, 10),
+ Error: fmt.Sprintf("node %s: %v", node, err),
+ })
+ }
+ return failedParts
+}
- // Construct syncIntroduction to remove synced parts from snapshot
+func (tst *tsTable) sendSyncIntroduction(partsToSync []*part, syncCh chan
*syncIntroduction) error {
si := generateSyncIntroduction()
defer releaseSyncIntroduction(si)
si.applied = make(chan struct{})
- // Mark all synced parts for removal
for _, part := range partsToSync {
si.synced[part.partMetadata.ID] = struct{}{}
}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 9384c4b4..4ce74d7a 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -230,6 +230,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string,
p common.Position,
if ee[i].Name() == inverted.ExternalSegmentTempDirName {
continue
}
+ if ee[i].Name() == storage.FailedPartsDirName {
+ continue
+ }
p, err := parseEpoch(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse part file
name. skip and delete it")
diff --git a/banyand/trace/handoff_controller.go
b/banyand/trace/handoff_controller.go
index 4ba09c2e..2b4d5706 100644
--- a/banyand/trace/handoff_controller.go
+++ b/banyand/trace/handoff_controller.go
@@ -1246,7 +1246,7 @@ func (hc *handoffController) sendPartToNode(ctx
context.Context, nodeAddr string
}
if !result.Success {
- return fmt.Errorf("sync failed: %s", result.ErrorMessage)
+ return fmt.Errorf("sync failed: %s", result.FailedParts)
}
hc.l.Debug().
diff --git a/banyand/trace/svc_liaison.go b/banyand/trace/svc_liaison.go
index 0a5de970..bfb1204e 100644
--- a/banyand/trace/svc_liaison.go
+++ b/banyand/trace/svc_liaison.go
@@ -46,22 +46,23 @@ import (
)
type liaison struct {
- metadata metadata.Repo
- pipeline queue.Server
- omr observability.MetricsRegistry
- lfs fs.FileSystem
- writeListener bus.MessageListener
- dataNodeSelector node.Selector
- pm protector.Memory
- handoffCtrl *handoffController
- l *logger.Logger
- schemaRepo schemaRepo
- dataPath string
- root string
- dataNodeList []string
- option option
- maxDiskUsagePercent int
- handoffMaxSizePercent int
+ metadata metadata.Repo
+ pipeline queue.Server
+ omr observability.MetricsRegistry
+ lfs fs.FileSystem
+ writeListener bus.MessageListener
+ dataNodeSelector node.Selector
+ pm protector.Memory
+ handoffCtrl *handoffController
+ l *logger.Logger
+ schemaRepo schemaRepo
+ dataPath string
+ root string
+ dataNodeList []string
+ option option
+ maxDiskUsagePercent int
+ handoffMaxSizePercent int
+ failedPartsMaxSizePercent int
}
var _ Service = (*liaison)(nil)
@@ -100,6 +101,10 @@ func (l *liaison) FlagSet() *run.FlagSet {
"Calculated as: totalDisk *
trace-max-disk-usage-percent * handoff-max-size-percent / 10000. "+
"Example: 100GB disk with 95% max usage and 10% handoff
= 9.5GB; 50% handoff = 47.5GB. "+
"Valid range: 0-100")
+ fs.IntVar(&l.failedPartsMaxSizePercent,
"failed-parts-max-size-percent", 10,
+ "percentage of BanyanDB's allowed disk usage allocated to
failed parts storage. "+
+ "Calculated as: totalDisk *
trace-max-disk-usage-percent * failed-parts-max-size-percent / 10000. "+
+ "Set to 0 to disable copying failed parts. Valid range:
0-100")
return fs
}
@@ -118,6 +123,10 @@ func (l *liaison) Validate() error {
l.handoffMaxSizePercent)
}
+ if l.failedPartsMaxSizePercent < 0 || l.failedPartsMaxSizePercent > 100
{
+ return fmt.Errorf("invalid failed-parts-max-size-percent: %d%%.
Must be between 0 and 100", l.failedPartsMaxSizePercent)
+ }
+
return nil
}
@@ -142,8 +151,23 @@ func (l *liaison) PreRun(ctx context.Context) error {
if l.dataPath == "" {
l.dataPath = filepath.Join(path, storage.DataDir)
}
+ l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
traceDataNodeRegistry :=
grpc.NewClusterNodeRegistry(data.TopicTracePartSync, l.option.tire2Client,
l.dataNodeSelector)
+ l.option.failedPartsMaxTotalSizeBytes = 0
+ if l.failedPartsMaxSizePercent > 0 {
+ totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
+ maxTotalSizeBytes := totalSpace * uint64(l.maxDiskUsagePercent)
/ 100
+ maxTotalSizeBytes = maxTotalSizeBytes *
uint64(l.failedPartsMaxSizePercent) / 100
+ l.option.failedPartsMaxTotalSizeBytes = maxTotalSizeBytes
+ l.l.Info().
+ Uint64("maxFailedPartsBytes", maxTotalSizeBytes).
+ Int("failedPartsMaxSizePercent",
l.failedPartsMaxSizePercent).
+ Int("maxDiskUsagePercent", l.maxDiskUsagePercent).
+ Msg("configured failed parts storage limit")
+ } else {
+ l.l.Info().Msg("failed parts storage limit disabled (percent
set to 0)")
+ }
// Initialize handoff controller if data nodes are configured
l.l.Info().Strs("dataNodeList", l.dataNodeList).Int("maxSizePercent",
l.handoffMaxSizePercent).
Msg("handoff configuration")
@@ -153,7 +177,6 @@ func (l *liaison) PreRun(ctx context.Context) error {
// Example: 100GB disk, 95% max usage, 10% handoff = 100 * 95 *
10 / 10000 = 9.5GB
maxSize := 0
if l.handoffMaxSizePercent > 0 {
- l.lfs.MkdirIfNotExist(l.dataPath, storage.DirPerm)
totalSpace := l.lfs.MustGetTotalSpace(l.dataPath)
// Divide after each multiplication to avoid overflow
with large disk capacities
maxSizeBytes := totalSpace *
uint64(l.maxDiskUsagePercent) / 100 * uint64(l.handoffMaxSizePercent) / 100
diff --git a/banyand/trace/syncer.go b/banyand/trace/syncer.go
index 02d3ded7..eb942b10 100644
--- a/banyand/trace/syncer.go
+++ b/banyand/trace/syncer.go
@@ -22,13 +22,15 @@ import (
"fmt"
"path/filepath"
"sort"
+ "strconv"
"time"
"github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
+ "github.com/apache/skywalking-banyandb/banyand/internal/storage"
"github.com/apache/skywalking-banyandb/banyand/queue"
"github.com/apache/skywalking-banyandb/pkg/bytes"
"github.com/apache/skywalking-banyandb/pkg/compress/zstd"
- "github.com/apache/skywalking-banyandb/pkg/logger"
"github.com/apache/skywalking-banyandb/pkg/watcher"
)
@@ -197,12 +199,15 @@ func (tst *tsTable) syncSnapshot(curSnapshot *snapshot,
syncCh chan *syncIntrodu
return nil
}
+ // Initialize failed parts handler
+ failedPartsHandler := storage.NewFailedPartsHandler(tst.fileSystem,
tst.root, tst.l, tst.option.failedPartsMaxTotalSizeBytes)
+
// Execute sync operation
- if err := tst.executeSyncOperation(partsToSync, partIDsToSync); err !=
nil {
+ if err := tst.executeSyncOperation(partsToSync, partIDsToSync,
failedPartsHandler); err != nil {
return err
}
- // Handle sync introductions
+ // Handle sync introductions (includes both successful and permanently
failed parts)
return tst.handleSyncIntroductions(partsToSync, syncCh)
}
@@ -226,49 +231,33 @@ func (tst *tsTable) needToSync(partsToSync []*part) bool {
return len(partsToSync) > 0 && len(nodes) > 0
}
-// executeSyncOperation performs the actual synchronization of parts to nodes.
-func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync
map[uint64]struct{}) error {
- sort.Slice(partsToSync, func(i, j int) bool {
- return partsToSync[i].partMetadata.ID <
partsToSync[j].partMetadata.ID
- })
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(
+ ctx context.Context, parts []*part, partIDsMap map[uint64]struct{},
+ nodes []string, sidxMap map[string]sidx.SIDX, releaseFuncs *[]func(),
+) ([]queue.FailedPart, error) {
+ var allFailedParts []queue.FailedPart
- ctx := context.Background()
- releaseFuncs := make([]func(), 0, len(partsToSync))
- defer func() {
- for _, release := range releaseFuncs {
- release()
- }
- }()
-
- nodes := tst.getNodes()
- if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
- }
- sidxMap := tst.getAllSidx()
for _, node := range nodes {
if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
+ return allFailedParts, errClosed
}
- // Prepare all streaming parts data
+
+ // Prepare streaming parts data
streamingParts := make([]queue.StreamingPartData, 0)
+
// Add sidx streaming parts
for name, sidx := range sidxMap {
- sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
- if len(sidxStreamingParts) != len(partIDsToSync) {
- logger.Panicf("sidx streaming parts count
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
- return nil
- }
+ sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
streamingParts = append(streamingParts,
sidxStreamingParts...)
- releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
- }
- if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
- logger.Panicf("streaming parts count mismatch: %d !=
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
- return nil
+ *releaseFuncs = append(*releaseFuncs,
sidxReleaseFuncs...)
}
- // Create streaming parts from core trace parts.
- for _, part := range partsToSync {
+
+ // Create streaming parts from core trace parts
+ for _, part := range parts {
files, release := createPartFileReaders(part)
- releaseFuncs = append(releaseFuncs, release)
+ *releaseFuncs = append(*releaseFuncs, release)
streamingParts = append(streamingParts,
queue.StreamingPartData{
ID: part.partMetadata.ID,
Group: tst.group,
@@ -284,32 +273,201 @@ func (tst *tsTable) executeSyncOperation(partsToSync
[]*part, partIDsToSync map[
PartType: PartTypeCore,
})
}
+
sort.Slice(streamingParts, func(i, j int) bool {
if streamingParts[i].ID == streamingParts[j].ID {
return streamingParts[i].PartType <
streamingParts[j].PartType
}
return streamingParts[i].ID < streamingParts[j].ID
})
- if err := tst.syncStreamingPartsToNode(ctx, node,
streamingParts); err != nil {
- return err
+
+ failedParts, err := tst.syncStreamingPartsToNode(ctx, node,
streamingParts)
+ if err != nil {
+ return allFailedParts, err
+ }
+ allFailedParts = append(allFailedParts, failedParts...)
+ }
+
+ return allFailedParts, nil
+}
+
+// executeSyncOperation performs the actual synchronization of parts to nodes.
+func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync
map[uint64]struct{}, failedPartsHandler *storage.FailedPartsHandler) error {
+ sort.Slice(partsToSync, func(i, j int) bool {
+ return partsToSync[i].partMetadata.ID <
partsToSync[j].partMetadata.ID
+ })
+
+ ctx := context.Background()
+ releaseFuncs := make([]func(), 0, len(partsToSync))
+ defer func() {
+ for _, release := range releaseFuncs {
+ release()
+ }
+ }()
+
+ nodes := tst.getNodes()
+ if tst.loopCloser != nil && tst.loopCloser.Closed() {
+ return errClosed
+ }
+
+ sidxMap := tst.getAllSidx()
+
+ // Build part info map for failed parts handler
+ // Each part ID can have multiple entries (core + SIDX parts)
+ partsInfo := make(map[uint64][]*storage.PartInfo)
+ // Add core parts
+ for _, part := range partsToSync {
+ partsInfo[part.partMetadata.ID] = []*storage.PartInfo{
+ {
+ PartID: part.partMetadata.ID,
+ SourcePath: part.path,
+ PartType: PartTypeCore,
+ },
+ }
+ }
+ // Add SIDX parts
+ for sidxName, sidxInstance := range sidxMap {
+ partPaths := sidxInstance.PartPaths(partIDsToSync)
+ for partID, partPath := range partPaths {
+ partsInfo[partID] = append(partsInfo[partID],
&storage.PartInfo{
+ PartID: partID,
+ SourcePath: partPath,
+ PartType: sidxName,
+ })
+ }
+ }
+
+ // Initial sync attempt - track failures per node
+ perNodeFailures := make(map[string][]queue.FailedPart) // node ->
failed parts
+ for _, node := range nodes {
+ if tst.loopCloser != nil && tst.loopCloser.Closed() {
+ return errClosed
+ }
+ failedParts, err := tst.syncPartsToNodesHelper(ctx,
partsToSync, partIDsToSync, []string{node}, sidxMap, &releaseFuncs)
+ if err != nil {
+ tst.l.Error().Err(err).Str("node", node).Msg("sync
error")
+ // Mark all parts as failed for this node
+ var allPartsFailed []queue.FailedPart
+ for _, part := range partsToSync {
+ allPartsFailed = append(allPartsFailed,
queue.FailedPart{
+ PartID:
strconv.FormatUint(part.partMetadata.ID, 10),
+ Error: fmt.Sprintf("node %s: %v",
node, err),
+ })
+ }
+ perNodeFailures[node] = allPartsFailed
+ continue
+ }
+ if len(failedParts) > 0 {
+ perNodeFailures[node] = failedParts
}
}
// After sync attempts, enqueue parts for offline nodes
tst.enqueueForOfflineNodes(nodes, partsToSync, partIDsToSync)
+ // If there are failed parts, use the retry handler
+ if len(perNodeFailures) > 0 {
+ // Collect all unique failed parts for retry handler
+ allFailedParts := make([]queue.FailedPart, 0)
+ for _, failedParts := range perNodeFailures {
+ allFailedParts = append(allFailedParts, failedParts...)
+ }
+
+ // Create a sync function for retries - only retry on nodes
where parts failed
+ syncFunc := func(partIDs []uint64) ([]queue.FailedPart, error) {
+ // Build map of partIDs to retry
+ partIDsSet := make(map[uint64]struct{})
+ for _, partID := range partIDs {
+ partIDsSet[partID] = struct{}{}
+ }
+
+ // Filter parts to retry
+ partsToRetry := make([]*part, 0)
+ partIDsMap := make(map[uint64]struct{})
+ for _, partID := range partIDs {
+ partIDsMap[partID] = struct{}{}
+ for _, part := range partsToSync {
+ if part.partMetadata.ID == partID {
+ partsToRetry =
append(partsToRetry, part)
+ break
+ }
+ }
+ }
+
+ if len(partsToRetry) == 0 {
+ return nil, nil
+ }
+
+ retryReleaseFuncs := make([]func(), 0)
+ defer func() {
+ for _, release := range retryReleaseFuncs {
+ release()
+ }
+ }()
+
+ // Only retry on nodes where these specific parts failed
+ var retryFailedParts []queue.FailedPart
+ for node, nodeFailedParts := range perNodeFailures {
+ // Check if any of the parts to retry failed on
this node
+ shouldRetryOnNode := false
+ for _, failedPart := range nodeFailedParts {
+ failedPartID, _ :=
strconv.ParseUint(failedPart.PartID, 10, 64)
+ if _, exists :=
partIDsSet[failedPartID]; exists {
+ shouldRetryOnNode = true
+ break
+ }
+ }
+
+ if !shouldRetryOnNode {
+ continue
+ }
+
+ // Retry only on this specific node
+ failedParts, err :=
tst.syncPartsToNodesHelper(ctx, partsToRetry, partIDsMap, []string{node},
sidxMap, &retryReleaseFuncs)
+ if err != nil {
+ // On error, mark all parts as failed
for this node
+ for _, partID := range partIDs {
+ retryFailedParts =
append(retryFailedParts, queue.FailedPart{
+ PartID:
strconv.FormatUint(partID, 10),
+ Error:
fmt.Sprintf("node %s: %v", node, err),
+ })
+ }
+ continue
+ }
+ retryFailedParts = append(retryFailedParts,
failedParts...)
+ }
+
+ return retryFailedParts, nil
+ }
+
+ permanentlyFailedParts, err :=
failedPartsHandler.RetryFailedParts(ctx, allFailedParts, partsInfo, syncFunc)
+ if err != nil {
+ tst.l.Warn().Err(err).Msg("error during retry process")
+ }
+ if len(permanentlyFailedParts) > 0 {
+ tst.l.Error().
+ Uints64("partIDs", permanentlyFailedParts).
+ Int("count", len(permanentlyFailedParts)).
+ Msg("parts permanently failed after all retries
and have been copied to failed-parts directory")
+ }
+ }
+
return nil
}
// handleSyncIntroductions creates and processes sync introductions for both
core and sidx parts.
+// Includes both successful and permanently failed parts to ensure snapshot
cleanup.
func (tst *tsTable) handleSyncIntroductions(partsToSync []*part, syncCh chan
*syncIntroduction) error {
// Create core sync introduction
si := generateSyncIntroduction()
defer releaseSyncIntroduction(si)
si.applied = make(chan struct{})
+
+ // Add all parts (both successful and permanently failed)
for _, part := range partsToSync {
si.synced[part.partMetadata.ID] = struct{}{}
}
+ // Permanently failed parts are already included since they're in
partsToSync
select {
case syncCh <- si:
@@ -326,23 +484,21 @@ func (tst *tsTable) handleSyncIntroductions(partsToSync
[]*part, syncCh chan *sy
}
// syncStreamingPartsToNode synchronizes streaming parts to a node.
-func (tst *tsTable) syncStreamingPartsToNode(ctx context.Context, node string,
streamingParts []queue.StreamingPartData) error {
+// Returns the list of failed parts.
+func (tst *tsTable) syncStreamingPartsToNode(ctx context.Context, node string,
streamingParts []queue.StreamingPartData) ([]queue.FailedPart, error) {
// Get chunked sync client for this node
chunkedClient, err := tst.option.tire2Client.NewChunkedSyncClient(node,
1024*1024)
if err != nil {
- return fmt.Errorf("failed to create chunked sync client for
node %s: %w", node, err)
+ return nil, fmt.Errorf("failed to create chunked sync client
for node %s: %w", node, err)
}
defer chunkedClient.Close()
// Sync parts using chunked transfer with streaming
result, err := chunkedClient.SyncStreamingParts(ctx, streamingParts)
if err != nil {
- return fmt.Errorf("failed to sync streaming parts to node %s:
%w", node, err)
+ return nil, fmt.Errorf("failed to sync streaming parts to node
%s: %w", node, err)
}
- if !result.Success {
- return fmt.Errorf("chunked sync partially failed: %v",
result.ErrorMessage)
- }
tst.incTotalSyncLoopBytes(result.TotalBytes)
if dl := tst.l.Debug(); dl.Enabled() {
dl.
@@ -352,7 +508,9 @@ func (tst *tsTable) syncStreamingPartsToNode(ctx
context.Context, node string, s
Int64("duration_ms", result.DurationMs).
Uint32("chunks", result.ChunksCount).
Uint32("parts", result.PartsCount).
- Msg("chunked sync completed successfully")
+ Int("failed_parts", len(result.FailedParts)).
+ Msg("chunked sync completed")
}
- return nil
+
+ return result.FailedParts, nil
}
diff --git a/banyand/trace/trace.go b/banyand/trace/trace.go
index 7792912f..4b16a4f0 100644
--- a/banyand/trace/trace.go
+++ b/banyand/trace/trace.go
@@ -51,12 +51,13 @@ const (
var traceScope = observability.RootScope.SubScope("trace")
type option struct {
- mergePolicy *mergePolicy
- protector protector.Memory
- tire2Client queue.Client
- seriesCacheMaxSize run.Bytes
- flushTimeout time.Duration
- syncInterval time.Duration
+ mergePolicy *mergePolicy
+ protector protector.Memory
+ tire2Client queue.Client
+ seriesCacheMaxSize run.Bytes
+ flushTimeout time.Duration
+ syncInterval time.Duration
+ failedPartsMaxTotalSizeBytes uint64
}
// Service allows inspecting the trace data.
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 2d83dc25..287b17a8 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -48,21 +48,21 @@ const (
)
type tsTable struct {
- l *logger.Logger
- fileSystem fs.FileSystem
- sidxMap map[string]sidx.SIDX
pm protector.Memory
+ fileSystem fs.FileSystem
+ handoffCtrl *handoffController
metrics *metrics
snapshot *snapshot
loopCloser *run.Closer
getNodes func() []string
- handoffCtrl *handoffController
- option option
+ l *logger.Logger
+ sidxMap map[string]sidx.SIDX
introductions chan *introduction
p common.Position
root string
group string
gc garbageCleaner
+ option option
curPartID uint64
sync.RWMutex
shardID common.ShardID
@@ -219,6 +219,9 @@ func initTSTable(fileSystem fs.FileSystem, rootPath string,
p common.Position,
if ee[i].Name() == sidxDirName {
continue
}
+ if ee[i].Name() == storage.FailedPartsDirName {
+ continue
+ }
p, err := parseEpoch(ee[i].Name())
if err != nil {
l.Info().Err(err).Msg("cannot parse part file
name. skip and delete it")
diff --git a/test/integration/distributed/sync_retry/injector.go
b/test/integration/distributed/sync_retry/injector.go
new file mode 100644
index 00000000..d5bca1d5
--- /dev/null
+++ b/test/integration/distributed/sync_retry/injector.go
@@ -0,0 +1,159 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Package integration_sync_retry_test provides test utilities for sync retry
integration tests.
+package integration_sync_retry_test
+
+import (
+ "fmt"
+ "strconv"
+ "sync"
+
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+)
+
+type topicFailureRule struct {
+ remaining int
+ attempts int
+}
+
+type chunkedSyncTestInjector struct {
+ rules map[string]*topicFailureRule
+ mu sync.Mutex
+}
+
+func newChunkedSyncTestInjector(config map[string]int)
*chunkedSyncTestInjector {
+ rules := make(map[string]*topicFailureRule, len(config))
+ for topic, remaining := range config {
+ rules[topic] = &topicFailureRule{remaining: remaining}
+ }
+ return &chunkedSyncTestInjector{rules: rules}
+}
+
+func (c *chunkedSyncTestInjector) BeforeSync(parts []queue.StreamingPartData)
(bool, []queue.FailedPart, error) {
+ if len(parts) == 0 {
+ return false, nil, nil
+ }
+
+ topic := parts[0].Topic
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ rule, ok := c.rules[topic]
+ if !ok {
+ return false, nil, nil
+ }
+
+ rule.attempts++
+ if rule.remaining == 0 {
+ return false, nil, nil
+ }
+
+ if rule.remaining > 0 {
+ rule.remaining--
+ }
+
+ failedParts := make([]queue.FailedPart, 0, len(parts))
+ for _, part := range parts {
+ failedParts = append(failedParts, queue.FailedPart{
+ PartID: strconv.FormatUint(part.ID, 10),
+ Error: fmt.Sprintf("injected failure (topic=%s
attempt=%d)", topic, rule.attempts),
+ })
+ }
+
+ return true, failedParts, nil
+}
+
+func (c *chunkedSyncTestInjector) attemptsFor(topic string) int {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if rule, ok := c.rules[topic]; ok {
+ return rule.attempts
+ }
+ return 0
+}
+
+func withChunkedSyncFailureInjector(config map[string]int)
(*chunkedSyncTestInjector, func()) {
+ injector := newChunkedSyncTestInjector(config)
+ queue.RegisterChunkedSyncFailureInjector(injector)
+ cleanup := func() {
+ queue.ClearChunkedSyncFailureInjector()
+ }
+ return injector, cleanup
+}
+
+// chunkedSyncErrorInjector injects connection-level errors (not just failed
parts)
+// to test the scenario where syncPartsToNodesHelper returns an error.
+type chunkedSyncErrorInjector struct {
+ rules map[string]*topicFailureRule
+ mu sync.Mutex
+}
+
+func newChunkedSyncErrorInjector(config map[string]int)
*chunkedSyncErrorInjector {
+ rules := make(map[string]*topicFailureRule, len(config))
+ for topic, remaining := range config {
+ rules[topic] = &topicFailureRule{remaining: remaining}
+ }
+ return &chunkedSyncErrorInjector{rules: rules}
+}
+
+func (c *chunkedSyncErrorInjector) BeforeSync(parts []queue.StreamingPartData)
(bool, []queue.FailedPart, error) {
+ if len(parts) == 0 {
+ return false, nil, nil
+ }
+
+ topic := parts[0].Topic
+
+ c.mu.Lock()
+ defer c.mu.Unlock()
+
+ rule, ok := c.rules[topic]
+ if !ok {
+ return false, nil, nil
+ }
+
+ rule.attempts++
+ if rule.remaining == 0 {
+ return false, nil, nil
+ }
+
+ if rule.remaining > 0 {
+ rule.remaining--
+ }
+
+ // Return an error to simulate connection/client failures
+ return true, nil, fmt.Errorf("injected connection error (topic=%s
attempt=%d)", topic, rule.attempts)
+}
+
+func (c *chunkedSyncErrorInjector) attemptsFor(topic string) int {
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if rule, ok := c.rules[topic]; ok {
+ return rule.attempts
+ }
+ return 0
+}
+
+func withChunkedSyncErrorInjector(config map[string]int)
(*chunkedSyncErrorInjector, func()) {
+ injector := newChunkedSyncErrorInjector(config)
+ queue.RegisterChunkedSyncFailureInjector(injector)
+ cleanup := func() {
+ queue.ClearChunkedSyncFailureInjector()
+ }
+ return injector, cleanup
+}
diff --git a/test/integration/distributed/sync_retry/sync_retry_suite_test.go
b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
new file mode 100644
index 00000000..40396bfd
--- /dev/null
+++ b/test/integration/distributed/sync_retry/sync_retry_suite_test.go
@@ -0,0 +1,136 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "testing"
+
+ g "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "github.com/onsi/gomega/gleak"
+
+ "github.com/apache/skywalking-banyandb/banyand/metadata"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/embeddedetcd"
+ "github.com/apache/skywalking-banyandb/banyand/metadata/schema"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ test_measure "github.com/apache/skywalking-banyandb/pkg/test/measure"
+ "github.com/apache/skywalking-banyandb/pkg/test/setup"
+ test_stream "github.com/apache/skywalking-banyandb/pkg/test/stream"
+ test_trace "github.com/apache/skywalking-banyandb/pkg/test/trace"
+)
+
+func TestDistributedSyncRetry(t *testing.T) {
+ gomega.RegisterFailHandler(g.Fail)
+ g.RunSpecs(t, "Distributed Sync Retry Suite")
+}
+
+type suiteConfig struct {
+ LiaisonAddr string `json:"liaisonAddr"`
+ DataPaths []string `json:"dataPaths"`
+}
+
+var (
+ liaisonAddr string
+ dataPaths []string
+
+ cleanupFuncs []func()
+ goods []gleak.Goroutine
+)
+
+var _ = g.SynchronizedBeforeSuite(func() []byte {
+ goods = gleak.Goroutines()
+ gomega.Expect(logger.Init(logger.Logging{Env: "dev", Level:
flags.LogLevel})).To(gomega.Succeed())
+
+ ports, err := test.AllocateFreePorts(2)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ dir, releaseSpace, err := test.NewSpace()
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ clientEP := fmt.Sprintf("http://127.0.0.1:%d", ports[0])
+ serverEP := fmt.Sprintf("http://127.0.0.1:%d", ports[1])
+
+ server, err := embeddedetcd.NewServer(
+ embeddedetcd.ConfigureListener([]string{clientEP},
[]string{serverEP}),
+ embeddedetcd.RootDir(dir),
+ embeddedetcd.AutoCompactionMode("periodic"),
+ embeddedetcd.AutoCompactionRetention("1h"),
+ embeddedetcd.QuotaBackendBytes(2*1024*1024*1024),
+ )
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ <-server.ReadyNotify()
+
+ cleanupFuncs = append([]func(){
+ func() {
+ _ = server.Close()
+ <-server.StopNotify()
+ releaseSpace()
+ },
+ }, cleanupFuncs...)
+
+ schemaRegistry, err := schema.NewEtcdSchemaRegistry(
+ schema.Namespace(metadata.DefaultNamespace),
+ schema.ConfigureServerEndpoints([]string{clientEP}),
+ )
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ defer schemaRegistry.Close()
+
+ ctx := context.Background()
+ test_stream.PreloadSchema(ctx, schemaRegistry)
+ test_measure.PreloadSchema(ctx, schemaRegistry)
+ test_trace.PreloadSchema(ctx, schemaRegistry)
+
+ // Start two data nodes to ensure replication targets exist
+ startDataNode := func() (string, string) {
+ addr, path, closeFn := setup.DataNodeWithAddrAndDir(clientEP)
+ cleanupFuncs = append(cleanupFuncs, closeFn)
+ return addr, path
+ }
+
+ _, path0 := startDataNode()
+ _, path1 := startDataNode()
+ paths := []string{path0, path1}
+
+ liaisonAddrLocal, _, closeLiaison := setup.LiaisonNodeWithHTTP(clientEP)
+ cleanupFuncs = append(cleanupFuncs, closeLiaison)
+
+ cfg := suiteConfig{
+ LiaisonAddr: liaisonAddrLocal,
+ DataPaths: paths,
+ }
+ payload, err := json.Marshal(cfg)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ return payload
+}, func(data []byte) {
+ var cfg suiteConfig
+ gomega.Expect(json.Unmarshal(data, &cfg)).To(gomega.Succeed())
+ liaisonAddr = cfg.LiaisonAddr
+ dataPaths = cfg.DataPaths
+})
+
+var _ = g.SynchronizedAfterSuite(func() {
+ // Execute cleanups in reverse order
+ for i := len(cleanupFuncs) - 1; i >= 0; i-- {
+ cleanupFuncs[i]()
+ }
+ gomega.Eventually(gleak.Goroutines,
flags.EventuallyTimeout).ShouldNot(gleak.HaveLeaked(goods))
+}, func() {})
diff --git a/test/integration/distributed/sync_retry/sync_retry_test.go
b/test/integration/distributed/sync_retry/sync_retry_test.go
new file mode 100644
index 00000000..411a9d0e
--- /dev/null
+++ b/test/integration/distributed/sync_retry/sync_retry_test.go
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+ casestracedata
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+const (
+ DefaultMaxRetries = 3
+ FailedPartsDirName = "failed-parts"
+)
+
+var _ = g.Describe("Chunked sync failure handling", g.Ordered, func() {
+ g.BeforeEach(func() {
+ gomega.Expect(clearFailedPartsDirs()).To(gomega.Succeed())
+ })
+
+ g.AfterEach(func() {
+ queue.ClearChunkedSyncFailureInjector()
+ })
+
+ g.It("retries stream parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, we need 2 * (1 initial
+ 3 retries) = 8 failures
+ // to ensure parts permanently fail
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicStreamPartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-5 *
time.Minute).Truncate(time.Millisecond)
+ casesstreamdata.Write(conn, "sw", baseTime,
500*time.Millisecond)
+
+ g.By("waiting for injected stream sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicStreamPartSync.String())
+ }, flags.EventuallyTimeout,
500*time.Millisecond).Should(gomega.BeNumerically(">=",
2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for stream failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("stream")
+ }, flags.EventuallyTimeout,
time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("stream")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("stream failed parts recorded at %s", dir))
+ })
+
+ g.It("retries measure parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes, 2 shards (2 parts), and 3 max retries:
+ // 2 parts * 2 nodes * (1 initial + 3 retries) = 16 failures
needed
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicMeasurePartSync.String(): 2 * 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-24 *
time.Hour).Truncate(time.Millisecond)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+ g.By("waiting for injected measure sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicMeasurePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for measure failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("measure")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("measure")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("measure failed parts recorded at %s", dir))
+ })
+
+ g.It("retries trace parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, we need 2 * (1 initial
+ 3 retries) = 8 failures
+ // to ensure parts permanently fail
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicTracePartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-10 *
time.Minute).Truncate(time.Millisecond)
+ casestracedata.WriteToGroup(conn, "sw", "test-trace-group",
"sw", baseTime, 500*time.Millisecond)
+
+ g.By("waiting for injected trace sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicTracePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for trace failed-parts directory to be populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("trace")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("trace")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("trace failed parts recorded at %s", dir))
+ })
+
+ g.It("handles stream connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, inject connection-level
errors
+ // that cause syncPartsToNodesHelper to return an error
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicStreamPartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-15 *
time.Minute).Truncate(time.Millisecond)
+ casesstreamdata.Write(conn, "sw", baseTime,
500*time.Millisecond)
+
+ g.By("waiting for injected stream connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicStreamPartSync.String())
+ }, flags.EventuallyTimeout,
500*time.Millisecond).Should(gomega.BeNumerically(">=",
2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for stream failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("stream")
+ }, flags.EventuallyTimeout,
time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("stream")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("stream failed parts recorded at %s (from
connection errors)", dir))
+ })
+
+ g.It("handles measure connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes, 2 shards, and 3 max retries, inject
connection-level errors
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicMeasurePartSync.String(): 2 * 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-48 *
time.Hour).Truncate(time.Millisecond)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+ g.By("waiting for injected measure connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicMeasurePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for measure failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("measure")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("measure")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("measure failed parts recorded at %s (from
connection errors)", dir))
+ })
+
+ g.It("handles trace connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, inject connection-level
errors
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicTracePartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-2 *
time.Hour).Truncate(time.Millisecond)
+ casestracedata.WriteToGroup(conn, "sw", "test-trace-group",
"sw", baseTime, 500*time.Millisecond)
+
+ g.By("waiting for injected trace connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicTracePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for trace failed-parts directory to be populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("trace")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("trace")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("trace failed parts recorded at %s (from
connection errors)", dir))
+ })
+})
+
+func dialLiaison() *grpc.ClientConn {
+ var conn *grpc.ClientConn
+ var err error
+ // Retry connection with Eventually to handle liaison startup timing
+ gomega.Eventually(func() error {
+ conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ return err
+ }, 30*time.Second, time.Second).Should(gomega.Succeed())
+ return conn
+}
+
+func clearFailedPartsDirs() error {
+ for _, root := range dataPaths {
+ if err := filepath.WalkDir(root, func(path string, d
fs.DirEntry, err error) error {
+ if err != nil {
+ return nil
+ }
+ if d.IsDir() && d.Name() == FailedPartsDirName {
+ if removeErr := os.RemoveAll(path); removeErr
!= nil {
+ return removeErr
+ }
+ return fs.SkipDir
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func locateFailedPartsDir(module string) string {
+ errFound := errors.New("found")
+
+ // Search both data node paths and any temp directories (for liaison)
+ searchPaths := append([]string{}, dataPaths...)
+
+ // Also search common temp directory patterns for liaison write queues
+ tmpDirs, _ := filepath.Glob("/tmp/banyandb-test-*")
+ searchPaths = append(searchPaths, tmpDirs...)
+ tmpDirs2, _ := filepath.Glob(os.TempDir() + "/banyandb-test-*")
+ searchPaths = append(searchPaths, tmpDirs2...)
+
+ for _, root := range searchPaths {
+ moduleRoot := filepath.Join(root, module)
+ info, err := os.Stat(moduleRoot)
+ if err != nil || !info.IsDir() {
+ continue
+ }
+ var match string
+ walkErr := filepath.WalkDir(moduleRoot, func(path string, d
fs.DirEntry, err error) error {
+ if err != nil {
+ return nil
+ }
+ if !d.IsDir() || d.Name() != FailedPartsDirName {
+ return nil
+ }
+ entries, readErr := os.ReadDir(path)
+ if readErr == nil && len(entries) > 0 {
+ match = path
+ return errFound
+ }
+ return fs.SkipDir
+ })
+ if walkErr != nil && !errors.Is(walkErr, errFound) {
+ continue
+ }
+ if match != "" {
+ return match
+ }
+ }
+ return ""
+}