This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new 0f6837aec Snapshot/backup and data inspection no longer reopen
idle-closed segments (#1146)
0f6837aec is described below
commit 0f6837aec79d1f7dfd2d2f08cf5c31a7b217e0c7
Author: mrproliu <[email protected]>
AuthorDate: Tue Jun 2 07:13:11 2026 +0800
Snapshot/backup and data inspection no longer reopen idle-closed segments
(#1146)
---
CHANGES.md | 1 +
banyand/internal/storage/inspect_stats.go | 143 +++++
banyand/internal/storage/inspect_stats_test.go | 96 +++
banyand/internal/storage/segment.go | 160 ++++-
banyand/internal/storage/segment_test.go | 4 +-
banyand/internal/storage/snapshot.go | 16 +
.../storage/snapshot_closed_segment_test.go | 701 +++++++++++++++++++++
banyand/internal/storage/snapshot_test.go | 30 +
banyand/internal/storage/storage.go | 12 +-
banyand/internal/storage/tsdb.go | 63 +-
banyand/measure/cache_benchmark_test.go | 2 +-
banyand/measure/metadata.go | 33 +-
banyand/measure/metadata_internal_test.go | 100 ++-
banyand/measure/query.go | 2 +-
banyand/measure/tstable.go | 11 +-
banyand/stream/metadata.go | 36 +-
banyand/stream/metadata_internal_test.go | 100 ++-
banyand/stream/query.go | 2 +-
banyand/stream/tstable.go | 11 +-
banyand/trace/metadata.go | 36 +-
banyand/trace/metadata_internal_test.go | 100 ++-
banyand/trace/query.go | 2 +-
banyand/trace/tstable.go | 11 +-
pkg/fs/local_file_system.go | 4 +
pkg/index/inverted/inverted.go | 26 +
25 files changed, 1541 insertions(+), 161 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5151671ff..87296f7cb 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -58,6 +58,7 @@ Release Notes.
- Add end-to-end observability for liaison internal queue pipelines with
per-topic metrics for queue_sub and queue_pub, along with Grafana panels and
troubleshooting docs.
- Introduce measure migration tool.
- Support displaying a measure's indexed tags in the dump tool, resolved per
part so peak memory is bounded by the part rather than a segment-wide series
map.
+- Snapshot/backup and data inspection no longer reopen idle-closed segments,
avoiding cold-segment nil-index panics and index lock-file churn.
### Bug Fixes
diff --git a/banyand/internal/storage/inspect_stats.go
b/banyand/internal/storage/inspect_stats.go
new file mode 100644
index 000000000..33b1a5ec2
--- /dev/null
+++ b/banyand/internal/storage/inspect_stats.go
@@ -0,0 +1,143 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "encoding/json"
+ "os"
+ "path/filepath"
+ "strconv"
+ "strings"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+const partDiskMetadataFilename = "metadata.json" // per-part metadata file,
shared by stream/measure/trace.
+
+const snapshotFileSuffix = ".snp" // suffix of the per-shard snapshot manifest
(a JSON array of part dir names).
+
+// partDiskMetadata captures the fields shared by every data type's part
+// metadata.json (stream/measure/trace use the same JSON keys for these).
+type partDiskMetadata struct {
+ CompressedSizeBytes uint64 `json:"compressedSizeBytes"`
+ TotalCount uint64 `json:"totalCount"`
+}
+
+// CollectClosedShardInfo reads per-shard stats for a closed segment directly
+// from disk, WITHOUT opening any table or bluge index (which would reacquire
+// the index's exclusive lock). Sub-index stats (inverted index / sidx) are
+// reported empty; callers populate those only for open segments. It returns
the
+// shard list and the total on-disk data size.
+func CollectClosedShardInfo(segLocation string) ([]*databasev1.ShardInfo,
int64) {
+ shardPrefix := shardPathPrefix + "-"
+ // Use os directly (not the fs abstraction whose ReadDir panics on a
missing
+ // directory): a closed segment may be removed by retention
concurrently, so
+ // missing/unreadable paths must be skipped, not fatal. Segment data is
+ // always on local disk.
+ entries, err := os.ReadDir(segLocation)
+ if err != nil {
+ return nil, 0
+ }
+ var (
+ infos []*databasev1.ShardInfo
+ totalSize int64
+ )
+ for _, entry := range entries {
+ if !entry.IsDir() || !strings.HasPrefix(entry.Name(),
shardPrefix) {
+ continue
+ }
+ shardID, parseErr :=
strconv.ParseUint(strings.TrimPrefix(entry.Name(), shardPrefix), 10, 32)
+ if parseErr != nil {
+ continue
+ }
+ info := readShardInfoFromDisk(uint32(shardID),
filepath.Join(segLocation, entry.Name()))
+ infos = append(infos, info)
+ totalSize += info.DataSizeBytes
+ }
+ return infos, totalSize
+}
+
+// readShardInfoFromDisk aggregates one shard's part statistics into a
ShardInfo
+// from the parts referenced by its latest snapshot manifest -- the same
+// authoritative set the in-memory snapshot exposes for an open segment, so
+// orphan parts not yet garbage-collected are excluded.
+func readShardInfoFromDisk(shardID uint32, shardDir string)
*databasev1.ShardInfo {
+ info := &databasev1.ShardInfo{
+ ShardId: shardID,
+ InvertedIndexInfo: &databasev1.InvertedIndexInfo{},
+ SidxInfo: &databasev1.SIDXInfo{},
+ }
+ partNames, ok := latestSnapshotParts(shardDir)
+ if !ok {
+ return info
+ }
+ for _, partName := range partNames {
+ raw, err := os.ReadFile(filepath.Join(shardDir, partName,
partDiskMetadataFilename))
+ if err != nil {
+ continue
+ }
+ var pm partDiskMetadata
+ if json.Unmarshal(raw, &pm) != nil {
+ continue
+ }
+ info.DataCount += int64(pm.TotalCount)
+ info.DataSizeBytes += int64(pm.CompressedSizeBytes)
+ info.PartCount++
+ }
+ info.FilePartCount = info.PartCount
+ return info
+}
+
+// latestSnapshotParts returns the part directory names referenced by the
+// highest-epoch ".snp" manifest in shardDir (false if none exists).
+func latestSnapshotParts(shardDir string) ([]string, bool) {
+ entries, err := os.ReadDir(shardDir)
+ if err != nil {
+ return nil, false
+ }
+ var (
+ latestName string
+ latestEpoch uint64
+ )
+ for _, entry := range entries {
+ name := entry.Name()
+ if entry.IsDir() || !strings.HasSuffix(name,
snapshotFileSuffix) {
+ continue
+ }
+ epoch, parseErr := strconv.ParseUint(strings.TrimSuffix(name,
snapshotFileSuffix), 16, 64)
+ if parseErr != nil {
+ continue
+ }
+ // A valid manifest name is never empty, so latestName doubles
as the
+ // "found one" flag.
+ if latestName == "" || epoch > latestEpoch {
+ latestEpoch, latestName = epoch, name
+ }
+ }
+ if latestName == "" {
+ return nil, false
+ }
+ // ReadSnapshotPartNames uses lfs.Read, which (unlike lfs.ReadDir)
returns an
+ // error rather than panicking on a missing file, so it is safe on the
+ // concurrently-deletable closed path.
+ partNames, err := ReadSnapshotPartNames(lfs, filepath.Join(shardDir,
latestName))
+ if err != nil {
+ return nil, false
+ }
+ return partNames, true
+}
diff --git a/banyand/internal/storage/inspect_stats_test.go
b/banyand/internal/storage/inspect_stats_test.go
new file mode 100644
index 000000000..9216f5ec9
--- /dev/null
+++ b/banyand/internal/storage/inspect_stats_test.go
@@ -0,0 +1,96 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/stretchr/testify/require"
+
+ databasev1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/database/v1"
+)
+
+// TestCollectClosedShardInfo verifies that per-shard part statistics are read
+// from the parts referenced by the latest snapshot manifest, that orphan part
+// directories not in the manifest are excluded, that the highest-epoch
manifest
+// wins, that a shard without a manifest reports zero, and that non-shard
+// directories are ignored. This is the read-only path used to inspect a closed
+// segment without reopening it.
+func TestCollectClosedShardInfo(t *testing.T) {
+ dir := t.TempDir()
+
+ writePart := func(shard, partID string, totalCount, compressed uint64) {
+ t.Helper()
+ partDir := filepath.Join(dir, shard, partID)
+ require.NoError(t, os.MkdirAll(partDir, 0o755))
+ meta :=
fmt.Sprintf(`{"compressedSizeBytes":%d,"totalCount":%d}`, compressed,
totalCount)
+ require.NoError(t, os.WriteFile(filepath.Join(partDir,
partDiskMetadataFilename), []byte(meta), 0o600))
+ }
+ writeManifest := func(shard string, epoch uint64, partIDs ...string) {
+ t.Helper()
+ data, err := json.Marshal(partIDs)
+ require.NoError(t, err)
+ name := fmt.Sprintf("%016x%s", epoch, snapshotFileSuffix)
+ require.NoError(t, os.WriteFile(filepath.Join(dir, shard,
name), data, 0o600))
+ }
+
+ // shard-0: two current parts + one orphan part (on disk but not in the
+ // manifest, must be excluded) + a series index dir (no metadata.json).
+ writePart("shard-0", "0000000000000001", 10, 100)
+ writePart("shard-0", "0000000000000002", 5, 50)
+ writePart("shard-0", "0000000000000099", 999, 9999) // orphan
+ require.NoError(t, os.MkdirAll(filepath.Join(dir, "shard-0",
seriesIndexDirName), 0o755))
+ writeManifest("shard-0", 0x10, "0000000000000001", "0000000000000002")
+
+ // shard-1: a newer manifest supersedes an older one referencing a
stale part.
+ writePart("shard-1", "0000000000000003", 7, 70)
+ writePart("shard-1", "0000000000000004", 4, 40)
+ writeManifest("shard-1", 0x05, "0000000000000004") // older epoch
+ writeManifest("shard-1", 0x10, "0000000000000003") // newer epoch wins
+
+ // shard-2: a part on disk but no manifest -> reports zero current data.
+ writePart("shard-2", "0000000000000005", 3, 30)
+
+ // a non-shard directory must be ignored.
+ require.NoError(t, os.MkdirAll(filepath.Join(dir, "not-a-shard"),
0o755))
+
+ infos, totalSize := CollectClosedShardInfo(dir)
+ byShard := map[uint32]*databasev1.ShardInfo{}
+ for _, s := range infos {
+ byShard[s.ShardId] = s
+ }
+ require.Len(t, byShard, 3)
+
+ require.Equal(t, int64(15), byShard[0].DataCount, "orphan part must be
excluded")
+ require.Equal(t, int64(150), byShard[0].DataSizeBytes)
+ require.Equal(t, int64(2), byShard[0].PartCount)
+ require.Equal(t, int64(2), byShard[0].FilePartCount)
+
+ require.Equal(t, int64(7), byShard[1].DataCount, "latest manifest must
win")
+ require.Equal(t, int64(70), byShard[1].DataSizeBytes)
+ require.Equal(t, int64(1), byShard[1].PartCount)
+
+ require.Equal(t, int64(0), byShard[2].DataCount, "shard without
manifest reports zero")
+ require.Equal(t, int64(0), byShard[2].PartCount)
+
+ require.Equal(t, int64(220), totalSize, "total size sums shard data
sizes")
+}
diff --git a/banyand/internal/storage/segment.go
b/banyand/internal/storage/segment.go
index 711bb62ac..3ca282f34 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -315,6 +315,132 @@ func (s *segment[T, O]) delete() {
s.DecRef()
}
+// Location returns the on-disk directory of the segment.
+func (s *segment[T, O]) Location() string {
+ return s.location
+}
+
+// SeriesIndexStats returns the series index document count and on-disk size.
+// An open segment is reported from its live index; a closed segment is read
+// from disk read-only (via OpenReader + directory walk), so inspecting a cold
+// segment never reopens its writable index.
+func (s *segment[T, O]) SeriesIndexStats() (int64, int64) {
+ s.mu.RLock()
+ if s.index != nil {
+ // Hold the read lock while reading live stats so a concurrent
+ // performCleanup cannot close the index out from under us.
+ defer s.mu.RUnlock()
+ return s.index.Stats()
+ }
+ s.mu.RUnlock()
+ // Closed: release the lock (so a concurrent reopen is not blocked) and
read
+ // read-only. Best-effort -- a missing/unflushed/concurrently-removed
index
+ // reports 0.
+ indexPath := filepath.Join(s.location, seriesIndexDirName)
+ count, err := inverted.ReadOnlyDocCount(indexPath)
+ if err != nil {
+ s.l.Debug().Err(err).Str("path", indexPath).Msg("closed series
index has no readable doc count")
+ }
+ size, _ := calculatePathSize(indexPath)
+ return count, int64(size)
+}
+
+// snapshotInto writes a point-in-time snapshot of this segment under dst.
+//
+// It NEVER reopens a closed segment -- reopening an idle-closed cold segment
is
+// the root cause of the nil-index panic and the bluge "exclusive lock" churn.
+// A closed (quiescent) segment is hard-linked directly from its immutable
+// on-disk files; an open segment is snapshotted through its live series index
+// and shard tables while a reference is held to keep it open.
+//
+// Returns whether anything was written (false when the segment is being
+// deleted, so the caller can skip it).
+func (s *segment[T, O]) snapshotInto(dst string) (bool, error) {
+ s.mu.Lock()
+ if atomic.LoadUint32(&s.mustBeDeleted) != 0 {
+ s.mu.Unlock()
+ return false, nil
+ }
+ idx := s.index
+ if idx != nil {
+ // Open: the bump only pins it (never reopens). Release the
mutex before
+ // the slow backup so other callers are not blocked.
+ atomic.AddInt32(&s.refCount, 1)
+ s.mu.Unlock()
+ defer s.DecRef()
+ return s.snapshotOpen(dst, idx)
+ }
+ // Closed and quiescent: hold s.mu for the fast hard-link so a
concurrent
+ // reopen cannot write into the directory mid-copy.
+ defer s.mu.Unlock()
+ return s.snapshotClosed(dst)
+}
+
+// snapshotClosed hard-links the whole quiescent segment directory into dst.
+// Must be called with s.mu held and s.index == nil.
+func (s *segment[T, O]) snapshotClosed(dst string) (bool, error) {
+ segDir := filepath.Base(s.location)
+ segPath := filepath.Join(dst, segDir)
+ if err := s.lfs.CreateHardLink(s.location, segPath,
includeInClosedSnapshot); err != nil {
+ return false, errors.Wrapf(err, "failed to hard-link closed
segment %s", segDir)
+ }
+ return true, nil
+}
+
+// includeInClosedSnapshot reports whether a file or directory under a closed
+// segment should be hard-linked into a snapshot. It excludes the transient and
+// non-current artifacts that the open-path snapshot never copies: the bluge
+// lock file, the failed-parts directory, the external-segment temp directory,
+// and partial ".tmp" atomic-write files. Current part directories and their
+// ".snp" manifests are kept.
+func includeInClosedSnapshot(p string) bool {
+ switch base := filepath.Base(p); {
+ case base == inverted.LockFilename, base == FailedPartsDirName, base ==
inverted.ExternalSegmentTempDirName:
+ return false
+ default:
+ return filepath.Ext(base) != ".tmp"
+ }
+}
+
+// snapshotOpen snapshots an open segment through its live state. idx is the
+// series index captured under s.mu; a reference is held by the caller so the
+// segment (and idx) stays open for the duration.
+func (s *segment[T, O]) snapshotOpen(dst string, idx *seriesIndex) (bool,
error) {
+ segDir := filepath.Base(s.location)
+ segPath := filepath.Join(dst, segDir)
+ s.lfs.MkdirIfNotExist(segPath, DirPerm)
+
+ metadataSrc := filepath.Join(s.location, metadataFilename)
+ metadataDest := filepath.Join(segPath, metadataFilename)
+ if err := s.lfs.CreateHardLink(metadataSrc, metadataDest, nil); err !=
nil {
+ return false, errors.Wrapf(err, "failed to snapshot metadata
for segment %s", segDir)
+ }
+
+ indexPath := filepath.Join(segPath, seriesIndexDirName)
+ s.lfs.MkdirIfNotExist(indexPath, DirPerm)
+ if err := idx.store.TakeFileSnapshot(indexPath); err != nil {
+ return false, errors.Wrapf(err, "failed to snapshot index for
segment %s", segDir)
+ }
+
+ sLst := s.sLst.Load()
+ if sLst != nil {
+ for _, shard := range *sLst {
+ shardDir := filepath.Base(shard.location)
+ shardPath := filepath.Join(segPath, shardDir)
+ s.lfs.MkdirIfNotExist(shardPath, DirPerm)
+ if _, err := shard.table.TakeFileSnapshot(shardPath);
err != nil {
+ if errors.Is(err, ErrNoCurrentSnapshot) {
+ s.l.Debug().Str("shard",
shardDir).Str("segment", segDir).
+ Msg("skipping empty shard
snapshot")
+ continue
+ }
+ return false, errors.Wrapf(err, "failed to
snapshot shard %s in segment %s", shardDir, segDir)
+ }
+ }
+ }
+ return true, nil
+}
+
func (s *segment[T, O]) CreateTSTableIfNotExist(id common.ShardID) (T, error) {
if s, ok := s.getShard(id); ok {
return s.table, nil
@@ -428,7 +554,7 @@ func (sc *segmentController[T, O])
updateOptions(resourceOpts *commonv1.Resource
sc.opts.ShardNum = resourceOpts.ShardNum
}
-func (sc *segmentController[T, O]) selectSegments(timeRange
timestamp.TimeRange) (tt []Segment[T, O], err error) {
+func (sc *segmentController[T, O]) selectSegments(timeRange
timestamp.TimeRange, reopenClosed bool) (tt []Segment[T, O], err error) {
sc.RLock()
defer sc.RUnlock()
last := len(sc.lst) - 1
@@ -440,10 +566,24 @@ func (sc *segmentController[T, O])
selectSegments(timeRange timestamp.TimeRange)
break
}
if s.Overlapping(timeRange) {
- if err = s.incRef(ctx); err != nil {
- return nil, err
+ if reopenClosed {
+ // Real read: reopen if closed and mark as
accessed.
+ if err = s.incRef(ctx); err != nil {
+ return nil, err
+ }
+ s.lastAccessed.Store(now)
+ } else {
+ // Stats peek: pin only if already open, never
reopen.
+ for {
+ current := atomic.LoadInt32(&s.refCount)
+ if current <= 0 {
+ break
+ }
+ if
atomic.CompareAndSwapInt32(&s.refCount, current, current+1) {
+ break
+ }
+ }
}
- s.lastAccessed.Store(now)
tt = append(tt, s)
}
}
@@ -482,6 +622,18 @@ func (sc *segmentController[T, O]) segments(ctx
context.Context, reopenClosed bo
return r, nil
}
+// copySegments returns a snapshot of the current segment list WITHOUT touching
+// reference counts or reopening anything. Callers (e.g. TakeFileSnapshot) that
+// must not force a reopen decide per-segment, under the segment's own lock,
+// whether it is open or closed.
+func (sc *segmentController[T, O]) copySegments() []*segment[T, O] {
+ sc.RLock()
+ defer sc.RUnlock()
+ r := make([]*segment[T, O], len(sc.lst))
+ copy(r, sc.lst)
+ return r
+}
+
func (sc *segmentController[T, O]) closeIdleSegments() int {
maxIdleTime := sc.idleTimeout
diff --git a/banyand/internal/storage/segment_test.go
b/banyand/internal/storage/segment_test.go
index 41fcfccf5..20175520b 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -366,7 +366,7 @@ func TestCloseIdleAndSelectSegments(t *testing.T) {
// Now select segments using the entire time range
timeRange := timestamp.NewInclusiveTimeRange(day1,
day3.Add(24*time.Hour))
- selectedSegments, err := sc.selectSegments(timeRange)
+ selectedSegments, err := sc.selectSegments(timeRange, true)
require.NoError(t, err)
// Should have selected all 3 segments
@@ -1151,7 +1151,7 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t
*testing.T) {
}()
<-start
for c := 0; c < cycles; c++ {
- segs, selErr := sc.selectSegments(timeRange)
+ segs, selErr := sc.selectSegments(timeRange,
true)
if selErr != nil {
t.Errorf("goroutine-%d cycle %d
selectSegments: %v", id, c, selErr)
return
diff --git a/banyand/internal/storage/snapshot.go
b/banyand/internal/storage/snapshot.go
index e5cdd2c65..d3eff63be 100644
--- a/banyand/internal/storage/snapshot.go
+++ b/banyand/internal/storage/snapshot.go
@@ -18,6 +18,7 @@
package storage
import (
+ "encoding/json"
"fmt"
"os"
"path/filepath"
@@ -28,6 +29,21 @@ import (
"github.com/apache/skywalking-banyandb/pkg/logger"
)
+// ReadSnapshotPartNames reads the part directory names recorded in a ".snp"
+// snapshot manifest (a JSON string array) at path. It is the single reader for
+// the part-list manifest written by every data type (stream/measure/trace).
+func ReadSnapshotPartNames(fileSystem fs.FileSystem, path string) ([]string,
error) {
+ data, err := fileSystem.Read(path)
+ if err != nil {
+ return nil, fmt.Errorf("cannot read %s: %w", path, err)
+ }
+ var partNames []string
+ if unmarshalErr := json.Unmarshal(data, &partNames); unmarshalErr !=
nil {
+ return nil, fmt.Errorf("cannot parse %s: %w", path,
unmarshalErr)
+ }
+ return partNames, nil
+}
+
// SnapshotTimeFormat is the timestamp snapshot directory prefix.
const SnapshotTimeFormat = "20060102150405"
diff --git a/banyand/internal/storage/snapshot_closed_segment_test.go
b/banyand/internal/storage/snapshot_closed_segment_test.go
new file mode 100644
index 000000000..11b6be406
--- /dev/null
+++ b/banyand/internal/storage/snapshot_closed_segment_test.go
@@ -0,0 +1,701 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "path/filepath"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/require"
+
+ modelv1
"github.com/apache/skywalking-banyandb/api/proto/banyandb/model/v1"
+ "github.com/apache/skywalking-banyandb/pkg/index"
+ "github.com/apache/skywalking-banyandb/pkg/index/inverted"
+ "github.com/apache/skywalking-banyandb/pkg/logger"
+ pbv1 "github.com/apache/skywalking-banyandb/pkg/pb/v1"
+ "github.com/apache/skywalking-banyandb/pkg/test"
+ "github.com/apache/skywalking-banyandb/pkg/test/flags"
+ "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// buildTestSeriesDocs builds n series-index documents with distinct doc IDs.
+func buildTestSeriesDocs(t *testing.T, n int) index.Documents {
+ t.Helper()
+ var docs index.Documents
+ for i := 0; i < n; i++ {
+ var series pbv1.Series
+ series.Subject = "series_index_stats"
+ series.EntityValues = []*modelv1.TagValue{
+ {Value: &modelv1.TagValue_Str{Str: &modelv1.Str{Value:
fmt.Sprintf("entity_%d", i)}}},
+ }
+ require.NoError(t, series.Marshal())
+ require.Positive(t, series.ID)
+ docs = append(docs, index.Document{
+ DocID: uint64(series.ID),
+ EntityValues: append([]byte(nil), series.Buffer...),
+ })
+ }
+ return docs
+}
+
+// snapshotTestBase is the fixed wall-clock time the snapshot-test TSDB
factories
+// pin the mock clock to.
+var snapshotTestBase = time.Date(2024, 5, 1, 0, 0, 0, 0, time.Local)
+
+// snapshotTestDir initializes logging and returns a fresh temp dir whose
cleanup
+// is registered with t.
+func snapshotTestDir(t *testing.T) string {
+ t.Helper()
+ require.NoError(t, logger.Init(logger.Logging{Env: "dev", Level:
flags.LogLevel}))
+ dir, defFn := test.Space(require.New(t))
+ t.Cleanup(defFn)
+ return dir
+}
+
+// openSnapshotTSDB opens a TSDB (1-day segments, 1-hour idle timeout, ttlDays
+// TTL) with the clock pinned at snapshotTestBase and NO segment created, so a
+// test can drive the real controller flow (CreateSegmentIfNotExist +
+// IndexDB().Insert + closeIdleSegments).
+func openSnapshotTSDB(t *testing.T, dir string, ttlDays int)
(TSDB[*MockTSTable, any], *segmentController[*MockTSTable, any]) {
+ t.Helper()
+ opts := TSDBOpts[*MockTSTable, any]{
+ Location: dir,
+ SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+ TTL: IntervalRule{Unit: DAY, Num: ttlDays},
+ ShardNum: 1,
+ TSTableCreator: MockTSTableCreator,
+ SegmentIdleTimeout: time.Hour,
+ }
+ mc := timestamp.NewMockClock()
+ mc.Set(snapshotTestBase)
+ ctx := timestamp.SetClock(context.Background(), mc)
+ tsdb, err := OpenTSDB(ctx, opts, NewServiceCache(), group)
+ require.NoError(t, err)
+ return tsdb, tsdb.(*database[*MockTSTable, any]).segmentController
+}
+
+// openSnapshotTestTSDB opens a TSDB with a single day segment created at
+// snapshotTestBase and returns the db, its controller and the (open) segment.
+func openSnapshotTestTSDB(t *testing.T, dir string) (TSDB[*MockTSTable, any],
*segmentController[*MockTSTable, any], *segment[*MockTSTable, any]) {
+ t.Helper()
+ tsdb, sc := openSnapshotTSDB(t, dir, 3)
+ seg, err := tsdb.CreateSegmentIfNotExist(snapshotTestBase)
+ require.NoError(t, err)
+ seg.DecRef() // leave it at its open baseline
+ require.Len(t, sc.lst, 1)
+ return tsdb, sc, sc.lst[0]
+}
+
+// TestTakeFileSnapshot_ClosedSegmentIsNotReopened is the core backup
+// guarantee: snapshotting an idle-closed segment must NOT reopen it (no
+// OpenWriter, no exclusive-lock churn, no nil-index panic). The closed segment
+// is hard-linked from its quiescent on-disk files instead.
+//
+// "Did not reopen" is asserted by checking the segment stays closed
+// (index == nil) and unreferenced (refCount == 0) after the snapshot.
+func TestTakeFileSnapshot_ClosedSegmentIsNotReopened(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ // Drive the segment into the idle-closed state every cold segment
lives in.
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index, "precondition: segment must be idle-closed")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err, "snapshot of a closed segment must succeed")
+ require.True(t, created)
+
+ // The crux: the closed segment was NOT reopened.
+ require.Nil(t, seg.index, "snapshot must not reopen a closed segment")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "snapshot
must not leave a dangling reference")
+
+ // And the series index was carried into the snapshot via hard-link.
+ segDir := filepath.Join(snapshotDir, filepath.Base(seg.location))
+ require.DirExists(t, segDir)
+ require.DirExists(t, filepath.Join(segDir, seriesIndexDirName))
+}
+
+// TestTakeFileSnapshot_ClosedSegmentExcludesTransientArtifacts verifies the
+// closed-segment hard-link path copies current data and manifests but skips
+// transient / non-current artifacts (the failed-parts directory, the
+// external-segment temp directory, and .tmp files), matching what the open
+// path produces.
+func TestTakeFileSnapshot_ClosedSegmentExcludesTransientArtifacts(t
*testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index)
+
+ // Lay out a closed segment's on-disk artifacts: a current part (kept)
plus
+ // transient/non-current ones that must NOT be carried into the
snapshot.
+ loc := seg.location
+ writeFile := func(rel string) {
+ p := filepath.Join(loc, rel)
+ require.NoError(t, os.MkdirAll(filepath.Dir(p), DirPerm))
+ require.NoError(t, os.WriteFile(p, []byte("x"), FilePerm))
+ }
+ writeFile(filepath.Join("shard-0", "0000000000000001", "metadata.json"))
+ writeFile(filepath.Join("shard-0", FailedPartsDirName, "junk.bin"))
+ writeFile(filepath.Join("shard-0", "stale.tmp"))
+ writeFile(filepath.Join(seriesIndexDirName,
inverted.ExternalSegmentTempDirName, "t.bin"))
+ writeFile(filepath.Join(seriesIndexDirName, inverted.LockFilename))
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ segSnap := filepath.Join(snapshotDir, filepath.Base(loc))
+ require.FileExists(t, filepath.Join(segSnap, "shard-0",
"0000000000000001", "metadata.json"), "current part must be copied")
+ require.DirExists(t, filepath.Join(segSnap, seriesIndexDirName),
"series index must be copied")
+ require.NoDirExists(t, filepath.Join(segSnap, "shard-0",
FailedPartsDirName), "failed-parts must be excluded")
+ require.NoFileExists(t, filepath.Join(segSnap, "shard-0", "stale.tmp"),
".tmp must be excluded")
+ require.NoDirExists(t, filepath.Join(segSnap, seriesIndexDirName,
inverted.ExternalSegmentTempDirName), "external-segment temp must be excluded")
+ require.NoFileExists(t, filepath.Join(segSnap, seriesIndexDirName,
inverted.LockFilename), "bluge lock file must be excluded")
+}
+
+// TestSeriesIndexStats_ClosedSegmentIsNotReopened verifies the inspection
+// primitive: reading a closed segment's series index stats must read from disk
+// read-only and must NOT reopen the writable index.
+func TestSeriesIndexStats_ClosedSegmentIsNotReopened(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ // Write a known number of series into the open segment's index, then
drive
+ // it idle-closed (performCleanup flushes the index to disk on close).
+ const seriesCount = 10
+ require.NoError(t, seg.IndexDB().Insert(buildTestSeriesDocs(t,
seriesCount)))
+
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index, "precondition: segment is idle-closed")
+
+ count, size := seg.SeriesIndexStats()
+ require.Equal(t, int64(seriesCount), count, "closed series index doc
count must be read from disk read-only")
+ require.Positive(t, size, "closed series index on-disk size must be
reported")
+ require.Nil(t, seg.index, "reading series index stats must not reopen a
closed segment")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestTakeFileSnapshot_OpenSegmentUsesLiveState verifies the other branch: an
+// open segment is snapshotted through its live state and is not closed by the
+// snapshot (the held reference keeps it open).
+func TestTakeFileSnapshot_OpenSegmentUsesLiveState(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, _, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ require.NotNil(t, seg.index, "precondition: segment is open")
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ require.NotNil(t, seg.index, "open segment must stay open after
snapshot")
+ segDir := filepath.Join(snapshotDir, filepath.Base(seg.location))
+ require.DirExists(t, filepath.Join(segDir, seriesIndexDirName))
+}
+
+// TestTakeFileSnapshot_ConcurrentWithReclaim_ClosedPath_NoPanic exercises the
+// closed-segment hard-link path against a concurrently running idle reclaimer.
+// Because the snapshot never reopens the closed segment, the reclaimer can
+// never steal a reference out from under the snapshot, so the original
+// nil-index panic cannot occur. Run with -race.
+//
+// NOTE: the broader "query-driven reopen racing the idle reclaimer" scenario
+// (and the open-segment reclaim race) is a separate concern in the segment
+// reference-counting model and is intentionally NOT covered here.
+func TestTakeFileSnapshot_ConcurrentWithReclaim_ClosedPath_NoPanic(t
*testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc, seg := openSnapshotTestTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ // Put the segment into the idle-closed state and keep it there: nothing
+ // reopens it (the snapshot hard-links it without opening a writer).
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index)
+
+ stop := make(chan struct{})
+ var wg sync.WaitGroup
+
+ // Reclaimer: keep hammering closeIdleSegments (a no-op on an
already-closed
+ // segment, but it must never collide with the snapshot's hard-link).
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for {
+ select {
+ case <-stop:
+ return
+ default:
+ }
+ seg.lastAccessed.Store(time.Now().Add(-2 *
time.Hour).UnixNano())
+ sc.closeIdleSegments()
+ }
+ }()
+
+ // Snapshotter: take snapshots into unique directories.
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; ; i++ {
+ select {
+ case <-stop:
+ return
+ default:
+ }
+ _, _ = tsdb.TakeFileSnapshot(filepath.Join(dir, "snap",
"s"+strconv.Itoa(i)))
+ }
+ }()
+
+ time.Sleep(300 * time.Millisecond)
+ close(stop)
+ wg.Wait()
+
+ require.Nil(t, seg.index, "the segment must stay closed: snapshot never
reopens it")
+ _, err := tsdb.TakeFileSnapshot(filepath.Join(dir, "final-snapshot"))
+ require.NoError(t, err)
+}
+
+// newEmptySnapshotTSDB opens a TSDB (1-day segments, 1-hour idle timeout) with
+// newEmptySnapshotTSDB opens a snapshot-test TSDB with no segment created yet.
+func newEmptySnapshotTSDB(t *testing.T, dir string) (TSDB[*MockTSTable, any],
*segmentController[*MockTSTable, any]) {
+ t.Helper()
+ return openSnapshotTSDB(t, dir, 30)
+}
+
+// snapshotDay returns the i-th distinct day starting at snapshotTestBase.
+func snapshotDay(i int) time.Time {
+ return snapshotTestBase.Add(time.Duration(i) * 24 * time.Hour)
+}
+
+// createSegmentWithSeries creates (via the controller) the segment containing
+// ts, inserts n series documents into its series index, and returns the
+// internal segment left at its open baseline (the create reference is
released).
+func createSegmentWithSeries(t *testing.T, tsdb TSDB[*MockTSTable, any], ts
time.Time, n int) *segment[*MockTSTable, any] {
+ t.Helper()
+ s, err := tsdb.CreateSegmentIfNotExist(ts)
+ require.NoError(t, err)
+ if n > 0 {
+ require.NoError(t, s.IndexDB().Insert(buildTestSeriesDocs(t,
n)))
+ }
+ seg := s.(*segment[*MockTSTable, any])
+ s.DecRef()
+ return seg
+}
+
+// snapshotSeriesDocCount reads the series-index doc count of seg's snapshot
copy
+// under snapshotDir, read-only.
+func snapshotSeriesDocCount(t *testing.T, snapshotDir string, seg
*segment[*MockTSTable, any]) int64 {
+ t.Helper()
+ count, err := inverted.ReadOnlyDocCount(filepath.Join(snapshotDir,
filepath.Base(seg.location), seriesIndexDirName))
+ require.NoError(t, err)
+ return count
+}
+
+// assertContainsHardLink asserts at least one regular file under dstDir is the
+// same inode as its counterpart in srcDir (i.e. hard-linked, not byte-copied).
+func assertContainsHardLink(t *testing.T, srcDir, dstDir string) {
+ t.Helper()
+ entries, err := os.ReadDir(srcDir)
+ require.NoError(t, err)
+ sawRegularFile := false
+ for _, e := range entries {
+ if e.IsDir() {
+ continue
+ }
+ sawRegularFile = true
+ srcInfo, srcErr := os.Stat(filepath.Join(srcDir, e.Name()))
+ dstInfo, dstErr := os.Stat(filepath.Join(dstDir, e.Name()))
+ if srcErr == nil && dstErr == nil && os.SameFile(srcInfo,
dstInfo) {
+ return
+ }
+ }
+ require.True(t, sawRegularFile, "precondition: %s must contain at least
one regular file to compare", srcDir)
+ t.Fatalf("expected at least one hard-linked file between %s and %s",
srcDir, dstDir)
+}
+
+// TestSnapshotInto_OpenSegmentIsRestorable drives the open-segment branch
+// through the real controller: an open segment with live series data is
+// snapshotted via its live state, stays open, leaves no dangling reference,
and
+// the snapshot's series index is independently readable with the same doc
count.
+func TestSnapshotInto_OpenSegmentIsRestorable(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, _ := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ const seriesCount = 8
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), seriesCount)
+ require.NotNil(t, seg.index, "precondition: segment is open")
+ refBefore := atomic.LoadInt32(&seg.refCount)
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ require.NotNil(t, seg.index, "open segment must stay open after
snapshot")
+ require.Equal(t, refBefore, atomic.LoadInt32(&seg.refCount), "open-path
snapshot must balance its reference bump")
+ require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t,
snapshotDir, seg), "snapshot series index must hold the live docs")
+}
+
+// TestSnapshotInto_ClosedSegmentIsRestorableViaHardLink drives the
+// closed-segment branch: an idle-closed segment is snapshotted by hard-linking
+// its quiescent files (not reopened), stays closed, and the snapshot's series
+// index is readable with the same doc count and shares inodes with the source.
+func TestSnapshotInto_ClosedSegmentIsRestorableViaHardLink(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ const seriesCount = 5
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), seriesCount)
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index, "precondition: segment is idle-closed")
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ require.Nil(t, seg.index, "snapshot must not reopen the closed segment")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+ require.Equal(t, int64(seriesCount), snapshotSeriesDocCount(t,
snapshotDir, seg))
+
+ srcIndex := filepath.Join(seg.location, seriesIndexDirName)
+ dstIndex := filepath.Join(snapshotDir, filepath.Base(seg.location),
seriesIndexDirName)
+ assertContainsHardLink(t, srcIndex, dstIndex)
+}
+
+// TestTakeFileSnapshot_MixedOpenAndClosedSegments snapshots several segments
at
+// once where some are open and some idle-closed: every segment is captured
with
+// the correct per-segment data, the open/closed states are preserved (the
+// closed ones are not reopened), and references stay balanced.
+func TestTakeFileSnapshot_MixedOpenAndClosedSegments(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg0 := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+ seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 5)
+ seg2 := createSegmentWithSeries(t, tsdb, snapshotDay(2), 7)
+
+ // Idle-close the two older segments; keep the newest open.
+ seg0.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ seg1.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 2, sc.closeIdleSegments())
+ require.Nil(t, seg0.index)
+ require.Nil(t, seg1.index)
+ require.NotNil(t, seg2.index)
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ // States preserved: closed stay closed, open stays open.
+ require.Nil(t, seg0.index, "closed segment must not be reopened by
snapshot")
+ require.Nil(t, seg1.index, "closed segment must not be reopened by
snapshot")
+ require.NotNil(t, seg2.index, "open segment must stay open")
+
+ // Every segment is present and restorable with its own doc count.
+ require.Equal(t, int64(3), snapshotSeriesDocCount(t, snapshotDir, seg0))
+ require.Equal(t, int64(5), snapshotSeriesDocCount(t, snapshotDir, seg1))
+ require.Equal(t, int64(7), snapshotSeriesDocCount(t, snapshotDir, seg2))
+}
+
+// TestTakeFileSnapshot_DeletedSegmentIsSkipped verifies a segment flagged for
+// deletion (the concurrent-retention race window) is skipped by snapshotInto
+// while the surviving segment is still snapshotted.
+func TestTakeFileSnapshot_DeletedSegmentIsSkipped(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, _ := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ doomed := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+ survivor := createSegmentWithSeries(t, tsdb, snapshotDay(1), 4)
+
+ // Model a segment that retention has flagged for deletion but that is
still
+ // in the controller list when the snapshot copies it.
+ atomic.StoreUint32(&doomed.mustBeDeleted, 1)
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created, "the surviving segment was written")
+
+ require.NoDirExists(t, filepath.Join(snapshotDir,
filepath.Base(doomed.location)), "a segment flagged for deletion must be
skipped")
+ require.DirExists(t, filepath.Join(snapshotDir,
filepath.Base(survivor.location)), "the surviving segment must be snapshotted")
+}
+
+// TestTakeFileSnapshot_AllSegmentsDeletedReturnsFalse verifies that when every
+// segment is flagged for deletion, the snapshot writes nothing and reports
+// success=false with no error.
+func TestTakeFileSnapshot_AllSegmentsDeletedReturnsFalse(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, _ := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg0 := createSegmentWithSeries(t, tsdb, snapshotDay(0), 3)
+ seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 4)
+ atomic.StoreUint32(&seg0.mustBeDeleted, 1)
+ atomic.StoreUint32(&seg1.mustBeDeleted, 1)
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.False(t, created, "no segment content was written")
+ require.NoDirExists(t, snapshotDir, "nothing should be written when all
segments are skipped")
+}
+
+// TestSnapshotInto_EmptyOpenSegment verifies an open segment that has no
+// ingested data yet is still snapshotted (metadata + an empty series index)
and
+// reports success.
+func TestSnapshotInto_EmptyOpenSegment(t *testing.T) {
+ dir := snapshotTestDir(t)
+
+ tsdb, _ := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ require.NotNil(t, seg.index, "precondition: segment is open")
+
+ snapshotDir := filepath.Join(dir, "snapshot")
+ created, err := tsdb.TakeFileSnapshot(snapshotDir)
+ require.NoError(t, err)
+ require.True(t, created)
+
+ segSnap := filepath.Join(snapshotDir, filepath.Base(seg.location))
+ require.FileExists(t, filepath.Join(segSnap, SegmentMetadataFilename),
"segment metadata must be snapshotted")
+ require.DirExists(t, filepath.Join(segSnap, seriesIndexDirName),
"series index dir must be snapshotted")
+ require.Equal(t, int64(0), snapshotSeriesDocCount(t, snapshotDir, seg))
+}
+
+// allTimeRange covers every segment the snapshot-test factories create.
+func allTimeRange() timestamp.TimeRange {
+ return timestamp.TimeRange{Start: time.Unix(0, 0), End: time.Unix(0,
timestamp.MaxNanoTime)}
+}
+
+// idleClose drives seg into the idle-closed state and asserts it is closed.
+func idleClose(t *testing.T, sc *segmentController[*MockTSTable, any], seg
*segment[*MockTSTable, any]) {
+ t.Helper()
+ seg.lastAccessed.Store(time.Now().Add(-2 * time.Hour).UnixNano())
+ require.Equal(t, 1, sc.closeIdleSegments())
+ require.Nil(t, seg.index, "precondition: segment must be idle-closed")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount),
"precondition: closed segment has refCount 0")
+}
+
+// TestSelectSegments_ReopenTrue_ReopensClosedSegment: reopenClosed=true on a
+// CLOSED segment reopens it (incRef -> initialize), restores refCount to 1,
and
+// refreshes lastAccessed (a real read keeps the segment hot).
+func TestSelectSegments_ReopenTrue_ReopensClosedSegment(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 5)
+ idleClose(t, sc, seg)
+ closedAccessed := seg.lastAccessed.Load()
+
+ got, err := sc.selectSegments(allTimeRange(), true)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.NotNil(t, seg.index, "reopenClosed=true must reopen a closed
segment")
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "reopen
reinitializes refCount to 1")
+ require.Greater(t, seg.lastAccessed.Load(), closedAccessed, "a real
read refreshes lastAccessed")
+
+ got[0].DecRef()
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "DecRef
returns the reopened segment to closed")
+ require.Nil(t, seg.index)
+}
+
+// TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes:
reopenClosed=true
+// on an OPEN segment pins it (+1) and refreshes lastAccessed without
re-opening.
+func TestSelectSegments_ReopenTrue_OpenSegmentBumpsAndRefreshes(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ require.NotNil(t, seg.index)
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "open
baseline refCount is 1")
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ before := seg.lastAccessed.Load()
+
+ got, err := sc.selectSegments(allTimeRange(), true)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.NotNil(t, seg.index, "open segment stays open")
+ require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "incRef
bumps the open segment")
+ require.Greater(t, seg.lastAccessed.Load(), before, "a real read
refreshes lastAccessed")
+
+ got[0].DecRef()
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened: reopenClosed=false
on
+// a CLOSED segment returns it WITHOUT reopening, without bumping refCount, and
+// without refreshing lastAccessed; the caller's DecRef is a safe no-op.
+func TestSelectSegments_ReopenFalse_ClosedSegmentNotReopened(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 5)
+ idleClose(t, sc, seg)
+ closedAccessed := seg.lastAccessed.Load()
+
+ got, err := sc.selectSegments(allTimeRange(), false)
+ require.NoError(t, err)
+ require.Len(t, got, 1, "closed segment is still returned for stats")
+ require.Nil(t, seg.index, "reopenClosed=false must NOT reopen a closed
segment")
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount), "closed
segment is returned without a ref bump")
+ require.Equal(t, closedAccessed, seg.lastAccessed.Load(), "a stats peek
must not refresh lastAccessed")
+
+ got[0].DecRef() // safe no-op on a refCount==0 segment
+ require.Equal(t, int32(0), atomic.LoadInt32(&seg.refCount))
+ require.Nil(t, seg.index)
+}
+
+// TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh:
reopenClosed=false
+// on an OPEN segment pins it (+1) but does NOT refresh lastAccessed, so the
+// segment stays eligible for the next idle-close tick.
+func TestSelectSegments_ReopenFalse_OpenSegmentPinnedNoRefresh(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+ seg.lastAccessed.Store(time.Now().Add(-time.Hour).UnixNano())
+ before := seg.lastAccessed.Load()
+
+ got, err := sc.selectSegments(allTimeRange(), false)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.NotNil(t, seg.index, "open segment stays open")
+ require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "open
segment is pinned (+1)")
+ require.Equal(t, before, seg.lastAccessed.Load(), "a stats peek must
not refresh lastAccessed")
+
+ got[0].DecRef()
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount))
+}
+
+// TestSelectSegments_ReopenFalse_InUseSegmentPinned: reopenClosed=false on a
+// segment already in active use (refCount>1) pins it (+1), never disturbing
the
+// references other callers hold.
+func TestSelectSegments_ReopenFalse_InUseSegmentPinned(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ require.NoError(t, seg.incRef(context.Background())) // a concurrent
reader holds a ref
+ require.Equal(t, int32(2), atomic.LoadInt32(&seg.refCount), "in-use:
baseline + one active reader")
+
+ got, err := sc.selectSegments(allTimeRange(), false)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.Equal(t, int32(3), atomic.LoadInt32(&seg.refCount), "in-use
open segment is pinned (+1)")
+
+ got[0].DecRef() // release the selectSegments pin
+ seg.DecRef() // release the simulated active reader
+ require.Equal(t, int32(1), atomic.LoadInt32(&seg.refCount), "back to
open baseline")
+}
+
+// TestSelectSegments_TimeRangeFilters: only segments overlapping the requested
+// range are returned; a range that ends before the newest segment exercises
the
+// sorted early-break, and a range covering one segment returns exactly that
one.
+func TestSelectSegments_TimeRangeFilters(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, sc := newEmptySnapshotTSDB(t, dir)
+ defer func() { require.NoError(t, tsdb.Close()) }()
+
+ _ = createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ seg1 := createSegmentWithSeries(t, tsdb, snapshotDay(1), 0)
+ _ = createSegmentWithSeries(t, tsdb, snapshotDay(2), 0)
+ require.Len(t, sc.lst, 3)
+
+ // Range inside day 1 only -> exactly seg1.
+ only := timestamp.NewInclusiveTimeRange(snapshotDay(1).Add(time.Hour),
snapshotDay(1).Add(2*time.Hour))
+ got, err := sc.selectSegments(only, false)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ require.Equal(t, seg1.location, got[0].(*segment[*MockTSTable,
any]).location)
+ got[0].DecRef()
+
+ // Range entirely after every segment -> none (early-break path).
+ after := timestamp.NewInclusiveTimeRange(snapshotDay(10),
snapshotDay(11))
+ got, err = sc.selectSegments(after, true)
+ require.NoError(t, err)
+ require.Empty(t, got)
+
+ // Range entirely before every segment -> none.
+ before := timestamp.NewInclusiveTimeRange(snapshotDay(-5),
snapshotDay(-4))
+ got, err = sc.selectSegments(before, true)
+ require.NoError(t, err)
+ require.Empty(t, got)
+}
+
+// TestSelectSegments_PublicWrapper verifies the exported SelectSegments
delegates
+// to the controller and short-circuits to nil once the database is closed.
+func TestSelectSegments_PublicWrapper(t *testing.T) {
+ dir := snapshotTestDir(t)
+ tsdb, _ := newEmptySnapshotTSDB(t, dir)
+
+ seg := createSegmentWithSeries(t, tsdb, snapshotDay(0), 0)
+ require.NotNil(t, seg.index)
+
+ got, err := tsdb.SelectSegments(allTimeRange(), false)
+ require.NoError(t, err)
+ require.Len(t, got, 1)
+ got[0].DecRef()
+
+ require.NoError(t, tsdb.Close())
+ got, err = tsdb.SelectSegments(allTimeRange(), true)
+ require.NoError(t, err)
+ require.Nil(t, got, "a closed database returns no segments")
+}
diff --git a/banyand/internal/storage/snapshot_test.go
b/banyand/internal/storage/snapshot_test.go
index e40459035..7f219e07b 100644
--- a/banyand/internal/storage/snapshot_test.go
+++ b/banyand/internal/storage/snapshot_test.go
@@ -220,3 +220,33 @@ func TestDeleteStaleSnapshotsWithMinAge(t *testing.T) {
})
}
}
+
+func TestReadSnapshotPartNames(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, deferFn := test.Space(require.New(t))
+ defer deferFn()
+
+ t.Run("valid manifest returns the part names verbatim", func(t
*testing.T) {
+ manifest := filepath.Join(tmpPath, "valid.snp")
+ _, writeErr :=
fileSystem.Write([]byte(`["0000000000000001","00000000000000ff"]`), manifest,
FilePerm)
+ require.NoError(t, writeErr)
+ names, err := ReadSnapshotPartNames(fileSystem, manifest)
+ require.NoError(t, err)
+ require.Equal(t, []string{"0000000000000001",
"00000000000000ff"}, names)
+ })
+
+ t.Run("missing file is a read error", func(t *testing.T) {
+ _, err := ReadSnapshotPartNames(fileSystem,
filepath.Join(tmpPath, "absent.snp"))
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "cannot read")
+ })
+
+ t.Run("invalid JSON is a parse error", func(t *testing.T) {
+ manifest := filepath.Join(tmpPath, "invalid.snp")
+ _, writeErr := fileSystem.Write([]byte("{not-an-array}"),
manifest, FilePerm)
+ require.NoError(t, writeErr)
+ _, err := ReadSnapshotPartNames(fileSystem, manifest)
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "cannot parse")
+ })
+}
diff --git a/banyand/internal/storage/storage.go
b/banyand/internal/storage/storage.go
index fc33a1ffa..cb5c8d495 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -111,7 +111,11 @@ type IndexDB interface {
type TSDB[T TSTable, O any] interface {
io.Closer
CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
- SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
+ // SelectSegments returns the segments overlapping timeRange.
reopenClosed=true
+ // (query) reopens closed segments and marks them accessed; false
(read-only
+ // stats) returns them without reopening or refreshing their idle
timer. The
+ // caller must DecRef every returned segment (a no-op for a closed one).
+ SelectSegments(timeRange timestamp.TimeRange, reopenClosed bool)
([]Segment[T, O], error)
// SegmentInterval returns the current segment interval rule.
SegmentInterval() IntervalRule
Tick(ts int64)
@@ -138,6 +142,12 @@ type Segment[T TSTable, O any] interface {
TablesWithShardIDs() ([]T, []common.ShardID, []Cache)
Lookup(ctx context.Context, series []*pbv1.Series) (pbv1.SeriesList,
error)
IndexDB() IndexDB
+ // Location returns the on-disk directory of the segment.
+ Location() string
+ // SeriesIndexStats returns the series index document count and on-disk
size.
+ // It works without reopening a closed segment: an open segment is
reported
+ // from its live index, a closed one is read from disk read-only.
+ SeriesIndexStats() (dataCount int64, dataSizeBytes int64)
}
// TSTable is time series table.
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index d6e0405b4..afe341ea1 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -274,11 +274,11 @@ func (d *database[T, O]) CreateSegmentIfNotExist(ts
time.Time) (Segment[T, O], e
return d.segmentController.createSegment(ts)
}
-func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange)
([]Segment[T, O], error) {
+func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange,
reopenClosed bool) ([]Segment[T, O], error) {
if d.closed.Load() {
return nil, nil
}
- return d.segmentController.selectSegments(timeRange)
+ return d.segmentController.selectSegments(timeRange, reopenClosed)
}
func (d *database[T, O]) SegmentInterval() IntervalRule {
@@ -292,21 +292,22 @@ func (d *database[T, O]) UpdateOptions(resourceOpts
*commonv1.ResourceOpts) {
d.segmentController.updateOptions(resourceOpts)
}
+// TakeFileSnapshot writes a point-in-time snapshot of every segment under dst.
+// success is true only when at least one segment actually wrote content; it is
+// false (with a nil error) when there are no segments, or when every segment
is
+// concurrently being deleted and thus skipped.
func (d *database[T, O]) TakeFileSnapshot(dst string) (success bool, err
error) {
if d.closed.Load() {
return false, errors.New("database is closed")
}
- segments, segErr := d.segmentController.segments(context.Background(),
true)
- if segErr != nil {
- return false, errors.Wrap(segErr, "failed to get segments")
- }
- defer func() {
- for _, seg := range segments {
- seg.DecRef()
- }
- }()
-
+ // The snapshot must NOT reopen a closed segment: reopening an
idle-closed
+ // cold segment is the source of the nil-index panic and the bluge
+ // "exclusive lock" churn. Take the current segment objects WITHOUT
forcing
+ // a reopen. A closed (quiescent) segment is hard-linked directly from
its
+ // immutable on-disk files; an open segment is snapshotted through its
live
+ // series index and shard tables.
+ segments := d.segmentController.copySegments()
if len(segments) == 0 {
return false, nil
}
@@ -320,42 +321,14 @@ func (d *database[T, O]) TakeFileSnapshot(dst string)
(success bool, err error)
log.Info().Int("segment_count", len(segments)).Str("db_location",
d.location).
Msgf("taking file snapshot for %s", dst)
for _, seg := range segments {
- segDir := filepath.Base(seg.location)
- segPath := filepath.Join(dst, segDir)
- d.lfs.MkdirIfNotExist(segPath, DirPerm)
-
- metadataSrc := filepath.Join(seg.location, metadataFilename)
- metadataDest := filepath.Join(segPath, metadataFilename)
- if linkErr := d.lfs.CreateHardLink(metadataSrc, metadataDest,
nil); linkErr != nil {
- return false, errors.Wrapf(linkErr, "failed to snapshot
metadata for segment %s", segDir)
- }
-
- indexPath := filepath.Join(segPath, seriesIndexDirName)
- d.lfs.MkdirIfNotExist(indexPath, DirPerm)
- if indexErr := seg.index.store.TakeFileSnapshot(indexPath);
indexErr != nil {
- return false, errors.Wrapf(indexErr, "failed to
snapshot index for segment %s", segDir)
- }
-
- sLst := seg.sLst.Load()
- if sLst == nil {
- continue
- }
- for _, shard := range *sLst {
- shardDir := filepath.Base(shard.location)
- shardPath := filepath.Join(segPath, shardDir)
- d.lfs.MkdirIfNotExist(shardPath, DirPerm)
- if _, shardErr :=
shard.table.TakeFileSnapshot(shardPath); shardErr != nil {
- if errors.Is(shardErr, ErrNoCurrentSnapshot) {
- log.Debug().Str("shard",
shardDir).Str("segment", segDir).
- Msg("skipping empty shard
snapshot")
- continue
- }
- return false, errors.Wrapf(shardErr, "failed to
snapshot shard %s in segment %s", shardDir, segDir)
- }
+ wrote, segErr := seg.snapshotInto(dst)
+ if segErr != nil {
+ return false, segErr
}
+ success = success || wrote
}
- return true, nil
+ return success, nil
}
func (d *database[T, O]) GetExpiredSegmentsTimeRange() *timestamp.TimeRange {
diff --git a/banyand/measure/cache_benchmark_test.go
b/banyand/measure/cache_benchmark_test.go
index 74e124814..fb1ae7839 100644
--- a/banyand/measure/cache_benchmark_test.go
+++ b/banyand/measure/cache_benchmark_test.go
@@ -305,7 +305,7 @@ func (m *measure) QueryWithCache(ctx context.Context, mqo
model.MeasureQueryOpti
tsdb = db.(storage.TSDB[*tsTable, option])
}
- segments, err := tsdb.SelectSegments(*mqo.TimeRange)
+ segments, err := tsdb.SelectSegments(*mqo.TimeRange, true)
if err != nil {
return nil, err
}
diff --git a/banyand/measure/metadata.go b/banyand/measure/metadata.go
index 93e857a70..9db478549 100644
--- a/banyand/measure/metadata.go
+++ b/banyand/measure/metadata.go
@@ -429,10 +429,13 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
if tsdb == nil {
return nil, nil
}
+ // reopenClosed=false: do NOT reopen closed segments, so inspecting cold
+ // segments never reopens their writable index (no exclusive-lock
churn).
+ // Closed segments are reported from their on-disk files.
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return nil, segmentsErr
}
@@ -442,10 +445,19 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
timeRange := segment.GetTimeRange()
tables, _ := segment.Tables()
var shardInfoList []*databasev1.ShardInfo
- for shardIdx, table := range tables {
- shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
- shardInfoList = append(shardInfoList, shardInfo)
- totalDataSize += shardInfo.DataSizeBytes
+ if len(tables) > 0 {
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ } else {
+ // No live shard tables (a closed segment, or a
brand-new open one
+ // with no data yet): read shard part stats from disk
without
+ // reopening. Either way the result is the on-disk
shard stats.
+ closedShards, closedSize :=
storage.CollectClosedShardInfo(segment.Location())
+ shardInfoList = closedShards
+ totalDataSize += closedSize
}
seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -468,11 +480,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
}
func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
- indexDB := segment.IndexDB()
- if indexDB == nil {
- return &databasev1.SeriesIndexInfo{}
- }
- dataCount, dataSizeBytes := indexDB.Stats()
+ // SeriesIndexStats reads from the live index when the segment is open
and
+ // from disk (read-only) when it is closed, so inspecting a cold segment
+ // never reopens its writable index.
+ dataCount, dataSizeBytes := segment.SeriesIndexStats()
return &databasev1.SeriesIndexInfo{
DataCount: dataCount,
DataSizeBytes: dataSizeBytes,
@@ -547,7 +558,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName
string) (int64, error) {
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
}
diff --git a/banyand/measure/metadata_internal_test.go
b/banyand/measure/metadata_internal_test.go
index 87fa5db3a..610e97b22 100644
--- a/banyand/measure/metadata_internal_test.go
+++ b/banyand/measure/metadata_internal_test.go
@@ -18,31 +18,103 @@
package measure
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
storage.Segment[*tsTable, option]
}
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12,
345 }
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
sr := &schemaRepo{}
- info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
require.NotNil(t, info)
- require.Equal(t, int64(0), info.DataCount)
- require.Equal(t, int64(0), info.DataSizeBytes)
+ require.Equal(t, int64(12), info.DataCount)
+ require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealMeasurePart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real measure part exactly as
a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the measure part metadata.json keys
the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealMeasurePart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ mp := generateMemPart()
+ mp.mustInitFromDataPoints(dpsTS1)
+ wantCount := int64(mp.partMetadata.TotalCount)
+ require.Positive(t, wantCount)
+ const epoch = uint64(1)
+ mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+ // Write the shard's snapshot manifest referencing the flushed part, as
the
+ // tsTable does after a flush, so the part is "current" (not an orphan).
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+ // The segment is closed (no live tables): stats are read straight from
disk.
+ infos, totalSize := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, uint32(0), infos[0].ShardId)
+ require.Equal(t, wantCount, infos[0].DataCount, "data count must match
the flushed part's totalCount")
+ require.Positive(t, infos[0].DataSizeBytes)
+ require.Equal(t, int64(1), infos[0].PartCount)
+ require.Equal(t, int64(1), infos[0].FilePartCount)
+ require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored,
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ // Current part (in the manifest).
+ current := generateMemPart()
+ current.mustInitFromDataPoints(dpsTS1)
+ wantCount := int64(current.partMetadata.TotalCount)
+ current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+ // Orphan part on disk but NOT referenced by the manifest.
+ orphan := generateMemPart()
+ orphan.mustInitFromDataPoints(dpsTS2)
+ orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(1, []string{partName(1)})
+
+ infos, _ := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not
be counted")
+ require.Equal(t, int64(1), infos[0].PartCount)
}
diff --git a/banyand/measure/query.go b/banyand/measure/query.go
index 92a397c47..093289bbe 100644
--- a/banyand/measure/query.go
+++ b/banyand/measure/query.go
@@ -116,7 +116,7 @@ func (m *measure) Query(ctx context.Context, mqo
model.MeasureQueryOptions) (mqr
tsdb = db.(storage.TSDB[*tsTable, option])
}
- segments, err := tsdb.SelectSegments(*mqo.TimeRange)
+ segments, err := tsdb.SelectSegments(*mqo.TimeRange, true)
if err != nil {
return nil, err
}
diff --git a/banyand/measure/tstable.go b/banyand/measure/tstable.go
index 7d281ee6b..67f2306e3 100644
--- a/banyand/measure/tstable.go
+++ b/banyand/measure/tstable.go
@@ -291,16 +291,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
}
func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
- snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- data, err := tst.fileSystem.Read(snapshotPath)
+ partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem,
filepath.Join(tst.root, snapshotName(snapshot)))
if err != nil {
- return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
- }
- var partNames []string
- if err := json.Unmarshal(data, &partNames); err != nil {
- return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+ return nil, err
}
- var result []uint64
+ result := make([]uint64, 0, len(partNames))
for i := range partNames {
e, parseErr := parseEpoch(partNames[i])
if parseErr != nil {
diff --git a/banyand/stream/metadata.go b/banyand/stream/metadata.go
index 99db76512..47001983c 100644
--- a/banyand/stream/metadata.go
+++ b/banyand/stream/metadata.go
@@ -296,7 +296,7 @@ func (sr *schemaRepo) CollectDataInfo(ctx context.Context,
group string) (*datab
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return nil, segmentsErr
}
@@ -306,10 +306,22 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
timeRange := segment.GetTimeRange()
tables, _ := segment.Tables()
var shardInfoList []*databasev1.ShardInfo
- for shardIdx, table := range tables {
- shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
- shardInfoList = append(shardInfoList, shardInfo)
- totalDataSize += shardInfo.DataSizeBytes
+ if len(tables) > 0 {
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ } else {
+ // No live shard tables (a closed segment, or a
brand-new open one
+ // with no data yet): read shard part stats from disk
without
+ // reopening. The per-shard InvertedIndexInfo is
reported empty here:
+ // reading it would reopen the shard's bluge index (the
exclusive-lock
+ // churn this change exists to avoid), so it is
populated only for
+ // open segments.
+ closedShards, closedSize :=
storage.CollectClosedShardInfo(segment.Location())
+ shardInfoList = closedShards
+ totalDataSize += closedSize
}
seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -332,14 +344,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
}
func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
- indexDB := segment.IndexDB()
- if indexDB == nil {
- return &databasev1.SeriesIndexInfo{
- DataCount: 0,
- DataSizeBytes: 0,
- }
- }
- dataCount, dataSizeBytes := indexDB.Stats()
+ // SeriesIndexStats reads from the live index when the segment is open
and
+ // from disk (read-only) when it is closed, so inspecting a cold segment
+ // never reopens its writable index.
+ dataCount, dataSizeBytes := segment.SeriesIndexStats()
return &databasev1.SeriesIndexInfo{
DataCount: dataCount,
DataSizeBytes: dataSizeBytes,
@@ -438,7 +446,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName
string) (int64, error) {
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
}
diff --git a/banyand/stream/metadata_internal_test.go
b/banyand/stream/metadata_internal_test.go
index a292b6704..2a776c5ad 100644
--- a/banyand/stream/metadata_internal_test.go
+++ b/banyand/stream/metadata_internal_test.go
@@ -18,31 +18,103 @@
package stream
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
storage.Segment[*tsTable, option]
}
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12,
345 }
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
sr := &schemaRepo{}
- info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
require.NotNil(t, info)
- require.Equal(t, int64(0), info.DataCount)
- require.Equal(t, int64(0), info.DataSizeBytes)
+ require.Equal(t, int64(12), info.DataCount)
+ require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealStreamPart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real stream part exactly as a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the stream part metadata.json keys the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealStreamPart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ mp := generateMemPart()
+ mp.mustInitFromElements(es)
+ wantCount := int64(mp.partMetadata.TotalCount)
+ require.Positive(t, wantCount)
+ const epoch = uint64(1)
+ mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+ // Write the shard's snapshot manifest referencing the flushed part, as
the
+ // tsTable does after a flush, so the part is "current" (not an orphan).
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+ // The segment is closed (no live tables): stats are read straight from
disk.
+ infos, totalSize := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, uint32(0), infos[0].ShardId)
+ require.Equal(t, wantCount, infos[0].DataCount, "data count must match
the flushed part's totalCount")
+ require.Positive(t, infos[0].DataSizeBytes)
+ require.Equal(t, int64(1), infos[0].PartCount)
+ require.Equal(t, int64(1), infos[0].FilePartCount)
+ require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored,
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ // Current part (in the manifest).
+ current := generateMemPart()
+ current.mustInitFromElements(es)
+ wantCount := int64(current.partMetadata.TotalCount)
+ current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+ // Orphan part on disk but NOT referenced by the manifest.
+ orphan := generateMemPart()
+ orphan.mustInitFromElements(es)
+ orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(1, []string{partName(1)})
+
+ infos, _ := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not
be counted")
+ require.Equal(t, int64(1), infos[0].PartCount)
}
diff --git a/banyand/stream/query.go b/banyand/stream/query.go
index b082fa289..5df920d54 100644
--- a/banyand/stream/query.go
+++ b/banyand/stream/query.go
@@ -60,7 +60,7 @@ func (s *stream) Query(ctx context.Context, sqo
model.StreamQueryOptions) (sqr m
return nil, err
}
- segments, err := tsdb.SelectSegments(*sqo.TimeRange)
+ segments, err := tsdb.SelectSegments(*sqo.TimeRange, true)
if err != nil {
return nil, err
}
diff --git a/banyand/stream/tstable.go b/banyand/stream/tstable.go
index 2a20cae90..b52c4de48 100644
--- a/banyand/stream/tstable.go
+++ b/banyand/stream/tstable.go
@@ -186,16 +186,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
}
func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
- snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- data, err := tst.fileSystem.Read(snapshotPath)
+ partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem,
filepath.Join(tst.root, snapshotName(snapshot)))
if err != nil {
- return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
- }
- var partNames []string
- if err := json.Unmarshal(data, &partNames); err != nil {
- return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+ return nil, err
}
- var result []uint64
+ result := make([]uint64, 0, len(partNames))
for i := range partNames {
e, parseErr := parseEpoch(partNames[i])
if parseErr != nil {
diff --git a/banyand/trace/metadata.go b/banyand/trace/metadata.go
index d2719e03c..08cf7d4e5 100644
--- a/banyand/trace/metadata.go
+++ b/banyand/trace/metadata.go
@@ -304,7 +304,7 @@ func (sr *schemaRepo) CollectDataInfo(ctx context.Context,
group string) (*datab
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return nil, segmentsErr
}
@@ -314,10 +314,22 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
timeRange := segment.GetTimeRange()
tables, _ := segment.Tables()
var shardInfoList []*databasev1.ShardInfo
- for shardIdx, table := range tables {
- shardInfo := sr.collectShardInfo(ctx, table,
uint32(shardIdx))
- shardInfoList = append(shardInfoList, shardInfo)
- totalDataSize += shardInfo.DataSizeBytes
+ if len(tables) > 0 {
+ for shardIdx, table := range tables {
+ shardInfo := sr.collectShardInfo(ctx, table,
uint32(shardIdx))
+ shardInfoList = append(shardInfoList, shardInfo)
+ totalDataSize += shardInfo.DataSizeBytes
+ }
+ } else {
+ // No live shard tables (a closed segment, or a
brand-new open one
+ // with no data yet): read shard part stats from disk
without
+ // reopening. The per-shard SidxInfo is reported empty
here: reading
+ // it would reopen the shard's sidx/bluge index (the
exclusive-lock
+ // churn this change exists to avoid), so it is
populated only for
+ // open segments.
+ closedShards, closedSize :=
storage.CollectClosedShardInfo(segment.Location())
+ shardInfoList = closedShards
+ totalDataSize += closedSize
}
seriesIndexInfo := sr.collectSeriesIndexInfo(segment)
totalDataSize += seriesIndexInfo.DataSizeBytes
@@ -340,14 +352,10 @@ func (sr *schemaRepo) CollectDataInfo(ctx
context.Context, group string) (*datab
}
func (sr *schemaRepo) collectSeriesIndexInfo(segment storage.Segment[*tsTable,
option]) *databasev1.SeriesIndexInfo {
- indexDB := segment.IndexDB()
- if indexDB == nil {
- return &databasev1.SeriesIndexInfo{
- DataCount: 0,
- DataSizeBytes: 0,
- }
- }
- dataCount, dataSizeBytes := indexDB.Stats()
+ // SeriesIndexStats reads from the live index when the segment is open
and
+ // from disk (read-only) when it is closed, so inspecting a cold segment
+ // never reopens its writable index.
+ dataCount, dataSizeBytes := segment.SeriesIndexStats()
return &databasev1.SeriesIndexInfo{
DataCount: dataCount,
DataSizeBytes: dataSizeBytes,
@@ -462,7 +470,7 @@ func (sr *schemaRepo) collectPendingWriteInfo(groupName
string) (int64, error) {
segments, segmentsErr := tsdb.SelectSegments(timestamp.TimeRange{
Start: time.Unix(0, 0),
End: time.Unix(0, timestamp.MaxNanoTime),
- })
+ }, false)
if segmentsErr != nil {
return 0, fmt.Errorf("failed to select segments: %w",
segmentsErr)
}
diff --git a/banyand/trace/metadata_internal_test.go
b/banyand/trace/metadata_internal_test.go
index 378078907..7bd407d19 100644
--- a/banyand/trace/metadata_internal_test.go
+++ b/banyand/trace/metadata_internal_test.go
@@ -18,31 +18,103 @@
package trace
import (
+ "path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/apache/skywalking-banyandb/banyand/internal/storage"
+ "github.com/apache/skywalking-banyandb/pkg/fs"
+ "github.com/apache/skywalking-banyandb/pkg/test"
)
-// nilIndexDBSegment is a stub Segment whose IndexDB() returns an untyped
-// nil. The embedded interface lets us satisfy the Segment[*tsTable, option]
-// signature without implementing every method (only IndexDB is exercised
-// by collectSeriesIndexInfo).
-type nilIndexDBSegment struct {
+// seriesIndexStatsSegment is a stub Segment that returns fixed
SeriesIndexStats.
+// The embedded interface satisfies the Segment[*tsTable, option] signature
+// without implementing every method (only SeriesIndexStats is exercised by
+// collectSeriesIndexInfo).
+type seriesIndexStatsSegment struct {
storage.Segment[*tsTable, option]
}
-func (nilIndexDBSegment) IndexDB() storage.IndexDB { return nil }
+func (seriesIndexStatsSegment) SeriesIndexStats() (int64, int64) { return 12,
345 }
-// TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB locks in the contract
-// that collectSeriesIndexInfo returns an empty SeriesIndexInfo (no panic)
-// when the segment is in the cold-tier residual state where its underlying
-// series index has been torn down by performCleanup.
-func TestSchemaRepo_CollectSeriesIndexInfo_NilIndexDB(t *testing.T) {
+// TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats verifies that
+// collectSeriesIndexInfo forwards the segment's SeriesIndexStats (which works
+// for both open and closed segments) into the SeriesIndexInfo fields.
+func TestSchemaRepo_CollectSeriesIndexInfo_ForwardsStats(t *testing.T) {
sr := &schemaRepo{}
- info := sr.collectSeriesIndexInfo(nilIndexDBSegment{})
+ info := sr.collectSeriesIndexInfo(seriesIndexStatsSegment{})
require.NotNil(t, info)
- require.Equal(t, int64(0), info.DataCount)
- require.Equal(t, int64(0), info.DataSizeBytes)
+ require.Equal(t, int64(12), info.DataCount)
+ require.Equal(t, int64(345), info.DataSizeBytes)
+}
+
+// TestCollectClosedShardInfo_RealTracePart simulates a closed segment whose
+// shard parts are quiescent on disk: it writes a real trace part exactly as a
+// flush does (mustFlush + mustWriteSnapshot), then verifies the closed-read
path
+// CollectDataInfo relies on reads the per-shard stats straight from disk
+// (without opening any table), and that the trace part metadata.json keys the
+// storage layer reads match the totals the part actually wrote.
+func TestCollectClosedShardInfo_RealTracePart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ // Lay out a closed segment's on-disk shard: <seg>/shard-0/<part>.
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ mp := generateMemPart()
+ mp.mustInitFromTraces(ts)
+ wantCount := int64(mp.partMetadata.TotalCount)
+ require.Positive(t, wantCount)
+ const epoch = uint64(1)
+ mp.mustFlush(fileSystem, partPath(shardDir, epoch))
+
+ // Write the shard's snapshot manifest referencing the flushed part, as
the
+ // tsTable does after a flush, so the part is "current" (not an orphan).
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(epoch, []string{partName(epoch)})
+
+ // The segment is closed (no live tables): stats are read straight from
disk.
+ infos, totalSize := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, uint32(0), infos[0].ShardId)
+ require.Equal(t, wantCount, infos[0].DataCount, "data count must match
the flushed part's totalCount")
+ require.Positive(t, infos[0].DataSizeBytes)
+ require.Equal(t, int64(1), infos[0].PartCount)
+ require.Equal(t, int64(1), infos[0].FilePartCount)
+ require.Equal(t, infos[0].DataSizeBytes, totalSize)
+}
+
+// TestCollectClosedShardInfo_ExcludesOrphanPart verifies the closed-read path
+// counts only parts referenced by the latest snapshot manifest: an orphan part
+// on disk (left by a crashed merge/flush, not in the manifest) is ignored,
just
+// as the open path's in-memory snapshot would ignore it.
+func TestCollectClosedShardInfo_ExcludesOrphanPart(t *testing.T) {
+ fileSystem := fs.NewLocalFileSystem()
+ tmpPath, defFn := test.Space(require.New(t))
+ defer defFn()
+
+ segDir := filepath.Join(tmpPath, "seg-20240501")
+ shardDir := filepath.Join(segDir, "shard-0")
+
+ // Current part (in the manifest).
+ current := generateMemPart()
+ current.mustInitFromTraces(ts)
+ wantCount := int64(current.partMetadata.TotalCount)
+ current.mustFlush(fileSystem, partPath(shardDir, 1))
+
+ // Orphan part on disk but NOT referenced by the manifest.
+ orphan := generateMemPart()
+ orphan.mustInitFromTraces(ts)
+ orphan.mustFlush(fileSystem, partPath(shardDir, 2))
+
+ tst := &tsTable{root: shardDir, fileSystem: fileSystem}
+ tst.mustWriteSnapshot(1, []string{partName(1)})
+
+ infos, _ := storage.CollectClosedShardInfo(segDir)
+ require.Len(t, infos, 1)
+ require.Equal(t, wantCount, infos[0].DataCount, "orphan part must not
be counted")
+ require.Equal(t, int64(1), infos[0].PartCount)
}
diff --git a/banyand/trace/query.go b/banyand/trace/query.go
index 49c14f9aa..78367ba58 100644
--- a/banyand/trace/query.go
+++ b/banyand/trace/query.go
@@ -65,7 +65,7 @@ func (t *trace) Query(ctx context.Context, tqo
model.TraceQueryOptions) (model.T
return nil, err
}
- segments, err := tsdb.SelectSegments(*tqo.TimeRange)
+ segments, err := tsdb.SelectSegments(*tqo.TimeRange, true)
if err != nil {
return nil, err
}
diff --git a/banyand/trace/tstable.go b/banyand/trace/tstable.go
index d6c383ff8..c2386362e 100644
--- a/banyand/trace/tstable.go
+++ b/banyand/trace/tstable.go
@@ -191,16 +191,11 @@ func (tst *tsTable) mustWriteSnapshot(snapshot uint64,
partNames []string) {
}
func (tst *tsTable) readSnapshot(snapshot uint64) ([]uint64, error) {
- snapshotPath := filepath.Join(tst.root, snapshotName(snapshot))
- data, err := tst.fileSystem.Read(snapshotPath)
+ partNames, err := storage.ReadSnapshotPartNames(tst.fileSystem,
filepath.Join(tst.root, snapshotName(snapshot)))
if err != nil {
- return nil, fmt.Errorf("cannot read %s: %w", snapshotPath, err)
- }
- var partNames []string
- if err := json.Unmarshal(data, &partNames); err != nil {
- return nil, fmt.Errorf("cannot parse %s: %w", snapshotPath, err)
+ return nil, err
}
- var result []uint64
+ result := make([]uint64, 0, len(partNames))
for i := range partNames {
e, parseErr := parseEpoch(partNames[i])
if parseErr != nil {
diff --git a/pkg/fs/local_file_system.go b/pkg/fs/local_file_system.go
index 4be28b6c5..61492e530 100644
--- a/pkg/fs/local_file_system.go
+++ b/pkg/fs/local_file_system.go
@@ -472,6 +472,10 @@ func (fs *localFileSystem) CreateHardLink(srcPath,
destPath string, filter func(
destFullPath := filepath.Join(destPath, relPath)
if info.IsDir() {
+ // A filter that rejects a directory skips the whole
subtree.
+ if filter != nil && !filter(path) {
+ return filepath.SkipDir
+ }
if err := os.MkdirAll(destFullPath, info.Mode()); err
!= nil {
return &FileSystemError{
Code: otherError,
diff --git a/pkg/index/inverted/inverted.go b/pkg/index/inverted/inverted.go
index 38779a8cf..d6ec0aaae 100644
--- a/pkg/index/inverted/inverted.go
+++ b/pkg/index/inverted/inverted.go
@@ -54,6 +54,11 @@ const (
// ExternalSegmentTempDirName is the name of the directory used for
temporary external segments.
ExternalSegmentTempDirName = "external-segment-temp"
+ // LockFilename is the bluge exclusive-lock file kept in every index
+ // directory while a writer is open (github.com/SkyAPM/bluge
directory_fs).
+ // It must never be copied into a snapshot.
+ LockFilename = "bluge.pid"
+
docIDField = "_id"
seriesIDField = "_series_id"
timestampField = "_timestamp"
@@ -292,6 +297,27 @@ func (s *store) Reset() {
s.writer.ResetCache()
}
+// ReadOnlyDocCount opens the index directory at path read-only and returns the
+// number of indexed documents. Unlike NewStore it never acquires the exclusive
+// directory lock, so it can inspect a closed (or even concurrently open)
+// segment index without reopening its writable index. A missing or unflushed
+// index (no usable snapshot) returns a count of 0 together with the open
error,
+// which callers may treat as an empty index.
+func ReadOnlyDocCount(path string) (int64, error) {
+ reader, err := bluge.OpenReader(bluge.DefaultConfig(path))
+ if err != nil {
+ return 0, err
+ }
+ defer func() {
+ _ = reader.Close()
+ }()
+ count, err := reader.Count()
+ if err != nil {
+ return 0, err
+ }
+ return int64(count), nil
+}
+
func (s *store) Iterator(ctx context.Context, fieldKey index.FieldKey,
termRange index.RangeOpts, order modelv1.Sort,
preLoadSize int,
) (iter index.FieldIterator[*index.DocumentResult], err error) {