This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch lifecyc-sync in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 74b2a5c1ed952f6f63cf12dc11648fda81d1aba3 Author: Gao Hongtao <hanahm...@gmail.com> AuthorDate: Wed Jul 30 09:25:37 2025 +0800 Update dependencies and refactor segment parsing logic - Updated the version of `github.com/SkyAPM/bluge` in `go.mod` and `go.sum`. - Refactored the segment parsing logic in `segment.go` to improve clarity and maintainability by introducing a new function `parseSegmentTime`. - Added new test cases in `visitor_test.go` and `stream/visitor_test.go` to ensure proper functionality of the visitor pattern for segment and stream components. --- banyand/internal/storage/segment.go | 9 +- banyand/internal/storage/visitor.go | 107 +++++++ banyand/internal/storage/visitor_test.go | 493 +++++++++++++++++++++++++++++++ banyand/stream/visitor.go | 103 +++++++ banyand/stream/visitor_test.go | 121 ++++++++ go.mod | 2 +- go.sum | 4 +- 7 files changed, 834 insertions(+), 5 deletions(-) diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index fe54b9b8..1c4c1ce0 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -445,8 +445,9 @@ func (sc *segmentController[T, O]) format(tm time.Time) string { panic("invalid interval unit") } -func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { - switch sc.getOptions().SegmentInterval.Unit { +// parseSegmentTime parses a segment suffix into a time based on the interval unit. +func parseSegmentTime(value string, unit IntervalUnit) (time.Time, error) { + switch unit { case HOUR: return time.ParseInLocation(hourFormat, value, time.Local) case DAY: @@ -455,6 +456,10 @@ func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { panic("invalid interval unit") } +func (sc *segmentController[T, O]) parse(value string) (time.Time, error) { + return parseSegmentTime(value, sc.getOptions().SegmentInterval.Unit) +} + func (sc *segmentController[T, O]) open() error { sc.Lock() defer sc.Unlock() diff --git a/banyand/internal/storage/visitor.go b/banyand/internal/storage/visitor.go new file mode 100644 index 00000000..bd82036a --- /dev/null +++ b/banyand/internal/storage/visitor.go @@ -0,0 +1,107 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "fmt" + "path/filepath" + "strconv" + + "github.com/pkg/errors" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// SegmentVisitor defines the interface for visiting segment components. +type SegmentVisitor interface { + // VisitSeries visits the series index directory for a segment. + VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error + // VisitShard visits a shard directory within a segment. + VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error +} + +// VisitSegmentsInTimeRange traverses segments within the specified time range +// and calls the visitor methods for series index and shard directories. +// This function works directly with the filesystem without requiring a database instance. +func VisitSegmentsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange, visitor SegmentVisitor, intervalRule IntervalRule) error { + // Parse segment directories in the root path + var segmentPaths []segmentInfo + err := walkDir(tsdbRootPath, segPathPrefix, func(suffix string) error { + startTime, err := parseSegmentTime(suffix, intervalRule.Unit) + if err != nil { + return err + } + + // Calculate end time based on interval rule + endTime := intervalRule.NextTime(startTime) + segTR := timestamp.NewSectionTimeRange(startTime, endTime) + + // Check if segment overlaps with the requested time range + if !segTR.Overlapping(timeRange) { + return nil // Skip segments outside the time range + } + + segmentPath := filepath.Join(tsdbRootPath, fmt.Sprintf(segTemplate, suffix)) + segmentPaths = append(segmentPaths, segmentInfo{ + path: segmentPath, + suffix: suffix, + timeRange: segTR, + }) + return nil + }) + if err != nil { + return errors.Wrap(err, "failed to walk segment directories") + } + + // Visit each matching segment + for _, segInfo := range segmentPaths { + // Visit series index directory + seriesIndexPath := filepath.Join(segInfo.path, seriesIndexDirName) + if err := visitor.VisitSeries(&segInfo.timeRange, seriesIndexPath); err != nil { + return errors.Wrapf(err, "failed to visit series index for segment %s", segInfo.suffix) + } + + // Visit shard directories + if err := visitSegmentShards(segInfo.path, &segInfo.timeRange, visitor); err != nil { + return errors.Wrapf(err, "failed to visit shards for segment %s", segInfo.suffix) + } + } + + return nil +} + +// segmentInfo holds information about a segment directory. +type segmentInfo struct { + path string + suffix string + timeRange timestamp.TimeRange +} + +// visitSegmentShards traverses shard directories within a segment. +func visitSegmentShards(segmentPath string, segmentTR *timestamp.TimeRange, visitor SegmentVisitor) error { + return walkDir(segmentPath, shardPathPrefix, func(suffix string) error { + shardID, err := strconv.Atoi(suffix) + if err != nil { + return errors.Wrapf(err, "invalid shard suffix: %s", suffix) + } + + shardPath := filepath.Join(segmentPath, fmt.Sprintf(shardTemplate, shardID)) + return visitor.VisitShard(segmentTR, common.ShardID(shardID), shardPath) + }) +} diff --git a/banyand/internal/storage/visitor_test.go b/banyand/internal/storage/visitor_test.go new file mode 100644 index 00000000..149bdad2 --- /dev/null +++ b/banyand/internal/storage/visitor_test.go @@ -0,0 +1,493 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package storage + +import ( + "context" + "path/filepath" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/pkg/logger" + "github.com/apache/skywalking-banyandb/pkg/test" + "github.com/apache/skywalking-banyandb/pkg/test/flags" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// TestVisitor tracks visited segments and shards for testing. +type TestVisitor struct { + seriesErrors map[string]error + shardErrors map[common.ShardID]error + visitedSeries []string + visitedShards []struct { + path string + shardID common.ShardID + } +} + +// NewTestVisitor creates a new test visitor. +func NewTestVisitor() *TestVisitor { + return &TestVisitor{ + seriesErrors: make(map[string]error), + shardErrors: make(map[common.ShardID]error), + visitedSeries: make([]string, 0), + visitedShards: make([]struct { + path string + shardID common.ShardID + }, 0), + } +} + +// VisitSeries implements SegmentVisitor.VisitSeries. +func (v *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { + if err, exists := v.seriesErrors[seriesIndexPath]; exists { + return err + } + v.visitedSeries = append(v.visitedSeries, seriesIndexPath) + return nil +} + +// VisitShard implements SegmentVisitor.VisitShard. +func (v *TestVisitor) VisitShard(_ *timestamp.TimeRange, shardID common.ShardID, shardPath string) error { + if err, exists := v.shardErrors[shardID]; exists { + return err + } + v.visitedShards = append(v.visitedShards, struct { + path string + shardID common.ShardID + }{path: shardPath, shardID: shardID}) + return nil +} + +// SetSeriesError sets an error to be returned when visiting a specific series path. +func (v *TestVisitor) SetSeriesError(path string, err error) { + v.seriesErrors[path] = err +} + +// SetShardError sets an error to be returned when visiting a specific shard ID. +func (v *TestVisitor) SetShardError(shardID common.ShardID, err error) { + v.shardErrors[shardID] = err +} + +func TestVisitSegmentsInTimeRange(t *testing.T) { + logger.Init(logger.Logging{ + Env: "dev", + Level: flags.LogLevel, + }) + + t.Run("visit single segment with single shard", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB with single shard + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segment + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + defer seg.DecRef() + + // Create shard + shard, err := seg.CreateTSTableIfNotExist(common.ShardID(0)) + require.NoError(t, err) + require.NotNil(t, shard) + + tsdb.Close() + + // Test visitor + visitor := NewTestVisitor() + timeRange := timestamp.NewSectionTimeRange(ts, ts.Add(24*time.Hour)) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.NoError(t, err) + + // Verify series index was visited + require.Len(t, visitor.visitedSeries, 1) + expectedSeriesPath := filepath.Join(dir, "seg-20240501", seriesIndexDirName) + require.Equal(t, expectedSeriesPath, visitor.visitedSeries[0]) + + // Verify shard was visited + require.Len(t, visitor.visitedShards, 1) + require.Equal(t, common.ShardID(0), visitor.visitedShards[0].shardID) + expectedShardPath := filepath.Join(dir, "seg-20240501", "shard-0") + require.Equal(t, expectedShardPath, visitor.visitedShards[0].path) + }) + + t.Run("visit multiple segments with multiple shards", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB with multiple shards + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + ShardNum: 3, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + baseDate, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(baseDate) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segments for 3 consecutive days + segmentDates := []time.Time{ + baseDate, + baseDate.AddDate(0, 0, 1), // 2024-05-02 + baseDate.AddDate(0, 0, 2), // 2024-05-03 + } + + for _, date := range segmentDates { + mc.Set(date) + seg, segErr := tsdb.CreateSegmentIfNotExist(date) + require.NoError(t, segErr) + + // Create all shards for each segment + for shardID := common.ShardID(0); shardID < common.ShardID(3); shardID++ { + shard, shardErr := seg.CreateTSTableIfNotExist(shardID) + require.NoError(t, shardErr) + require.NotNil(t, shard) + } + seg.DecRef() + } + + tsdb.Close() + + // Test visitor with time range covering all segments + visitor := NewTestVisitor() + timeRange := timestamp.NewSectionTimeRange(baseDate, baseDate.AddDate(0, 0, 3)) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.NoError(t, err) + + // Verify all series indices were visited + require.Len(t, visitor.visitedSeries, 3) + expectedSeriesPaths := []string{ + filepath.Join(dir, "seg-20240501", seriesIndexDirName), + filepath.Join(dir, "seg-20240502", seriesIndexDirName), + filepath.Join(dir, "seg-20240503", seriesIndexDirName), + } + for _, expectedPath := range expectedSeriesPaths { + require.Contains(t, visitor.visitedSeries, expectedPath) + } + + // Verify all shards were visited (3 segments × 3 shards = 9 total) + require.Len(t, visitor.visitedShards, 9) + + // Verify shard IDs are correct + shardIDs := make(map[common.ShardID]int) + for _, shard := range visitor.visitedShards { + shardIDs[shard.shardID]++ + } + require.Equal(t, 3, shardIDs[0]) // Each shard ID should appear 3 times (once per segment) + require.Equal(t, 3, shardIDs[1]) + require.Equal(t, 3, shardIDs[2]) + }) + + t.Run("visit segments with time range filtering", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 7}, + ShardNum: 2, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + baseDate, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(baseDate) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segments for 5 consecutive days + segmentDates := []time.Time{ + baseDate, // 2024-05-01 + baseDate.AddDate(0, 0, 1), // 2024-05-02 + baseDate.AddDate(0, 0, 2), // 2024-05-03 + baseDate.AddDate(0, 0, 3), // 2024-05-04 + baseDate.AddDate(0, 0, 4), // 2024-05-05 + } + + for _, date := range segmentDates { + mc.Set(date) + seg, segErr := tsdb.CreateSegmentIfNotExist(date) + require.NoError(t, segErr) + + // Create shards for each segment + for shardID := common.ShardID(0); shardID < common.ShardID(2); shardID++ { + shard, shardErr := seg.CreateTSTableIfNotExist(shardID) + require.NoError(t, shardErr) + require.NotNil(t, shard) + } + seg.DecRef() + } + + tsdb.Close() + + // Test visitor with time range covering only middle segments (2024-05-02 to 2024-05-04) + visitor := NewTestVisitor() + startTime := baseDate.AddDate(0, 0, 1) // 2024-05-02 + endTime := baseDate.AddDate(0, 0, 4) // 2024-05-05 (exclusive) + timeRange := timestamp.NewSectionTimeRange(startTime, endTime) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.NoError(t, err) + + // Verify only middle segments were visited (3 segments) + require.Len(t, visitor.visitedSeries, 3) + expectedSeriesPaths := []string{ + filepath.Join(dir, "seg-20240502", seriesIndexDirName), + filepath.Join(dir, "seg-20240503", seriesIndexDirName), + filepath.Join(dir, "seg-20240504", seriesIndexDirName), + } + for _, expectedPath := range expectedSeriesPaths { + require.Contains(t, visitor.visitedSeries, expectedPath) + } + + // Verify shards were visited (3 segments × 2 shards = 6 total) + require.Len(t, visitor.visitedShards, 6) + }) + + t.Run("visit empty directory", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + visitor := NewTestVisitor() + timeRange := timestamp.NewSectionTimeRange(time.Now(), time.Now().Add(24*time.Hour)) + intervalRule := IntervalRule{Unit: DAY, Num: 1} + + err := VisitSegmentsInTimeRange(dir, timeRange, visitor, intervalRule) + require.NoError(t, err) + + // Verify nothing was visited + require.Len(t, visitor.visitedSeries, 0) + require.Len(t, visitor.visitedShards, 0) + }) + + t.Run("visit with series error", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segment + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + defer seg.DecRef() + + // Create shard + shard, err := seg.CreateTSTableIfNotExist(common.ShardID(0)) + require.NoError(t, err) + require.NotNil(t, shard) + + tsdb.Close() + + // Test visitor with series error + visitor := NewTestVisitor() + expectedSeriesPath := filepath.Join(dir, "seg-20240501", seriesIndexDirName) + visitor.SetSeriesError(expectedSeriesPath, errors.New("series access error")) + + timeRange := timestamp.NewSectionTimeRange(ts, ts.Add(24*time.Hour)) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.Error(t, err) + require.Contains(t, err.Error(), "series access error") + require.Contains(t, err.Error(), "failed to visit series index for segment") + }) + + t.Run("visit with shard error", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: DAY, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 3}, + ShardNum: 2, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + require.NoError(t, err) + mc.Set(ts) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segment + seg, err := tsdb.CreateSegmentIfNotExist(ts) + require.NoError(t, err) + defer seg.DecRef() + + // Create shards + for shardID := common.ShardID(0); shardID < common.ShardID(2); shardID++ { + shard, shardErr := seg.CreateTSTableIfNotExist(shardID) + require.NoError(t, shardErr) + require.NotNil(t, shard) + } + + tsdb.Close() + + // Test visitor with shard error + visitor := NewTestVisitor() + visitor.SetShardError(common.ShardID(1), errors.New("shard access error")) + + timeRange := timestamp.NewSectionTimeRange(ts, ts.Add(24*time.Hour)) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.Error(t, err) + require.Contains(t, err.Error(), "shard access error") + require.Contains(t, err.Error(), "failed to visit shards for segment") + }) + + t.Run("visit with hour-based segments", func(t *testing.T) { + dir, defFn := test.Space(require.New(t)) + defer defFn() + + // Create TSDB with hour-based segments + opts := TSDBOpts[*MockTSTable, any]{ + Location: dir, + SegmentInterval: IntervalRule{Unit: HOUR, Num: 1}, + TTL: IntervalRule{Unit: DAY, Num: 1}, + ShardNum: 1, + TSTableCreator: MockTSTableCreator, + } + + ctx := context.Background() + mc := timestamp.NewMockClock() + baseTime, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 10:00:00", time.Local) + require.NoError(t, err) + mc.Set(baseTime) + ctx = timestamp.SetClock(ctx, mc) + + serviceCache := NewServiceCache() + tsdb, err := OpenTSDB(ctx, opts, serviceCache, group) + require.NoError(t, err) + require.NotNil(t, tsdb) + + // Create segments for 3 consecutive hours + segmentTimes := []time.Time{ + baseTime, // 2024-05-01 10:00:00 + baseTime.Add(1 * time.Hour), // 2024-05-01 11:00:00 + baseTime.Add(2 * time.Hour), // 2024-05-01 12:00:00 + } + + for _, segmentTime := range segmentTimes { + mc.Set(segmentTime) + seg, segErr := tsdb.CreateSegmentIfNotExist(segmentTime) + require.NoError(t, segErr) + + // Create shard for each segment + shard, shardErr := seg.CreateTSTableIfNotExist(common.ShardID(0)) + require.NoError(t, shardErr) + require.NotNil(t, shard) + seg.DecRef() + } + + tsdb.Close() + + // Test visitor with time range covering all segments + visitor := NewTestVisitor() + timeRange := timestamp.NewSectionTimeRange(baseTime, baseTime.Add(3*time.Hour)) + + err = VisitSegmentsInTimeRange(dir, timeRange, visitor, opts.SegmentInterval) + require.NoError(t, err) + + // Verify all series indices were visited + require.Len(t, visitor.visitedSeries, 3) + expectedSeriesPaths := []string{ + filepath.Join(dir, "seg-2024050110", seriesIndexDirName), + filepath.Join(dir, "seg-2024050111", seriesIndexDirName), + filepath.Join(dir, "seg-2024050112", seriesIndexDirName), + } + for _, expectedPath := range expectedSeriesPaths { + require.Contains(t, visitor.visitedSeries, expectedPath) + } + + // Verify all shards were visited + require.Len(t, visitor.visitedShards, 3) + for _, shard := range visitor.visitedShards { + require.Equal(t, common.ShardID(0), shard.shardID) + } + }) +} diff --git a/banyand/stream/visitor.go b/banyand/stream/visitor.go new file mode 100644 index 00000000..586742c8 --- /dev/null +++ b/banyand/stream/visitor.go @@ -0,0 +1,103 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "path/filepath" + "strconv" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/fs" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// Visitor defines the interface for visiting stream components. +type Visitor interface { + // VisitSeries visits the series index directory for a segment. + VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error + // VisitPart visits a part directory within a shard. + VisitPart(segmentTR *timestamp.TimeRange, shardID common.ShardID, partPath string) error + // VisitElementIndex visits the element index directory within a shard. + VisitElementIndex(segmentTR *timestamp.TimeRange, shardID common.ShardID, indexPath string) error +} + +// streamSegmentVisitor adapts Visitor to work with storage.SegmentVisitor. +type streamSegmentVisitor struct { + visitor Visitor +} + +// VisitSeries implements storage.SegmentVisitor. +func (sv *streamSegmentVisitor) VisitSeries(segmentTR *timestamp.TimeRange, seriesIndexPath string) error { + return sv.visitor.VisitSeries(segmentTR, seriesIndexPath) +} + +// VisitShard implements storage.SegmentVisitor. +func (sv *streamSegmentVisitor) VisitShard(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error { + // Visit parts within the shard + if err := sv.visitShardParts(segmentTR, shardID, shardPath); err != nil { + return err + } + + // Visit element index within the shard + return sv.visitShardElementIndex(segmentTR, shardID, shardPath) +} + +// visitShardParts visits all part directories within a shard. +func (sv *streamSegmentVisitor) visitShardParts(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error { + lfs := fs.NewLocalFileSystem() + entries := lfs.ReadDir(shardPath) + + for _, entry := range entries { + if !entry.IsDir() { + continue + } + + name := entry.Name() + // Check if this is a part directory (16-character hex string) + if len(name) != 16 { + continue // Skip non-part entries + } + + // Validate it's a valid hex string (part ID) + if _, err := strconv.ParseUint(name, 16, 64); err != nil { + continue // Skip invalid part entries + } + + partPath := filepath.Join(shardPath, name) + if err := sv.visitor.VisitPart(segmentTR, shardID, partPath); err != nil { + return err + } + } + + return nil +} + +// visitShardElementIndex visits the element index directory within a shard. +func (sv *streamSegmentVisitor) visitShardElementIndex(segmentTR *timestamp.TimeRange, shardID common.ShardID, shardPath string) error { + indexPath := filepath.Join(shardPath, elementIndexFilename) + return sv.visitor.VisitElementIndex(segmentTR, shardID, indexPath) +} + +// VisitStreamsInTimeRange traverses stream segments within the specified time range +// and calls the visitor methods for series index, parts, and element indexes. +// This function works directly with the filesystem without requiring a database instance. +func VisitStreamsInTimeRange(tsdbRootPath string, timeRange timestamp.TimeRange, visitor Visitor, intervalRule storage.IntervalRule) error { + adapter := &streamSegmentVisitor{visitor: visitor} + return storage.VisitSegmentsInTimeRange(tsdbRootPath, timeRange, adapter, intervalRule) +} diff --git a/banyand/stream/visitor_test.go b/banyand/stream/visitor_test.go new file mode 100644 index 00000000..a1c41d78 --- /dev/null +++ b/banyand/stream/visitor_test.go @@ -0,0 +1,121 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package stream + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/apache/skywalking-banyandb/api/common" + "github.com/apache/skywalking-banyandb/banyand/internal/storage" + "github.com/apache/skywalking-banyandb/pkg/timestamp" +) + +// TestVisitor tracks visited components for testing. +type TestVisitor struct { + visitedSeries []string + visitedParts []struct { + path string + shardID common.ShardID + } + visitedElementIndex []struct { + path string + shardID common.ShardID + } +} + +// VisitSeries records the visited series path. +func (tv *TestVisitor) VisitSeries(_ *timestamp.TimeRange, seriesIndexPath string) error { + tv.visitedSeries = append(tv.visitedSeries, seriesIndexPath) + return nil +} + +// VisitPart records the visited part path and shard ID. +func (tv *TestVisitor) VisitPart(_ *timestamp.TimeRange, shardID common.ShardID, partPath string) error { + tv.visitedParts = append(tv.visitedParts, struct { + path string + shardID common.ShardID + }{partPath, shardID}) + return nil +} + +// VisitElementIndex records the visited element index path and shard ID. +func (tv *TestVisitor) VisitElementIndex(_ *timestamp.TimeRange, shardID common.ShardID, indexPath string) error { + tv.visitedElementIndex = append(tv.visitedElementIndex, struct { + path string + shardID common.ShardID + }{indexPath, shardID}) + return nil +} + +func TestVisitStreamsInTimeRange(t *testing.T) { + // Create a temporary directory structure + tmpDir, err := os.MkdirTemp("", "stream_visitor_test") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + // Create segment directory structure + now := time.Now() + segmentTime := now.Format("2006010215") // hourly format + segmentDir := filepath.Join(tmpDir, "seg-"+segmentTime) + require.NoError(t, os.MkdirAll(segmentDir, 0o755)) + + // Create series index directory + seriesDir := filepath.Join(segmentDir, "sidx") + require.NoError(t, os.MkdirAll(seriesDir, 0o755)) + + // Create shard directory + shardDir := filepath.Join(segmentDir, "shard-0") + require.NoError(t, os.MkdirAll(shardDir, 0o755)) + + // Create part directory within shard (16-character hex) + partDir := filepath.Join(shardDir, "000000000000001a") + require.NoError(t, os.MkdirAll(partDir, 0o755)) + + // Create element index directory within shard + elemIndexDir := filepath.Join(shardDir, elementIndexFilename) + require.NoError(t, os.MkdirAll(elemIndexDir, 0o755)) + + // Test the visitor + visitor := &TestVisitor{} + timeRange := timestamp.TimeRange{ + Start: now.Add(-time.Hour), + End: now.Add(time.Hour), + } + intervalRule := storage.IntervalRule{Unit: storage.HOUR, Num: 1} + + err = VisitStreamsInTimeRange(tmpDir, timeRange, visitor, intervalRule) + require.NoError(t, err) + + // Verify visits occurred + assert.Len(t, visitor.visitedSeries, 1) + assert.Contains(t, visitor.visitedSeries[0], "sidx") + + assert.Len(t, visitor.visitedParts, 1) + assert.Equal(t, common.ShardID(0), visitor.visitedParts[0].shardID) + assert.Contains(t, visitor.visitedParts[0].path, "000000000000001a") + + assert.Len(t, visitor.visitedElementIndex, 1) + assert.Equal(t, common.ShardID(0), visitor.visitedElementIndex[0].shardID) + assert.Contains(t, visitor.visitedElementIndex[0].path, elementIndexFilename) +} diff --git a/go.mod b/go.mod index c3cb1bf5..4c4ff3dd 100644 --- a/go.mod +++ b/go.mod @@ -200,7 +200,7 @@ require ( replace ( github.com/benbjohnson/clock v1.3.0 => github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 - github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 + github.com/blugelabs/bluge => github.com/SkyAPM/bluge v0.0.0-20250729101336-af60771881c0 github.com/blugelabs/bluge_segment_api => github.com/zinclabs/bluge_segment_api v1.0.0 github.com/blugelabs/ice => github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 ) diff --git a/go.sum b/go.sum index 1fddda45..36d193fa 100644 --- a/go.sum +++ b/go.sum @@ -27,8 +27,8 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/RoaringBitmap/roaring v0.9.4/go.mod h1:icnadbWcNyfEHlYdr+tDlOTih1Bf/h+rzPpv4sbomAA= github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv2QzDdQ= github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= -github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9 h1:OpK6hoXecSlCCeOENHs6m84Gs0knLKt7AHoXCgmABk4= -github.com/SkyAPM/bluge v0.0.0-20250619030236-3750bbbf63b9/go.mod h1:rriyHHsTidJ4UYFiVDIZyXCFJxQYs5FGpZmIfuvSqPA= +github.com/SkyAPM/bluge v0.0.0-20250729101336-af60771881c0 h1:aP6GyHQMNUugBrsqkiKsoVD2kwbCtVi4vs35oZLX8G0= +github.com/SkyAPM/bluge v0.0.0-20250729101336-af60771881c0/go.mod h1:VfQRtJQEwpooobYuSBadb8QYjTUICH+CX42IJYrfSiM= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97 h1:FKuhJ+6n/DHspGeLleeNbziWnKr9gHKYN4q7NcoCp4s= github.com/SkyAPM/clock v1.3.1-0.20220809233656-dc7607c94a97/go.mod h1:2xGRl9H1pllhxTbEGO1W3gDkip8P9GQaHPni/wpdR44= github.com/SkyAPM/ice v0.0.0-20250619023539-b5173603b0b3 h1:bZvLPihpC1q1AJkmR6ere/aJOr3ev+JCytvjKuL+gE8=