This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch lifecyc-sync
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 74b2a5c1ed952f6f63cf12dc11648fda81d1aba3
Author: Gao Hongtao <hanahm...@gmail.com>
AuthorDate: Wed Jul 30 09:25:37 2025 +0800

    Update dependencies and refactor segment parsing logic
    
    - Updated the version of `github.com/SkyAPM/bluge` in `go.mod` and `go.sum`.
    - Refactored the segment parsing logic in `segment.go` to improve clarity 
and maintainability by introducing a new function `parseSegmentTime`.
    - Added new test cases in `visitor_test.go` and `stream/visitor_test.go` to 
ensure proper functionality of the visitor pattern for segment and stream 
components.
---
 banyand/internal/storage/segment.go      |   9 +-
 banyand/internal/storage/visitor.go      | 107 +++++++
 banyand/internal/storage/visitor_test.go | 493 +++++++++++++++++++++++++++++++
 banyand/stream/visitor.go                | 103 +++++++
 banyand/stream/visitor_test.go           | 121 ++++++++
 go.mod                                   |   2 +-
 go.sum                                   |   4 +-
 7 files changed, 834 insertions(+), 5 deletions(-)

diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index fe54b9b8..1c4c1ce0 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -445,8 +445,9 @@ func (sc *segmentController[T, O]) format(tm time.Time) 
string {
        panic("invalid interval unit")
 }
 
-func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
-       switch sc.getOptions().SegmentInterval.Unit {
+// parseSegmentTime parses a segment suffix into a time based on the interval 
unit.
+func parseSegmentTime(value string, unit IntervalUnit) (time.Time, error) {
+       switch unit {
        case HOUR:
                return time.ParseInLocation(hourFormat, value, time.Local)
        case DAY:
@@ -455,6 +456,10 @@ func (sc *segmentController[T, O]) parse(value string) 
(time.Time, error) {
        panic("invalid interval unit")
 }
 
+func (sc *segmentController[T, O]) parse(value string) (time.Time, error) {
+       return parseSegmentTime(value, sc.getOptions().SegmentInterval.Unit)
+}
+
 func (sc *segmentController[T, O]) open() error {
        sc.Lock()
        defer sc.Unlock()
diff --git a/banyand/internal/storage/visitor.go 
b/banyand/internal/storage/visitor.go
new file mode 100644
index 00000000..bd82036a
--- /dev/null
+++ b/banyand/internal/storage/visitor.go
@@ -0,0 +1,107 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "fmt"
+       "path/filepath"
+       "strconv"
+
+       "github.com/pkg/errors"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// SegmentVisitor defines the interface for visiting segment components.
+type SegmentVisitor interface {
+       // VisitSeries visits the series index directory for a segment.
+       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) 
error
+       // VisitShard visits a shard directory within a segment.
+       VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, 
shardPath string) error
+}
+
+// VisitSegmentsInTimeRange traverses segments within the specified time range
+// and calls the visitor methods for series index and shard directories.
+// This function works directly with the filesystem without requiring a 
database instance.
+func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor SegmentVisitor, intervalRule IntervalRule) error {
+       // Parse segment directories in the root path
+       var segmentPaths []segmentInfo
+       err := walkDir(tsdbRootPath, segPathPrefix, func(suffix string) error {
+               startTime, err := parseSegmentTime(suffix, intervalRule.Unit)
+               if err != nil {
+                       return err
+               }
+
+               // Calculate end time based on interval rule
+               endTime := intervalRule.NextTime(startTime)
+               segTR := timestamp.NewSectionTimeRange(startTime, endTime)
+
+               // Check if segment overlaps with the requested time range
+               if !segTR.Overlapping(timeRange) {
+                       return nil // Skip segments outside the time range
+               }
+
+               segmentPath := filepath.Join(tsdbRootPath, 
fmt.Sprintf(segTemplate, suffix))
+               segmentPaths = append(segmentPaths, segmentInfo{
+                       path:      segmentPath,
+                       suffix:    suffix,
+                       timeRange: segTR,
+               })
+               return nil
+       })
+       if err != nil {
+               return errors.Wrap(err, "failed to walk segment directories")
+       }
+
+       // Visit each matching segment
+       for _, segInfo := range segmentPaths {
+               // Visit series index directory
+               seriesIndexPath := filepath.Join(segInfo.path, 
seriesIndexDirName)
+               if err := visitor.VisitSeries(&segInfo.timeRange, 
seriesIndexPath); err != nil {
+                       return errors.Wrapf(err, "failed to visit series index 
for segment %s", segInfo.suffix)
+               }
+
+               // Visit shard directories
+               if err := visitSegmentShards(segInfo.path, &segInfo.timeRange, 
visitor); err != nil {
+                       return errors.Wrapf(err, "failed to visit shards for 
segment %s", segInfo.suffix)
+               }
+       }
+
+       return nil
+}
+
+// segmentInfo holds information about a segment directory.
+type segmentInfo struct {
+       path      string
+       suffix    string
+       timeRange timestamp.TimeRange
+}
+
+// visitSegmentShards traverses shard directories within a segment.
+func visitSegmentShards(segmentPath string, segmentTR *timestamp.TimeRange, 
visitor SegmentVisitor) error {
+       return walkDir(segmentPath, shardPathPrefix, func(suffix string) error {
+               shardID, err := strconv.Atoi(suffix)
+               if err != nil {
+                       return errors.Wrapf(err, "invalid shard suffix: %s", 
suffix)
+               }
+
+               shardPath := filepath.Join(segmentPath, 
fmt.Sprintf(shardTemplate, shardID))
+               return visitor.VisitShard(segmentTR, common.ShardID(shardID), 
shardPath)
+       })
+}
diff --git a/banyand/internal/storage/visitor_test.go 
b/banyand/internal/storage/visitor_test.go
new file mode 100644
index 00000000..149bdad2
--- /dev/null
+++ b/banyand/internal/storage/visitor_test.go
@@ -0,0 +1,493 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package storage
+
+import (
+       "context"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/pkg/errors"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/pkg/logger"
+       "github.com/apache/skywalking-banyandb/pkg/test"
+       "github.com/apache/skywalking-banyandb/pkg/test/flags"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// TestVisitor tracks visited segments and shards for testing.
+type TestVisitor struct {
+       seriesErrors  map[string]error
+       shardErrors   map[common.ShardID]error
+       visitedSeries []string
+       visitedShards []struct {
+               path    string
+               shardID common.ShardID
+       }
+}
+
+// NewTestVisitor creates a new test visitor.
+func NewTestVisitor() *TestVisitor {
+       return &TestVisitor{
+               seriesErrors:  make(map[string]error),
+               shardErrors:   make(map[common.ShardID]error),
+               visitedSeries: make([]string, 0),
+               visitedShards: make([]struct {
+                       path    string
+                       shardID common.ShardID
+               }, 0),
+       }
+}
+
+// VisitSeries implements SegmentVisitor.VisitSeries.
+func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string) error {
+       if err, exists := v.seriesErrors[seriesIndexPath]; exists {
+               return err
+       }
+       v.visitedSeries = append(v.visitedSeries, seriesIndexPath)
+       return nil
+}
+
+// VisitShard implements SegmentVisitor.VisitShard.
+func (v *TestVisitor) VisitShard(_ *timestamp.TimeRange, shardID 
common.ShardID, shardPath string) error {
+       if err, exists := v.shardErrors[shardID]; exists {
+               return err
+       }
+       v.visitedShards = append(v.visitedShards, struct {
+               path    string
+               shardID common.ShardID
+       }{path: shardPath, shardID: shardID})
+       return nil
+}
+
+// SetSeriesError sets an error to be returned when visiting a specific series 
path.
+func (v *TestVisitor) SetSeriesError(path string, err error) {
+       v.seriesErrors[path] = err
+}
+
+// SetShardError sets an error to be returned when visiting a specific shard 
ID.
+func (v *TestVisitor) SetShardError(shardID common.ShardID, err error) {
+       v.shardErrors[shardID] = err
+}
+
+func TestVisitSegmentsInTimeRange(t *testing.T) {
+       logger.Init(logger.Logging{
+               Env:   "dev",
+               Level: flags.LogLevel,
+       })
+
+       t.Run("visit single segment with single shard", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB with single shard
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segment
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               defer seg.DecRef()
+
+               // Create shard
+               shard, err := seg.CreateTSTableIfNotExist(common.ShardID(0))
+               require.NoError(t, err)
+               require.NotNil(t, shard)
+
+               tsdb.Close()
+
+               // Test visitor
+               visitor := NewTestVisitor()
+               timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.NoError(t, err)
+
+               // Verify series index was visited
+               require.Len(t, visitor.visitedSeries, 1)
+               expectedSeriesPath := filepath.Join(dir, "seg-20240501", 
seriesIndexDirName)
+               require.Equal(t, expectedSeriesPath, visitor.visitedSeries[0])
+
+               // Verify shard was visited
+               require.Len(t, visitor.visitedShards, 1)
+               require.Equal(t, common.ShardID(0), 
visitor.visitedShards[0].shardID)
+               expectedShardPath := filepath.Join(dir, "seg-20240501", 
"shard-0")
+               require.Equal(t, expectedShardPath, 
visitor.visitedShards[0].path)
+       })
+
+       t.Run("visit multiple segments with multiple shards", func(t 
*testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB with multiple shards
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 7},
+                       ShardNum:        3,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               baseDate, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(baseDate)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segments for 3 consecutive days
+               segmentDates := []time.Time{
+                       baseDate,
+                       baseDate.AddDate(0, 0, 1), // 2024-05-02
+                       baseDate.AddDate(0, 0, 2), // 2024-05-03
+               }
+
+               for _, date := range segmentDates {
+                       mc.Set(date)
+                       seg, segErr := tsdb.CreateSegmentIfNotExist(date)
+                       require.NoError(t, segErr)
+
+                       // Create all shards for each segment
+                       for shardID := common.ShardID(0); shardID < 
common.ShardID(3); shardID++ {
+                               shard, shardErr := 
seg.CreateTSTableIfNotExist(shardID)
+                               require.NoError(t, shardErr)
+                               require.NotNil(t, shard)
+                       }
+                       seg.DecRef()
+               }
+
+               tsdb.Close()
+
+               // Test visitor with time range covering all segments
+               visitor := NewTestVisitor()
+               timeRange := timestamp.NewSectionTimeRange(baseDate, 
baseDate.AddDate(0, 0, 3))
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.NoError(t, err)
+
+               // Verify all series indices were visited
+               require.Len(t, visitor.visitedSeries, 3)
+               expectedSeriesPaths := []string{
+                       filepath.Join(dir, "seg-20240501", seriesIndexDirName),
+                       filepath.Join(dir, "seg-20240502", seriesIndexDirName),
+                       filepath.Join(dir, "seg-20240503", seriesIndexDirName),
+               }
+               for _, expectedPath := range expectedSeriesPaths {
+                       require.Contains(t, visitor.visitedSeries, expectedPath)
+               }
+
+               // Verify all shards were visited (3 segments × 3 shards = 9 
total)
+               require.Len(t, visitor.visitedShards, 9)
+
+               // Verify shard IDs are correct
+               shardIDs := make(map[common.ShardID]int)
+               for _, shard := range visitor.visitedShards {
+                       shardIDs[shard.shardID]++
+               }
+               require.Equal(t, 3, shardIDs[0]) // Each shard ID should appear 
3 times (once per segment)
+               require.Equal(t, 3, shardIDs[1])
+               require.Equal(t, 3, shardIDs[2])
+       })
+
+       t.Run("visit segments with time range filtering", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 7},
+                       ShardNum:        2,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               baseDate, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(baseDate)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segments for 5 consecutive days
+               segmentDates := []time.Time{
+                       baseDate,                  // 2024-05-01
+                       baseDate.AddDate(0, 0, 1), // 2024-05-02
+                       baseDate.AddDate(0, 0, 2), // 2024-05-03
+                       baseDate.AddDate(0, 0, 3), // 2024-05-04
+                       baseDate.AddDate(0, 0, 4), // 2024-05-05
+               }
+
+               for _, date := range segmentDates {
+                       mc.Set(date)
+                       seg, segErr := tsdb.CreateSegmentIfNotExist(date)
+                       require.NoError(t, segErr)
+
+                       // Create shards for each segment
+                       for shardID := common.ShardID(0); shardID < 
common.ShardID(2); shardID++ {
+                               shard, shardErr := 
seg.CreateTSTableIfNotExist(shardID)
+                               require.NoError(t, shardErr)
+                               require.NotNil(t, shard)
+                       }
+                       seg.DecRef()
+               }
+
+               tsdb.Close()
+
+               // Test visitor with time range covering only middle segments 
(2024-05-02 to 2024-05-04)
+               visitor := NewTestVisitor()
+               startTime := baseDate.AddDate(0, 0, 1) // 2024-05-02
+               endTime := baseDate.AddDate(0, 0, 4)   // 2024-05-05 (exclusive)
+               timeRange := timestamp.NewSectionTimeRange(startTime, endTime)
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.NoError(t, err)
+
+               // Verify only middle segments were visited (3 segments)
+               require.Len(t, visitor.visitedSeries, 3)
+               expectedSeriesPaths := []string{
+                       filepath.Join(dir, "seg-20240502", seriesIndexDirName),
+                       filepath.Join(dir, "seg-20240503", seriesIndexDirName),
+                       filepath.Join(dir, "seg-20240504", seriesIndexDirName),
+               }
+               for _, expectedPath := range expectedSeriesPaths {
+                       require.Contains(t, visitor.visitedSeries, expectedPath)
+               }
+
+               // Verify shards were visited (3 segments × 2 shards = 6 total)
+               require.Len(t, visitor.visitedShards, 6)
+       })
+
+       t.Run("visit empty directory", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               visitor := NewTestVisitor()
+               timeRange := timestamp.NewSectionTimeRange(time.Now(), 
time.Now().Add(24*time.Hour))
+               intervalRule := IntervalRule{Unit: DAY, Num: 1}
+
+               err := VisitSegmentsInTimeRange(dir, timeRange, visitor, 
intervalRule)
+               require.NoError(t, err)
+
+               // Verify nothing was visited
+               require.Len(t, visitor.visitedSeries, 0)
+               require.Len(t, visitor.visitedShards, 0)
+       })
+
+       t.Run("visit with series error", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segment
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               defer seg.DecRef()
+
+               // Create shard
+               shard, err := seg.CreateTSTableIfNotExist(common.ShardID(0))
+               require.NoError(t, err)
+               require.NotNil(t, shard)
+
+               tsdb.Close()
+
+               // Test visitor with series error
+               visitor := NewTestVisitor()
+               expectedSeriesPath := filepath.Join(dir, "seg-20240501", 
seriesIndexDirName)
+               visitor.SetSeriesError(expectedSeriesPath, errors.New("series 
access error"))
+
+               timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "series access error")
+               require.Contains(t, err.Error(), "failed to visit series index 
for segment")
+       })
+
+       t.Run("visit with shard error", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: DAY, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 3},
+                       ShardNum:        2,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(ts)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segment
+               seg, err := tsdb.CreateSegmentIfNotExist(ts)
+               require.NoError(t, err)
+               defer seg.DecRef()
+
+               // Create shards
+               for shardID := common.ShardID(0); shardID < common.ShardID(2); 
shardID++ {
+                       shard, shardErr := seg.CreateTSTableIfNotExist(shardID)
+                       require.NoError(t, shardErr)
+                       require.NotNil(t, shard)
+               }
+
+               tsdb.Close()
+
+               // Test visitor with shard error
+               visitor := NewTestVisitor()
+               visitor.SetShardError(common.ShardID(1), errors.New("shard 
access error"))
+
+               timeRange := timestamp.NewSectionTimeRange(ts, 
ts.Add(24*time.Hour))
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "shard access error")
+               require.Contains(t, err.Error(), "failed to visit shards for 
segment")
+       })
+
+       t.Run("visit with hour-based segments", func(t *testing.T) {
+               dir, defFn := test.Space(require.New(t))
+               defer defFn()
+
+               // Create TSDB with hour-based segments
+               opts := TSDBOpts[*MockTSTable, any]{
+                       Location:        dir,
+                       SegmentInterval: IntervalRule{Unit: HOUR, Num: 1},
+                       TTL:             IntervalRule{Unit: DAY, Num: 1},
+                       ShardNum:        1,
+                       TSTableCreator:  MockTSTableCreator,
+               }
+
+               ctx := context.Background()
+               mc := timestamp.NewMockClock()
+               baseTime, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 10:00:00", time.Local)
+               require.NoError(t, err)
+               mc.Set(baseTime)
+               ctx = timestamp.SetClock(ctx, mc)
+
+               serviceCache := NewServiceCache()
+               tsdb, err := OpenTSDB(ctx, opts, serviceCache, group)
+               require.NoError(t, err)
+               require.NotNil(t, tsdb)
+
+               // Create segments for 3 consecutive hours
+               segmentTimes := []time.Time{
+                       baseTime,                    // 2024-05-01 10:00:00
+                       baseTime.Add(1 * time.Hour), // 2024-05-01 11:00:00
+                       baseTime.Add(2 * time.Hour), // 2024-05-01 12:00:00
+               }
+
+               for _, segmentTime := range segmentTimes {
+                       mc.Set(segmentTime)
+                       seg, segErr := tsdb.CreateSegmentIfNotExist(segmentTime)
+                       require.NoError(t, segErr)
+
+                       // Create shard for each segment
+                       shard, shardErr := 
seg.CreateTSTableIfNotExist(common.ShardID(0))
+                       require.NoError(t, shardErr)
+                       require.NotNil(t, shard)
+                       seg.DecRef()
+               }
+
+               tsdb.Close()
+
+               // Test visitor with time range covering all segments
+               visitor := NewTestVisitor()
+               timeRange := timestamp.NewSectionTimeRange(baseTime, 
baseTime.Add(3*time.Hour))
+
+               err = VisitSegmentsInTimeRange(dir, timeRange, visitor, 
opts.SegmentInterval)
+               require.NoError(t, err)
+
+               // Verify all series indices were visited
+               require.Len(t, visitor.visitedSeries, 3)
+               expectedSeriesPaths := []string{
+                       filepath.Join(dir, "seg-2024050110", 
seriesIndexDirName),
+                       filepath.Join(dir, "seg-2024050111", 
seriesIndexDirName),
+                       filepath.Join(dir, "seg-2024050112", 
seriesIndexDirName),
+               }
+               for _, expectedPath := range expectedSeriesPaths {
+                       require.Contains(t, visitor.visitedSeries, expectedPath)
+               }
+
+               // Verify all shards were visited
+               require.Len(t, visitor.visitedShards, 3)
+               for _, shard := range visitor.visitedShards {
+                       require.Equal(t, common.ShardID(0), shard.shardID)
+               }
+       })
+}
diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go
new file mode 100644
index 00000000..586742c8
--- /dev/null
+++ b/banyand/stream/visitor.go
@@ -0,0 +1,103 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "path/filepath"
+       "strconv"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/fs"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// Visitor defines the interface for visiting stream components.
+type Visitor interface {
+       // VisitSeries visits the series index directory for a segment.
+       VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) 
error
+       // VisitPart visits a part directory within a shard.
+       VisitPart(segmentTR *timestamp.TimeRange, shardID common.ShardID, 
partPath string) error
+       // VisitElementIndex visits the element index directory within a shard.
+       VisitElementIndex(segmentTR *timestamp.TimeRange, shardID 
common.ShardID, indexPath string) error
+}
+
+// streamSegmentVisitor adapts Visitor to work with storage.SegmentVisitor.
+type streamSegmentVisitor struct {
+       visitor Visitor
+}
+
+// VisitSeries implements storage.SegmentVisitor.
+func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, 
seriesIndexPath string) error {
+       return sv.visitor.VisitSeries(segmentTR, seriesIndexPath)
+}
+
+// VisitShard implements storage.SegmentVisitor.
+func (sv *streamSegmentVisitor) VisitShard(segmentTR *timestamp.TimeRange, 
shardID common.ShardID, shardPath string) error {
+       // Visit parts within the shard
+       if err := sv.visitShardParts(segmentTR, shardID, shardPath); err != nil 
{
+               return err
+       }
+
+       // Visit element index within the shard
+       return sv.visitShardElementIndex(segmentTR, shardID, shardPath)
+}
+
+// visitShardParts visits all part directories within a shard.
+func (sv *streamSegmentVisitor) visitShardParts(segmentTR 
*timestamp.TimeRange, shardID common.ShardID, shardPath string) error {
+       lfs := fs.NewLocalFileSystem()
+       entries := lfs.ReadDir(shardPath)
+
+       for _, entry := range entries {
+               if !entry.IsDir() {
+                       continue
+               }
+
+               name := entry.Name()
+               // Check if this is a part directory (16-character hex string)
+               if len(name) != 16 {
+                       continue // Skip non-part entries
+               }
+
+               // Validate it's a valid hex string (part ID)
+               if _, err := strconv.ParseUint(name, 16, 64); err != nil {
+                       continue // Skip invalid part entries
+               }
+
+               partPath := filepath.Join(shardPath, name)
+               if err := sv.visitor.VisitPart(segmentTR, shardID, partPath); 
err != nil {
+                       return err
+               }
+       }
+
+       return nil
+}
+
+// visitShardElementIndex visits the element index directory within a shard.
+func (sv *streamSegmentVisitor) visitShardElementIndex(segmentTR 
*timestamp.TimeRange, shardID common.ShardID, shardPath string) error {
+       indexPath := filepath.Join(shardPath, elementIndexFilename)
+       return sv.visitor.VisitElementIndex(segmentTR, shardID, indexPath)
+}
+
+// VisitStreamsInTimeRange traverses stream segments within the specified time 
range
+// and calls the visitor methods for series index, parts, and element indexes.
+// This function works directly with the filesystem without requiring a 
database instance.
+func VisitStreamsInTimeRange(tsdbRootPath string, timeRange 
timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error {
+       adapter := &streamSegmentVisitor{visitor: visitor}
+       return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, 
adapter, intervalRule)
+}
diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go
new file mode 100644
index 00000000..a1c41d78
--- /dev/null
+++ b/banyand/stream/visitor_test.go
@@ -0,0 +1,121 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package stream
+
+import (
+       "os"
+       "path/filepath"
+       "testing"
+       "time"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       "github.com/apache/skywalking-banyandb/api/common"
+       "github.com/apache/skywalking-banyandb/banyand/internal/storage"
+       "github.com/apache/skywalking-banyandb/pkg/timestamp"
+)
+
+// TestVisitor tracks visited components for testing.
+type TestVisitor struct {
+       visitedSeries []string
+       visitedParts  []struct {
+               path    string
+               shardID common.ShardID
+       }
+       visitedElementIndex []struct {
+               path    string
+               shardID common.ShardID
+       }
+}
+
+// VisitSeries records the visited series path.
+func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath 
string) error {
+       tv.visitedSeries = append(tv.visitedSeries, seriesIndexPath)
+       return nil
+}
+
+// VisitPart records the visited part path and shard ID.
+func (tv *TestVisitor) VisitPart(_ *timestamp.TimeRange, shardID 
common.ShardID, partPath string) error {
+       tv.visitedParts = append(tv.visitedParts, struct {
+               path    string
+               shardID common.ShardID
+       }{partPath, shardID})
+       return nil
+}
+
+// VisitElementIndex records the visited element index path and shard ID.
+func (tv *TestVisitor) VisitElementIndex(_ *timestamp.TimeRange, shardID 
common.ShardID, indexPath string) error {
+       tv.visitedElementIndex = append(tv.visitedElementIndex, struct {
+               path    string
+               shardID common.ShardID
+       }{indexPath, shardID})
+       return nil
+}
+
+func TestVisitStreamsInTimeRange(t *testing.T) {
+       // Create a temporary directory structure
+       tmpDir, err := os.MkdirTemp("", "stream_visitor_test")
+       require.NoError(t, err)
+       defer os.RemoveAll(tmpDir)
+
+       // Create segment directory structure
+       now := time.Now()
+       segmentTime := now.Format("2006010215") // hourly format
+       segmentDir := filepath.Join(tmpDir, "seg-"+segmentTime)
+       require.NoError(t, os.MkdirAll(segmentDir, 0o755))
+
+       // Create series index directory
+       seriesDir := filepath.Join(segmentDir, "sidx")
+       require.NoError(t, os.MkdirAll(seriesDir, 0o755))
+
+       // Create shard directory
+       shardDir := filepath.Join(segmentDir, "shard-0")
+       require.NoError(t, os.MkdirAll(shardDir, 0o755))
+
+       // Create part directory within shard (16-character hex)
+       partDir := filepath.Join(shardDir, "000000000000001a")
+       require.NoError(t, os.MkdirAll(partDir, 0o755))
+
+       // Create element index directory within shard
+       elemIndexDir := filepath.Join(shardDir, elementIndexFilename)
+       require.NoError(t, os.MkdirAll(elemIndexDir, 0o755))
+
+       // Test the visitor
+       visitor := &TestVisitor{}
+       timeRange := timestamp.TimeRange{
+               Start: now.Add(-time.Hour),
+               End:   now.Add(time.Hour),
+       }
+       intervalRule := storage.IntervalRule{Unit: storage.HOUR, Num: 1}
+
+       err = VisitStreamsInTimeRange(tmpDir, timeRange, visitor, intervalRule)
+       require.NoError(t, err)
+
+       // Verify visits occurred
+       assert.Len(t, visitor.visitedSeries, 1)
+       assert.Contains(t, visitor.visitedSeries[0], "sidx")
+
+       assert.Len(t, visitor.visitedParts, 1)
+       assert.Equal(t, common.ShardID(0), visitor.visitedParts[0].shardID)
+       assert.Contains(t, visitor.visitedParts[0].path, "000000000000001a")
+
+       assert.Len(t, visitor.visitedElementIndex, 1)
+       assert.Equal(t, common.ShardID(0), 
visitor.visitedElementIndex[0].shardID)
+       assert.Contains(t, visitor.visitedElementIndex[0].path, 
elementIndexFilename)
+}
diff --git a/go.mod b/go.mod
index c3cb1bf5..4c4ff3dd 100644
--- a/go.mod
+++ b/go.mod
@@ -200,7 +200,7 @@ require (
 
 replace (
        github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock 
v1.3.1-0.20220809233656-dc7607c94a97
-       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20250619030236-3750bbbf63b9
+       github.com/blugelabs/bluge => github.com/SkyAPM/bluge 
v0.0.0-20250729101336-af60771881c0
        github.com/blugelabs/bluge_segment_api => 
github.com/zinclabs/bluge_segment_api v1.0.0
        github.com/blugelabs/ice => github.com/SkyAPM/ice 
v0.0.0-20250619023539-b5173603b0b3
 )
diff --git a/go.sum b/go.sum
index 1fddda45..36d193fa 100644
--- a/go.sum
+++ b/go.sum
@@ -27,8 +27,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod 
h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
 github.com/RoaringBitmap/roaring v0.9.4/go.mod 
h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA=
 github.com/RoaringBitmap/roaring v1.9.4 
h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ=
 github.com/RoaringBitmap/roaring v1.9.4/go.mod 
h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
-github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 
h1:OpK6hoXecSlCCeOENHs6m84Gs0knLKt7AHoXCgmABk4=
-github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9/go.mod 
h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA=
+github.com/SkyAPM/bluge v0.0.0-20250729101336-af60771881c0 
h1:aP6GyHQMNUugBrsqkiKsoVD2kwbCtVi4vs35oZLX8G0=
+github.com/SkyAPM/bluge v0.0.0-20250729101336-af60771881c0/go.mod 
h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 
h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s=
 github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod 
h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44=
 github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 
h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8=

Reply via email to