Copilot commented on code in PR #836:
URL:
https://github.com/apache/skywalking-banyandb/pull/836#discussion_r2497616376
##########
banyand/trace/syncer.go:
##########
@@ -226,49 +231,33 @@ func (tst *tsTable) needToSync(partsToSync []*part) bool {
return len(partsToSync) > 0 && len(nodes) > 0
}
-// executeSyncOperation performs the actual synchronization of parts to nodes.
-func (tst *tsTable) executeSyncOperation(partsToSync []*part, partIDsToSync
map[uint64]struct{}) error {
- sort.Slice(partsToSync, func(i, j int) bool {
- return partsToSync[i].partMetadata.ID <
partsToSync[j].partMetadata.ID
- })
+// syncPartsToNodesHelper syncs given parts to all nodes and returns failed
parts.
+// This helper is used for both initial sync and retry attempts.
+func (tst *tsTable) syncPartsToNodesHelper(
+ ctx context.Context, parts []*part, partIDsMap map[uint64]struct{},
+ nodes []string, sidxMap map[string]sidx.SIDX, releaseFuncs *[]func(),
+) ([]queue.FailedPart, error) {
+ var allFailedParts []queue.FailedPart
- ctx := context.Background()
- releaseFuncs := make([]func(), 0, len(partsToSync))
- defer func() {
- for _, release := range releaseFuncs {
- release()
- }
- }()
-
- nodes := tst.getNodes()
- if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
- }
- sidxMap := tst.getAllSidx()
for _, node := range nodes {
if tst.loopCloser != nil && tst.loopCloser.Closed() {
- return errClosed
+ return allFailedParts, errClosed
}
- // Prepare all streaming parts data
+
+ // Prepare streaming parts data
streamingParts := make([]queue.StreamingPartData, 0)
+
// Add sidx streaming parts
for name, sidx := range sidxMap {
- sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsToSync, tst.group, uint32(tst.shardID), name)
- if len(sidxStreamingParts) != len(partIDsToSync) {
- logger.Panicf("sidx streaming parts count
mismatch: %d != %d", len(sidxStreamingParts), len(partIDsToSync))
- return nil
- }
+ sidxStreamingParts, sidxReleaseFuncs :=
sidx.StreamingParts(partIDsMap, tst.group, uint32(tst.shardID), name)
streamingParts = append(streamingParts,
sidxStreamingParts...)
- releaseFuncs = append(releaseFuncs, sidxReleaseFuncs...)
- }
- if len(streamingParts) != len(partIDsToSync)*len(sidxMap) {
- logger.Panicf("streaming parts count mismatch: %d !=
%d", len(streamingParts), len(partIDsToSync)*len(sidxMap))
- return nil
+ *releaseFuncs = append(*releaseFuncs,
sidxReleaseFuncs...)
Review Comment:
Missing validation that sidx.StreamingParts returns the expected number of
parts. The original code at lines 253-255 (now removed) included panic checks
to verify `len(sidxStreamingParts) == len(partIDsToSync)` and
`len(streamingParts) == len(partIDsToSync)*len(sidxMap)`. These validation
checks should be retained or replaced with appropriate error handling to detect
mismatches early.
##########
banyand/queue/pub/chunked_sync.go:
##########
@@ -129,6 +144,9 @@ func (c *chunkedSyncClient) SyncStreamingParts(ctx
context.Context, parts []queu
result := finalResp.GetSyncResult()
success = result.Success
}
+ if !success && len(failedParts) < len(parts) {
+ success = true
+ }
Review Comment:
The logic for determining partial success is unclear. When `result.Success`
is false but some parts succeeded (fewer than all parts failed), setting
`success = true` changes the result semantics. This could mask actual sync
failures. Consider either keeping `Success = false` and letting callers check
`FailedParts`, or documenting that Success indicates 'at least one part
succeeded' rather than 'all parts succeeded'.
```suggestion
```
##########
banyand/internal/storage/failed_parts_handler.go:
##########
@@ -0,0 +1,409 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ iofs "io/fs"
+ "os"
+ "path/filepath"
+ "sort"
+ "strconv"
+ "sync"
+ "time"
+
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+)
+
+const (
+ // DefaultInitialRetryDelay is the initial delay before the first retry.
+ DefaultInitialRetryDelay = 1 * time.Second
+ // DefaultMaxRetries is the maximum number of retry attempts.
+ DefaultMaxRetries = 3
+ // DefaultBackoffMultiplier is the multiplier for exponential backoff.
+ DefaultBackoffMultiplier = 2
+ // FailedPartsDirName is the name of the directory for failed parts.
+ FailedPartsDirName = "failed-parts"
+)
+
+// FailedPartsHandler handles retry logic and filesystem fallback for failed
parts.
+type FailedPartsHandler struct {
+ fileSystem fs.FileSystem
+ l *logger.Logger
+ root string
+ failedPartsDir string
+ initialRetryDelay time.Duration
+ maxRetries int
+ backoffMultiplier int
+ maxTotalSizeBytes uint64
+ sizeMu sync.Mutex
+}
+
+// PartInfo contains information needed to retry or copy a failed part.
+type PartInfo struct {
+ SourcePath string
+ PartType string
+ PartID uint64
+}
+
+// NewFailedPartsHandler creates a new handler for failed parts.
+func NewFailedPartsHandler(fileSystem fs.FileSystem, root string, l
*logger.Logger, maxTotalSizeBytes uint64) *FailedPartsHandler {
+ failedPartsDir := filepath.Join(root, FailedPartsDirName)
+ fileSystem.MkdirIfNotExist(failedPartsDir, DirPerm)
+
+ return &FailedPartsHandler{
+ fileSystem: fileSystem,
+ root: root,
+ failedPartsDir: failedPartsDir,
+ l: l,
+ initialRetryDelay: DefaultInitialRetryDelay,
+ maxRetries: DefaultMaxRetries,
+ backoffMultiplier: DefaultBackoffMultiplier,
+ maxTotalSizeBytes: maxTotalSizeBytes,
+ }
+}
+
+// RetryFailedParts attempts to retry failed parts with exponential backoff.
+// Returns the list of permanently failed part IDs after all retries are
exhausted.
+func (h *FailedPartsHandler) RetryFailedParts(
+ ctx context.Context,
+ failedParts []queue.FailedPart,
+ partsInfo map[uint64][]*PartInfo,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) ([]uint64, error) {
+ if len(failedParts) == 0 {
+ return nil, nil
+ }
+
+ // Group failed parts by part ID
+ failedPartIDs := make(map[uint64]string)
+ for _, fp := range failedParts {
+ partID, err := strconv.ParseUint(fp.PartID, 10, 64)
+ if err != nil {
+ h.l.Warn().Err(err).Str("partID",
fp.PartID).Msg("failed to parse part ID, skipping")
+ continue
+ }
+ failedPartIDs[partID] = fp.Error
+ }
+
+ h.l.Warn().
+ Int("count", len(failedPartIDs)).
+ Msg("starting retry process for failed parts")
+
+ // Retry with exponential backoff
+ var stillFailing []uint64
+ for partID, errMsg := range failedPartIDs {
+ if err := h.retryPartWithBackoff(ctx, partID, errMsg,
syncFunc); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Msg("part failed after all retries")
+ stillFailing = append(stillFailing, partID)
+ }
+ }
+
+ // Copy permanently failed parts to failed-parts directory
+ var permanentlyFailed []uint64
+ for _, partID := range stillFailing {
+ partInfoList, exists := partsInfo[partID]
+ if !exists || len(partInfoList) == 0 {
+ h.l.Warn().Uint64("partID", partID).Msg("no part info
found for failed part")
+ permanentlyFailed = append(permanentlyFailed, partID)
+ continue
+ }
+
+ // Copy all parts with this ID (core + all SIDX parts)
+ allCopied := true
+ for _, partInfo := range partInfoList {
+ destSubDir := fmt.Sprintf("%016x_%s", partID,
partInfo.PartType)
+ if err := h.CopyToFailedPartsDir(partID,
partInfo.SourcePath, destSubDir); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("sourcePath", partInfo.SourcePath).
+ Msg("failed to copy part to
failed-parts directory")
+ allCopied = false
+ } else {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("partType", partInfo.PartType).
+ Str("destination",
filepath.Join(h.failedPartsDir, destSubDir)).
+ Msg("successfully copied failed part to
failed-parts directory")
+ }
+ }
+ if !allCopied {
+ h.l.Warn().Uint64("partID", partID).Msg("some parts
failed to copy")
+ }
+ permanentlyFailed = append(permanentlyFailed, partID)
+ }
+
+ return permanentlyFailed, nil
+}
+
+// retryPartWithBackoff retries a single part with exponential backoff.
+func (h *FailedPartsHandler) retryPartWithBackoff(
+ ctx context.Context,
+ partID uint64,
+ initialError string,
+ syncFunc func(partIDs []uint64) ([]queue.FailedPart, error),
+) error {
+ delay := h.initialRetryDelay
+
+ for attempt := 1; attempt <= h.maxRetries; attempt++ {
+ // Wait before retry
+ select {
+ case <-ctx.Done():
+ return fmt.Errorf("context canceled during retry: %w",
ctx.Err())
+ case <-time.After(delay):
+ }
+
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Int("maxRetries", h.maxRetries).
+ Dur("delay", delay).
+ Msg("retrying failed part")
+
+ // Attempt to sync just this part
+ newFailedParts, err := syncFunc([]uint64{partID})
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("retry attempt failed with error")
+ delay *= time.Duration(h.backoffMultiplier)
+ continue
+ }
+
+ // Check if this part still failed
+ partStillFailed := false
+ for _, fp := range newFailedParts {
+ fpID, _ := strconv.ParseUint(fp.PartID, 10, 64)
+ if fpID == partID {
+ partStillFailed = true
+ h.l.Warn().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Str("error", fp.Error).
+ Msg("retry attempt failed")
+ break
+ }
+ }
+
+ if !partStillFailed {
+ h.l.Info().
+ Uint64("partID", partID).
+ Int("attempt", attempt).
+ Msg("part successfully synced after retry")
+ return nil
+ }
+
+ // Exponential backoff for next attempt
+ delay *= time.Duration(h.backoffMultiplier)
+ }
+
+ return fmt.Errorf("part failed after %d retry attempts, initial error:
%s", h.maxRetries, initialError)
+}
+
+// CopyToFailedPartsDir copies a part to the failed-parts directory using hard
links.
+func (h *FailedPartsHandler) CopyToFailedPartsDir(partID uint64, sourcePath
string, destSubDir string) error {
+ destPath := filepath.Join(h.failedPartsDir, destSubDir)
+
+ if h.maxTotalSizeBytes > 0 {
+ h.sizeMu.Lock()
+ defer h.sizeMu.Unlock()
+ }
+
+ // Check if already exists
+ entries := h.fileSystem.ReadDir(h.failedPartsDir)
+ for _, entry := range entries {
+ if entry.Name() == destSubDir {
+ h.l.Info().
+ Uint64("partID", partID).
+ Str("destSubDir", destSubDir).
+ Msg("part already exists in failed-parts
directory")
+ return nil
+ }
+ }
+
+ if _, err := os.Stat(sourcePath); err != nil {
+ h.l.Error().
+ Err(err).
+ Uint64("partID", partID).
+ Str("sourcePath", sourcePath).
+ Msg("failed to stat source path before copying to
failed-parts directory")
+ return fmt.Errorf("failed to stat source path %s: %w",
sourcePath, err)
+ }
+
+ if h.maxTotalSizeBytes > 0 {
+ currentSize, err := calculatePathSize(h.failedPartsDir)
+ if err != nil {
+ return fmt.Errorf("failed to calculate current
failed-parts size: %w", err)
+ }
+ sourceSize, err := calculatePathSize(sourcePath)
+ if err != nil {
+ return fmt.Errorf("failed to calculate size of source
path %s: %w", sourcePath, err)
+ }
+
+ if currentSize+sourceSize > h.maxTotalSizeBytes {
+ type partDir struct {
+ modTime time.Time
+ name string
+ path string
+ size uint64
+ }
+
+ partDirs := make([]partDir, 0, len(entries))
+ for _, entry := range entries {
+ if !entry.IsDir() || entry.Name() == destSubDir
{
+ continue
+ }
+
+ dirPath := filepath.Join(h.failedPartsDir,
entry.Name())
+ info, err := os.Stat(dirPath)
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Str("path", dirPath).
+ Msg("failed to stat existing
failed part directory during eviction")
+ continue
+ }
+
+ dirSize, err := calculatePathSize(dirPath)
+ if err != nil {
+ h.l.Warn().
+ Err(err).
+ Str("path", dirPath).
+ Msg("failed to calculate size
for existing failed part directory during eviction")
+ continue
+ }
+
+ partDirs = append(partDirs, partDir{
+ name: entry.Name(),
+ path: dirPath,
+ modTime: info.ModTime(),
+ size: dirSize,
+ })
+ }
+
+ sort.Slice(partDirs, func(i, j int) bool {
+ if
partDirs[i].modTime.Equal(partDirs[j].modTime) {
+ return partDirs[i].name <
partDirs[j].name
+ }
+ return
partDirs[i].modTime.Before(partDirs[j].modTime)
+ })
+
+ for _, dir := range partDirs {
+ if currentSize+sourceSize <=
h.maxTotalSizeBytes {
+ break
+ }
+
+ if err := os.RemoveAll(dir.path); err != nil {
+ return fmt.Errorf("failed to remove
oldest failed part %s: %w", dir.name, err)
+ }
+
+ h.l.Info().
+ Str("removedFailedPartDir", dir.name).
+ Uint64("freedBytes", dir.size).
+ Msg("removed oldest failed part to
honor size limit")
+
+ if currentSize >= dir.size {
+ currentSize -= dir.size
+ } else {
Review Comment:
The safety check for underflow (lines 331-335) suggests potential
calculation issues. If currentSize can be less than dir.size, this indicates
the size tracking is inconsistent. Consider investigating why this edge case
exists and either fix the root cause or add a warning log when this condition
occurs.
```suggestion
} else {
h.l.Warn().
Uint64("currentSize",
currentSize).
Uint64("dirSize", dir.size).
Str("dirName", dir.name).
Msg("currentSize is less than
dir.size during failed-parts eviction; setting currentSize to 0. This indicates
a possible size tracking inconsistency.")
```
##########
banyand/queue/pub/chunked_sync_test.go:
##########
@@ -228,8 +228,209 @@ var _ = ginkgo.Describe("Chunked Sync Retry Mechanism",
func() {
gomega.Expect(mockServer.checksumMismatchCount).To(gomega.Equal(0))
})
})
+
+ ginkgo.Context("when a part fails mid-read during chunk building",
func() {
+ ginkgo.It("should discard incomplete chunk and continue with
non-failed parts", func() {
+ address := getAddress()
+
+ // Track chunks received to verify no partial data is
sent
+ var receivedChunks []*clusterv1.SyncPartRequest
+ var mu sync.Mutex
+
+ // Custom mock server that records chunks
+ s := grpc.NewServer()
+ mockSvc := &recordingMockServer{
+ receivedChunks: &receivedChunks,
+ mu: &mu,
+ }
+ clusterv1.RegisterChunkedSyncServiceServer(s, mockSvc)
+
+ lis, err := net.Listen("tcp", address)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+
+ go func() {
+ _ = s.Serve(lis)
+ }()
+ defer s.GracefulStop()
+
+ conn, err := grpc.NewClient(address,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ defer conn.Close()
+
+ client := &chunkedSyncClient{
+ conn: conn,
+ log:
logger.GetLogger("test-chunked-sync"),
+ chunkSize: 1024, // Small chunk size to force
multiple chunks
+ }
+
+ // Create test data: Part 1 (will succeed), Part 2
(will fail mid-read), Part 3 (will succeed)
+ part1Data := make([]byte, 800)
+ for i := range part1Data {
+ part1Data[i] = byte('A')
+ }
+
+ part2Data := make([]byte, 1500) // Large enough to span
multiple reads
+ for i := range part2Data {
+ part2Data[i] = byte('B')
+ }
+
+ part3Data := make([]byte, 600)
+ for i := range part3Data {
+ part3Data[i] = byte('C')
+ }
+
+ parts := []queue.StreamingPartData{
+ {
+ ID: 1,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+
createFileInfo("part1-file1.dat", part1Data),
+ },
+ },
+ {
+ ID: 2,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+ {
+ Name: "part2-file1.dat",
+ Reader: &failingReader{
+ data:
part2Data,
+ failAfter: 500,
// Fail after reading 500 bytes
+ },
+ },
+ },
+ },
+ {
+ ID: 3,
+ Group: "test-group",
+ ShardID: 1,
+ Topic: "stream_write",
+ Files: []queue.FileInfo{
+
createFileInfo("part3-file1.dat", part3Data),
+ },
+ },
+ }
+
+ ctx, cancel :=
context.WithTimeout(context.Background(), 30*time.Second)
+ defer cancel()
+
+ result, err := client.SyncStreamingParts(ctx, parts)
+ gomega.Expect(err).NotTo(gomega.HaveOccurred())
+ gomega.Expect(result).NotTo(gomega.BeNil())
+
+ // Verify that part 2 is marked as failed
+ gomega.Expect(result.FailedParts).To(gomega.HaveLen(1))
+
gomega.Expect(result.FailedParts[0].PartID).To(gomega.Equal("2"))
+
+ // Verify that the result is still considered
successful (partial success)
+ gomega.Expect(result.Success).To(gomega.BeTrue())
+
+ // Verify chunks received
+ mu.Lock()
+ defer mu.Unlock()
+
+ // Find where part 2 appears in chunks
+ var part2FirstChunkIdx, part2LastChunkIdx int = -1, -1
Review Comment:
[nitpick] Initializing both variables to -1 on the same line is less
readable than separate declarations. Consider using separate lines for clarity:
`part2FirstChunkIdx := -1` and `part2LastChunkIdx := -1`.
```suggestion
var part2FirstChunkIdx int = -1
var part2LastChunkIdx int = -1
```
##########
test/integration/distributed/sync_retry/sync_retry_test.go:
##########
@@ -0,0 +1,298 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package integration_sync_retry_test
+
+import (
+ "errors"
+ "fmt"
+ "io/fs"
+ "os"
+ "path/filepath"
+ "time"
+
+ g "github.com/onsi/ginkgo/v2"
+ "github.com/onsi/gomega"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+
+ "github.com/apache/skywalking-banyandb/api/data"
+ "github.com/apache/skywalking-banyandb/banyand/queue"
+ "github.com/apache/skywalking-banyandb/pkg/grpchelper"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ casesmeasuredata
"github.com/apache/skywalking-banyandb/test/cases/measure/data"
+ casesstreamdata
"github.com/apache/skywalking-banyandb/test/cases/stream/data"
+ casestracedata
"github.com/apache/skywalking-banyandb/test/cases/trace/data"
+)
+
+const (
+ DefaultMaxRetries = 3
+ FailedPartsDirName = "failed-parts"
+)
+
+var _ = g.Describe("Chunked sync failure handling", g.Ordered, func() {
+ g.BeforeEach(func() {
+ gomega.Expect(clearFailedPartsDirs()).To(gomega.Succeed())
+ })
+
+ g.AfterEach(func() {
+ queue.ClearChunkedSyncFailureInjector()
+ })
+
+ g.It("retries stream parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, we need 2 * (1 initial
+ 3 retries) = 8 failures
+ // to ensure parts permanently fail
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicStreamPartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-5 *
time.Minute).Truncate(time.Millisecond)
+ casesstreamdata.Write(conn, "sw", baseTime,
500*time.Millisecond)
+
+ g.By("waiting for injected stream sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicStreamPartSync.String())
+ }, flags.EventuallyTimeout,
500*time.Millisecond).Should(gomega.BeNumerically(">=",
2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for stream failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("stream")
+ }, flags.EventuallyTimeout,
time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("stream")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("stream failed parts recorded at %s", dir))
+ })
+
+ g.It("retries measure parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes, 2 shards (2 parts), and 3 max retries:
+ // 2 parts * 2 nodes * (1 initial + 3 retries) = 16 failures
needed
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicMeasurePartSync.String(): 2 * 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-24 *
time.Hour).Truncate(time.Millisecond)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+ g.By("waiting for injected measure sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicMeasurePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for measure failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("measure")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("measure")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("measure failed parts recorded at %s", dir))
+ })
+
+ g.It("retries trace parts and records failed segments", func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, we need 2 * (1 initial
+ 3 retries) = 8 failures
+ // to ensure parts permanently fail
+ injector, cleanup :=
withChunkedSyncFailureInjector(map[string]int{
+ data.TopicTracePartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ baseTime := time.Now().Add(-10 *
time.Minute).Truncate(time.Millisecond)
+ casestracedata.WriteToGroup(conn, "sw", "test-trace-group",
"sw", baseTime, 500*time.Millisecond)
+
+ g.By("waiting for injected trace sync failures")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicTracePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for trace failed-parts directory to be populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("trace")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("trace")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("trace failed parts recorded at %s", dir))
+ })
+
+ g.It("handles stream connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, inject connection-level
errors
+ // that cause syncPartsToNodesHelper to return an error
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicStreamPartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-15 *
time.Minute).Truncate(time.Millisecond)
+ casesstreamdata.Write(conn, "sw", baseTime,
500*time.Millisecond)
+
+ g.By("waiting for injected stream connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicStreamPartSync.String())
+ }, flags.EventuallyTimeout,
500*time.Millisecond).Should(gomega.BeNumerically(">=",
2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for stream failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("stream")
+ }, flags.EventuallyTimeout,
time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("stream")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("stream failed parts recorded at %s (from
connection errors)", dir))
+ })
+
+ g.It("handles measure connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes, 2 shards, and 3 max retries, inject
connection-level errors
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicMeasurePartSync.String(): 2 * 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-48 *
time.Hour).Truncate(time.Millisecond)
+ casesmeasuredata.Write(conn, "service_cpm_minute", "sw_metric",
"service_cpm_minute_data.json", baseTime, time.Minute)
+
+ g.By("waiting for injected measure connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicMeasurePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for measure failed-parts directory to be
populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("measure")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("measure")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("measure failed parts recorded at %s (from
connection errors)", dir))
+ })
+
+ g.It("handles trace connection errors and marks all parts as failed",
func() {
+ conn := dialLiaison()
+ defer conn.Close()
+
+ // With 2 data nodes and 3 max retries, inject connection-level
errors
+ injector, cleanup :=
withChunkedSyncErrorInjector(map[string]int{
+ data.TopicTracePartSync.String(): 2 *
(DefaultMaxRetries + 1),
+ })
+ defer cleanup()
+
+ // Use a different time range to create new parts
+ baseTime := time.Now().Add(-2 *
time.Hour).Truncate(time.Millisecond)
+ casestracedata.WriteToGroup(conn, "sw", "test-trace-group",
"sw", baseTime, 500*time.Millisecond)
+
+ g.By("waiting for injected trace connection errors")
+ gomega.Eventually(func() int {
+ return
injector.attemptsFor(data.TopicTracePartSync.String())
+ }, flags.EventuallyTimeout,
time.Second).Should(gomega.BeNumerically(">=", 2*(DefaultMaxRetries+1)))
+
+ g.By("waiting for trace failed-parts directory to be populated")
+ gomega.Eventually(func() string {
+ return locateFailedPartsDir("trace")
+ }, flags.EventuallyTimeout,
2*time.Second).ShouldNot(gomega.BeEmpty())
+ dir := locateFailedPartsDir("trace")
+ gomega.Expect(dir).NotTo(gomega.BeEmpty())
+ g.By(fmt.Sprintf("trace failed parts recorded at %s (from
connection errors)", dir))
+ })
+})
+
+func dialLiaison() *grpc.ClientConn {
+ var conn *grpc.ClientConn
+ var err error
+ // Retry connection with Eventually to handle liaison startup timing
+ gomega.Eventually(func() error {
+ conn, err = grpchelper.Conn(liaisonAddr, 10*time.Second,
grpc.WithTransportCredentials(insecure.NewCredentials()))
+ return err
+ }, 30*time.Second, time.Second).Should(gomega.Succeed())
+ return conn
+}
+
+func clearFailedPartsDirs() error {
+ for _, root := range dataPaths {
+ if err := filepath.WalkDir(root, func(path string, d
fs.DirEntry, err error) error {
+ if err != nil {
+ return nil
+ }
+ if d.IsDir() && d.Name() == FailedPartsDirName {
+ if removeErr := os.RemoveAll(path); removeErr
!= nil {
+ return removeErr
+ }
+ return fs.SkipDir
+ }
+ return nil
+ }); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func locateFailedPartsDir(module string) string {
+ errFound := errors.New("found")
+
+ // Search both data node paths and any temp directories (for liaison)
+ searchPaths := append([]string{}, dataPaths...)
+
+ // Also search common temp directory patterns for liaison write queues
+ tmpDirs, _ := filepath.Glob("/tmp/banyandb-test-*")
+ searchPaths = append(searchPaths, tmpDirs...)
+ tmpDirs2, _ := filepath.Glob(os.TempDir() + "/banyandb-test-*")
Review Comment:
Glob errors are silently ignored with `_`. If glob pattern evaluation fails
(e.g., malformed pattern), the search will silently skip those paths. Consider
logging or handling these errors, especially since this is a test helper that
should be reliable.
```suggestion
tmpDirs, err := filepath.Glob("/tmp/banyandb-test-*")
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to glob
/tmp/banyandb-test-*: %v\n", err)
}
searchPaths = append(searchPaths, tmpDirs...)
tmpDirs2, err := filepath.Glob(os.TempDir() + "/banyandb-test-*")
if err != nil {
fmt.Fprintf(os.Stderr, "Warning: failed to glob %s: %v\n",
os.TempDir()+"/banyandb-test-*", err)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]