This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch feature/snapshot-refactor in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 32616dbbf5335e3ada18ff6aea894883f9bcf86e Author: Hongtao Gao <[email protected]> AuthorDate: Sat Feb 7 08:30:10 2026 +0800 feat(trace): enhance error handling during flush and merge operations - Introduced ReleaseFlushedParts and ReleaseNewPart methods to manage resource cleanup when flush or merge operations fail. - Updated merge and flush logic to ensure proper cleanup of trace and sidx parts on errors. - Improved test coverage for error scenarios to validate cleanup behavior. --- banyand/internal/sidx/introducer.go | 20 +++++ banyand/internal/sidx/merge.go | 3 + banyand/internal/storage/segment_test.go | 8 +- banyand/trace/flusher.go | 13 ++++ banyand/trace/merger.go | 41 +++++++++- banyand/trace/merger_test.go | 127 +++++++++++++++++++++++++++++++ 6 files changed, 204 insertions(+), 8 deletions(-) diff --git a/banyand/internal/sidx/introducer.go b/banyand/internal/sidx/introducer.go index 5856828f9..d7b0b2ccc 100644 --- a/banyand/internal/sidx/introducer.go +++ b/banyand/internal/sidx/introducer.go @@ -34,6 +34,17 @@ func (i *FlusherIntroduction) Release() { releaseFlusherIntroduction(i) } +// ReleaseFlushedParts releases all flushed part wrappers (closes file handles). +// Call this when a flush is abandoned so the caller can remove part directories from disk before calling Release(). +func (i *FlusherIntroduction) ReleaseFlushedParts() { + for _, pw := range i.flushed { + pw.release() + } + for k := range i.flushed { + delete(i.flushed, k) + } +} + func (i *FlusherIntroduction) reset() { for k := range i.flushed { delete(i.flushed, k) @@ -69,6 +80,15 @@ func (i *MergerIntroduction) Release() { releaseMergerIntroduction(i) } +// ReleaseNewPart releases the newPart from this introduction (closes file handles). +// Call this when the merge is abandoned so the part can be removed from disk by the caller. +func (i *MergerIntroduction) ReleaseNewPart() { + if i.newPart != nil { + i.newPart.release() + i.newPart = nil + } +} + func (i *MergerIntroduction) reset() { for k := range i.merged { delete(i.merged, k) diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go index d5b64f14a..a537e3881 100644 --- a/banyand/internal/sidx/merge.go +++ b/banyand/internal/sidx/merge.go @@ -45,6 +45,9 @@ func (s *sidx) Merge(closeCh <-chan struct{}, partIDtoMerge map[uint64]struct{}, partsToMerge = append(partsToMerge, pw) } } + if len(partsToMerge) == 0 { + return nil, nil + } if d := s.l.Debug(); d.Enabled() { if len(partsToMerge) != len(partIDtoMerge) { d.Int("parts_to_merge_count", len(partsToMerge)). diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index b7cd892d5..cb331568c 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -618,11 +618,11 @@ func TestDeleteExpiredSegmentsWithClosedSegments(t *testing.T) { assert.NotNil(t, segments[5].index, "Segment 5 should remain open") // Now delete expired segments - // Get the time range for segments 0, 1, and 2 (the expired ones) + // Use the same segment dates (UTC) used when creating segments 0, 1, 2 to avoid timezone mismatch with time.Now() deletedCount := sc.deleteExpiredSegments([]string{ - time.Now().AddDate(0, 0, -6).Format(dayFormat), - time.Now().AddDate(0, 0, -5).Format(dayFormat), - time.Now().AddDate(0, 0, -4).Format(dayFormat), + segmentDates[0].Format(dayFormat), + segmentDates[1].Format(dayFormat), + segmentDates[2].Format(dayFormat), }) assert.Equal(t, int64(3), deletedCount, "Should have deleted 3 expired segments") diff --git a/banyand/trace/flusher.go b/banyand/trace/flusher.go index 51d82dc5e..8ebe7baf0 100644 --- a/banyand/trace/flusher.go +++ b/banyand/trace/flusher.go @@ -236,6 +236,19 @@ func (tst *tsTable) flush(snapshot *snapshot, flushCh chan *flusherIntroduction) sidxFlusherIntroduced, err := sidxInstance.Flush(partIDMap) if err != nil { tst.l.Warn().Err(err).Str("sidx", name).Msg("sidx flush failed") + for _, sidxIntro := range ind.sidxFlusherIntroduced { + sidxIntro.ReleaseFlushedParts() + sidxIntro.Release() + } + for partID := range partIDMap { + for sidxName := range ind.sidxFlusherIntroduced { + tst.removeSidxPartOnFailure(sidxName, partID) + } + tst.removeSidxPartOnFailure(name, partID) + } + for _, pw := range ind.flushed { + tst.removeTracePartOnFailure(pw) + } return } ind.sidxFlusherIntroduced[name] = sidxFlusherIntroduced diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go index 55a9d2c81..941ee9878 100644 --- a/banyand/trace/merger.go +++ b/banyand/trace/merger.go @@ -20,6 +20,7 @@ package trace import ( "errors" "fmt" + "path/filepath" "sync/atomic" "time" @@ -164,10 +165,19 @@ func (tst *tsTable) mergePartsThenSendIntroduction(creator snapshotCreator, part mergerIntroductionMap := make(map[string]*sidx.MergerIntroduction) for sidxName, sidxInstance := range tst.getAllSidx() { start = time.Now() - mergerIntroduction, err := sidxInstance.Merge(closeCh, partIDMap, newPartID) - if err != nil { - tst.l.Warn().Err(err).Msg("sidx merge mem parts failed") - return nil, err + mergerIntroduction, mergeErr := sidxInstance.Merge(closeCh, partIDMap, newPartID) + if mergeErr != nil { + tst.l.Warn().Err(mergeErr).Msg("sidx merge mem parts failed") + tst.removeTracePartOnFailure(newPart) + for doneSidxName, intro := range mergerIntroductionMap { + intro.ReleaseNewPart() + tst.removeSidxPartOnFailure(doneSidxName, newPartID) + intro.Release() + } + return nil, mergeErr + } + if mergerIntroduction == nil { + continue } mergerIntroductionMap[sidxName] = mergerIntroduction elapsed = time.Since(start) @@ -270,6 +280,29 @@ func (tst *tsTable) reserveSpace(parts []*partWrapper) uint64 { var errNoPartToMerge = fmt.Errorf("no part to merge") +// removeTracePartOnFailure closes the part and removes its directory from disk. +// Used when a merge fails after the trace part was created so the directory is not left as trash. +func (tst *tsTable) removeTracePartOnFailure(pw *partWrapper) { + if pw == nil { + return + } + pathToRemove := pw.p.path + pw.decRef() + tst.fileSystem.MustRMAll(pathToRemove) +} + +// sidxPartPath returns the on-disk path for a sidx part (same layout as sidx package). +func sidxPartPath(traceRoot, sidxName string, partID uint64) string { + return filepath.Join(traceRoot, sidxDirName, sidxName, fmt.Sprintf("%016x", partID)) +} + +// removeSidxPartOnFailure removes a sidx part directory from disk. +// Used when a merge fails after one or more sidx parts were created. +func (tst *tsTable) removeSidxPartOnFailure(sidxName string, partID uint64) { + pathToRemove := sidxPartPath(tst.root, sidxName, partID) + tst.fileSystem.MustRMAll(pathToRemove) +} + func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, closeCh <-chan struct{}, parts []*partWrapper, partID uint64, root string) (*partWrapper, error) { if len(parts) == 0 { return nil, errNoPartToMerge diff --git a/banyand/trace/merger_test.go b/banyand/trace/merger_test.go index a01f2c470..f162b400e 100644 --- a/banyand/trace/merger_test.go +++ b/banyand/trace/merger_test.go @@ -19,6 +19,8 @@ package trace import ( "errors" + "fmt" + "path/filepath" "reflect" "testing" @@ -26,11 +28,14 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + "github.com/apache/skywalking-banyandb/banyand/internal/sidx" "github.com/apache/skywalking-banyandb/banyand/protector" "github.com/apache/skywalking-banyandb/pkg/convert" "github.com/apache/skywalking-banyandb/pkg/encoding" "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/logger" pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1" + "github.com/apache/skywalking-banyandb/pkg/run" "github.com/apache/skywalking-banyandb/pkg/test" ) @@ -1329,3 +1334,125 @@ func Test_mergeParts(t *testing.T) { }) } } + +// sidxMergeErrFake is a SIDX that returns an error from Merge to test cleanup on failure. +type sidxMergeErrFake struct { + fakeSIDX +} + +func (f *sidxMergeErrFake) Merge(<-chan struct{}, map[uint64]struct{}, uint64) (*sidx.MergerIntroduction, error) { + return nil, errors.New("sidx merge failed") +} + +// Test_mergePartsThenSendIntroduction_cleansUpOnSidxMergeError verifies that when sidx.Merge +// returns an error, the newly created trace part and any sidx parts are removed from disk. +func Test_mergePartsThenSendIntroduction_cleansUpOnSidxMergeError(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + // Create two file parts with IDs 10 and 11 so the merged part gets newPartID 1 (curPartID starts at 0). + var parts []*partWrapper + for i, ts := range []*traces{tsTS1, tsTS2} { + partID := uint64(10 + i) + mp := generateMemPart() + mp.mustInitFromTraces(ts) + mp.mustFlush(fileSystem, partPath(tmpPath, partID)) + p := mustOpenFilePart(partID, tmpPath, fileSystem) + p.partMetadata.ID = partID + parts = append(parts, newPartWrapper(nil, p)) + releaseMemPart(mp) + } + defer func() { + for _, pw := range parts { + pw.decRef() + } + }() + + closer := run.NewCloser(1) + defer closer.Done() + l := logger.GetLogger("trace-merger-test") + tst := &tsTable{ + pm: protector.Nop{}, + fileSystem: fileSystem, + root: tmpPath, + loopCloser: closer, + l: l, + curPartID: 0, + sidxMap: map[string]sidx.SIDX{ + "idx1": &sidxMergeErrFake{fakeSIDX{}}, + }, + } + + merged := make(map[uint64]struct{}) + for _, pw := range parts { + merged[pw.ID()] = struct{}{} + } + merges := make(chan *mergerIntroduction, 1) + closeCh := make(chan struct{}) + + _, err := tst.mergePartsThenSendIntroduction(snapshotCreatorMerger, parts, merged, merges, closeCh, mergeTypeFile) + require.Error(t, err) + require.Contains(t, err.Error(), "sidx merge failed") + + // New part ID is 1 (curPartID was 0, then AddUint64(..., 1)). + newPartID := uint64(1) + tracePartPath := partPath(tmpPath, newPartID) + require.False(t, fileSystem.IsExist(tracePartPath), "trace part directory should be removed on sidx merge failure") + + sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", fmt.Sprintf("%016x", newPartID)) + require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory should not exist (no sidx part created before first Merge)") +} + +// sidxFlushErrFake is a SIDX that returns an error from Flush to test cleanup on failure. +type sidxFlushErrFake struct { + fakeSIDX +} + +func (f *sidxFlushErrFake) Flush(map[uint64]struct{}) (*sidx.FlusherIntroduction, error) { + return nil, errors.New("sidx flush failed") +} + +// Test_flush_cleansUpOnSidxFlushError verifies that when any sidx.Flush returns an error, +// the newly flushed trace parts and any sidx parts are removed from disk. +func Test_flush_cleansUpOnSidxFlushError(t *testing.T) { + tmpPath, defFn := test.Space(require.New(t)) + defer defFn() + + fileSystem := fs.NewLocalFileSystem() + partID := uint64(100) + mp := generateMemPart() + mp.mustInitFromTraces(tsTS1) + mp.partMetadata.ID = partID + pw := newPartWrapper(mp, openMemPart(mp)) + pw.p.partMetadata.ID = partID + pw.incRef() + snp := &snapshot{ + parts: []*partWrapper{pw}, + epoch: 1, + ref: 1, + } + defer snp.decRef() + + closer := run.NewCloser(1) + defer closer.Done() + l := logger.GetLogger("trace-flusher-test") + tst := &tsTable{ + pm: protector.Nop{}, + fileSystem: fileSystem, + root: tmpPath, + loopCloser: closer, + l: l, + sidxMap: map[string]sidx.SIDX{ + "idx1": &sidxFlushErrFake{fakeSIDX{}}, + }, + } + + flushCh := make(chan *flusherIntroduction, 1) + tst.flush(snp, flushCh) + + tracePartPath := partPath(tmpPath, partID) + require.False(t, fileSystem.IsExist(tracePartPath), "trace part directory should be removed on sidx flush failure") + sidxPartPath := filepath.Join(tmpPath, sidxDirName, "idx1", fmt.Sprintf("%016x", partID)) + require.False(t, fileSystem.IsExist(sidxPartPath), "sidx part directory should be removed on sidx flush failure") +}
