This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f0033a6e fix(banyand): close merge durability gap (#13862) (#1118)
8f0033a6e is described below

commit 8f0033a6eddce6dcd42c4307f1cc4553cd9cd19b
Author: Gao Hongtao <[email protected]>
AuthorDate: Fri May 8 10:01:55 2026 +0800

    fix(banyand): close merge durability gap (#13862) (#1118)
    
    * fix(fs): propagate fdatasync error in seqWriter.Close
    
    The previous implementation discarded the SyncAndDropCache return value
    (`_ = SyncAndDropCache(...)`), so a fdatasync failure (ENOSPC, EIO,
    EBADF) silently lost data. The merge-write path used by trace, measure,
    stream, and sidx all flow through seqWriter.Close, so the silent loss
    applied to every part written to disk.
    
    Now the error is checked and returned wrapped in a FileSystemError. The
    existing bw.writers.MustClose() callers in each engine continue to
    panic on error — that turns silent data loss on disk failure into a
    visible panic operators can act on, consistent with BanyanDB's
    fail-fast contract.
    
    The skipFadvise=true branch falls back to file.Sync() with the same
    error propagation, so the no-cache path is now also durable.
    
    Note on ownership: seqWriter borrows the *os.File from LocalFile and
    does not own it; the fd is closed by LocalFile.Close(). seqWriter.Close
    intentionally does NOT close the fd.
    
    Part of apache/skywalking#13862.
    
    * fix(fs): fsync metadata file in localFileSystem.Write
    
    metadata.json, traceID.filter, tag.type, manifest.json (sidx), and
    similar atomic state files all flow through localFileSystem.Write,
    which previously did write+close with no fsync between. A crash after
    the parent dir was fsynced (so the directory entry was on disk) but
    before the inode's data was flushed left a metadata.json with zero or
    partial content even though the merger thought the write had completed.
    
    Add file.Sync() between file.Write and the deferred file.Close so the
    bytes hit disk before Write returns. The error from Sync is propagated
    as flushError to match the existing failure-mode style.
    
    Part of apache/skywalking#13862.
    
    * feat(fs): add WriteAtomic, MustFlushAtomic, and CleanupLeftoverTmp
    
    WriteAtomic does write-tmp -> file.Sync(tmp) -> rename -> dir.Sync().
    After a successful return the content is durable on a POSIX-compliant
    local filesystem. A crash between fsync(tmp) and rename leaves only the
    .tmp; a crash after rename leaves a healthy final.
    
    MustFlushAtomic mirrors MustFlush's panic-on-error wrapper for callers
    that already use the panic style (sidx).
    
    CleanupLeftoverTmp removes <file>.tmp siblings of healthy <file> finals
    on part open — safe post-rename leftover from a re-tried merge. A .tmp
    without its final is left alone so the engine's existing
    mustReadMetadata panic fires for operator intervention, per BanyanDB's
    canonical fail-fast contract.
    
    testHookAfterTmpFsync lives in pkg/fs/local_file_system_testhook_test.go
    (_test.go suffix) so production binaries do not carry the indirection.
    
    Engines (trace, measure, stream, sidx) adopt these in follow-up commits.
    
    Part of apache/skywalking#13862.
    
    * fix(trace): two-phase commit for part metadata
    
    Trace's three metadata writers — mustWriteMetadata (metadata.json),
    mustWriteTagType (tag.type), and mustWriteTraceIDFilter (traceID.filter)
    — now use fileSystem.WriteAtomic instead of fileSystem.Write. A clean
    kernel crash mid-flush or mid-merge can no longer leave a part dir where
    the metadata files claim bytes the data files do not contain: the
    rename is atomic, so either the final exists (durable) or it does not
    (operator-actionable, the existing mustReadMetadata panic at boot is
    the canonical signal).
    
    mustOpenFilePart calls fs.CleanupLeftoverTmp before mustReadMetadata to
    remove safe post-rename leftovers (stray <file>.tmp next to a healthy
    final from a re-tried merge or filesystem replay). A .tmp without its
    final is left in place so the existing panic-on-missing-metadata fires
    for operator intervention, per BanyanDB's canonical fail-fast pattern.
    
    Affects all three callers of trace's metadata helpers: the flush path
    (memPart.mustFlush), the merge path (mergeParts), and the streaming
    sync path.
    
    Part of apache/skywalking#13862.
    
    * fix(measure): two-phase commit for part metadata
    
    Same recipe as trace (apache/skywalking#13862): mustWriteMetadata at
    banyand/measure/part_metadata.go now uses fileSystem.WriteAtomic, and
    mustOpenFilePart at banyand/measure/part.go calls fs.CleanupLeftoverTmp
    to remove safe post-rename leftovers.
    
    A clean kernel crash mid-merge or mid-flush can no longer leave a part
    dir where metadata.json claims bytes the data files do not contain.
    
    Affects all three callers of measure's mustWriteMetadata: the flush
    path (memPart.mustFlush), the merge path, and the streaming sync path.
    
    * fix(stream): two-phase commit for part metadata
    
    Same recipe as trace and measure (apache/skywalking#13862):
    mustWriteMetadata at banyand/stream/part_metadata.go now uses
    fileSystem.WriteAtomic, and mustOpenFilePart at banyand/stream/part.go
    calls fs.CleanupLeftoverTmp to remove safe post-rename leftovers.
    
    Affects all three callers of stream's mustWriteMetadata: the flush
    path, the merge path, and the streaming sync path.
    
    * fix(sidx): two-phase commit for manifest.json
    
    Same recipe as trace, measure, and stream (apache/skywalking#13862),
    adjusted for sidx's panic-on-error helper style. mustWriteMetadata at
    banyand/internal/sidx/metadata.go now calls fs.MustFlushAtomic instead
    of fs.MustFlush. mustOpenPart at banyand/internal/sidx/part.go calls
    fs.CleanupLeftoverTmp before opening the part's files.
    
    The legacy meta.bin fallback in loadPartMetadata is unaffected — the
    cleanup helper matches <file>.tmp against <file> per-file, so a
    manifest.json.tmp without a manifest.json final stays in place for
    operator inspection (existing panic-on-missing-metadata still fires).
    
    Affects flush, merge, and streaming sync paths — all funnel through
    mustWriteMetadata.
    
    * docs(changes): note merge durability fix under 0.11.0
    
    CHANGES.md entry for the seven-commit merge durability fix landing as
    apache/skywalking#13862 (root cause for #13861).
    
    Skipping a separate cross-package integration test: each engine
    (trace, measure, stream, sidx) already has merger_durability_test.go
    that exercises the same on-disk shape using the engine's native
    fixtures. A cross-package integration test would require exposing
    testHookAfterTmpFsync outside pkg/fs, which would violate the
    test-only-isolation invariant established by Task 3.
    
    * test(trace): verify torn-part fail-fast contract is preserved
    
    Test_merger_tornSpansBin_stillPanics is the reproducer described in
    the body of apache/skywalking#13862. It forges the production-shape
    on-disk state (a part directory whose metadata.json claims more bytes
    than spans.bin actually contains) and feeds it to mergeParts.
    
    Expected outcome: mergeParts panics with one of the canonical
    fail-fast messages from pkg/fs (mustReadFull) — "offset must be
    equal to bytesRead", "cannot read data: unexpected EOF", "cannot
    decode spans", or similar.
    
    Captured panic in this run:
      "cannot read data: unexpected EOF" (from seqReader.mustReadFull)
    
    This guards against accidental panic suppression by future refactors:
    the durability fix prevents torn parts from being CREATED via a
    clean-crash path, but a torn part forged on disk (rsync, manual
    file copy, host disk failure, pre-fix-deployment leftovers) still
    triggers BanyanDB's canonical fail-fast contract — by design.
    
    * polish(fs,trace): address review nits on merge durability fix
    
    Five MINOR findings from code review of the apache/skywalking#13862
    fix:
    
    - WriteAtomic: drop unreachable os.IsExist branch (O_CREATE without
      O_EXCL never returns EEXIST). Hoist filepath.Dir(name) to a local
      parentDir variable. Document the re-tried-after-crash overwrite
      semantics in the godoc.
    - CleanupLeftoverTmp: skip symlinks via os.Lstat — BanyanDB part
      directories never contain symlinks; encountering one is unexpected.
      Log Remove failures at debug level for post-mortem visibility
      (previously errors were silently swallowed).
    - Test_mustOpenFilePart_removesLeftoverTmp: parameterize over all
      three trace .tmp shapes (metadata.json.tmp, traceID.filter.tmp,
      tag.type.tmp) instead of only metadata.json.tmp. Skips per-case
      when the fixture didn't produce the matching final.
    
    Behavior unchanged on the success and panic paths; only edge-case
    defenses and test coverage breadth.
    
    * fix(fs,trace,measure,stream,sidx): address Copilot review findings
    
    Three MAJOR findings from a follow-up review pass:
    
    1. WriteAtomic no longer removes the .tmp on rename failure. At that
       point the .tmp is fully written + fsynced + closed — the only
       durable record of the new content — and an operator may need it
       for forensic recovery. The next successful WriteAtomic to the same
       name overwrites it via O_TRUNC, so no orphan accumulates. Aligns
       with the project's no-deletion policy on potentially-useful
       artifacts.
    
    2. sidx loadPartMetadata no longer silently falls back to legacy
       meta.bin when manifest.json is missing AND manifest.json.tmp is
       present. That state means the most recent atomic commit crashed
       between fsync(tmp) and rename; falling back to a stale meta.bin
       would mask the incomplete-commit state. Now returns an error that
       propagates to mustOpenPart's panic — canonical fail-fast.
    
    3. Removed redundant SyncPath calls after metadata writes that go
       through WriteAtomic / MustFlushAtomic. The atomic helpers already
       fsync the parent directory after rename, and the trailing
       SyncPath was issuing a second dir fsync of the same inode. Trace
       was paying up to 4 dir fsyncs per finalize (3 metadata files +
       trailing SyncPath); measure/stream/sidx paid 2. Now 1 each.
       Affects mergeParts and memPart.mustFlush in trace/measure/stream,
       sidx mergeParts and SyncPartContext.Finish, and the per-engine
       streaming-sync FinishSync paths.
    
    Also switches the trace streaming-sync sidecars (traceID.filter,
    tag.type via write_data.go) from non-atomic fs.MustFlush to
    fs.MustFlushAtomic — same crash-safety as the merger/flush paths
    they parallel.
    
    Tests all pass: pkg/fs (21 specs), trace, measure, stream, sidx.
    
    * fix(trace,measure,stream): atomic write for seriesMetadata sidecar
    
    The seriesMetadata.bin sidecar in each engine's memPart.mustFlush is
    read at part open by mustOpenFilePart, so it has the same crash-safety
    profile as metadata.json: a crash during a non-atomic fs.MustFlush could
    leave a partial file that reads as corrupt and produces a mid-file
    unmarshal panic instead of fail-fast at metadata read time.
    
    Switch to fs.MustFlushAtomic to match the other metadata writers
    (mustWriteMetadata, mustWriteTagType, mustWriteTraceIDFilter,
    streaming-sync sidecars).
    
    Out of scope for the original Copilot review pass but found while
    auditing call sites; closes the last non-atomic metadata write site
    in the part-flush path. Part of apache/skywalking#13862.
    
    * fix(trace,measure,stream): atomic write for snapshot files
    
    mustWriteSnapshot in trace, measure, and stream now uses
    fs.MustFlushAtomic instead of the bespoke CreateLockFile + Write +
    Close + Rename + SyncPath sequence. The previous code was missing
    file.Sync() between Write and Close, so the snapshot's content was
    not durable when the rename made it visible.
    
    Why this matters: the orphan-part cleanup at startup
    (banyand/{trace,measure,stream}/tstable.go in initTSTable) deletes any
    part directory whose ID is NOT referenced by the loaded snapshot. The
    cleanup logic implicitly trusts the snapshot to be authoritative — but
    a non-durable snapshot can land on disk pointing at parts whose data
    isn't actually durable yet, or fail to land and lose references to
    healthy parts that then look orphaned.
    
    WriteAtomic does write-tmp -> file.Sync(tmp) -> rename -> dir.Sync,
    matching the same crash-safety contract now used for metadata.json,
    manifest.json, traceID.filter, tag.type, and seriesMetadata.bin. A
    crash leaves either the previous valid snapshot or only <name>.tmp,
    never a renamed-but-not-durable snapshot.
    
    This closes the last metadata-style write site that could undermine
    the orphan-cleanup invariant. Part of apache/skywalking#13862.
    
    * fix(fs,measure,trace): address PR #1118 review feedback
    
    Three issues from CI lint and remote Copilot review on 
apache/skywalking-banyandb#1118:
    
    1. CI lint failure: govet shadow at banyand/measure/merger_test.go:1460.
       The local "dps" shadowed the package-level "dps" defined in
       part_test.go:130. The shadow was latent until the new
       merger_durability_test.go added another package-level reference,
       which made govet's shadow check fire on the local. Renamed the local
       to partDPs.
    
    2. WriteAtomic's parent-directory fsync was not platform-aware.
       os.File.Sync on a directory is a real operation on ext4/xfs/apfs,
       but Windows does not expose directory fsync the way POSIX does
       (the existing SyncPath on Windows is a no-op for the same reason).
       Extracted the dir-sync into a per-platform syncDir helper:
       - linux/darwin: open the directory, file.Sync, close, return error
       - windows: no-op returning nil
       WriteAtomic now calls syncDir(parentDir) instead of inlining the
       open+sync+close. Cross-compiled clean on both windows and darwin.
    
    3. File-descriptor leak in Test_merger_tornSpansBin_stillPanics.
       p1 and p2 are opened via mustOpenFilePart (each opens many files)
       and were never closed when the panic fired. Added p1.close() and
       p2.close() to the deferred recover so the fds are released even on
       the panic path.
    
    All target tests still green (pkg/fs 21 specs, trace, measure 29.4s);
    cross-compile clean on linux/darwin/windows.
---
 CHANGES.md                                      |   1 +
 banyand/internal/sidx/merge.go                  |   3 +-
 banyand/internal/sidx/merger_durability_test.go |  95 +++++++++++++
 banyand/internal/sidx/metadata.go               |   2 +-
 banyand/internal/sidx/part.go                   |  19 ++-
 banyand/measure/merger.go                       |   3 +-
 banyand/measure/merger_durability_test.go       |  82 +++++++++++
 banyand/measure/merger_test.go                  |   4 +-
 banyand/measure/part.go                         |  13 +-
 banyand/measure/part_metadata.go                |   2 +-
 banyand/measure/tstable.go                      |  27 +---
 banyand/measure/write_data.go                   |   3 +-
 banyand/stream/merger.go                        |   3 +-
 banyand/stream/merger_durability_test.go        |  80 +++++++++++
 banyand/stream/part.go                          |  13 +-
 banyand/stream/part_metadata.go                 |   2 +-
 banyand/stream/tstable.go                       |  27 +---
 banyand/stream/write_data.go                    |   3 +-
 banyand/trace/merger.go                         |   4 +-
 banyand/trace/merger_durability_test.go         | 179 ++++++++++++++++++++++++
 banyand/trace/part.go                           |  13 +-
 banyand/trace/part_metadata.go                  |   6 +-
 banyand/trace/tstable.go                        |  27 +---
 banyand/trace/write_data.go                     |   7 +-
 pkg/fs/file_system.go                           |  59 ++++++++
 pkg/fs/local_file_system.go                     |  99 ++++++++++++-
 pkg/fs/local_file_system_darwin.go              |  17 +++
 pkg/fs/local_file_system_linux.go               |  17 +++
 pkg/fs/local_file_system_test.go                |  62 ++++++++
 pkg/fs/local_file_system_testhook_test.go       |  29 ++++
 pkg/fs/local_file_system_windows.go             |   6 +
 31 files changed, 811 insertions(+), 96 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 6ac94f618..846197836 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,7 @@ Release Notes.
 
 ### Bug Fixes
 
+- Close BanyanDB merge write-path durability gap that allowed torn parts to be 
created by a crash between data write and metadata commit. Metadata files 
(`metadata.json` for trace/measure/stream, `manifest.json` for sidx, plus 
`traceID.filter` and `tag.type`) now go through a new `WriteAtomic` (write-tmp 
+ fsync + rename + fsync-dir) sequence; data writers (`seqWriter.Close`, 
`localFileSystem.Write`) now propagate fdatasync errors instead of silently 
dropping them. `mustOpenFilePart` / ` [...]
 - Fix bydbctl command tests using global stdout capture, which caused 
race-enabled runs to corrupt captured command output.
 - Use `topic` instead of `session_id` as the Prometheus label on liaison 
`queue_sub` chunk-ordering counters to avoid unbounded metric cardinality.
 - Fix flaky trace query filtering caused by non-deterministic sidx tag 
ordering and add consistency checks for integration query cases.
diff --git a/banyand/internal/sidx/merge.go b/banyand/internal/sidx/merge.go
index 9c3c24d35..83c18f3a4 100644
--- a/banyand/internal/sidx/merge.go
+++ b/banyand/internal/sidx/merge.go
@@ -129,7 +129,8 @@ func (s *sidx) mergeParts(fileSystem fs.FileSystem, closeCh 
<-chan struct{}, par
                pm.MaxTimestamp = &maxVal
        }
        pm.mustWriteMetadata(fileSystem, dstPath)
-       fileSystem.SyncPath(dstPath)
+       // No SyncPath: mustWriteMetadata goes through MustFlushAtomic which
+       // already fsyncs the parent directory after rename.
        p := mustOpenPart(partID, dstPath, fileSystem)
 
        return newPartWrapper(nil, p), nil
diff --git a/banyand/internal/sidx/merger_durability_test.go 
b/banyand/internal/sidx/merger_durability_test.go
new file mode 100644
index 000000000..ed29ce6e0
--- /dev/null
+++ b/banyand/internal/sidx/merger_durability_test.go
@@ -0,0 +1,95 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package sidx
+
+import (
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+)
+
+// minimalTestElements builds a tiny elements collection sufficient to flush a
+// non-empty memPart to disk.
+func minimalTestElements(t *testing.T) []testElement {
+       t.Helper()
+       return []testElement{
+               {seriesID: common.SeriesID(1), userKey: 1, data: []byte("a")},
+               {seriesID: common.SeriesID(1), userKey: 2, data: []byte("b")},
+       }
+}
+
+// Test_partFlush_atomicManifest_noTmpLeftover asserts that after a successful
+// part flush, no <file>.tmp sibling survives anywhere under the part 
directory.
+// Sidx writes manifest.json (not metadata.json), via fs.MustFlushAtomic.
+func Test_partFlush_atomicManifest_noTmpLeftover(t *testing.T) {
+       tempDir := t.TempDir()
+       testFS := fs.NewLocalFileSystem()
+
+       elements := createTestElements(minimalTestElements(t))
+       defer releaseElements(elements)
+
+       mp := GenerateMemPart()
+       defer ReleaseMemPart(mp)
+       mp.mustInitFromElements(elements)
+
+       partDir := filepath.Join(tempDir, "part_1")
+       mp.mustFlush(testFS, partDir)
+
+       require.NoError(t, filepath.Walk(tempDir, func(p string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       return err
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               assert.Falsef(t, strings.HasSuffix(p, ".tmp"), "leftover tmp at 
%s", p)
+               return nil
+       }))
+}
+
+// Test_mustOpenPart_removesLeftoverTmp asserts that mustOpenPart removes a
+// stray <file>.tmp sibling whose matching final exists.
+func Test_mustOpenPart_removesLeftoverTmp(t *testing.T) {
+       tempDir := t.TempDir()
+       testFS := fs.NewLocalFileSystem()
+
+       elements := createTestElements(minimalTestElements(t))
+       defer releaseElements(elements)
+
+       mp := GenerateMemPart()
+       mp.mustInitFromElements(elements)
+       partDir := filepath.Join(tempDir, "part_1")
+       mp.mustFlush(testFS, partDir)
+       ReleaseMemPart(mp)
+
+       stray := filepath.Join(partDir, "manifest.json.tmp")
+       require.NoError(t, os.WriteFile(stray, []byte("stale-leftover"), 0o600))
+
+       p := mustOpenPart(1, partDir, testFS)
+       defer p.close()
+
+       _, statErr := os.Stat(stray)
+       assert.Truef(t, os.IsNotExist(statErr), "expected stray .tmp removed, 
got err=%v", statErr)
+}
diff --git a/banyand/internal/sidx/metadata.go 
b/banyand/internal/sidx/metadata.go
index 40b2cf521..726a77188 100644
--- a/banyand/internal/sidx/metadata.go
+++ b/banyand/internal/sidx/metadata.go
@@ -455,5 +455,5 @@ func (pm *partMetadata) mustWriteMetadata(fileSystem 
fs.FileSystem, partPath str
        if err != nil {
                logger.GetLogger().Panic().Err(err).Str("path", 
partPath).Msg("failed to marshal part metadata")
        }
-       fs.MustFlush(fileSystem, manifestData, filepath.Join(partPath, 
manifestFilename), storage.FilePerm)
+       fs.MustFlushAtomic(fileSystem, manifestData, filepath.Join(partPath, 
manifestFilename), storage.FilePerm)
 }
diff --git a/banyand/internal/sidx/part.go b/banyand/internal/sidx/part.go
index f0372af6a..80a03b4af 100644
--- a/banyand/internal/sidx/part.go
+++ b/banyand/internal/sidx/part.go
@@ -20,6 +20,7 @@ package sidx
 import (
        "encoding/json"
        "fmt"
+       "os"
        "path"
        "path/filepath"
        "sort"
@@ -112,6 +113,12 @@ func mustOpenPart(partID uint64, path string, fileSystem 
fs.FileSystem) *part {
                fileSystem: fileSystem,
        }
 
+       // Remove safe post-rename leftovers (<file>.tmp next to a healthy 
<file>)
+       // before opening any of the part's files. A .tmp without its final 
stays
+       // in place so the existing panic-on-missing-metadata fires for operator
+       // intervention.
+       fs.CleanupLeftoverTmp(fileSystem, path)
+
        // Open standard files.
        p.primary = mustOpenReader(filepath.Join(path, primaryFilename), 
fileSystem)
        p.data = mustOpenReader(filepath.Join(path, dataFilename), fileSystem)
@@ -149,6 +156,15 @@ func (p *part) loadPartMetadata() error {
                return nil
        }
 
+       // If a stranded manifest.json.tmp exists without its final, the most
+       // recent atomic commit crashed between fsync(tmp) and rename. Falling
+       // back to meta.bin would silently hide the incomplete-commit state
+       // from the operator. Surface it as an error instead — the canonical
+       // fail-fast contract.
+       if _, statErr := os.Stat(manifestPath + ".tmp"); statErr == nil {
+               return fmt.Errorf("found stranded %s.tmp without %s — 
incomplete atomic commit, manual recovery required", manifestPath, manifestPath)
+       }
+
        // Fallback to meta.bin for backward compatibility
        metaData, err := p.fileSystem.Read(filepath.Join(p.path, metaFilename))
        if err != nil {
@@ -906,7 +922,8 @@ func (spc *SyncPartContext) Finish() string {
        }
        if spc.fileSystem != nil && spc.partPath != "" {
                spc.partMeta.mustWriteMetadata(spc.fileSystem, spc.partPath)
-               spc.fileSystem.SyncPath(spc.partPath)
+               // No SyncPath: mustWriteMetadata goes through MustFlushAtomic 
which
+               // already fsyncs the parent directory after rename.
        }
        p := spc.partPath
        spc.partPath = ""
diff --git a/banyand/measure/merger.go b/banyand/measure/merger.go
index 206dee25d..9828c10d8 100644
--- a/banyand/measure/merger.go
+++ b/banyand/measure/merger.go
@@ -260,7 +260,8 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
                return nil, err
        }
        pm.mustWriteMetadata(fileSystem, dstPath)
-       fileSystem.SyncPath(dstPath)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename.
        p := mustOpenFilePart(partID, root, fileSystem)
        return newPartWrapper(nil, p), nil
 }
diff --git a/banyand/measure/merger_durability_test.go 
b/banyand/measure/merger_durability_test.go
new file mode 100644
index 000000000..7ff12d92a
--- /dev/null
+++ b/banyand/measure/merger_durability_test.go
@@ -0,0 +1,82 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package measure
+
+import (
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Test_partFlush_atomicMetadata_noTmpLeftover asserts that after a successful
+// part flush, no <file>.tmp sibling survives anywhere under the part 
directory.
+// This is the post-condition of mustWriteMetadata going through
+// fileSystem.WriteAtomic.
+func Test_partFlush_atomicMetadata_noTmpLeftover(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       defer releaseMemPart(mp)
+       mp.mustInitFromDataPoints(dps)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+
+       require.NoError(t, filepath.Walk(tmpPath, func(p string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       return err
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               assert.Falsef(t, strings.HasSuffix(p, ".tmp"), "leftover tmp at 
%s", p)
+               return nil
+       }))
+}
+
+// Test_mustOpenFilePart_removesLeftoverTmp asserts that mustOpenFilePart
+// removes a stray <file>.tmp sibling whose matching final exists.
+func Test_mustOpenFilePart_removesLeftoverTmp(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       mp.mustInitFromDataPoints(dps)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+       releaseMemPart(mp)
+
+       pp := partPath(tmpPath, epoch)
+       stray := filepath.Join(pp, "metadata.json.tmp")
+       require.NoError(t, os.WriteFile(stray, []byte("stale-leftover"), 0o600))
+
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       _, statErr := os.Stat(stray)
+       assert.Truef(t, os.IsNotExist(statErr), "expected stray .tmp removed, 
got err=%v", statErr)
+}
diff --git a/banyand/measure/merger_test.go b/banyand/measure/merger_test.go
index 7cdf97b98..a6727781a 100644
--- a/banyand/measure/merger_test.go
+++ b/banyand/measure/merger_test.go
@@ -1457,9 +1457,9 @@ func Test_mergeParts_fileBased(t *testing.T) {
                fileSystem := fs.NewLocalFileSystem()
                for i := 0; i < numParts; i++ {
                        startTS := int64(i)*totalPerPart + 1
-                       dps := generateDatapointsWithMultipleBlocks(startTS, 
countPerBlock, blocksPerPart)
+                       partDPs := 
generateDatapointsWithMultipleBlocks(startTS, countPerBlock, blocksPerPart)
                        mp := generateMemPart()
-                       mp.mustInitFromDataPoints(dps)
+                       mp.mustInitFromDataPoints(partDPs)
                        mp.mustFlush(fileSystem, partPath(tmpPath, uint64(i+1)))
                        filePW := newPartWrapper(nil, 
mustOpenFilePart(uint64(i+1), tmpPath, fileSystem))
                        filePW.p.partMetadata.ID = uint64(i + 1)
diff --git a/banyand/measure/part.go b/banyand/measure/part.go
index 844713fb1..93f5b33c5 100644
--- a/banyand/measure/part.go
+++ b/banyand/measure/part.go
@@ -218,14 +218,18 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesMetadataFilenameExt), storage.FilePerm)
        }
 
-       // Flush series metadata if available
+       // Flush series metadata if available. Goes through MustFlushAtomic
+       // because the file is read at part open by mustOpenFilePart and a
+       // crash during a non-atomic write could leave a partial file that
+       // reads as corrupt.
        if len(mp.seriesMetadata.Buf) > 0 {
-               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+               fs.MustFlushAtomic(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
        }
 
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
-
-       fileSystem.SyncPath(path)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename, covering the dirent changes
+       // for all data files written above.
 }
 
 func uncompressedDataPointSizeBytes(index int, dps *dataPoints) uint64 {
@@ -310,6 +314,7 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
        partPath := partPath(root, id)
        p.path = partPath
        p.fileSystem = fileSystem
+       fs.CleanupLeftoverTmp(fileSystem, partPath)
        p.partMetadata.mustReadMetadata(fileSystem, partPath)
        p.partMetadata.ID = id
 
diff --git a/banyand/measure/part_metadata.go b/banyand/measure/part_metadata.go
index dbd41b5f6..e7458c07d 100644
--- a/banyand/measure/part_metadata.go
+++ b/banyand/measure/part_metadata.go
@@ -97,7 +97,7 @@ func (pm *partMetadata) mustWriteMetadata(fileSystem 
fs.FileSystem, partPath str
                return
        }
        metadataPath := filepath.Join(partPath, metadataFilename)
-       n, err := fileSystem.Write(metadata, metadataPath, storage.FilePerm)
+       n, err := fileSystem.WriteAtomic(metadata, metadataPath, 
storage.FilePerm)
        if err != nil {
                logger.Panicf("cannot write metadata: %s", err)
                return
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 34e989601..4e3210d20 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -259,27 +259,12 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       snapshotTempPath := snapshotPath + ".tmp"
-       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
-       if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
-       }
-       n, err := lf.Write(data)
-       if err != nil {
-               _ = lf.Close()
-               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
-       }
-       if n != len(data) {
-               _ = lf.Close()
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
-       }
-       if closeErr := lf.Close(); closeErr != nil {
-               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
-       }
-       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
-               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
-       }
-       tst.fileSystem.SyncPath(tst.root)
+       // WriteAtomic does write-tmp -> file.Sync -> rename -> dir.Sync, so a
+       // crash leaves either the previous valid snapshot or only <name>.tmp,
+       // never a renamed-but-not-durable snapshot. The orphan-cleanup logic
+       // at startup depends on snapshots being authoritative; this closes the
+       // last metadata-style write site that could undermine that assumption.
+       fs.MustFlushAtomic(tst.fileSystem, data, snapshotPath, storage.FilePerm)
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index 2ecf535bf..c37b52e60 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -57,7 +57,8 @@ func (s *syncPartContext) FinishSync() error {
        s.releaseCoreWriters()
 
        s.partMeta.mustWriteMetadata(s.fileSystem, s.partPath)
-       s.fileSystem.SyncPath(s.partPath)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename.
 
        s.tsTable.mustAddFilePart(s.partID)
        s.partPath = ""
diff --git a/banyand/stream/merger.go b/banyand/stream/merger.go
index ba7486878..934eb43cb 100644
--- a/banyand/stream/merger.go
+++ b/banyand/stream/merger.go
@@ -259,7 +259,8 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
                return nil, err
        }
        pm.mustWriteMetadata(fileSystem, dstPath)
-       fileSystem.SyncPath(dstPath)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename.
        p := mustOpenFilePart(partID, root, fileSystem)
 
        return newPartWrapper(nil, p), nil
diff --git a/banyand/stream/merger_durability_test.go 
b/banyand/stream/merger_durability_test.go
new file mode 100644
index 000000000..e86d67f0a
--- /dev/null
+++ b/banyand/stream/merger_durability_test.go
@@ -0,0 +1,80 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Test_partFlush_atomicMetadata_noTmpLeftover asserts that after a successful
+// part flush, no <file>.tmp sibling survives anywhere under the part 
directory.
+func Test_partFlush_atomicMetadata_noTmpLeftover(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       defer releaseMemPart(mp)
+       mp.mustInitFromElements(es)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+
+       require.NoError(t, filepath.Walk(tmpPath, func(p string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       return err
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               assert.Falsef(t, strings.HasSuffix(p, ".tmp"), "leftover tmp at 
%s", p)
+               return nil
+       }))
+}
+
+// Test_mustOpenFilePart_removesLeftoverTmp asserts that mustOpenFilePart
+// removes a stray <file>.tmp sibling whose matching final exists.
+func Test_mustOpenFilePart_removesLeftoverTmp(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       mp.mustInitFromElements(es)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+       releaseMemPart(mp)
+
+       pp := partPath(tmpPath, epoch)
+       stray := filepath.Join(pp, "metadata.json.tmp")
+       require.NoError(t, os.WriteFile(stray, []byte("stale-leftover"), 0o600))
+
+       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+       defer p.close()
+
+       _, statErr := os.Stat(stray)
+       assert.Truef(t, os.IsNotExist(statErr), "expected stray .tmp removed, 
got err=%v", statErr)
+}
diff --git a/banyand/stream/part.go b/banyand/stream/part.go
index b727f5d7a..7ee11504c 100644
--- a/banyand/stream/part.go
+++ b/banyand/stream/part.go
@@ -223,14 +223,18 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tfh.Buf, filepath.Join(path, 
name+tagFamiliesFilterFilenameExt), storage.FilePerm)
        }
 
-       // Flush series metadata if available
+       // Flush series metadata if available. Goes through MustFlushAtomic
+       // because the file is read at part open by mustOpenFilePart and a
+       // crash during a non-atomic write could leave a partial file that
+       // reads as corrupt.
        if len(mp.seriesMetadata.Buf) > 0 {
-               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+               fs.MustFlushAtomic(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
        }
 
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
-
-       fileSystem.SyncPath(path)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename, covering the dirent changes
+       // for all data files written above.
 }
 
 func uncompressedElementSizeBytes(index int, es *elements) uint64 {
@@ -311,6 +315,7 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
        partPath := partPath(root, id)
        p.path = partPath
        p.fileSystem = fileSystem
+       fs.CleanupLeftoverTmp(fileSystem, partPath)
        p.partMetadata.mustReadMetadata(fileSystem, partPath)
        p.partMetadata.ID = id
 
diff --git a/banyand/stream/part_metadata.go b/banyand/stream/part_metadata.go
index 25c0eaa2f..47cddb501 100644
--- a/banyand/stream/part_metadata.go
+++ b/banyand/stream/part_metadata.go
@@ -97,7 +97,7 @@ func (pm *partMetadata) mustWriteMetadata(fileSystem 
fs.FileSystem, partPath str
                return
        }
        metadataPath := filepath.Join(partPath, metadataFilename)
-       n, err := fileSystem.Write(metadata, metadataPath, storage.FilePerm)
+       n, err := fileSystem.WriteAtomic(metadata, metadataPath, 
storage.FilePerm)
        if err != nil {
                logger.Panicf("cannot write metadata: %s", err)
                return
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 40f72581f..aa062a7d4 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -156,27 +156,12 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       snapshotTempPath := snapshotPath + ".tmp"
-       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
-       if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
-       }
-       n, err := lf.Write(data)
-       if err != nil {
-               _ = lf.Close()
-               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
-       }
-       if n != len(data) {
-               _ = lf.Close()
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
-       }
-       if closeErr := lf.Close(); closeErr != nil {
-               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
-       }
-       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
-               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
-       }
-       tst.fileSystem.SyncPath(tst.root)
+       // WriteAtomic does write-tmp -> file.Sync -> rename -> dir.Sync, so a
+       // crash leaves either the previous valid snapshot or only <name>.tmp,
+       // never a renamed-but-not-durable snapshot. The orphan-cleanup logic
+       // at startup depends on snapshots being authoritative; this closes the
+       // last metadata-style write site that could undermine that assumption.
+       fs.MustFlushAtomic(tst.fileSystem, data, snapshotPath, storage.FilePerm)
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 6a6ff24da..76f4e4ced 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -56,7 +56,8 @@ func (s *syncPartContext) FinishSync() error {
        s.releaseCoreWriters()
 
        s.partMeta.mustWriteMetadata(s.fileSystem, s.partPath)
-       s.fileSystem.SyncPath(s.partPath)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename.
 
        s.tsTable.mustAddFilePart(s.partID)
        s.partPath = ""
diff --git a/banyand/trace/merger.go b/banyand/trace/merger.go
index 37418b69d..4261c0174 100644
--- a/banyand/trace/merger.go
+++ b/banyand/trace/merger.go
@@ -519,7 +519,9 @@ func (tst *tsTable) mergeParts(fileSystem fs.FileSystem, 
closeCh <-chan struct{}
        tf.mustWriteTraceIDFilter(fileSystem, dstPath)
        tf.reset()
        tt.mustWriteTagType(fileSystem, dstPath)
-       fileSystem.SyncPath(dstPath)
+       // No SyncPath here: each mustWrite* helper goes through 
fileSystem.WriteAtomic
+       // which already fsyncs the parent directory after rename. The last 
atomic
+       // metadata write covers all prior dirent changes (data file creations).
        p := mustOpenFilePart(partID, root, fileSystem)
        return newPartWrapper(nil, p), nil
 }
diff --git a/banyand/trace/merger_durability_test.go 
b/banyand/trace/merger_durability_test.go
new file mode 100644
index 000000000..6dfb9dcb2
--- /dev/null
+++ b/banyand/trace/merger_durability_test.go
@@ -0,0 +1,179 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package trace
+
+import (
+       "fmt"
+       "os"
+       "path/filepath"
+       "strings"
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/banyand/protector"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+)
+
+// Test_partFlush_atomicMetadata_noTmpLeftover asserts that after a successful
+// part flush, no <file>.tmp sibling survives anywhere under the part 
directory.
+// This is the post-condition of mustWriteMetadata, mustWriteTraceIDFilter, and
+// mustWriteTagType all going through fileSystem.WriteAtomic.
+func Test_partFlush_atomicMetadata_noTmpLeftover(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+       epoch := uint64(1)
+
+       mp := generateMemPart()
+       defer releaseMemPart(mp)
+       mp.mustInitFromTraces(ts)
+       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+
+       require.NoError(t, filepath.Walk(tmpPath, func(p string, info 
os.FileInfo, err error) error {
+               if err != nil {
+                       return err
+               }
+               if info.IsDir() {
+                       return nil
+               }
+               assert.Falsef(t, strings.HasSuffix(p, ".tmp"), "leftover tmp at 
%s", p)
+               return nil
+       }))
+}
+
+// Test_mustOpenFilePart_removesLeftoverTmp asserts that mustOpenFilePart
+// removes a stray <file>.tmp sibling whose matching final exists. This is the
+// post-rename cleanup hook into fs.CleanupLeftoverTmp. Trace writes three
+// metadata files via WriteAtomic — metadata.json, traceID.filter, tag.type —
+// so all three .tmp shapes must be cleaned up on open.
+func Test_mustOpenFilePart_removesLeftoverTmp(t *testing.T) {
+       cases := []string{"metadata.json.tmp", "traceID.filter.tmp", 
"tag.type.tmp"}
+       for _, strayName := range cases {
+               t.Run(strayName, func(t *testing.T) {
+                       tmpPath, defFn := test.Space(require.New(t))
+                       defer defFn()
+                       fileSystem := fs.NewLocalFileSystem()
+                       epoch := uint64(1)
+
+                       mp := generateMemPart()
+                       mp.mustInitFromTraces(ts)
+                       mp.mustFlush(fileSystem, partPath(tmpPath, epoch))
+                       releaseMemPart(mp)
+
+                       pp := partPath(tmpPath, epoch)
+                       // Skip the case where the matching final file doesn't 
exist on
+                       // disk for this fixture — without a healthy <file>, 
the cleanup
+                       // helper is correct to leave the .tmp in place.
+                       final := strings.TrimSuffix(strayName, ".tmp")
+                       if _, err := os.Stat(filepath.Join(pp, final)); err != 
nil {
+                               t.Skipf("fixture did not produce %s; cleanup 
behavior covered by other cases", final)
+                       }
+                       stray := filepath.Join(pp, strayName)
+                       require.NoError(t, os.WriteFile(stray, 
[]byte("stale-leftover"), 0o600))
+
+                       p := mustOpenFilePart(epoch, tmpPath, fileSystem)
+                       defer p.close()
+
+                       _, statErr := os.Stat(stray)
+                       assert.Truef(t, os.IsNotExist(statErr), "expected %s 
removed, got err=%v", strayName, statErr)
+               })
+       }
+}
+
+// Test_merger_tornSpansBin_stillPanics is the reproducer described in the body
+// of apache/skywalking#13862. It forges the production-shape on-disk state —
+// a part directory whose metadata.json claims N blocks but whose spans.bin is
+// truncated — and feeds it to mergeParts. The expected outcome under the
+// merge-durability fix is *unchanged* from before the fix: the merger panics
+// with "offset must be equal to bytesRead", because that is BanyanDB's
+// canonical fail-fast contract for a corrupt source part.
+//
+// The fix prevents torn parts from being CREATED via a clean-crash path
+// (atomic metadata commit makes that impossible). It does NOT prevent the
+// panic when an externally-corrupt part is forced onto disk (rsync, manual
+// file copy, host disk failure, pre-fix-deployment leftovers). This test
+// guards against accidental panic suppression by future refactors.
+func Test_merger_tornSpansBin_stillPanics(t *testing.T) {
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+       fileSystem := fs.NewLocalFileSystem()
+
+       // Build two healthy parts with multiple blocks.
+       mp1 := generateMemPart()
+       mp1.mustInitFromTraces(ts)
+       mp1.mustFlush(fileSystem, partPath(tmpPath, 1))
+       releaseMemPart(mp1)
+
+       mp2 := generateMemPart()
+       mp2.mustInitFromTraces(ts)
+       mp2.mustFlush(fileSystem, partPath(tmpPath, 2))
+       releaseMemPart(mp2)
+
+       // Forge the torn shape: truncate part 1's spans.bin so primary.bin's
+       // block-metadata claims more bytes than spans.bin actually contains.
+       spansPath := filepath.Join(partPath(tmpPath, 1), "spans.bin")
+       stat, err := os.Stat(spansPath)
+       require.NoError(t, err)
+       require.Greater(t, stat.Size(), int64(0), "test fixture must produce 
non-empty spans.bin")
+       tornAt := stat.Size() - 1
+       require.NoError(t, os.Truncate(spansPath, tornAt))
+
+       // mustOpenFilePart succeeds because metadata.json is intact — the
+       // truncation is only in spans.bin.
+       p1 := mustOpenFilePart(1, tmpPath, fileSystem)
+       p1.partMetadata.ID = 1
+       p2 := mustOpenFilePart(2, tmpPath, fileSystem)
+       p2.partMetadata.ID = 2
+
+       tst := &tsTable{pm: protector.Nop{}, fileSystem: fileSystem, root: 
tmpPath}
+       closeCh := make(chan struct{})
+       defer close(closeCh)
+
+       defer func() {
+               r := recover()
+               // Release fds even on the panic path. mustOpenFilePart opens
+               // many underlying files; without explicit close here the panic
+               // would leak them for the lifetime of the test process.
+               p1.close()
+               p2.close()
+               require.NotNilf(t, r, "merger must panic on torn spans.bin 
(canonical fail-fast contract)")
+               // Accept any panic from the merger's read path. The exact 
message
+               // depends on which block boundary the truncation hit; valid 
shapes:
+               //   "offset N must be equal to bytesRead M"
+               //   "cannot read full data: ..."
+               //   "cannot read data: ..."
+               //   "cannot decode spans: ..."
+               got := fmt.Sprint(r)
+               t.Logf("captured panic: %s", got)
+               valid := strings.Contains(got, "offset") ||
+                       strings.Contains(got, "cannot read") ||
+                       strings.Contains(got, "cannot decode") ||
+                       strings.Contains(got, "EOF")
+               assert.Truef(t, valid, "panic message did not match canonical 
fail-fast shapes: %s", got)
+       }()
+
+       _, _ = tst.mergeParts(fileSystem, closeCh, []*partWrapper{
+               newPartWrapper(nil, p1),
+               newPartWrapper(nil, p2),
+       }, 99, tmpPath)
+
+       t.Fatal("mergeParts returned without panicking — torn-part fail-fast 
contract is broken")
+}
diff --git a/banyand/trace/part.go b/banyand/trace/part.go
index 3ed531967..353700cf0 100644
--- a/banyand/trace/part.go
+++ b/banyand/trace/part.go
@@ -296,16 +296,20 @@ func (mp *memPart) mustFlush(fileSystem fs.FileSystem, 
path string) {
                fs.MustFlush(fileSystem, tm.Buf, filepath.Join(path, 
name+tagsMetadataFilenameExt), storage.FilePerm)
        }
 
-       // Flush series metadata if available
+       // Flush series metadata if available. Goes through MustFlushAtomic
+       // because the file is read at part open by mustOpenFilePart and a
+       // crash during a non-atomic write could leave a partial file that
+       // reads as corrupt.
        if len(mp.seriesMetadata.Buf) > 0 {
-               fs.MustFlush(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
+               fs.MustFlushAtomic(fileSystem, mp.seriesMetadata.Buf, 
filepath.Join(path, seriesMetadataFilename), storage.FilePerm)
        }
 
        mp.partMetadata.mustWriteMetadata(fileSystem, path)
        mp.tagType.mustWriteTagType(fileSystem, path)
        mp.traceIDFilter.mustWriteTraceIDFilter(fileSystem, path)
-
-       fileSystem.SyncPath(path)
+       // No SyncPath: each mustWrite* helper uses WriteAtomic which fsyncs the
+       // parent directory after rename, covering all prior dirent changes
+       // (data file creations).
 }
 
 func generateMemPart() *memPart {
@@ -373,6 +377,7 @@ func mustOpenFilePart(id uint64, root string, fileSystem 
fs.FileSystem) *part {
        partPath := partPath(root, id)
        p.path = partPath
        p.fileSystem = fileSystem
+       fs.CleanupLeftoverTmp(fileSystem, partPath)
        p.partMetadata.mustReadMetadata(fileSystem, partPath)
        p.partMetadata.ID = id
 
diff --git a/banyand/trace/part_metadata.go b/banyand/trace/part_metadata.go
index ae1a029a2..62468b444 100644
--- a/banyand/trace/part_metadata.go
+++ b/banyand/trace/part_metadata.go
@@ -102,7 +102,7 @@ func (pm *partMetadata) mustWriteMetadata(fileSystem 
fs.FileSystem, partPath str
                return
        }
        metadataPath := filepath.Join(partPath, metadataFilename)
-       n, err := fileSystem.Write(metadata, metadataPath, storage.FilePerm)
+       n, err := fileSystem.WriteAtomic(metadata, metadataPath, 
storage.FilePerm)
        if err != nil {
                logger.Panicf("cannot write metadata: %s", err)
                return
@@ -199,7 +199,7 @@ func (tt tagType) mustWriteTagType(fileSystem 
fs.FileSystem, partPath string) {
        data = tt.marshal(data)
 
        tagTypePath := filepath.Join(partPath, tagTypeFilename)
-       n, err := fileSystem.Write(data, tagTypePath, storage.FilePerm)
+       n, err := fileSystem.WriteAtomic(data, tagTypePath, storage.FilePerm)
        if err != nil {
                logger.Panicf("cannot write tagType: %s", err)
                return
@@ -249,7 +249,7 @@ func (tf *traceIDFilter) mustWriteTraceIDFilter(fileSystem 
fs.FileSystem, partPa
        data = encodeBloomFilter(data, tf.filter)
 
        traceIDFilterPath := filepath.Join(partPath, traceIDFilterFilename)
-       n, err := fileSystem.Write(data, traceIDFilterPath, storage.FilePerm)
+       n, err := fileSystem.WriteAtomic(data, traceIDFilterPath, 
storage.FilePerm)
        if err != nil {
                logger.Panicf("cannot write traceIDFilter: %s", err)
                return
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index 7dd0118fd..40cde19fb 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -159,27 +159,12 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
                logger.Panicf("cannot marshal partNames to JSON: %s", err)
        }
        snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       snapshotTempPath := snapshotPath + ".tmp"
-       lf, err := tst.fileSystem.CreateLockFile(snapshotTempPath, 
storage.FilePerm)
-       if err != nil {
-               logger.Panicf("cannot create lock file %s: %s", 
snapshotTempPath, err)
-       }
-       n, err := lf.Write(data)
-       if err != nil {
-               _ = lf.Close()
-               logger.Panicf("cannot write snapshot %s: %s", snapshotTempPath, 
err)
-       }
-       if n != len(data) {
-               _ = lf.Close()
-               logger.Panicf("unexpected number of bytes written to %s; got 
%d; want %d", snapshotTempPath, n, len(data))
-       }
-       if closeErr := lf.Close(); closeErr != nil {
-               logger.Panicf("cannot close snapshot temp file %s: %s", 
snapshotTempPath, closeErr)
-       }
-       if renameErr := tst.fileSystem.Rename(snapshotTempPath, snapshotPath); 
renameErr != nil {
-               logger.Panicf("cannot rename snapshot %s to %s: %s", 
snapshotTempPath, snapshotPath, renameErr)
-       }
-       tst.fileSystem.SyncPath(tst.root)
+       // WriteAtomic does write-tmp -> file.Sync -> rename -> dir.Sync, so a
+       // crash leaves either the previous valid snapshot or only <name>.tmp,
+       // never a renamed-but-not-durable snapshot. The orphan-cleanup logic
+       // at startup depends on snapshots being authoritative; this closes the
+       // last metadata-style write site that could undermine that assumption.
+       fs.MustFlushAtomic(tst.fileSystem, data, snapshotPath, storage.FilePerm)
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 0d339d71f..16c989b52 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -102,13 +102,14 @@ func (s *syncPartContext) FinishSync() error {
        s.releaseCoreWriters()
 
        if len(s.traceIDFilterBuffer) > 0 {
-               fs.MustFlush(s.fileSystem, s.traceIDFilterBuffer, 
filepath.Join(s.partPath, traceIDFilterFilename), storage.FilePerm)
+               fs.MustFlushAtomic(s.fileSystem, s.traceIDFilterBuffer, 
filepath.Join(s.partPath, traceIDFilterFilename), storage.FilePerm)
        }
        if len(s.tagTypeBuffer) > 0 {
-               fs.MustFlush(s.fileSystem, s.tagTypeBuffer, 
filepath.Join(s.partPath, tagTypeFilename), storage.FilePerm)
+               fs.MustFlushAtomic(s.fileSystem, s.tagTypeBuffer, 
filepath.Join(s.partPath, tagTypeFilename), storage.FilePerm)
        }
        s.partMeta.mustWriteMetadata(s.fileSystem, s.partPath)
-       s.fileSystem.SyncPath(s.partPath)
+       // No SyncPath: mustWriteMetadata goes through WriteAtomic which already
+       // fsyncs the parent directory after rename.
 
        // Finish SIDX writers and collect file paths for file-backed parts.
        sidxFilePartsMap := make(map[string]string, len(s.sidxPartContexts))
diff --git a/pkg/fs/file_system.go b/pkg/fs/file_system.go
index 2254fb29b..3cdc57759 100644
--- a/pkg/fs/file_system.go
+++ b/pkg/fs/file_system.go
@@ -20,6 +20,9 @@ package fs
 
 import (
        "io"
+       "os"
+       "path/filepath"
+       "strings"
 
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
@@ -110,6 +113,17 @@ type FileSystem interface {
        OpenFile(name string) (File, error)
        // Flush mode, which flushes all data to one file.
        Write(buffer []byte, name string, permission Mode) (int, error)
+       // WriteAtomic writes buffer to a fresh ".tmp" sibling, fsyncs it, 
renames it
+       // over name, and fsyncs the parent directory. After a successful 
return the
+       // content is durable on a POSIX-compliant local filesystem (ext4, xfs, 
apfs,
+       // ntfs). On any error the .tmp file may be left behind for next-boot 
cleanup
+       // (when the matching final file exists) or operator intervention (when 
only
+       // the .tmp exists, indicating a crash before rename).
+       //
+       // Atomicity caveat: os.Rename is not guaranteed atomic on 
network-mounted
+       // filesystems (NFS, SMB, FUSE-based S3 gateways). BanyanDB targets 
local
+       // disks; do not run on a network FS without verifying the mount 
semantics.
+       WriteAtomic(buffer []byte, name string, permission Mode) (int, error)
        // Read the entire file using streaming read.
        Read(name string) ([]byte, error)
        // Delete the file.
@@ -166,6 +180,51 @@ func MustFlush(fs FileSystem, buffer []byte, name string, 
permission Mode) {
        }
 }
 
+// MustFlushAtomic writes buffer to name via WriteAtomic and panics on error.
+// Same panic-on-error semantics as MustFlush; use this when the caller needs
+// crash-safety on the file (metadata.json, manifest.json, etc.).
+func MustFlushAtomic(fs FileSystem, buffer []byte, name string, permission 
Mode) {
+       n, err := fs.WriteAtomic(buffer, name, permission)
+       if err != nil {
+               logger.GetLogger().Panic().Err(err).Str("path", 
name).Msg("cannot write data atomically")
+       }
+       if n != len(buffer) {
+               logger.GetLogger().Panic().Int("written", n).Int("expected", 
len(buffer)).Str("path", name).Msg("BUG: WriteAtomic wrote wrong number of 
bytes")
+       }
+}
+
+// CleanupLeftoverTmp removes any "<file>.tmp" sibling under partPath whose
+// matching final "<file>" exists. A .tmp without its final indicates a crash
+// between WriteAtomic's fsync(tmp) and rename — that case is left alone so
+// the engine's existing panic-on-missing-metadata fires for operator
+// intervention, per BanyanDB's canonical fail-fast contract.
+//
+// Symlinks ending in ".tmp" are skipped: BanyanDB part directories never
+// contain symlinks, so encountering one is unexpected and we do not want to
+// follow it during cleanup.
+func CleanupLeftoverTmp(fs FileSystem, partPath string) {
+       for _, e := range fs.ReadDir(partPath) {
+               if e.IsDir() {
+                       continue
+               }
+               name := e.Name()
+               if !strings.HasSuffix(name, ".tmp") {
+                       continue
+               }
+               tmpFull := filepath.Join(partPath, name)
+               info, lstatErr := os.Lstat(tmpFull)
+               if lstatErr != nil || info.Mode()&os.ModeSymlink != 0 {
+                       continue
+               }
+               final := name[:len(name)-len(".tmp")]
+               if _, err := os.Stat(filepath.Join(partPath, final)); err == 
nil {
+                       if removeErr := os.Remove(tmpFull); removeErr != nil {
+                               
logger.GetLogger().Debug().Err(removeErr).Str("path", 
tmpFull).Msg("CleanupLeftoverTmp: cannot remove leftover .tmp")
+                       }
+               }
+       }
+}
+
 // MustWriteData writes data to w and panics if it cannot write all data.
 func MustWriteData(w SeqWriter, data []byte) {
        if len(data) == 0 {
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index aa654050c..4be28b6c5 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -228,7 +228,88 @@ func (fs *localFileSystem) Write(buffer []byte, name 
string, permission Mode) (i
                        Message: fmt.Sprintf("Flush file return error, file 
name: %s,error message: %s", name, err),
                }
        }
+       if syncErr := file.Sync(); syncErr != nil {
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync file return error, file 
name: %s, error message: %s", name, syncErr),
+               }
+       }
+
+       return size, nil
+}
+
+// invokeTestHookAfterTmpFsync is a no-op in production. The test build
+// overrides it via pkg/fs/local_file_system_testhook_test.go to simulate a
+// crash between WriteAtomic's fsync(tmp) and rename.
+var invokeTestHookAfterTmpFsync = func() {}
 
+// WriteAtomic writes buffer to name+".tmp", fsyncs it, renames over name, and
+// fsyncs the parent directory. See FileSystem.WriteAtomic for semantics.
+//
+// On a re-tried call after a prior crash mid-rename, a stale ".tmp" sibling
+// from the prior attempt is overwritten via O_TRUNC. The "next-boot
+// inspection" semantics in the FileSystem interface docstring still hold:
+// CleanupLeftoverTmp on the next part open will remove the .tmp once a
+// matching final exists, and a .tmp without its final remains until the
+// next successful WriteAtomic to the same name.
+func (fs *localFileSystem) WriteAtomic(buffer []byte, name string, permission 
Mode) (int, error) {
+       tmpName := name + ".tmp"
+       parentDir := filepath.Dir(name)
+       file, err := os.OpenFile(tmpName, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 
os.FileMode(permission))
+       if err != nil {
+               if os.IsPermission(err) {
+                       return 0, &FileSystemError{
+                               Code:    permissionError,
+                               Message: fmt.Sprintf("There is not enough 
permission, file name: %s, permission: %d, error message: %s", tmpName, 
permission, err),
+                       }
+               }
+               return 0, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Create tmp file return error, 
file name: %s, error message: %s", tmpName, err),
+               }
+       }
+       size, writeErr := file.Write(buffer)
+       if writeErr != nil {
+               _ = file.Close()
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Write tmp file return error, file 
name: %s, error message: %s", tmpName, writeErr),
+               }
+       }
+       if syncErr := file.Sync(); syncErr != nil {
+               _ = file.Close()
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync tmp file return error, file 
name: %s, error message: %s", tmpName, syncErr),
+               }
+       }
+       if closeErr := file.Close(); closeErr != nil {
+               _ = os.Remove(tmpName)
+               return size, &FileSystemError{
+                       Code:    closeError,
+                       Message: fmt.Sprintf("Close tmp file return error, file 
name: %s, error message: %s", tmpName, closeErr),
+               }
+       }
+       invokeTestHookAfterTmpFsync()
+       if renameErr := os.Rename(tmpName, name); renameErr != nil {
+               // Do NOT remove the .tmp on rename failure: it is fully written
+               // and fsynced — the only durable record of what we tried to
+               // commit — and an operator may need it for forensic recovery.
+               // The next successful WriteAtomic call to the same name will
+               // overwrite it via O_TRUNC, so no orphan accumulates.
+               return size, &FileSystemError{
+                       Code:    otherError,
+                       Message: fmt.Sprintf("Rename %s -> %s return error: 
%s", tmpName, name, renameErr),
+               }
+       }
+       if err := syncDir(parentDir); err != nil {
+               return size, &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync parent dir return error, dir 
name: %s, error message: %s", parentDir, err),
+               }
+       }
        return size, nil
 }
 
@@ -661,9 +742,21 @@ func (w *seqWriter) Close() error {
                        Message: fmt.Sprintf("Flush File error, directory name: 
%s, error message: %s", w.fileName, err),
                }
        }
-
-       if w.file != nil && !w.skipFadvise {
-               _ = SyncAndDropCache(w.file.Fd(), 0, 0)
+       if w.file == nil {
+               return nil
+       }
+       if !w.skipFadvise {
+               if syncErr := SyncAndDropCache(w.file.Fd(), 0, 0); syncErr != 
nil {
+                       return &FileSystemError{
+                               Code:    flushError,
+                               Message: fmt.Sprintf("Sync File error, 
directory name: %s, error message: %s", w.fileName, syncErr),
+                       }
+               }
+       } else if syncErr := w.file.Sync(); syncErr != nil {
+               return &FileSystemError{
+                       Code:    flushError,
+                       Message: fmt.Sprintf("Sync File error, directory name: 
%s, error message: %s", w.fileName, syncErr),
+               }
        }
        return nil
 }
diff --git a/pkg/fs/local_file_system_darwin.go 
b/pkg/fs/local_file_system_darwin.go
index e0ece7662..b49f9ae60 100644
--- a/pkg/fs/local_file_system_darwin.go
+++ b/pkg/fs/local_file_system_darwin.go
@@ -84,6 +84,23 @@ func syncFile(file *os.File) error {
        return nil
 }
 
+// syncDir opens the directory at path, fsyncs it, and closes it. Used by
+// WriteAtomic to make a rename durable on POSIX filesystems. Returns the
+// underlying error rather than panicking so callers can wrap it in a
+// FileSystemError. The Windows implementation is a no-op (NTFS does not
+// expose directory fsync; os.Rename is atomic on its own).
+func syncDir(path string) error {
+       dir, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       if syncErr := dir.Sync(); syncErr != nil {
+               _ = dir.Close()
+               return syncErr
+       }
+       return dir.Close()
+}
+
 func mustGetFileStat(path string) (*syscall.Stat_t, error) {
        fi, err := os.Stat(path)
        if err != nil {
diff --git a/pkg/fs/local_file_system_linux.go 
b/pkg/fs/local_file_system_linux.go
index c660b4915..1ace320ad 100644
--- a/pkg/fs/local_file_system_linux.go
+++ b/pkg/fs/local_file_system_linux.go
@@ -84,6 +84,23 @@ func syncFile(file *os.File) error {
        return nil
 }
 
+// syncDir opens the directory at path, fsyncs it, and closes it. Used by
+// WriteAtomic to make a rename durable on POSIX filesystems. Returns the
+// underlying error rather than panicking so callers can wrap it in a
+// FileSystemError. The Windows implementation is a no-op (NTFS does not
+// expose directory fsync; os.Rename is atomic on its own).
+func syncDir(path string) error {
+       dir, err := os.Open(path)
+       if err != nil {
+               return err
+       }
+       if syncErr := dir.Sync(); syncErr != nil {
+               _ = dir.Close()
+               return syncErr
+       }
+       return dir.Close()
+}
+
 func mustGetFileStat(path string) (*syscall.Stat_t, error) {
        fi, err := os.Stat(path)
        if err != nil {
diff --git a/pkg/fs/local_file_system_test.go b/pkg/fs/local_file_system_test.go
index b96dc9dc4..d97012da1 100644
--- a/pkg/fs/local_file_system_test.go
+++ b/pkg/fs/local_file_system_test.go
@@ -307,6 +307,68 @@ var _ = ginkgo.Describe("Local File System", func() {
                        _, err = os.Stat(fileName)
                        gomega.Expect(err).To(gomega.HaveOccurred())
                })
+
+               ginkgo.It("SeqWriter Close propagates sync error", func() {
+                       sw := file.SequentialWrite()
+                       // Force the underlying *os.File closed before 
sw.Close() runs so
+                       // SyncAndDropCache (or the bufio Flush) sees a bad fd. 
The fix
+                       // requires the resulting error to surface from 
sw.Close() instead
+                       // of being silently dropped.
+                       localFile, ok := file.(*LocalFile)
+                       gomega.Expect(ok).To(gomega.BeTrue())
+                       
gomega.Expect(localFile.file.Close()).To(gomega.Succeed())
+                       closeErr := sw.Close()
+                       gomega.Expect(closeErr).To(gomega.HaveOccurred())
+               })
+
+               ginkgo.It("WriteAtomic leaves no .tmp on success", func() {
+                       target := filepath.Join(dirName, "metadata.json")
+                       n, err := fs.WriteAtomic([]byte(`{"v":1}`), target, 
0o600)
+                       gomega.Expect(err).ToNot(gomega.HaveOccurred())
+                       gomega.Expect(n).To(gomega.Equal(7))
+                       final, readErr := os.ReadFile(target)
+                       gomega.Expect(readErr).ToNot(gomega.HaveOccurred())
+                       gomega.Expect(string(final)).To(gomega.Equal(`{"v":1}`))
+                       _, statErr := os.Stat(target + ".tmp")
+                       
gomega.Expect(os.IsNotExist(statErr)).To(gomega.BeTrue())
+               })
+
+               ginkgo.It("WriteAtomic leaves only .tmp on a forged crash", 
func() {
+                       target := filepath.Join(dirName, "metadata.json")
+                       prev := testHookAfterTmpFsync
+                       testHookAfterTmpFsync = func() { panic("simulated 
crash") }
+                       defer func() {
+                               testHookAfterTmpFsync = prev
+                               recovered := recover()
+                               gomega.Expect(recovered).ToNot(gomega.BeNil(), 
"hook should have caused a panic")
+                               // After the forged crash, .tmp must remain 
(was fsynced) and
+                               // the final file must not exist (rename never 
ran).
+                               _, statTmp := os.Stat(target + ".tmp")
+                               
gomega.Expect(statTmp).ToNot(gomega.HaveOccurred(), "tmp must remain after 
crash")
+                               _, statFinal := os.Stat(target)
+                               
gomega.Expect(os.IsNotExist(statFinal)).To(gomega.BeTrue(), "final must not 
exist if rename never ran")
+                       }()
+                       _, _ = fs.WriteAtomic([]byte("data"), target, 0o600)
+               })
+
+               ginkgo.It("CleanupLeftoverTmp removes .tmp when the final file 
exists", func() {
+                       final := filepath.Join(dirName, "metadata.json")
+                       gomega.Expect(os.WriteFile(final, []byte("ok"), 
0o600)).To(gomega.Succeed())
+                       gomega.Expect(os.WriteFile(final+".tmp", 
[]byte("stale"), 0o600)).To(gomega.Succeed())
+                       CleanupLeftoverTmp(fs, dirName)
+                       _, statErr := os.Stat(final + ".tmp")
+                       
gomega.Expect(os.IsNotExist(statErr)).To(gomega.BeTrue())
+                       _, statFinal := os.Stat(final)
+                       gomega.Expect(statFinal).ToNot(gomega.HaveOccurred())
+               })
+
+               ginkgo.It("CleanupLeftoverTmp leaves .tmp when no final 
exists", func() {
+                       final := filepath.Join(dirName, "metadata.json")
+                       gomega.Expect(os.WriteFile(final+".tmp", 
[]byte("stranded"), 0o600)).To(gomega.Succeed())
+                       CleanupLeftoverTmp(fs, dirName)
+                       _, statErr := os.Stat(final + ".tmp")
+                       gomega.Expect(statErr).ToNot(gomega.HaveOccurred())
+               })
        })
 
        ginkgo.Context("Hard Link Operations", func() {
diff --git a/pkg/fs/local_file_system_testhook_test.go 
b/pkg/fs/local_file_system_testhook_test.go
new file mode 100644
index 000000000..8fe8cfff5
--- /dev/null
+++ b/pkg/fs/local_file_system_testhook_test.go
@@ -0,0 +1,29 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package fs
+
+// testHookAfterTmpFsync is overridden by tests to simulate a crash between
+// WriteAtomic's fsync(tmp) and rename. Because this file ends in _test.go,
+// the indirection is removed at production link time — production binaries
+// see only the no-op invokeTestHookAfterTmpFsync defined in
+// local_file_system.go.
+var testHookAfterTmpFsync = func() {}
+
+func init() {
+       invokeTestHookAfterTmpFsync = func() { testHookAfterTmpFsync() }
+}
diff --git a/pkg/fs/local_file_system_windows.go 
b/pkg/fs/local_file_system_windows.go
index ad7a5131e..80290af28 100644
--- a/pkg/fs/local_file_system_windows.go
+++ b/pkg/fs/local_file_system_windows.go
@@ -66,6 +66,12 @@ func syncFile(_ *os.File) error {
        return nil
 }
 
+// syncDir is a no-op on Windows. NTFS does not expose directory fsync via
+// os.File.Sync the way ext4/xfs/apfs do; os.Rename on NTFS is atomic on its
+// own and the existing SyncPath has the same no-op pattern. Linux/Darwin
+// implementations open the dir and call file.Sync.
+func syncDir(_ string) error { return nil }
+
 func CompareINode(srcPath, destPath string) error {
        return nil
 }

Reply via email to