This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch feature/snapshot-refactor
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 32616dbbf5335e3ada18ff6aea894883f9bcf86e
Author: Hongtao Gao <[email protected]>
AuthorDate: Sat Feb 7 08:30:10 2026 +0800

    feat(trace): enhance error handling during flush and merge operations
    
    - Introduced ReleaseFlushedParts and ReleaseNewPart methods to manage 
resource cleanup when flush or merge operations fail.
    - Updated merge and flush logic to ensure proper cleanup of trace and sidx 
parts on errors.
    - Improved test coverage for error scenarios to validate cleanup behavior.
---
 banyand/internal/sidx/introducer.go      |  20 +++++
 banyand/internal/sidx/merge.go           |   3 +
 banyand/internal/storage/segment_test.go |   8 +-
 banyand/trace/flusher.go                 |  13 ++++
 banyand/trace/merger.go                  |  41 +++++++++-
 banyand/trace/merger_test.go             | 127 +++++++++++++++++++++++++++++++
 6 files changed, 204 insertions(+), 8 deletions(-)

diff --git a/banyand/internal/sidx/introducer.go 
b/banyand/internal/sidx/introducer.go
index 5856828f9..d7b0b2ccc 100644
--- a/banyand/internal/sidx/introducer.go
+++ b/banyand/internal/sidx/introducer.go
@@ -34,6 +34,17 @@ func (i *FlusherIntroduction) Release() {
        releaseFlusherIntroduction(i)
 }
 
+// ReleaseFlushedParts releases all flushed part wrappers (closes file 
handles).
+// Call this when a flush is abandoned so the caller can remove part 
directories from disk before calling Release().
+func (i *FlusherIntroduction) ReleaseFlushedParts() {
+       for _, pw := range i.flushed {
+               pw.release()
+       }
+       for k := range i.flushed {
+               delete(i.flushed, k)
+       }
+}
+
 func (i *FlusherIntroduction) reset() {
        for k := range i.flushed {
                delete(i.flushed, k)
@@ -69,6 +80,15 @@ func (i *MergerIntroduction) Release() {
        releaseMergerIntroduction(i)
 }
 
+// ReleaseNewPart releases the newPart from this introduction (closes file 
handles).
+// Call this when the merge is abandoned so the part can be removed from disk 
by the caller.
+func (i *MergerIntroduction) ReleaseNewPart() {
+       if i.newPart != nil {
+               i.newPart.release()
+               i.newPart = nil
+       }
+}
+
 func (i *MergerIntroduction) reset() {
        for k := range i.merged {
                delete(i.merged, k)
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index d5b64f14a..a537e3881 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -45,6 +45,9 @@ func (s *sidx) Merge(closeCh <-chan struct{}, partIDtoMerge 
map[uint64]struct{},
                        partsToMerge = append(partsToMerge, pw)
                }
        }
+       if len(partsToMerge) == 0 {
+               return nil, nil
+       }
        if d := s.l.Debug(); d.Enabled() {
                if len(partsToMerge) != len(partIDtoMerge) {
                        d.Int("parts_to_merge_count", len(partsToMerge)).
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index b7cd892d5..cb331568c 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -618,11 +618,11 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t 
*testing.T) {
        assert.NotNil(t, segments[5].index, "Segment 5 should remain open")
 
        // Now delete expired segments
-       // Get the time range for segments 0, 1, and 2 (the expired ones)
+       // Use the same segment dates (UTC) used when creating segments 0, 1, 2 
to avoid timezone mismatch with time.Now()
        deletedCount := sc.deleteExpiredSegments([]string{
-               time.Now().AddDate(0, 0, -6).Format(dayFormat),
-               time.Now().AddDate(0, 0, -5).Format(dayFormat),
-               time.Now().AddDate(0, 0, -4).Format(dayFormat),
+               segmentDates[0].Format(dayFormat),
+               segmentDates[1].Format(dayFormat),
+               segmentDates[2].Format(dayFormat),
        })
        assert.Equal(t, int64(3), deletedCount, "Should have deleted 3 expired 
segments")
 
diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go
index 51d82dc5e..8ebe7baf0 100644
--- a/banyand/trace/flusher.go
+++ b/banyand/trace/flusher.go
@@ -236,6 +236,19 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan 
*flusherIntroduction)
                sidxFlusherIntroduced, err := sidxInstance.Flush(partIDMap)
                if err != nil {
                        tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush 
failed")
+                       for _, sidxIntro := range ind.sidxFlusherIntroduced {
+                               sidxIntro.ReleaseFlushedParts()
+                               sidxIntro.Release()
+                       }
+                       for partID := range partIDMap {
+                               for sidxName := range ind.sidxFlusherIntroduced 
{
+                                       tst.removeSidxPartOnFailure(sidxName, 
partID)
+                               }
+                               tst.removeSidxPartOnFailure(name, partID)
+                       }
+                       for _, pw := range ind.flushed {
+                               tst.removeTracePartOnFailure(pw)
+                       }
                        return
                }
                ind.sidxFlusherIntroduced[name] = sidxFlusherIntroduced
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 55a9d2c81..941ee9878 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -20,6 +20,7 @@ package trace
 import (
        "errors"
        "fmt"
+       "path/filepath"
        "sync/atomic"
        "time"
 
@@ -164,10 +165,19 @@ func (tst *tsTable) 
mergePartsThenSendIntroduction(creator snapshotCreator, part
        mergerIntroductionMap := make(map[string]*sidx.MergerIntroduction)
        for sidxName, sidxInstance := range tst.getAllSidx() {
                start = time.Now()
-               mergerIntroduction, err := sidxInstance.Merge(closeCh, 
partIDMap, newPartID)
-               if err != nil {
-                       tst.l.Warn().Err(err).Msg("sidx merge mem parts failed")
-                       return nil, err
+               mergerIntroduction, mergeErr := sidxInstance.Merge(closeCh, 
partIDMap, newPartID)
+               if mergeErr != nil {
+                       tst.l.Warn().Err(mergeErr).Msg("sidx merge mem parts 
failed")
+                       tst.removeTracePartOnFailure(newPart)
+                       for doneSidxName, intro := range mergerIntroductionMap {
+                               intro.ReleaseNewPart()
+                               tst.removeSidxPartOnFailure(doneSidxName, 
newPartID)
+                               intro.Release()
+                       }
+                       return nil, mergeErr
+               }
+               if mergerIntroduction == nil {
+                       continue
                }
                mergerIntroductionMap[sidxName] = mergerIntroduction
                elapsed = time.Since(start)
@@ -270,6 +280,29 @@ func (tst *tsTable) reserveSpace(parts []*partWrapper) 
uint64 {
 
 var errNoPartToMerge = fmt.Errorf("no part to merge")
 
+// removeTracePartOnFailure closes the part and removes its directory from 
disk.
+// Used when a merge fails after the trace part was created so the directory 
is not left as trash.
+func (tst *tsTable) removeTracePartOnFailure(pw *partWrapper) {
+       if pw == nil {
+               return
+       }
+       pathToRemove := pw.p.path
+       pw.decRef()
+       tst.fileSystem.MustRMAll(pathToRemove)
+}
+
+// sidxPartPath returns the on-disk path for a sidx part (same layout as sidx 
package).
+func sidxPartPath(traceRoot, sidxName string, partID uint64) string {
+       return filepath.Join(traceRoot, sidxDirName, sidxName, 
fmt.Sprintf("%016x", partID))
+}
+
+// removeSidxPartOnFailure removes a sidx part directory from disk.
+// Used when a merge fails after one or more sidx parts were created.
+func (tst *tsTable) removeSidxPartOnFailure(sidxName string, partID uint64) {
+       pathToRemove := sidxPartPath(tst.root, sidxName, partID)
+       tst.fileSystem.MustRMAll(pathToRemove)
+}
+
 func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan 
struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, 
error) {
        if len(parts) == 0 {
                return nil, errNoPartToMerge
diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go
index a01f2c470..f162b400e 100644
--- a/banyand/trace/merger_test.go
+++ b/banyand/trace/merger_test.go
@@ -19,6 +19,8 @@ package trace
 
 import (
        "errors"
+       "fmt"
+       "path/filepath"
        "reflect"
        "testing"
 
@@ -26,11 +28,14 @@ import (
        "github.com/google/go-cmp/cmp/cmpopts"
        "github.com/stretchr/testify/require"
 
+       "github.com/apache/skywalking-banyandb/banyand/internal/sidx"
        "github.com/apache/skywalking-banyandb/banyand/protector"
        "github.com/apache/skywalking-banyandb/pkg/convert"
        "github.com/apache/skywalking-banyandb/pkg/encoding"
        "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
        pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/run"
        "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
@@ -1329,3 +1334,125 @@ func Test_mergeParts(t *testing.T) {
                })
        }
 }
+
+// sidxMergeErrFake is a SIDX that returns an error from Merge to test cleanup 
on failure.
+type sidxMergeErrFake struct {
+       fakeSIDX
+}
+
+func (f *sidxMergeErrFake) Merge(<-chan struct{}, map[uint64]struct{}, uint64) 
(*sidx.MergerIntroduction, error) {
+       return nil, errors.New("sidx merge failed")
+}
+
+// Test_mergePartsThenSendIntroduction_cleansUpOnSidxMergeError verifies that 
when sidx.Merge
+// returns an error, the newly created trace part and any sidx parts are 
removed from disk.
+func Test_mergePartsThenSendIntroduction_cleansUpOnSidxMergeError(t 
*testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       // Create two file parts with IDs 10 and 11 so the merged part gets 
newPartID 1 (curPartID starts at 0).
+       var parts []*partWrapper
+       for i, ts := range []*traces{tsTS1, tsTS2} {
+               partID := uint64(10 + i)
+               mp := generateMemPart()
+               mp.mustInitFromTraces(ts)
+               mp.mustFlush(fileSystem, partPath(tmpPath, partID))
+               p := mustOpenFilePart(partID, tmpPath, fileSystem)
+               p.partMetadata.ID = partID
+               parts = append(parts, newPartWrapper(nil, p))
+               releaseMemPart(mp)
+       }
+       defer func() {
+               for _, pw := range parts {
+                       pw.decRef()
+               }
+       }()
+
+       closer := run.NewCloser(1)
+       defer closer.Done()
+       l := logger.GetLogger("trace-merger-test")
+       tst := &tsTable{
+               pm:         protector.Nop{},
+               fileSystem: fileSystem,
+               root:       tmpPath,
+               loopCloser: closer,
+               l:          l,
+               curPartID:  0,
+               sidxMap: map[string]sidx.SIDX{
+                       "idx1": &sidxMergeErrFake{fakeSIDX{}},
+               },
+       }
+
+       merged := make(map[uint64]struct{})
+       for _, pw := range parts {
+               merged[pw.ID()] = struct{}{}
+       }
+       merges := make(chan *mergerIntroduction, 1)
+       closeCh := make(chan struct{})
+
+       _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, 
parts, merged, merges, closeCh, mergeTypeFile)
+       require.Error(t, err)
+       require.Contains(t, err.Error(), "sidx merge failed")
+
+       // New part ID is 1 (curPartID was 0, then AddUint64(..., 1)).
+       newPartID := uint64(1)
+       tracePartPath := partPath(tmpPath, newPartID)
+       require.False(t, fileSystem.IsExist(tracePartPath), "trace part 
directory should be removed on sidx merge failure")
+
+       sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", 
fmt.Sprintf("%016x", newPartID))
+       require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory 
should not exist (no sidx part created before first Merge)")
+}
+
+// sidxFlushErrFake is a SIDX that returns an error from Flush to test cleanup 
on failure.
+type sidxFlushErrFake struct {
+       fakeSIDX
+}
+
+func (f *sidxFlushErrFake) Flush(map[uint64]struct{}) 
(*sidx.FlusherIntroduction, error) {
+       return nil, errors.New("sidx flush failed")
+}
+
+// Test_flush_cleansUpOnSidxFlushError verifies that when any sidx.Flush 
returns an error,
+// the newly flushed trace parts and any sidx parts are removed from disk.
+func Test_flush_cleansUpOnSidxFlushError(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       fileSystem := fs.NewLocalFileSystem()
+       partID := uint64(100)
+       mp := generateMemPart()
+       mp.mustInitFromTraces(tsTS1)
+       mp.partMetadata.ID = partID
+       pw := newPartWrapper(mp, openMemPart(mp))
+       pw.p.partMetadata.ID = partID
+       pw.incRef()
+       snp := &snapshot{
+               parts: []*partWrapper{pw},
+               epoch: 1,
+               ref:   1,
+       }
+       defer snp.decRef()
+
+       closer := run.NewCloser(1)
+       defer closer.Done()
+       l := logger.GetLogger("trace-flusher-test")
+       tst := &tsTable{
+               pm:         protector.Nop{},
+               fileSystem: fileSystem,
+               root:       tmpPath,
+               loopCloser: closer,
+               l:          l,
+               sidxMap: map[string]sidx.SIDX{
+                       "idx1": &sidxFlushErrFake{fakeSIDX{}},
+               },
+       }
+
+       flushCh := make(chan *flusherIntroduction, 1)
+       tst.flush(snp, flushCh)
+
+       tracePartPath := partPath(tmpPath, partID)
+       require.False(t, fileSystem.IsExist(tracePartPath), "trace part 
directory should be removed on sidx flush failure")
+       sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", 
fmt.Sprintf("%016x", partID))
+       require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory 
should be removed on sidx flush failure")
+}

Reply via email to