Copilot commented on code in PR #1118:
URL: 
https://github.com/apache/skywalking-banyandb/pull/1118#discussion_r3205269448


##########
banyand/trace/merger_durability_test.go:
##########
@@ -0,0 +1,174 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Test_partFlush_atomicMetadata_noTmpLeftover asserts that after a successful
+// part flush, no <file>.tmp sibling survives anywhere under the part 
directory.
+// This is the post-condition of mustWriteMetadata, mustWriteTraceIDFilter, and
+// mustWriteTagType all going through fileSystem.WriteAtomic.
+func Test_partFlush_atomicMetadata_noTmpLeftover(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       defer releaseMemPart(mp)
+       mp.mustInitFromTraces(ts)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+
+       require.NoError(t, filepath.Walk(tmpPath, func(p string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       return err
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               assert.Falsef(t, strings.HasSuffix(p, ".tmp"), "leftover tmp at 
%s", p)
+               return nil
+       }))
+}
+
+// Test_mustOpenFilePart_removesLeftoverTmp asserts that mustOpenFilePart
+// removes a stray <file>.tmp sibling whose matching final exists. This is the
+// post-rename cleanup hook into fs.CleanupLeftoverTmp. Trace writes three
+// metadata files via WriteAtomic — metadata.json, traceID.filter, tag.type —
+// so all three .tmp shapes must be cleaned up on open.
+func Test_mustOpenFilePart_removesLeftoverTmp(t *testing.T) {
+       cases := []string{"metadata.json.tmp", "traceID.filter.tmp", 
"tag.type.tmp"}
+       for _, strayName := range cases {
+               t.Run(strayName, func(t *testing.T) {
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       fileSystem := fs.NewLocalFileSystem()
+                       epoch := uint64(1)
+
+                       mp := generateMemPart()
+                       mp.mustInitFromTraces(ts)
+                       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+                       releaseMemPart(mp)
+
+                       pp := partPath(tmpPath, epoch)
+                       // Skip the case where the matching final file doesn't 
exist on
+                       // disk for this fixture — without a healthy <file>, 
the cleanup
+                       // helper is correct to leave the .tmp in place.
+                       final := strings.TrimSuffix(strayName, ".tmp")
+                       if _, err := os.Stat(filepath.Join(pp, final)); err != 
nil {
+                               t.Skipf("fixture did not produce %s; cleanup 
behavior covered by other cases", final)
+                       }
+                       stray := filepath.Join(pp, strayName)
+                       require.NoError(t, os.WriteFile(stray, 
[]byte("stale-leftover"), 0o600))
+
+                       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+                       defer p.close()
+
+                       _, statErr := os.Stat(stray)
+                       assert.Truef(t, os.IsNotExist(statErr), "expected %s 
removed, got err=%v", strayName, statErr)
+               })
+       }
+}
+
+// Test_merger_tornSpansBin_stillPanics is the reproducer described in the body
+// of apache/skywalking#13862. It forges the production-shape on-disk state —
+// a part directory whose metadata.json claims N blocks but whose spans.bin is
+// truncated — and feeds it to mergeParts. The expected outcome under the
+// merge-durability fix is *unchanged* from before the fix: the merger panics
+// with "offset must be equal to bytesRead", because that is BanyanDB's
+// canonical fail-fast contract for a corrupt source part.
+//
+// The fix prevents torn parts from being CREATED via a clean-crash path
+// (atomic metadata commit makes that impossible). It does NOT prevent the
+// panic when an externally-corrupt part is forced onto disk (rsync, manual
+// file copy, host disk failure, pre-fix-deployment leftovers). This test
+// guards against accidental panic suppression by future refactors.
+func Test_merger_tornSpansBin_stillPanics(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+
+       // Build two healthy parts with multiple blocks.
+       mp1 := generateMemPart()
+       mp1.mustInitFromTraces(ts)
+       mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+       releaseMemPart(mp1)
+
+       mp2 := generateMemPart()
+       mp2.mustInitFromTraces(ts)
+       mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+       releaseMemPart(mp2)
+
+       // Forge the torn shape: truncate part 1's spans.bin so primary.bin's
+       // block-metadata claims more bytes than spans.bin actually contains.
+       spansPath := filepath.Join(partPath(tmpPath, 1), "spans.bin")
+       stat, err := os.Stat(spansPath)
+       require.NoError(t, err)
+       require.Greater(t, stat.Size(), int64(0), "test fixture must produce 
non-empty spans.bin")
+       tornAt := stat.Size() - 1
+       require.NoError(t, os.Truncate(spansPath, tornAt))
+
+       // mustOpenFilePart succeeds because metadata.json is intact — the
+       // truncation is only in spans.bin.
+       p1 := mustOpenFilePart(1, tmpPath, fileSystem)
+       p1.partMetadata.ID = 1
+       p2 := mustOpenFilePart(2, tmpPath, fileSystem)
+       p2.partMetadata.ID = 2
+
+       tst := &tsTable{pm: protector.Nop{}, fileSystem: fileSystem, root: 
tmpPath}
+       closeCh := make(chan struct{})
+       defer close(closeCh)
+
+       defer func() {
+               r := recover()
+               require.NotNilf(t, r, "merger must panic on torn spans.bin 
(canonical fail-fast contract)")
+               // Accept any panic from the merger's read path. The exact 
message
+               // depends on which block boundary the truncation hit; valid 
shapes:
+               //   "offset N must be equal to bytesRead M"
+               //   "cannot read full data: ..."
+               //   "cannot read data: ..."
+               //   "cannot decode spans: ..."
+               got := fmt.Sprint(r)
+               t.Logf("captured panic: %s", got)
+               valid := strings.Contains(got, "offset") ||
+                       strings.Contains(got, "cannot read") ||
+                       strings.Contains(got, "cannot decode") ||
+                       strings.Contains(got, "EOF")
+               assert.Truef(t, valid, "panic message did not match canonical 
fail-fast shapes: %s", got)
+       }()
+
+       _, _ = tst.mergeParts(fileSystem, closeCh, []*partWrapper{
+               newPartWrapper(nil, p1),
+               newPartWrapper(nil, p2),
+       }, 99, tmpPath)
+

Review Comment:
   This test intentionally triggers a panic in mergeParts, but p1/p2 are opened 
via mustOpenFilePart (which opens multiple underlying files) and are never 
closed when the panic occurs. To avoid leaking file descriptors across the test 
process, close p1/p2 (or the wrappers via decRef) in the deferred recover() 
block.



##########
pkg/fs/local_file_system.go:
##########
@@ -228,10 +228,105 @@ func (fs *localFileSystem) Write(buffer []byte, name 
string, permission Mode) (i
                        Message: fmt.Sprintf("Flush file return error, file 
name: %s,error message: %s", name, err),
                }
        }
+       if syncErr := file.Sync(); syncErr != nil {
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync file return error, file 
name: %s, error message: %s", name, syncErr),
+               }
+       }
 
        return size, nil
 }
 
+// invokeTestHookAfterTmpFsync is a no-op in production. The test build
+// overrides it via pkg/fs/local_file_system_testhook_test.go to simulate a
+// crash between WriteAtomic's fsync(tmp) and rename.
+var invokeTestHookAfterTmpFsync = func() {}
+
+// WriteAtomic writes buffer to name+".tmp", fsyncs it, renames over name, and
+// fsyncs the parent directory. See FileSystem.WriteAtomic for semantics.
+//
+// On a re-tried call after a prior crash mid-rename, a stale ".tmp" sibling
+// from the prior attempt is overwritten via O_TRUNC. The "next-boot
+// inspection" semantics in the FileSystem interface docstring still hold:
+// CleanupLeftoverTmp on the next part open will remove the .tmp once a
+// matching final exists, and a .tmp without its final remains until the
+// next successful WriteAtomic to the same name.
+func (fs *localFileSystem) WriteAtomic(buffer []byte, name string, permission 
Mode) (int, error) {
+       tmpName := name + ".tmp"
+       parentDir := filepath.Dir(name)
+       file, err := os.OpenFile(tmpName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
+       if err != nil {
+               if os.IsPermission(err) {
+                       return 0, &FileSystemError{
+                               Code:    permissionError,
+                               Message: fmt.Sprintf("There is not enough 
permission, file name: %s, permission: %d, error message: %s", tmpName, 
permission, err),
+                       }
+               }
+               return 0, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create tmp file return error, 
file name: %s, error message: %s", tmpName, err),
+               }
+       }
+       size, writeErr := file.Write(buffer)
+       if writeErr != nil {
+               _ = file.Close()
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Write tmp file return error, file 
name: %s, error message: %s", tmpName, writeErr),
+               }
+       }
+       if syncErr := file.Sync(); syncErr != nil {
+               _ = file.Close()
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync tmp file return error, file 
name: %s, error message: %s", tmpName, syncErr),
+               }
+       }
+       if closeErr := file.Close(); closeErr != nil {
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    closeError,
+                       Message: fmt.Sprintf("Close tmp file return error, file 
name: %s, error message: %s", tmpName, closeErr),
+               }
+       }
+       invokeTestHookAfterTmpFsync()
+       if renameErr := os.Rename(tmpName, name); renameErr != nil {
+               // Do NOT remove the .tmp on rename failure: it is fully written
+               // and fsynced — the only durable record of what we tried to
+               // commit — and an operator may need it for forensic recovery.
+               // The next successful WriteAtomic call to the same name will
+               // overwrite it via O_TRUNC, so no orphan accumulates.
+               return size, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Rename %s -> %s return error: 
%s", tmpName, name, renameErr),
+               }
+       }
+       dir, err := os.Open(parentDir)
+       if err != nil {
+               return size, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Open parent dir for fsync, dir 
name: %s, error message: %s", parentDir, err),
+               }
+       }
+       if syncErr := dir.Sync(); syncErr != nil {
+               _ = dir.Close()
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync parent dir return error, dir 
name: %s, error message: %s", parentDir, syncErr),
+               }
+       }
+       if closeErr := dir.Close(); closeErr != nil {
+               return size, &FileSystemError{
+                       Code:    closeError,
+                       Message: fmt.Sprintf("Close parent dir return error, 
dir name: %s, error message: %s", parentDir, closeErr),
+               }
+       }

Review Comment:
   WriteAtomic always fsyncs the parent directory via os.Open(parentDir) + 
dir.Sync(). On Windows, the existing SyncPath implementation is a no-op 
(pkg/fs/local_file_system_windows.go), likely because directory fsync isn’t 
supported; calling dir.Sync here may fail and make WriteAtomic unusable on 
Windows builds. Consider routing the directory sync through a platform-aware 
helper (e.g., build-tagged syncDir) or otherwise skipping/handling directory 
fsync on Windows similarly to SyncPath, while still syncing on Linux/Darwin.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to