This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new 0f6837aec Snapshot/backup and data inspection no longer reopen 
idle-closed segments (#1146)
0f6837aec is described below

commit 0f6837aec79d1f7dfd2d2f08cf5c31a7b217e0c7
Author: mrproliu <[email protected]>
AuthorDate: Tue Jun 2 07:13:11 2026 +0800

    Snapshot/backup and data inspection no longer reopen idle-closed segments 
(#1146)
---
 CHANGES.md                                         |   1 +
 banyand/internal/storage/inspect_stats.go          | 143 +++++
 banyand/internal/storage/inspect_stats_test.go     |  96 +++
 banyand/internal/storage/segment.go                | 160 ++++-
 banyand/internal/storage/segment_test.go           |   4 +-
 banyand/internal/storage/snapshot.go               |  16 +
 .../storage/snapshot_closed_segment_test.go        | 701 +++++++++++++++++++++
 banyand/internal/storage/snapshot_test.go          |  30 +
 banyand/internal/storage/storage.go                |  12 +-
 banyand/internal/storage/tsdb.go                   |  63 +-
 banyand/measure/cache_benchmark_test.go            |   2 +-
 banyand/measure/metadata.go                        |  33 +-
 banyand/measure/metadata_internal_test.go          | 100 ++-
 banyand/measure/query.go                           |   2 +-
 banyand/measure/tstable.go                         |  11 +-
 banyand/stream/metadata.go                         |  36 +-
 banyand/stream/metadata_internal_test.go           | 100 ++-
 banyand/stream/query.go                            |   2 +-
 banyand/stream/tstable.go                          |  11 +-
 banyand/trace/metadata.go                          |  36 +-
 banyand/trace/metadata_internal_test.go            | 100 ++-
 banyand/trace/query.go                             |   2 +-
 banyand/trace/tstable.go                           |  11 +-
 pkg/fs/local_file_system.go                        |   4 +
 pkg/index/inverted/inverted.go                     |  26 +
 25 files changed, 1541 insertions(+), 161 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5151671ff..87296f7cb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@ Release Notes.
   - Add end-to-end observability for liaison internal queue pipelines with 
per-topic metrics for queue_sub and queue_pub, along with Grafana panels and 
troubleshooting docs.
   - Introduce measure migration tool.
 - Support displaying a measure's indexed tags in the dump tool, resolved per 
part so peak memory is bounded by the part rather than a segment-wide series 
map.
+- Snapshot/backup and data inspection no longer reopen idle-closed segments, 
avoiding cold-segment nil-index panics and index lock-file churn.
 
 ### Bug Fixes
 
diff --git a/banyand/internal/storage/inspect_stats.go 
b/banyand/internal/storage/inspect_stats.go
new file mode 100644
index 000000000..33b1a5ec2
--- /dev/null
+++ b/banyand/internal/storage/inspect_stats.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "encoding/json"
+       "os"
+       "path/filepath"
+       "strconv"
+       "strings"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+const partDiskMetadataFilename = "metadata.json" // per-part metadata file, 
shared by stream/measure/trace.
+
+const snapshotFileSuffix = ".snp" // suffix of the per-shard snapshot manifest 
(a JSON array of part dir names).
+
+// partDiskMetadata captures the fields shared by every data type's part
+// metadata.json (stream/measure/trace use the same JSON keys for these).
+type partDiskMetadata struct {
+       CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+       TotalCount          uint64 `json:"totalCount"`
+}
+
+// CollectClosedShardInfo reads per-shard stats for a closed segment directly
+// from disk, WITHOUT opening any table or bluge index (which would reacquire
+// the index's exclusive lock). Sub-index stats (inverted index / sidx) are
+// reported empty; callers populate those only for open segments. It returns 
the
+// shard list and the total on-disk data size.
+func CollectClosedShardInfo(segLocation string) ([]*databasev1.ShardInfo, 
int64) {
+       shardPrefix := shardPathPrefix + "-"
+       // Use os directly (not the fs abstraction whose ReadDir panics on a 
missing
+       // directory): a closed segment may be removed by retention 
concurrently, so
+       // missing/unreadable paths must be skipped, not fatal. Segment data is
+       // always on local disk.
+       entries, err := os.ReadDir(segLocation)
+       if err != nil {
+               return nil, 0
+       }
+       var (
+               infos     []*databasev1.ShardInfo
+               totalSize int64
+       )
+       for _, entry := range entries {
+               if !entry.IsDir() || !strings.HasPrefix(entry.Name(), 
shardPrefix) {
+                       continue
+               }
+               shardID, parseErr := 
strconv.ParseUint(strings.TrimPrefix(entry.Name(), shardPrefix), 10, 32)
+               if parseErr != nil {
+                       continue
+               }
+               info := readShardInfoFromDisk(uint32(shardID), 
filepath.Join(segLocation, entry.Name()))
+               infos = append(infos, info)
+               totalSize += info.DataSizeBytes
+       }
+       return infos, totalSize
+}
+
+// readShardInfoFromDisk aggregates one shard's part statistics into a 
ShardInfo
+// from the parts referenced by its latest snapshot manifest -- the same
+// authoritative set the in-memory snapshot exposes for an open segment, so
+// orphan parts not yet garbage-collected are excluded.
+func readShardInfoFromDisk(shardID uint32, shardDir string) 
*databasev1.ShardInfo {
+       info := &databasev1.ShardInfo{
+               ShardId:           shardID,
+               InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+               SidxInfo:          &databasev1.SIDXInfo{},
+       }
+       partNames, ok := latestSnapshotParts(shardDir)
+       if !ok {
+               return info
+       }
+       for _, partName := range partNames {
+               raw, err := os.ReadFile(filepath.Join(shardDir, partName, 
partDiskMetadataFilename))
+               if err != nil {
+                       continue
+               }
+               var pm partDiskMetadata
+               if json.Unmarshal(raw, &pm) != nil {
+                       continue
+               }
+               info.DataCount += int64(pm.TotalCount)
+               info.DataSizeBytes += int64(pm.CompressedSizeBytes)
+               info.PartCount++
+       }
+       info.FilePartCount = info.PartCount
+       return info
+}
+
+// latestSnapshotParts returns the part directory names referenced by the
+// highest-epoch ".snp" manifest in shardDir (false if none exists).
+func latestSnapshotParts(shardDir string) ([]string, bool) {
+       entries, err := os.ReadDir(shardDir)
+       if err != nil {
+               return nil, false
+       }
+       var (
+               latestName  string
+               latestEpoch uint64
+       )
+       for _, entry := range entries {
+               name := entry.Name()
+               if entry.IsDir() || !strings.HasSuffix(name, 
snapshotFileSuffix) {
+                       continue
+               }
+               epoch, parseErr := strconv.ParseUint(strings.TrimSuffix(name, 
snapshotFileSuffix), 16, 64)
+               if parseErr != nil {
+                       continue
+               }
+               // A valid manifest name is never empty, so latestName doubles 
as the
+               // "found one" flag.
+               if latestName == "" || epoch > latestEpoch {
+                       latestEpoch, latestName = epoch, name
+               }
+       }
+       if latestName == "" {
+               return nil, false
+       }
+       // ReadSnapshotPartNames uses lfs.Read, which (unlike lfs.ReadDir) 
returns an
+       // error rather than panicking on a missing file, so it is safe on the
+       // concurrently-deletable closed path.
+       partNames, err := ReadSnapshotPartNames(lfs, filepath.Join(shardDir, 
latestName))
+       if err != nil {
+               return nil, false
+       }
+       return partNames, true
+}
diff --git a/banyand/internal/storage/inspect_stats_test.go 
b/banyand/internal/storage/inspect_stats_test.go
new file mode 100644
index 000000000..9216f5ec9
--- /dev/null
+++ b/banyand/internal/storage/inspect_stats_test.go
@@ -0,0 +1,96 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "encoding/json"
+       "fmt"
+       "os"
+       "path/filepath"
+       "testing"
+
+       "github.com/stretchr/testify/require"
+
+       databasev1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// TestCollectClosedShardInfo verifies that per-shard part statistics are read
+// from the parts referenced by the latest snapshot manifest, that orphan part
+// directories not in the manifest are excluded, that the highest-epoch 
manifest
+// wins, that a shard without a manifest reports zero, and that non-shard
+// directories are ignored. This is the read-only path used to inspect a closed
+// segment without reopening it.
+func TestCollectClosedShardInfo(t *testing.T) {
+       dir := t.TempDir()
+
+       writePart := func(shard, partID string, totalCount, compressed uint64) {
+               t.Helper()
+               partDir := filepath.Join(dir, shard, partID)
+               require.NoError(t, os.MkdirAll(partDir, 0o755))
+               meta := 
fmt.Sprintf(`{"compressedSizeBytes":%d,"totalCount":%d}`, compressed, 
totalCount)
+               require.NoError(t, os.WriteFile(filepath.Join(partDir, 
partDiskMetadataFilename), []byte(meta), 0o600))
+       }
+       writeManifest := func(shard string, epoch uint64, partIDs ...string) {
+               t.Helper()
+               data, err := json.Marshal(partIDs)
+               require.NoError(t, err)
+               name := fmt.Sprintf("%016x%s", epoch, snapshotFileSuffix)
+               require.NoError(t, os.WriteFile(filepath.Join(dir, shard, 
name), data, 0o600))
+       }
+
+       // shard-0: two current parts + one orphan part (on disk but not in the
+       // manifest, must be excluded) + a series index dir (no metadata.json).
+       writePart("shard-0", "0000000000000001", 10, 100)
+       writePart("shard-0", "0000000000000002", 5, 50)
+       writePart("shard-0", "0000000000000099", 999, 9999) // orphan
+       require.NoError(t, os.MkdirAll(filepath.Join(dir, "shard-0", 
seriesIndexDirName), 0o755))
+       writeManifest("shard-0", 0x10, "0000000000000001", "0000000000000002")
+
+       // shard-1: a newer manifest supersedes an older one referencing a 
stale part.
+       writePart("shard-1", "0000000000000003", 7, 70)
+       writePart("shard-1", "0000000000000004", 4, 40)
+       writeManifest("shard-1", 0x05, "0000000000000004") // older epoch
+       writeManifest("shard-1", 0x10, "0000000000000003") // newer epoch wins
+
+       // shard-2: a part on disk but no manifest -> reports zero current data.
+       writePart("shard-2", "0000000000000005", 3, 30)
+
+       // a non-shard directory must be ignored.
+       require.NoError(t, os.MkdirAll(filepath.Join(dir, "not-a-shard"), 
0o755))
+
+       infos, totalSize := CollectClosedShardInfo(dir)
+       byShard := map[uint32]*databasev1.ShardInfo{}
+       for _, s := range infos {
+               byShard[s.ShardId] = s
+       }
+       require.Len(t, byShard, 3)
+
+       require.Equal(t, int64(15), byShard[0].DataCount, "orphan part must be 
excluded")
+       require.Equal(t, int64(150), byShard[0].DataSizeBytes)
+       require.Equal(t, int64(2), byShard[0].PartCount)
+       require.Equal(t, int64(2), byShard[0].FilePartCount)
+
+       require.Equal(t, int64(7), byShard[1].DataCount, "latest manifest must 
win")
+       require.Equal(t, int64(70), byShard[1].DataSizeBytes)
+       require.Equal(t, int64(1), byShard[1].PartCount)
+
+       require.Equal(t, int64(0), byShard[2].DataCount, "shard without 
manifest reports zero")
+       require.Equal(t, int64(0), byShard[2].PartCount)
+
+       require.Equal(t, int64(220), totalSize, "total size sums shard data 
sizes")
+}
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 711bb62ac..3ca282f34 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -315,6 +315,132 @@ func (s *segment[T, O]) delete() {
        s.DecRef()
 }
 
+// Location returns the on-disk directory of the segment.
+func (s *segment[T, O]) Location() string {
+       return s.location
+}
+
+// SeriesIndexStats returns the series index document count and on-disk size.
+// An open segment is reported from its live index; a closed segment is read
+// from disk read-only (via OpenReader + directory walk), so inspecting a cold
+// segment never reopens its writable index.
+func (s *segment[T, O]) SeriesIndexStats() (int64, int64) {
+       s.mu.RLock()
+       if s.index != nil {
+               // Hold the read lock while reading live stats so a concurrent
+               // performCleanup cannot close the index out from under us.
+               defer s.mu.RUnlock()
+               return s.index.Stats()
+       }
+       s.mu.RUnlock()
+       // Closed: release the lock (so a concurrent reopen is not blocked) and 
read
+       // read-only. Best-effort -- a missing/unflushed/concurrently-removed 
index
+       // reports 0.
+       indexPath := filepath.Join(s.location, seriesIndexDirName)
+       count, err := inverted.ReadOnlyDocCount(indexPath)
+       if err != nil {
+               s.l.Debug().Err(err).Str("path", indexPath).Msg("closed series 
index has no readable doc count")
+       }
+       size, _ := calculatePathSize(indexPath)
+       return count, int64(size)
+}
+
+// snapshotInto writes a point-in-time snapshot of this segment under dst.
+//
+// It NEVER reopens a closed segment -- reopening an idle-closed cold segment 
is
+// the root cause of the nil-index panic and the bluge "exclusive lock" churn.
+// A closed (quiescent) segment is hard-linked directly from its immutable
+// on-disk files; an open segment is snapshotted through its live series index
+// and shard tables while a reference is held to keep it open.
+//
+// Returns whether anything was written (false when the segment is being
+// deleted, so the caller can skip it).
+func (s *segment[T, O]) snapshotInto(dst string) (bool, error) {
+       s.mu.Lock()
+       if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+               s.mu.Unlock()
+               return false, nil
+       }
+       idx := s.index
+       if idx != nil {
+               // Open: the bump only pins it (never reopens). Release the 
mutex before
+               // the slow backup so other callers are not blocked.
+               atomic.AddInt32(&s.refCount, 1)
+               s.mu.Unlock()
+               defer s.DecRef()
+               return s.snapshotOpen(dst, idx)
+       }
+       // Closed and quiescent: hold s.mu for the fast hard-link so a 
concurrent
+       // reopen cannot write into the directory mid-copy.
+       defer s.mu.Unlock()
+       return s.snapshotClosed(dst)
+}
+
+// snapshotClosed hard-links the whole quiescent segment directory into dst.
+// Must be called with s.mu held and s.index == nil.
+func (s *segment[T, O]) snapshotClosed(dst string) (bool, error) {
+       segDir := filepath.Base(s.location)
+       segPath := filepath.Join(dst, segDir)
+       if err := s.lfs.CreateHardLink(s.location, segPath, 
includeInClosedSnapshot); err != nil {
+               return false, errors.Wrapf(err, "failed to hard-link closed 
segment %s", segDir)
+       }
+       return true, nil
+}
+
+// includeInClosedSnapshot reports whether a file or directory under a closed
+// segment should be hard-linked into a snapshot. It excludes the transient and
+// non-current artifacts that the open-path snapshot never copies: the bluge
+// lock file, the failed-parts directory, the external-segment temp directory,
+// and partial ".tmp" atomic-write files. Current part directories and their
+// ".snp" manifests are kept.
+func includeInClosedSnapshot(p string) bool {
+       switch base := filepath.Base(p); {
+       case base == inverted.LockFilename, base == FailedPartsDirName, base == 
inverted.ExternalSegmentTempDirName:
+               return false
+       default:
+               return filepath.Ext(base) != ".tmp"
+       }
+}
+
+// snapshotOpen snapshots an open segment through its live state. idx is the
+// series index captured under s.mu; a reference is held by the caller so the
+// segment (and idx) stays open for the duration.
+func (s *segment[T, O]) snapshotOpen(dst string, idx *seriesIndex) (bool, 
error) {
+       segDir := filepath.Base(s.location)
+       segPath := filepath.Join(dst, segDir)
+       s.lfs.MkdirIfNotExist(segPath, DirPerm)
+
+       metadataSrc := filepath.Join(s.location, metadataFilename)
+       metadataDest := filepath.Join(segPath, metadataFilename)
+       if err := s.lfs.CreateHardLink(metadataSrc, metadataDest, nil); err != 
nil {
+               return false, errors.Wrapf(err, "failed to snapshot metadata 
for segment %s", segDir)
+       }
+
+       indexPath := filepath.Join(segPath, seriesIndexDirName)
+       s.lfs.MkdirIfNotExist(indexPath, DirPerm)
+       if err := idx.store.TakeFileSnapshot(indexPath); err != nil {
+               return false, errors.Wrapf(err, "failed to snapshot index for 
segment %s", segDir)
+       }
+
+       sLst := s.sLst.Load()
+       if sLst != nil {
+               for _, shard := range *sLst {
+                       shardDir := filepath.Base(shard.location)
+                       shardPath := filepath.Join(segPath, shardDir)
+                       s.lfs.MkdirIfNotExist(shardPath, DirPerm)
+                       if _, err := shard.table.TakeFileSnapshot(shardPath); 
err != nil {
+                               if errors.Is(err, ErrNoCurrentSnapshot) {
+                                       s.l.Debug().Str("shard", 
shardDir).Str("segment", segDir).
+                                               Msg("skipping empty shard 
snapshot")
+                                       continue
+                               }
+                               return false, errors.Wrapf(err, "failed to 
snapshot shard %s in segment %s", shardDir, segDir)
+                       }
+               }
+       }
+       return true, nil
+}
+
 func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) {
        if s, ok := s.getShard(id); ok {
                return s.table, nil
@@ -428,7 +554,7 @@ func (sc *segmentController[T, O]) 
updateOptions(resourceOpts *commonv1.Resource
        sc.opts.ShardNum = resourceOpts.ShardNum
 }
 
-func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange) (tt []Segment[T, O], err error) {
+func (sc *segmentController[T, O]) selectSegments(timeRange 
timestamp.TimeRange, reopenClosed bool) (tt []Segment[T, O], err error) {
        sc.RLock()
        defer sc.RUnlock()
        last := len(sc.lst) - 1
@@ -440,10 +566,24 @@ func (sc *segmentController[T, O]) 
selectSegments(timeRange timestamp.TimeRange)
                        break
                }
                if s.Overlapping(timeRange) {
-                       if err = s.incRef(ctx); err != nil {
-                               return nil, err
+                       if reopenClosed {
+                               // Real read: reopen if closed and mark as 
accessed.
+                               if err = s.incRef(ctx); err != nil {
+                                       return nil, err
+                               }
+                               s.lastAccessed.Store(now)
+                       } else {
+                               // Stats peek: pin only if already open, never 
reopen.
+                               for {
+                                       current := atomic.LoadInt32(&s.refCount)
+                                       if current <= 0 {
+                                               break
+                                       }
+                                       if 
atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
+                                               break
+                                       }
+                               }
                        }
-                       s.lastAccessed.Store(now)
                        tt = append(tt, s)
                }
        }
@@ -482,6 +622,18 @@ func (sc *segmentController[T, O]) segments(ctx 
context.Context, reopenClosed bo
        return r, nil
 }
 
+// copySegments returns a snapshot of the current segment list WITHOUT touching
+// reference counts or reopening anything. Callers (e.g. TakeFileSnapshot) that
+// must not force a reopen decide per-segment, under the segment's own lock,
+// whether it is open or closed.
+func (sc *segmentController[T, O]) copySegments() []*segment[T, O] {
+       sc.RLock()
+       defer sc.RUnlock()
+       r := make([]*segment[T, O], len(sc.lst))
+       copy(r, sc.lst)
+       return r
+}
+
 func (sc *segmentController[T, O]) closeIdleSegments() int {
        maxIdleTime := sc.idleTimeout
 
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index 41fcfccf5..20175520b 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -366,7 +366,7 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
 
        // Now select segments using the entire time range
        timeRange := timestamp.NewInclusiveTimeRange(day1, 
day3.Add(24*time.Hour))
-       selectedSegments, err := sc.selectSegments(timeRange)
+       selectedSegments, err := sc.selectSegments(timeRange, true)
        require.NoError(t, err)
 
        // Should have selected all 3 segments
@@ -1151,7 +1151,7 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t 
*testing.T) {
                        }()
                        <-start
                        for c := 0; c < cycles; c++ {
-                               segs, selErr := sc.selectSegments(timeRange)
+                               segs, selErr := sc.selectSegments(timeRange, 
true)
                                if selErr != nil {
                                        t.Errorf("goroutine-%d cycle %d 
selectSegments: %v", id, c, selErr)
                                        return
diff --git a/banyand/internal/storage/snapshot.go 
b/banyand/internal/storage/snapshot.go
index e5cdd2c65..d3eff63be 100644
--- a/banyand/internal/storage/snapshot.go
+++ b/banyand/internal/storage/snapshot.go
@@ -18,6 +18,7 @@
 package storage
 
 import (
+       "encoding/json"
        "fmt"
        "os"
        "path/filepath"
@@ -28,6 +29,21 @@ import (
        "github.com/apache/skywalking-banyandb/pkg/logger"
 )
 
+// ReadSnapshotPartNames reads the part directory names recorded in a ".snp"
+// snapshot manifest (a JSON string array) at path. It is the single reader for
+// the part-list manifest written by every data type (stream/measure/trace).
+func ReadSnapshotPartNames(fileSystem fs.FileSystem, path string) ([]string, 
error) {
+       data, err := fileSystem.Read(path)
+       if err != nil {
+               return nil, fmt.Errorf("cannot read %s: %w", path, err)
+       }
+       var partNames []string
+       if unmarshalErr := json.Unmarshal(data, &partNames); unmarshalErr != 
nil {
+               return nil, fmt.Errorf("cannot parse %s: %w", path, 
unmarshalErr)
+       }
+       return partNames, nil
+}
+
 // SnapshotTimeFormat is the timestamp snapshot directory prefix.
 const SnapshotTimeFormat = "20060102150405"
 
diff --git a/banyand/internal/storage/snapshot_closed_segment_test.go 
b/banyand/internal/storage/snapshot_closed_segment_test.go
new file mode 100644
index 000000000..11b6be406
--- /dev/null
+++ b/banyand/internal/storage/snapshot_closed_segment_test.go
@@ -0,0 +1,701 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "fmt"
+       "os"
+       "path/filepath"
+       "strconv"
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/require"
+
+       modelv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+       "github.com/apache/skywalking-banyandb/pkg/index"
+       "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// buildTestSeriesDocs builds n series-index documents with distinct doc IDs.
+func buildTestSeriesDocs(t *testing.T, n int) index.Documents {
+       t.Helper()
+       var docs index.Documents
+       for i := 0; i < n; i++ {
+               var series pbv1.Series
+               series.Subject = "series_index_stats"
+               series.EntityValues = []*modelv1.TagValue{
+                       {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value: 
fmt.Sprintf("entity_%d", i)}}},
+               }
+               require.NoError(t, series.Marshal())
+               require.Positive(t, series.ID)
+               docs = append(docs, index.Document{
+                       DocID:        uint64(series.ID),
+                       EntityValues: append([]byte(nil), series.Buffer...),
+               })
+       }
+       return docs
+}
+
+// snapshotTestBase is the fixed wall-clock time the snapshot-test TSDB 
factories
+// pin the mock clock to.
+var snapshotTestBase = time.Date(2024, 5, 1, 0, 0, 0, 0, time.Local)
+
+// snapshotTestDir initializes logging and returns a fresh temp dir whose 
cleanup
+// is registered with t.
+func snapshotTestDir(t *testing.T) string {
+       t.Helper()
+       require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level: 
flags.LogLevel}))
+       dir, defFn := test.Space(require.New(t))
+       t.Cleanup(defFn)
+       return dir
+}
+
+// openSnapshotTSDB opens a TSDB (1-day segments, 1-hour idle timeout, ttlDays
+// TTL) with the clock pinned at snapshotTestBase and NO segment created, so a
+// test can drive the real controller flow (CreateSegmentIfNotExist +
+// IndexDB().Insert + closeIdleSegments).
+func openSnapshotTSDB(t *testing.T, dir string, ttlDays int) 
(TSDB[*MockTSTable, any], *segmentController[*MockTSTable, any]) {
+       t.Helper()
+       opts := TSDBOpts[*MockTSTable, any]{
+               Location:           dir,
+               SegmentInterval:    IntervalRule{Unit: DAY, Num: 1},
+               TTL:                IntervalRule{Unit: DAY, Num: ttlDays},
+               ShardNum:           1,
+               TSTableCreator:     MockTSTableCreator,
+               SegmentIdleTimeout: time.Hour,
+       }
+       mc := timestamp.NewMockClock()
+       mc.Set(snapshotTestBase)
+       ctx := timestamp.SetClock(context.Background(), mc)
+       tsdb, err := OpenTSDB(ctx, opts, NewServiceCache(), group)
+       require.NoError(t, err)
+       return tsdb, tsdb.(*database[*MockTSTable, any]).segmentController
+}
+
+// openSnapshotTestTSDB opens a TSDB with a single day segment created at
+// snapshotTestBase and returns the db, its controller and the (open) segment.
+func openSnapshotTestTSDB(t *testing.T, dir string) (TSDB[*MockTSTable, any], 
*segmentController[*MockTSTable, any], *segment[*MockTSTable, any]) {
+       t.Helper()
+       tsdb, sc := openSnapshotTSDB(t, dir, 3)
+       seg, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
+       require.NoError(t, err)
+       seg.DecRef() // leave it at its open baseline
+       require.Len(t, sc.lst, 1)
+       return tsdb, sc, sc.lst[0]
+}
+
+// TestTakeFileSnapshot_ClosedSegmentIsNotReopened is the core backup
+// guarantee: snapshotting an idle-closed segment must NOT reopen it (no
+// OpenWriter, no exclusive-lock churn, no nil-index panic). The closed segment
+// is hard-linked from its quiescent on-disk files instead.
+//
+// "Did not reopen" is asserted by checking the segment stays closed
+// (index == nil) and unreferenced (refCount == 0) after the snapshot.
+func TestTakeFileSnapshot_ClosedSegmentIsNotReopened(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       // Drive the segment into the idle-closed state every cold segment 
lives in.
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index, "precondition: segment must be idle-closed")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err, "snapshot of a closed segment must succeed")
+       require.True(t, created)
+
+       // The crux: the closed segment was NOT reopened.
+       require.Nil(t, seg.index, "snapshot must not reopen a closed segment")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "snapshot 
must not leave a dangling reference")
+
+       // And the series index was carried into the snapshot via hard-link.
+       segDir := filepath.Join(snapshotDir, filepath.Base(seg.location))
+       require.DirExists(t, segDir)
+       require.DirExists(t, filepath.Join(segDir, seriesIndexDirName))
+}
+
+// TestTakeFileSnapshot_ClosedSegmentExcludesTransientArtifacts verifies the
+// closed-segment hard-link path copies current data and manifests but skips
+// transient / non-current artifacts (the failed-parts directory, the
+// external-segment temp directory, and .tmp files), matching what the open
+// path produces.
+func TestTakeFileSnapshot_ClosedSegmentExcludesTransientArtifacts(t 
*testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index)
+
+       // Lay out a closed segment's on-disk artifacts: a current part (kept) 
plus
+       // transient/non-current ones that must NOT be carried into the 
snapshot.
+       loc := seg.location
+       writeFile := func(rel string) {
+               p := filepath.Join(loc, rel)
+               require.NoError(t, os.MkdirAll(filepath.Dir(p), DirPerm))
+               require.NoError(t, os.WriteFile(p, []byte("x"), FilePerm))
+       }
+       writeFile(filepath.Join("shard-0", "0000000000000001", "metadata.json"))
+       writeFile(filepath.Join("shard-0", FailedPartsDirName, "junk.bin"))
+       writeFile(filepath.Join("shard-0", "stale.tmp"))
+       writeFile(filepath.Join(seriesIndexDirName, 
inverted.ExternalSegmentTempDirName, "t.bin"))
+       writeFile(filepath.Join(seriesIndexDirName, inverted.LockFilename))
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       segSnap := filepath.Join(snapshotDir, filepath.Base(loc))
+       require.FileExists(t, filepath.Join(segSnap, "shard-0", 
"0000000000000001", "metadata.json"), "current part must be copied")
+       require.DirExists(t, filepath.Join(segSnap, seriesIndexDirName), 
"series index must be copied")
+       require.NoDirExists(t, filepath.Join(segSnap, "shard-0", 
FailedPartsDirName), "failed-parts must be excluded")
+       require.NoFileExists(t, filepath.Join(segSnap, "shard-0", "stale.tmp"), 
".tmp must be excluded")
+       require.NoDirExists(t, filepath.Join(segSnap, seriesIndexDirName, 
inverted.ExternalSegmentTempDirName), "external-segment temp must be excluded")
+       require.NoFileExists(t, filepath.Join(segSnap, seriesIndexDirName, 
inverted.LockFilename), "bluge lock file must be excluded")
+}
+
+// TestSeriesIndexStats_ClosedSegmentIsNotReopened verifies the inspection
+// primitive: reading a closed segment's series index stats must read from disk
+// read-only and must NOT reopen the writable index.
+func TestSeriesIndexStats_ClosedSegmentIsNotReopened(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       // Write a known number of series into the open segment's index, then 
drive
+       // it idle-closed (performCleanup flushes the index to disk on close).
+       const seriesCount = 10
+       require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t, 
seriesCount)))
+
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index, "precondition: segment is idle-closed")
+
+       count, size := seg.SeriesIndexStats()
+       require.Equal(t, int64(seriesCount), count, "closed series index doc 
count must be read from disk read-only")
+       require.Positive(t, size, "closed series index on-disk size must be 
reported")
+       require.Nil(t, seg.index, "reading series index stats must not reopen a 
closed segment")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestTakeFileSnapshot_OpenSegmentUsesLiveState verifies the other branch: an
+// open segment is snapshotted through its live state and is not closed by the
+// snapshot (the held reference keeps it open).
+func TestTakeFileSnapshot_OpenSegmentUsesLiveState(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, _, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       require.NotNil(t, seg.index, "precondition: segment is open")
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       require.NotNil(t, seg.index, "open segment must stay open after 
snapshot")
+       segDir := filepath.Join(snapshotDir, filepath.Base(seg.location))
+       require.DirExists(t, filepath.Join(segDir, seriesIndexDirName))
+}
+
+// TestTakeFileSnapshot_ConcurrentWithReclaim_ClosedPath_NoPanic exercises the
+// closed-segment hard-link path against a concurrently running idle reclaimer.
+// Because the snapshot never reopens the closed segment, the reclaimer can
+// never steal a reference out from under the snapshot, so the original
+// nil-index panic cannot occur. Run with -race.
+//
+// NOTE: the broader "query-driven reopen racing the idle reclaimer" scenario
+// (and the open-segment reclaim race) is a separate concern in the segment
+// reference-counting model and is intentionally NOT covered here.
+func TestTakeFileSnapshot_ConcurrentWithReclaim_ClosedPath_NoPanic(t 
*testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       // Put the segment into the idle-closed state and keep it there: nothing
+       // reopens it (the snapshot hard-links it without opening a writer).
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index)
+
+       stop := make(chan struct{})
+       var wg sync.WaitGroup
+
+       // Reclaimer: keep hammering closeIdleSegments (a no-op on an 
already-closed
+       // segment, but it must never collide with the snapshot's hard-link).
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               for {
+                       select {
+                       case <-stop:
+                               return
+                       default:
+                       }
+                       seg.lastAccessed.Store(time.Now().Add(-2 * 
time.Hour).UnixNano())
+                       sc.closeIdleSegments()
+               }
+       }()
+
+       // Snapshotter: take snapshots into unique directories.
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               for i := 0; ; i++ {
+                       select {
+                       case <-stop:
+                               return
+                       default:
+                       }
+                       _, _ = tsdb.TakeFileSnapshot(filepath.Join(dir, "snap", 
"s"+strconv.Itoa(i)))
+               }
+       }()
+
+       time.Sleep(300 * time.Millisecond)
+       close(stop)
+       wg.Wait()
+
+       require.Nil(t, seg.index, "the segment must stay closed: snapshot never 
reopens it")
+       _, err := tsdb.TakeFileSnapshot(filepath.Join(dir, "final-snapshot"))
+       require.NoError(t, err)
+}
+
+// newEmptySnapshotTSDB opens a TSDB (1-day segments, 1-hour idle timeout) with
+// newEmptySnapshotTSDB opens a snapshot-test TSDB with no segment created yet.
+func newEmptySnapshotTSDB(t *testing.T, dir string) (TSDB[*MockTSTable, any], 
*segmentController[*MockTSTable, any]) {
+       t.Helper()
+       return openSnapshotTSDB(t, dir, 30)
+}
+
+// snapshotDay returns the i-th distinct day starting at snapshotTestBase.
+func snapshotDay(i int) time.Time {
+       return snapshotTestBase.Add(time.Duration(i) * 24 * time.Hour)
+}
+
+// createSegmentWithSeries creates (via the controller) the segment containing
+// ts, inserts n series documents into its series index, and returns the
+// internal segment left at its open baseline (the create reference is 
released).
+func createSegmentWithSeries(t *testing.T, tsdb TSDB[*MockTSTable, any], ts 
time.Time, n int) *segment[*MockTSTable, any] {
+       t.Helper()
+       s, err := tsdb.CreateSegmentIfNotExist(ts)
+       require.NoError(t, err)
+       if n > 0 {
+               require.NoError(t, s.IndexDB().Insert(buildTestSeriesDocs(t, 
n)))
+       }
+       seg := s.(*segment[*MockTSTable, any])
+       s.DecRef()
+       return seg
+}
+
+// snapshotSeriesDocCount reads the series-index doc count of seg's snapshot 
copy
+// under snapshotDir, read-only.
+func snapshotSeriesDocCount(t *testing.T, snapshotDir string, seg 
*segment[*MockTSTable, any]) int64 {
+       t.Helper()
+       count, err := inverted.ReadOnlyDocCount(filepath.Join(snapshotDir, 
filepath.Base(seg.location), seriesIndexDirName))
+       require.NoError(t, err)
+       return count
+}
+
+// assertContainsHardLink asserts at least one regular file under dstDir is the
+// same inode as its counterpart in srcDir (i.e. hard-linked, not byte-copied).
+func assertContainsHardLink(t *testing.T, srcDir, dstDir string) {
+       t.Helper()
+       entries, err := os.ReadDir(srcDir)
+       require.NoError(t, err)
+       sawRegularFile := false
+       for _, e := range entries {
+               if e.IsDir() {
+                       continue
+               }
+               sawRegularFile = true
+               srcInfo, srcErr := os.Stat(filepath.Join(srcDir, e.Name()))
+               dstInfo, dstErr := os.Stat(filepath.Join(dstDir, e.Name()))
+               if srcErr == nil && dstErr == nil && os.SameFile(srcInfo, 
dstInfo) {
+                       return
+               }
+       }
+       require.True(t, sawRegularFile, "precondition: %s must contain at least 
one regular file to compare", srcDir)
+       t.Fatalf("expected at least one hard-linked file between %s and %s", 
srcDir, dstDir)
+}
+
+// TestSnapshotInto_OpenSegmentIsRestorable drives the open-segment branch
+// through the real controller: an open segment with live series data is
+// snapshotted via its live state, stays open, leaves no dangling reference, 
and
+// the snapshot's series index is independently readable with the same doc 
count.
+func TestSnapshotInto_OpenSegmentIsRestorable(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, _ := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       const seriesCount = 8
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), seriesCount)
+       require.NotNil(t, seg.index, "precondition: segment is open")
+       refBefore := atomic.LoadInt32(&seg.refCount)
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       require.NotNil(t, seg.index, "open segment must stay open after 
snapshot")
+       require.Equal(t, refBefore, atomic.LoadInt32(&seg.refCount), "open-path 
snapshot must balance its reference bump")
+       require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t, 
snapshotDir, seg), "snapshot series index must hold the live docs")
+}
+
+// TestSnapshotInto_ClosedSegmentIsRestorableViaHardLink drives the
+// closed-segment branch: an idle-closed segment is snapshotted by hard-linking
+// its quiescent files (not reopened), stays closed, and the snapshot's series
+// index is readable with the same doc count and shares inodes with the source.
+func TestSnapshotInto_ClosedSegmentIsRestorableViaHardLink(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       const seriesCount = 5
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), seriesCount)
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index, "precondition: segment is idle-closed")
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       require.Nil(t, seg.index, "snapshot must not reopen the closed segment")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+       require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t, 
snapshotDir, seg))
+
+       srcIndex := filepath.Join(seg.location, seriesIndexDirName)
+       dstIndex := filepath.Join(snapshotDir, filepath.Base(seg.location), 
seriesIndexDirName)
+       assertContainsHardLink(t, srcIndex, dstIndex)
+}
+
+// TestTakeFileSnapshot_MixedOpenAndClosedSegments snapshots several segments 
at
+// once where some are open and some idle-closed: every segment is captured 
with
+// the correct per-segment data, the open/closed states are preserved (the
+// closed ones are not reopened), and references stay balanced.
+func TestTakeFileSnapshot_MixedOpenAndClosedSegments(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg0 := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+       seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 5)
+       seg2 := createSegmentWithSeries(t, tsdb, snapshotDay(2), 7)
+
+       // Idle-close the two older segments; keep the newest open.
+       seg0.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       seg1.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 2, sc.closeIdleSegments())
+       require.Nil(t, seg0.index)
+       require.Nil(t, seg1.index)
+       require.NotNil(t, seg2.index)
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       // States preserved: closed stay closed, open stays open.
+       require.Nil(t, seg0.index, "closed segment must not be reopened by 
snapshot")
+       require.Nil(t, seg1.index, "closed segment must not be reopened by 
snapshot")
+       require.NotNil(t, seg2.index, "open segment must stay open")
+
+       // Every segment is present and restorable with its own doc count.
+       require.Equal(t, int64(3), snapshotSeriesDocCount(t, snapshotDir, seg0))
+       require.Equal(t, int64(5), snapshotSeriesDocCount(t, snapshotDir, seg1))
+       require.Equal(t, int64(7), snapshotSeriesDocCount(t, snapshotDir, seg2))
+}
+
+// TestTakeFileSnapshot_DeletedSegmentIsSkipped verifies a segment flagged for
+// deletion (the concurrent-retention race window) is skipped by snapshotInto
+// while the surviving segment is still snapshotted.
+func TestTakeFileSnapshot_DeletedSegmentIsSkipped(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, _ := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       doomed := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+       survivor := createSegmentWithSeries(t, tsdb, snapshotDay(1), 4)
+
+       // Model a segment that retention has flagged for deletion but that is 
still
+       // in the controller list when the snapshot copies it.
+       atomic.StoreUint32(&doomed.mustBeDeleted, 1)
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created, "the surviving segment was written")
+
+       require.NoDirExists(t, filepath.Join(snapshotDir, 
filepath.Base(doomed.location)), "a segment flagged for deletion must be 
skipped")
+       require.DirExists(t, filepath.Join(snapshotDir, 
filepath.Base(survivor.location)), "the surviving segment must be snapshotted")
+}
+
+// TestTakeFileSnapshot_AllSegmentsDeletedReturnsFalse verifies that when every
+// segment is flagged for deletion, the snapshot writes nothing and reports
+// success=false with no error.
+func TestTakeFileSnapshot_AllSegmentsDeletedReturnsFalse(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, _ := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg0 := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+       seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 4)
+       atomic.StoreUint32(&seg0.mustBeDeleted, 1)
+       atomic.StoreUint32(&seg1.mustBeDeleted, 1)
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.False(t, created, "no segment content was written")
+       require.NoDirExists(t, snapshotDir, "nothing should be written when all 
segments are skipped")
+}
+
+// TestSnapshotInto_EmptyOpenSegment verifies an open segment that has no
+// ingested data yet is still snapshotted (metadata + an empty series index) 
and
+// reports success.
+func TestSnapshotInto_EmptyOpenSegment(t *testing.T) {
+       dir := snapshotTestDir(t)
+
+       tsdb, _ := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       require.NotNil(t, seg.index, "precondition: segment is open")
+
+       snapshotDir := filepath.Join(dir, "snapshot")
+       created, err := tsdb.TakeFileSnapshot(snapshotDir)
+       require.NoError(t, err)
+       require.True(t, created)
+
+       segSnap := filepath.Join(snapshotDir, filepath.Base(seg.location))
+       require.FileExists(t, filepath.Join(segSnap, SegmentMetadataFilename), 
"segment metadata must be snapshotted")
+       require.DirExists(t, filepath.Join(segSnap, seriesIndexDirName), 
"series index dir must be snapshotted")
+       require.Equal(t, int64(0), snapshotSeriesDocCount(t, snapshotDir, seg))
+}
+
+// allTimeRange covers every segment the snapshot-test factories create.
+func allTimeRange() timestamp.TimeRange {
+       return timestamp.TimeRange{Start: time.Unix(0, 0), End: time.Unix(0, 
timestamp.MaxNanoTime)}
+}
+
+// idleClose drives seg into the idle-closed state and asserts it is closed.
+func idleClose(t *testing.T, sc *segmentController[*MockTSTable, any], seg 
*segment[*MockTSTable, any]) {
+       t.Helper()
+       seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+       require.Equal(t, 1, sc.closeIdleSegments())
+       require.Nil(t, seg.index, "precondition: segment must be idle-closed")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), 
"precondition: closed segment has refCount 0")
+}
+
+// TestSelectSegments_ReopenTrue_ReopensClosedSegment: reopenClosed=true on a
+// CLOSED segment reopens it (incRef -> initialize), restores refCount to 1, 
and
+// refreshes lastAccessed (a real read keeps the segment hot).
+func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 5)
+       idleClose(t, sc, seg)
+       closedAccessed := seg.lastAccessed.Load()
+
+       got, err := sc.selectSegments(allTimeRange(), true)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.NotNil(t, seg.index, "reopenClosed=true must reopen a closed 
segment")
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "reopen 
reinitializes refCount to 1")
+       require.Greater(t, seg.lastAccessed.Load(), closedAccessed, "a real 
read refreshes lastAccessed")
+
+       got[0].DecRef()
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef 
returns the reopened segment to closed")
+       require.Nil(t, seg.index)
+}
+
+// TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes: 
reopenClosed=true
+// on an OPEN segment pins it (+1) and refreshes lastAccessed without 
re-opening.
+func TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       require.NotNil(t, seg.index)
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "open 
baseline refCount is 1")
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       before := seg.lastAccessed.Load()
+
+       got, err := sc.selectSegments(allTimeRange(), true)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.NotNil(t, seg.index, "open segment stays open")
+       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "incRef 
bumps the open segment")
+       require.Greater(t, seg.lastAccessed.Load(), before, "a real read 
refreshes lastAccessed")
+
+       got[0].DecRef()
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened: reopenClosed=false 
on
+// a CLOSED segment returns it WITHOUT reopening, without bumping refCount, and
+// without refreshing lastAccessed; the caller's DecRef is a safe no-op.
+func TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 5)
+       idleClose(t, sc, seg)
+       closedAccessed := seg.lastAccessed.Load()
+
+       got, err := sc.selectSegments(allTimeRange(), false)
+       require.NoError(t, err)
+       require.Len(t, got, 1, "closed segment is still returned for stats")
+       require.Nil(t, seg.index, "reopenClosed=false must NOT reopen a closed 
segment")
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "closed 
segment is returned without a ref bump")
+       require.Equal(t, closedAccessed, seg.lastAccessed.Load(), "a stats peek 
must not refresh lastAccessed")
+
+       got[0].DecRef() // safe no-op on a refCount==0 segment
+       require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+       require.Nil(t, seg.index)
+}
+
+// TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh: 
reopenClosed=false
+// on an OPEN segment pins it (+1) but does NOT refresh lastAccessed, so the
+// segment stays eligible for the next idle-close tick.
+func TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+       seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+       before := seg.lastAccessed.Load()
+
+       got, err := sc.selectSegments(allTimeRange(), false)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.NotNil(t, seg.index, "open segment stays open")
+       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "open 
segment is pinned (+1)")
+       require.Equal(t, before, seg.lastAccessed.Load(), "a stats peek must 
not refresh lastAccessed")
+
+       got[0].DecRef()
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestSelectSegments_ReopenFalse_InUseSegmentPinned: reopenClosed=false on a
+// segment already in active use (refCount>1) pins it (+1), never disturbing 
the
+// references other callers hold.
+func TestSelectSegments_ReopenFalse_InUseSegmentPinned(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       require.NoError(t, seg.incRef(context.Background())) // a concurrent 
reader holds a ref
+       require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "in-use: 
baseline + one active reader")
+
+       got, err := sc.selectSegments(allTimeRange(), false)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.Equal(t, int32(3), atomic.LoadInt32(&seg.refCount), "in-use 
open segment is pinned (+1)")
+
+       got[0].DecRef() // release the selectSegments pin
+       seg.DecRef()    // release the simulated active reader
+       require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "back to 
open baseline")
+}
+
+// TestSelectSegments_TimeRangeFilters: only segments overlapping the requested
+// range are returned; a range that ends before the newest segment exercises 
the
+// sorted early-break, and a range covering one segment returns exactly that 
one.
+func TestSelectSegments_TimeRangeFilters(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, sc := newEmptySnapshotTSDB(t, dir)
+       defer func() { require.NoError(t, tsdb.Close()) }()
+
+       _ = createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 0)
+       _ = createSegmentWithSeries(t, tsdb, snapshotDay(2), 0)
+       require.Len(t, sc.lst, 3)
+
+       // Range inside day 1 only -> exactly seg1.
+       only := timestamp.NewInclusiveTimeRange(snapshotDay(1).Add(time.Hour), 
snapshotDay(1).Add(2*time.Hour))
+       got, err := sc.selectSegments(only, false)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       require.Equal(t, seg1.location, got[0].(*segment[*MockTSTable, 
any]).location)
+       got[0].DecRef()
+
+       // Range entirely after every segment -> none (early-break path).
+       after := timestamp.NewInclusiveTimeRange(snapshotDay(10), 
snapshotDay(11))
+       got, err = sc.selectSegments(after, true)
+       require.NoError(t, err)
+       require.Empty(t, got)
+
+       // Range entirely before every segment -> none.
+       before := timestamp.NewInclusiveTimeRange(snapshotDay(-5), 
snapshotDay(-4))
+       got, err = sc.selectSegments(before, true)
+       require.NoError(t, err)
+       require.Empty(t, got)
+}
+
+// TestSelectSegments_PublicWrapper verifies the exported SelectSegments 
delegates
+// to the controller and short-circuits to nil once the database is closed.
+func TestSelectSegments_PublicWrapper(t *testing.T) {
+       dir := snapshotTestDir(t)
+       tsdb, _ := newEmptySnapshotTSDB(t, dir)
+
+       seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+       require.NotNil(t, seg.index)
+
+       got, err := tsdb.SelectSegments(allTimeRange(), false)
+       require.NoError(t, err)
+       require.Len(t, got, 1)
+       got[0].DecRef()
+
+       require.NoError(t, tsdb.Close())
+       got, err = tsdb.SelectSegments(allTimeRange(), true)
+       require.NoError(t, err)
+       require.Nil(t, got, "a closed database returns no segments")
+}
diff --git a/banyand/internal/storage/snapshot_test.go 
b/banyand/internal/storage/snapshot_test.go
index e40459035..7f219e07b 100644
--- a/banyand/internal/storage/snapshot_test.go
+++ b/banyand/internal/storage/snapshot_test.go
@@ -220,3 +220,33 @@ func TestDeleteStaleSnapshotsWithMinAge(t *testing.T) {
                })
        }
 }
+
+func TestReadSnapshotPartNames(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, deferFn := test.Space(require.New(t))
+       defer deferFn()
+
+       t.Run("valid manifest returns the part names verbatim", func(t 
*testing.T) {
+               manifest := filepath.Join(tmpPath, "valid.snp")
+               _, writeErr := 
fileSystem.Write([]byte(`["0000000000000001","00000000000000ff"]`), manifest, 
FilePerm)
+               require.NoError(t, writeErr)
+               names, err := ReadSnapshotPartNames(fileSystem, manifest)
+               require.NoError(t, err)
+               require.Equal(t, []string{"0000000000000001", 
"00000000000000ff"}, names)
+       })
+
+       t.Run("missing file is a read error", func(t *testing.T) {
+               _, err := ReadSnapshotPartNames(fileSystem, 
filepath.Join(tmpPath, "absent.snp"))
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "cannot read")
+       })
+
+       t.Run("invalid JSON is a parse error", func(t *testing.T) {
+               manifest := filepath.Join(tmpPath, "invalid.snp")
+               _, writeErr := fileSystem.Write([]byte("{not-an-array}"), 
manifest, FilePerm)
+               require.NoError(t, writeErr)
+               _, err := ReadSnapshotPartNames(fileSystem, manifest)
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "cannot parse")
+       })
+}
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index fc33a1ffa..cb5c8d495 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -111,7 +111,11 @@ type IndexDB interface {
 type TSDB[T TSTable, O any] interface {
        io.Closer
        CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
-       SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
+       // SelectSegments returns the segments overlapping timeRange. 
reopenClosed=true
+       // (query) reopens closed segments and marks them accessed; false 
(read-only
+       // stats) returns them without reopening or refreshing their idle 
timer. The
+       // caller must DecRef every returned segment (a no-op for a closed one).
+       SelectSegments(timeRange timestamp.TimeRange, reopenClosed bool) 
([]Segment[T, O], error)
        // SegmentInterval returns the current segment interval rule.
        SegmentInterval() IntervalRule
        Tick(ts int64)
@@ -138,6 +142,12 @@ type Segment[T TSTable, O any] interface {
        TablesWithShardIDs() ([]T, []common.ShardID, []Cache)
        Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList, 
error)
        IndexDB() IndexDB
+       // Location returns the on-disk directory of the segment.
+       Location() string
+       // SeriesIndexStats returns the series index document count and on-disk 
size.
+       // It works without reopening a closed segment: an open segment is 
reported
+       // from its live index, a closed one is read from disk read-only.
+       SeriesIndexStats() (dataCount int64, dataSizeBytes int64)
 }
 
 // TSTable is time series table.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index d6e0405b4..afe341ea1 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -274,11 +274,11 @@ func (d *database[T, O]) CreateSegmentIfNotExist(ts 
time.Time) (Segment[T, O], e
        return d.segmentController.createSegment(ts)
 }
 
-func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) 
([]Segment[T, O], error) {
+func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange, 
reopenClosed bool) ([]Segment[T, O], error) {
        if d.closed.Load() {
                return nil, nil
        }
-       return d.segmentController.selectSegments(timeRange)
+       return d.segmentController.selectSegments(timeRange, reopenClosed)
 }
 
 func (d *database[T, O]) SegmentInterval() IntervalRule {
@@ -292,21 +292,22 @@ func (d *database[T, O]) UpdateOptions(resourceOpts 
*commonv1.ResourceOpts) {
        d.segmentController.updateOptions(resourceOpts)
 }
 
+// TakeFileSnapshot writes a point-in-time snapshot of every segment under dst.
+// success is true only when at least one segment actually wrote content; it is
+// false (with a nil error) when there are no segments, or when every segment 
is
+// concurrently being deleted and thus skipped.
 func (d *database[T, O]) TakeFileSnapshot(dst string) (success bool, err 
error) {
        if d.closed.Load() {
                return false, errors.New("database is closed")
        }
 
-       segments, segErr := d.segmentController.segments(context.Background(), 
true)
-       if segErr != nil {
-               return false, errors.Wrap(segErr, "failed to get segments")
-       }
-       defer func() {
-               for _, seg := range segments {
-                       seg.DecRef()
-               }
-       }()
-
+       // The snapshot must NOT reopen a closed segment: reopening an 
idle-closed
+       // cold segment is the source of the nil-index panic and the bluge
+       // "exclusive lock" churn. Take the current segment objects WITHOUT 
forcing
+       // a reopen. A closed (quiescent) segment is hard-linked directly from 
its
+       // immutable on-disk files; an open segment is snapshotted through its 
live
+       // series index and shard tables.
+       segments := d.segmentController.copySegments()
        if len(segments) == 0 {
                return false, nil
        }
@@ -320,42 +321,14 @@ func (d *database[T, O]) TakeFileSnapshot(dst string) 
(success bool, err error)
        log.Info().Int("segment_count", len(segments)).Str("db_location", 
d.location).
                Msgf("taking file snapshot for %s", dst)
        for _, seg := range segments {
-               segDir := filepath.Base(seg.location)
-               segPath := filepath.Join(dst, segDir)
-               d.lfs.MkdirIfNotExist(segPath, DirPerm)
-
-               metadataSrc := filepath.Join(seg.location, metadataFilename)
-               metadataDest := filepath.Join(segPath, metadataFilename)
-               if linkErr := d.lfs.CreateHardLink(metadataSrc, metadataDest, 
nil); linkErr != nil {
-                       return false, errors.Wrapf(linkErr, "failed to snapshot 
metadata for segment %s", segDir)
-               }
-
-               indexPath := filepath.Join(segPath, seriesIndexDirName)
-               d.lfs.MkdirIfNotExist(indexPath, DirPerm)
-               if indexErr := seg.index.store.TakeFileSnapshot(indexPath); 
indexErr != nil {
-                       return false, errors.Wrapf(indexErr, "failed to 
snapshot index for segment %s", segDir)
-               }
-
-               sLst := seg.sLst.Load()
-               if sLst == nil {
-                       continue
-               }
-               for _, shard := range *sLst {
-                       shardDir := filepath.Base(shard.location)
-                       shardPath := filepath.Join(segPath, shardDir)
-                       d.lfs.MkdirIfNotExist(shardPath, DirPerm)
-                       if _, shardErr := 
shard.table.TakeFileSnapshot(shardPath); shardErr != nil {
-                               if errors.Is(shardErr, ErrNoCurrentSnapshot) {
-                                       log.Debug().Str("shard", 
shardDir).Str("segment", segDir).
-                                               Msg("skipping empty shard 
snapshot")
-                                       continue
-                               }
-                               return false, errors.Wrapf(shardErr, "failed to 
snapshot shard %s in segment %s", shardDir, segDir)
-                       }
+               wrote, segErr := seg.snapshotInto(dst)
+               if segErr != nil {
+                       return false, segErr
                }
+               success = success || wrote
        }
 
-       return true, nil
+       return success, nil
 }
 
 func (d *database[T, O]) GetExpiredSegmentsTimeRange() *timestamp.TimeRange {
diff --git a/banyand/measure/cache_benchmark_test.go 
b/banyand/measure/cache_benchmark_test.go
index 74e124814..fb1ae7839 100644
--- a/banyand/measure/cache_benchmark_test.go
+++ b/banyand/measure/cache_benchmark_test.go
@@ -305,7 +305,7 @@ func (m *measure) QueryWithCache(ctx context.Context, mqo 
model.MeasureQueryOpti
                tsdb = db.(storage.TSDB[*tsTable, option])
        }
 
-       segments, err := tsdb.SelectSegments(*mqo.TimeRange)
+       segments, err := tsdb.SelectSegments(*mqo.TimeRange, true)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 93e857a70..9db478549 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -429,10 +429,13 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
        if tsdb == nil {
                return nil, nil
        }
+       // reopenClosed=false: do NOT reopen closed segments, so inspecting cold
+       // segments never reopens their writable index (no exclusive-lock 
churn).
+       // Closed segments are reported from their on-disk files.
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return nil, segmentsErr
        }
@@ -442,10 +445,19 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
                timeRange := segment.GetTimeRange()
                tables, _ := segment.Tables()
                var shardInfoList []*databasev1.ShardInfo
-               for shardIdx, table := range tables {
-                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
-                       shardInfoList = append(shardInfoList, shardInfo)
-                       totalDataSize += shardInfo.DataSizeBytes
+               if len(tables) > 0 {
+                       for shardIdx, table := range tables {
+                               shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                               shardInfoList = append(shardInfoList, shardInfo)
+                               totalDataSize += shardInfo.DataSizeBytes
+                       }
+               } else {
+                       // No live shard tables (a closed segment, or a 
brand-new open one
+                       // with no data yet): read shard part stats from disk 
without
+                       // reopening. Either way the result is the on-disk 
shard stats.
+                       closedShards, closedSize := 
storage.CollectClosedShardInfo(segment.Location())
+                       shardInfoList = closedShards
+                       totalDataSize += closedSize
                }
                seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
                totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -468,11 +480,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
 }
 
 func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
-       indexDB := segment.IndexDB()
-       if indexDB == nil {
-               return &databasev1.SeriesIndexInfo{}
-       }
-       dataCount, dataSizeBytes := indexDB.Stats()
+       // SeriesIndexStats reads from the live index when the segment is open 
and
+       // from disk (read-only) when it is closed, so inspecting a cold segment
+       // never reopens its writable index.
+       dataCount, dataSizeBytes := segment.SeriesIndexStats()
        return &databasev1.SeriesIndexInfo{
                DataCount:     dataCount,
                DataSizeBytes: dataSizeBytes,
@@ -547,7 +558,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName 
string) (int64, error) {
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
        }
diff --git a/banyand/measure/metadata_internal_test.go 
b/banyand/measure/metadata_internal_test.go
index 87fa5db3a..610e97b22 100644
--- a/banyand/measure/metadata_internal_test.go
+++ b/banyand/measure/metadata_internal_test.go
@@ -18,31 +18,103 @@
 package measure
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed 
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
        storage.Segment[*tsTable, option]
 }
 
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12, 
345 }
 
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
        sr := &schemaRepo{}
-       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
        require.NotNil(t, info)
-       require.Equal(t, int64(0), info.DataCount)
-       require.Equal(t, int64(0), info.DataSizeBytes)
+       require.Equal(t, int64(12), info.DataCount)
+       require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealMeasurePart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real measure part exactly as 
a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read 
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the measure part metadata.json keys 
the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealMeasurePart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       mp := generateMemPart()
+       mp.mustInitFromDataPoints(dpsTS1)
+       wantCount := int64(mp.partMetadata.TotalCount)
+       require.Positive(t, wantCount)
+       const epoch = uint64(1)
+       mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+       // Write the shard's snapshot manifest referencing the flushed part, as 
the
+       // tsTable does after a flush, so the part is "current" (not an orphan).
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+       // The segment is closed (no live tables): stats are read straight from 
disk.
+       infos, totalSize := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, uint32(0), infos[0].ShardId)
+       require.Equal(t, wantCount, infos[0].DataCount, "data count must match 
the flushed part's totalCount")
+       require.Positive(t, infos[0].DataSizeBytes)
+       require.Equal(t, int64(1), infos[0].PartCount)
+       require.Equal(t, int64(1), infos[0].FilePartCount)
+       require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored, 
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       // Current part (in the manifest).
+       current := generateMemPart()
+       current.mustInitFromDataPoints(dpsTS1)
+       wantCount := int64(current.partMetadata.TotalCount)
+       current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+       // Orphan part on disk but NOT referenced by the manifest.
+       orphan := generateMemPart()
+       orphan.mustInitFromDataPoints(dpsTS2)
+       orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(1, []string{partName(1)})
+
+       infos, _ := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not 
be counted")
+       require.Equal(t, int64(1), infos[0].PartCount)
 }
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 92a397c47..093289bbe 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -116,7 +116,7 @@ func (m *measure) Query(ctx context.Context, mqo 
model.MeasureQueryOptions) (mqr
                tsdb = db.(storage.TSDB[*tsTable, option])
        }
 
-       segments, err := tsdb.SelectSegments(*mqo.TimeRange)
+       segments, err := tsdb.SelectSegments(*mqo.TimeRange, true)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 7d281ee6b..67f2306e3 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -291,16 +291,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
-       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       data, err := tst.fileSystem.Read(snapshotPath)
+       partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem, 
filepath.Join(tst.root, snapshotName(snapshot)))
        if err != nil {
-               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
-       }
-       var partNames []string
-       if err := json.Unmarshal(data, &partNames); err != nil {
-               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+               return nil, err
        }
-       var result []uint64
+       result := make([]uint64, 0, len(partNames))
        for i := range partNames {
                e, parseErr := parseEpoch(partNames[i])
                if parseErr != nil {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 99db76512..47001983c 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -296,7 +296,7 @@ func (sr *schemaRepo) CollectDataInfo(ctx context.Context, 
group string) (*datab
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return nil, segmentsErr
        }
@@ -306,10 +306,22 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
                timeRange := segment.GetTimeRange()
                tables, _ := segment.Tables()
                var shardInfoList []*databasev1.ShardInfo
-               for shardIdx, table := range tables {
-                       shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
-                       shardInfoList = append(shardInfoList, shardInfo)
-                       totalDataSize += shardInfo.DataSizeBytes
+               if len(tables) > 0 {
+                       for shardIdx, table := range tables {
+                               shardInfo := sr.collectShardInfo(table, 
uint32(shardIdx))
+                               shardInfoList = append(shardInfoList, shardInfo)
+                               totalDataSize += shardInfo.DataSizeBytes
+                       }
+               } else {
+                       // No live shard tables (a closed segment, or a 
brand-new open one
+                       // with no data yet): read shard part stats from disk 
without
+                       // reopening. The per-shard InvertedIndexInfo is 
reported empty here:
+                       // reading it would reopen the shard's bluge index (the 
exclusive-lock
+                       // churn this change exists to avoid), so it is 
populated only for
+                       // open segments.
+                       closedShards, closedSize := 
storage.CollectClosedShardInfo(segment.Location())
+                       shardInfoList = closedShards
+                       totalDataSize += closedSize
                }
                seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
                totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -332,14 +344,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
 }
 
 func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
-       indexDB := segment.IndexDB()
-       if indexDB == nil {
-               return &databasev1.SeriesIndexInfo{
-                       DataCount:     0,
-                       DataSizeBytes: 0,
-               }
-       }
-       dataCount, dataSizeBytes := indexDB.Stats()
+       // SeriesIndexStats reads from the live index when the segment is open 
and
+       // from disk (read-only) when it is closed, so inspecting a cold segment
+       // never reopens its writable index.
+       dataCount, dataSizeBytes := segment.SeriesIndexStats()
        return &databasev1.SeriesIndexInfo{
                DataCount:     dataCount,
                DataSizeBytes: dataSizeBytes,
@@ -438,7 +446,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName 
string) (int64, error) {
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
        }
diff --git a/banyand/stream/metadata_internal_test.go 
b/banyand/stream/metadata_internal_test.go
index a292b6704..2a776c5ad 100644
--- a/banyand/stream/metadata_internal_test.go
+++ b/banyand/stream/metadata_internal_test.go
@@ -18,31 +18,103 @@
 package stream
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed 
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
        storage.Segment[*tsTable, option]
 }
 
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12, 
345 }
 
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
        sr := &schemaRepo{}
-       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
        require.NotNil(t, info)
-       require.Equal(t, int64(0), info.DataCount)
-       require.Equal(t, int64(0), info.DataSizeBytes)
+       require.Equal(t, int64(12), info.DataCount)
+       require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealStreamPart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real stream part exactly as a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read 
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the stream part metadata.json keys the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealStreamPart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       mp := generateMemPart()
+       mp.mustInitFromElements(es)
+       wantCount := int64(mp.partMetadata.TotalCount)
+       require.Positive(t, wantCount)
+       const epoch = uint64(1)
+       mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+       // Write the shard's snapshot manifest referencing the flushed part, as 
the
+       // tsTable does after a flush, so the part is "current" (not an orphan).
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+       // The segment is closed (no live tables): stats are read straight from 
disk.
+       infos, totalSize := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, uint32(0), infos[0].ShardId)
+       require.Equal(t, wantCount, infos[0].DataCount, "data count must match 
the flushed part's totalCount")
+       require.Positive(t, infos[0].DataSizeBytes)
+       require.Equal(t, int64(1), infos[0].PartCount)
+       require.Equal(t, int64(1), infos[0].FilePartCount)
+       require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored, 
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       // Current part (in the manifest).
+       current := generateMemPart()
+       current.mustInitFromElements(es)
+       wantCount := int64(current.partMetadata.TotalCount)
+       current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+       // Orphan part on disk but NOT referenced by the manifest.
+       orphan := generateMemPart()
+       orphan.mustInitFromElements(es)
+       orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(1, []string{partName(1)})
+
+       infos, _ := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not 
be counted")
+       require.Equal(t, int64(1), infos[0].PartCount)
 }
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index b082fa289..5df920d54 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -60,7 +60,7 @@ func (s *stream) Query(ctx context.Context, sqo 
model.StreamQueryOptions) (sqr m
                return nil, err
        }
 
-       segments, err := tsdb.SelectSegments(*sqo.TimeRange)
+       segments, err := tsdb.SelectSegments(*sqo.TimeRange, true)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 2a20cae90..b52c4de48 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -186,16 +186,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
-       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       data, err := tst.fileSystem.Read(snapshotPath)
+       partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem, 
filepath.Join(tst.root, snapshotName(snapshot)))
        if err != nil {
-               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
-       }
-       var partNames []string
-       if err := json.Unmarshal(data, &partNames); err != nil {
-               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+               return nil, err
        }
-       var result []uint64
+       result := make([]uint64, 0, len(partNames))
        for i := range partNames {
                e, parseErr := parseEpoch(partNames[i])
                if parseErr != nil {
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index d2719e03c..08cf7d4e5 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -304,7 +304,7 @@ func (sr *schemaRepo) CollectDataInfo(ctx context.Context, 
group string) (*datab
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return nil, segmentsErr
        }
@@ -314,10 +314,22 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
                timeRange := segment.GetTimeRange()
                tables, _ := segment.Tables()
                var shardInfoList []*databasev1.ShardInfo
-               for shardIdx, table := range tables {
-                       shardInfo := sr.collectShardInfo(ctx, table, 
uint32(shardIdx))
-                       shardInfoList = append(shardInfoList, shardInfo)
-                       totalDataSize += shardInfo.DataSizeBytes
+               if len(tables) > 0 {
+                       for shardIdx, table := range tables {
+                               shardInfo := sr.collectShardInfo(ctx, table, 
uint32(shardIdx))
+                               shardInfoList = append(shardInfoList, shardInfo)
+                               totalDataSize += shardInfo.DataSizeBytes
+                       }
+               } else {
+                       // No live shard tables (a closed segment, or a 
brand-new open one
+                       // with no data yet): read shard part stats from disk 
without
+                       // reopening. The per-shard SidxInfo is reported empty 
here: reading
+                       // it would reopen the shard's sidx/bluge index (the 
exclusive-lock
+                       // churn this change exists to avoid), so it is 
populated only for
+                       // open segments.
+                       closedShards, closedSize := 
storage.CollectClosedShardInfo(segment.Location())
+                       shardInfoList = closedShards
+                       totalDataSize += closedSize
                }
                seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
                totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -340,14 +352,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx 
context.Context, group string) (*datab
 }
 
 func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable, 
option]) *databasev1.SeriesIndexInfo {
-       indexDB := segment.IndexDB()
-       if indexDB == nil {
-               return &databasev1.SeriesIndexInfo{
-                       DataCount:     0,
-                       DataSizeBytes: 0,
-               }
-       }
-       dataCount, dataSizeBytes := indexDB.Stats()
+       // SeriesIndexStats reads from the live index when the segment is open 
and
+       // from disk (read-only) when it is closed, so inspecting a cold segment
+       // never reopens its writable index.
+       dataCount, dataSizeBytes := segment.SeriesIndexStats()
        return &databasev1.SeriesIndexInfo{
                DataCount:     dataCount,
                DataSizeBytes: dataSizeBytes,
@@ -462,7 +470,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName 
string) (int64, error) {
        segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
                Start: time.Unix(0, 0),
                End:   time.Unix(0, timestamp.MaxNanoTime),
-       })
+       }, false)
        if segmentsErr != nil {
                return 0, fmt.Errorf("failed to select segments: %w", 
segmentsErr)
        }
diff --git a/banyand/trace/metadata_internal_test.go 
b/banyand/trace/metadata_internal_test.go
index 378078907..7bd407d19 100644
--- a/banyand/trace/metadata_internal_test.go
+++ b/banyand/trace/metadata_internal_test.go
@@ -18,31 +18,103 @@
 package trace
 
 import (
+       "path/filepath"
        "testing"
 
        "github.com/stretchr/testify/require"
 
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/test"
 )
 
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed 
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
        storage.Segment[*tsTable, option]
 }
 
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12, 
345 }
 
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
        sr := &schemaRepo{}
-       info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+       info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
        require.NotNil(t, info)
-       require.Equal(t, int64(0), info.DataCount)
-       require.Equal(t, int64(0), info.DataSizeBytes)
+       require.Equal(t, int64(12), info.DataCount)
+       require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealTracePart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real trace part exactly as a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read 
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the trace part metadata.json keys the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealTracePart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       mp := generateMemPart()
+       mp.mustInitFromTraces(ts)
+       wantCount := int64(mp.partMetadata.TotalCount)
+       require.Positive(t, wantCount)
+       const epoch = uint64(1)
+       mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+       // Write the shard's snapshot manifest referencing the flushed part, as 
the
+       // tsTable does after a flush, so the part is "current" (not an orphan).
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+       // The segment is closed (no live tables): stats are read straight from 
disk.
+       infos, totalSize := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, uint32(0), infos[0].ShardId)
+       require.Equal(t, wantCount, infos[0].DataCount, "data count must match 
the flushed part's totalCount")
+       require.Positive(t, infos[0].DataSizeBytes)
+       require.Equal(t, int64(1), infos[0].PartCount)
+       require.Equal(t, int64(1), infos[0].FilePartCount)
+       require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored, 
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+       fileSystem := fs.NewLocalFileSystem()
+       tmpPath, defFn := test.Space(require.New(t))
+       defer defFn()
+
+       segDir := filepath.Join(tmpPath, "seg-20240501")
+       shardDir := filepath.Join(segDir, "shard-0")
+
+       // Current part (in the manifest).
+       current := generateMemPart()
+       current.mustInitFromTraces(ts)
+       wantCount := int64(current.partMetadata.TotalCount)
+       current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+       // Orphan part on disk but NOT referenced by the manifest.
+       orphan := generateMemPart()
+       orphan.mustInitFromTraces(ts)
+       orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+       tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+       tst.mustWriteSnapshot(1, []string{partName(1)})
+
+       infos, _ := storage.CollectClosedShardInfo(segDir)
+       require.Len(t, infos, 1)
+       require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not 
be counted")
+       require.Equal(t, int64(1), infos[0].PartCount)
 }
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 49c14f9aa..78367ba58 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -65,7 +65,7 @@ func (t *trace) Query(ctx context.Context, tqo 
model.TraceQueryOptions) (model.T
                return nil, err
        }
 
-       segments, err := tsdb.SelectSegments(*tqo.TimeRange)
+       segments, err := tsdb.SelectSegments(*tqo.TimeRange, true)
        if err != nil {
                return nil, err
        }
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index d6c383ff8..c2386362e 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -191,16 +191,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64, 
partNames []string) {
 }
 
 func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
-       snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
-       data, err := tst.fileSystem.Read(snapshotPath)
+       partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem, 
filepath.Join(tst.root, snapshotName(snapshot)))
        if err != nil {
-               return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
-       }
-       var partNames []string
-       if err := json.Unmarshal(data, &partNames); err != nil {
-               return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+               return nil, err
        }
-       var result []uint64
+       result := make([]uint64, 0, len(partNames))
        for i := range partNames {
                e, parseErr := parseEpoch(partNames[i])
                if parseErr != nil {
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 4be28b6c5..61492e530 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -472,6 +472,10 @@ func (fs *localFileSystem) CreateHardLink(srcPath, 
destPath string, filter func(
                destFullPath := filepath.Join(destPath, relPath)
 
                if info.IsDir() {
+                       // A filter that rejects a directory skips the whole 
subtree.
+                       if filter != nil && !filter(path) {
+                               return filepath.SkipDir
+                       }
                        if err := os.MkdirAll(destFullPath, info.Mode()); err 
!= nil {
                                return &FileSystemError{
                                        Code:    otherError,
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 38779a8cf..d6ec0aaae 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -54,6 +54,11 @@ const (
        // ExternalSegmentTempDirName is the name of the directory used for 
temporary external segments.
        ExternalSegmentTempDirName = "external-segment-temp"
 
+       // LockFilename is the bluge exclusive-lock file kept in every index
+       // directory while a writer is open (github.com/SkyAPM/bluge 
directory_fs).
+       // It must never be copied into a snapshot.
+       LockFilename = "bluge.pid"
+
        docIDField     = "_id"
        seriesIDField  = "_series_id"
        timestampField = "_timestamp"
@@ -292,6 +297,27 @@ func (s *store) Reset() {
        s.writer.ResetCache()
 }
 
+// ReadOnlyDocCount opens the index directory at path read-only and returns the
+// number of indexed documents. Unlike NewStore it never acquires the exclusive
+// directory lock, so it can inspect a closed (or even concurrently open)
+// segment index without reopening its writable index. A missing or unflushed
+// index (no usable snapshot) returns a count of 0 together with the open 
error,
+// which callers may treat as an empty index.
+func ReadOnlyDocCount(path string) (int64, error) {
+       reader, err := bluge.OpenReader(bluge.DefaultConfig(path))
+       if err != nil {
+               return 0, err
+       }
+       defer func() {
+               _ = reader.Close()
+       }()
+       count, err := reader.Count()
+       if err != nil {
+               return 0, err
+       }
+       return int64(count), nil
+}
+
 func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey, 
termRange index.RangeOpts, order modelv1.Sort,
        preLoadSize int,
 ) (iter index.FieldIterator[*index.DocumentResult], err error) {


Reply via email to