This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 137dabb24962fec4fa086950994d1803c285fda1 Author: mrproliu <[email protected]> AuthorDate: Sat May 9 12:46:34 2026 +0800 Fix lifecycle migration segment shorter than the configured `SegmentInterval` (#1120) --- CHANGES.md | 1 + banyand/backup/lifecycle/segment_boundary_utils.go | 43 +- .../lifecycle/segment_boundary_utils_test.go | 229 ++++++++++- banyand/backup/lifecycle/steps.go | 73 +++- banyand/backup/lifecycle/steps_test.go | 93 +++++ banyand/internal/storage/rotation_test.go | 30 +- banyand/internal/storage/segment.go | 24 +- banyand/internal/storage/segment_test.go | 453 +++++++++++++++++++++ banyand/internal/storage/storage.go | 39 ++ banyand/internal/wqueue/wqueue.go | 2 +- banyand/internal/wqueue/wqueue_test.go | 25 +- 11 files changed, 930 insertions(+), 82 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index d912f0c04..c893e5740 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -38,6 +38,7 @@ Release Notes. - Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced with idle-segment cleanup. - Add `GroupLifecycleInfo.errors` to surface per-group collection failures from FODC `InspectAll` instead of silently dropping the affected node entry. - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling `CATALOG_PROPERTY` groups. +- Fix lifecycle migration where the receiving node could create segments shorter than the configured `SegmentInterval`. ### Chores diff --git a/banyand/backup/lifecycle/segment_boundary_utils.go b/banyand/backup/lifecycle/segment_boundary_utils.go index c3f0d6a12..1fd32c298 100644 --- a/banyand/backup/lifecycle/segment_boundary_utils.go +++ b/banyand/backup/lifecycle/segment_boundary_utils.go @@ -25,34 +25,18 @@ import ( ) func calculateTargetSegments(partMinTS, partMaxTS int64, targetInterval storage.IntervalRule) []time.Time { - minTime := time.Unix(0, partMinTS).UTC() - maxTime := time.Unix(0, partMaxTS).UTC() + // Use time.Local so sender's bucket math stays on the same per-timezone + // grid as the receiver, which sees ctx.MinTimestamp as time.Local too. + minTime := time.Unix(0, partMinTS) + maxTime := time.Unix(0, partMaxTS) var targetSegments []time.Time - - var segmentStart time.Time - switch targetInterval.Unit { - case storage.DAY: - daysSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)).Hours() / 24 - segmentIndex := int(daysSinceEpoch) / targetInterval.Num - segmentStart = time.Date(1970, 1, 1+segmentIndex*targetInterval.Num, 0, 0, 0, 0, time.UTC) - case storage.HOUR: - hoursSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)).Hours() - segmentIndex := int(hoursSinceEpoch) / targetInterval.Num - segmentStart = time.Date(1970, 1, 1, segmentIndex*targetInterval.Num, 0, 0, 0, time.UTC) - default: - segmentStart = targetInterval.Unit.Standard(minTime) - } - - current := segmentStart - for !current.After(maxTime) { - segmentEnd := targetInterval.NextTime(current) - if !(segmentEnd.Before(minTime) || current.After(maxTime)) { - targetSegments = append(targetSegments, current) - } - current = segmentEnd + // current starts at the bucket containing minTime, so segmentEnd is + // always > minTime and the loop guard already excludes anything past + // maxTime - every iteration produces a real overlap. + for current := targetInterval.Standard(minTime); !current.After(maxTime); current = targetInterval.NextTime(current) { + targetSegments = append(targetSegments, current) } - return targetSegments } @@ -61,12 +45,11 @@ func getSegmentTimeRange(segmentStart time.Time, interval storage.IntervalRule) return timestamp.NewSectionTimeRange(segmentStart, segmentEnd) } +// getTargetStageInterval returns the segment interval of the migration target +// (the next stage relative to the current node), as populated by parseGroup. func getTargetStageInterval(group *GroupConfig) storage.IntervalRule { - if group.ResourceOpts != nil && len(group.ResourceOpts.Stages) > 0 { - stage := group.ResourceOpts.Stages[0] - if stage.SegmentInterval != nil { - return storage.MustToIntervalRule(stage.SegmentInterval) - } + if group.TargetSegmentInterval != nil { + return storage.MustToIntervalRule(group.TargetSegmentInterval) } if group.ResourceOpts != nil && group.ResourceOpts.SegmentInterval != nil { diff --git a/banyand/backup/lifecycle/segment_boundary_utils_test.go b/banyand/backup/lifecycle/segment_boundary_utils_test.go index 003558667..98777f455 100644 --- a/banyand/backup/lifecycle/segment_boundary_utils_test.go +++ b/banyand/backup/lifecycle/segment_boundary_utils_test.go @@ -18,11 +18,13 @@ package lifecycle import ( + "sync" "testing" "time" "github.com/stretchr/testify/assert" + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" "github.com/apache/skywalking-banyandb/banyand/internal/storage" ) @@ -37,54 +39,54 @@ func TestCalculateTargetSegments(t *testing.T) { }{ { name: "2-day to 3-day segments - part spans multiple segments", - partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC).UnixNano(), // Day 2 - partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0, time.UTC).UnixNano(), // Day 4 + partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0, time.Local).UnixNano(), // Day 2 + partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0, time.Local).UnixNano(), // Day 4 targetInterval: storage.IntervalRule{ Unit: storage.DAY, Num: 3, }, expected: []time.Time{ - time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), // Day 0-3 - time.Date(2024, 1, 3, 0, 0, 0, 0, time.UTC), // Day 3-6 + time.Date(2023, 12, 31, 0, 0, 0, 0, time.Local), // Day 0-3 + time.Date(2024, 1, 3, 0, 0, 0, 0, time.Local), // Day 3-6 }, }, { name: "2-day to 3-day segments - part fits in single segment", - partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano(), // Day 1 - partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0, time.UTC).UnixNano(), // Day 2 + partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local).UnixNano(), // Day 1 + partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0, time.Local).UnixNano(), // Day 2 targetInterval: storage.IntervalRule{ Unit: storage.DAY, Num: 3, }, expected: []time.Time{ - time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), // Day 0-3 + time.Date(2023, 12, 31, 0, 0, 0, 0, time.Local), // Day 0-3 }, }, { name: "hour to day segments - part spans multiple segments", - partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC).UnixNano(), // Hour 12 - partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0, time.UTC).UnixNano(), // Hour 18 + partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0, time.Local).UnixNano(), // Hour 12 + partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0, time.Local).UnixNano(), // Hour 18 targetInterval: storage.IntervalRule{ Unit: storage.DAY, Num: 1, }, expected: []time.Time{ - time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), // Day 1 + time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local), // Day 1 }, }, { name: "day to hour segments - part spans multiple segments", - partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC).UnixNano(), // Day 1 - partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0, time.UTC).UnixNano(), // End of Day 1 + partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local).UnixNano(), // Day 1 + partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0, time.Local).UnixNano(), // End of Day 1 targetInterval: storage.IntervalRule{ Unit: storage.HOUR, Num: 6, }, expected: []time.Time{ - time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), // Hour 0-6 - time.Date(2024, 1, 1, 6, 0, 0, 0, time.UTC), // Hour 6-12 - time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), // Hour 12-18 - time.Date(2024, 1, 1, 18, 0, 0, 0, time.UTC), // Hour 18-24 + time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local), // Hour 0-6 + time.Date(2024, 1, 1, 6, 0, 0, 0, time.Local), // Hour 6-12 + time.Date(2024, 1, 1, 12, 0, 0, 0, time.Local), // Hour 12-18 + time.Date(2024, 1, 1, 18, 0, 0, 0, time.Local), // Hour 18-24 }, }, } @@ -136,3 +138,198 @@ func TestGetSegmentTimeRange(t *testing.T) { }) } } + +const ( + stageWarm = "warm" + stageCold = "cold" + stageFrozen = "frozen" + selectorWarm = "type=warm" + selectorCold = "type=cold" + selectorFrozen = "type=frozen" +) + +func dayInterval(num uint32) *commonv1.IntervalRule { + return &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, Num: num} +} + +func stage(name, selector string, segNum, ttlNum uint32) *commonv1.LifecycleStage { + return &commonv1.LifecycleStage{ + Name: name, + SegmentInterval: dayInterval(segNum), + Ttl: dayInterval(ttlNum), + NodeSelector: selector, + } +} + +// TestGetTargetStageInterval pins every branch of getTargetStageInterval: +// the explicit TargetSegmentInterval, the ResourceOpts.SegmentInterval +// fallback, and the final 1d default. +func TestGetTargetStageInterval(t *testing.T) { + //nolint:govet // fieldalignment: test struct optimization not critical + type tc struct { + name string + group *GroupConfig + expected storage.IntervalRule + } + + cases := []tc{ + { + name: "3-stage warm->cold returns cold's 15d (sw_metricsHour)", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(5), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 7, 7), + stage(stageCold, selectorCold, 15, 30), + }, + }, + }, + SegmentInterval: dayInterval(7), + TargetSegmentInterval: dayInterval(15), + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 15}, + }, + { + name: "3-stage warm->cold returns cold's 5d (sw_metricsMinute)", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(1), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 3, 7), + stage(stageCold, selectorCold, 5, 30), + }, + }, + }, + SegmentInterval: dayInterval(3), + TargetSegmentInterval: dayInterval(5), + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 5}, + }, + { + name: "2-stage hot->cold returns cold's interval", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(1), + Stages: []*commonv1.LifecycleStage{ + stage(stageCold, selectorCold, 15, 30), + }, + }, + }, + SegmentInterval: dayInterval(1), + TargetSegmentInterval: dayInterval(15), + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 15}, + }, + { + name: "4-stage chain warm->cold picks cold, not the last stage", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(1), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 3, 7), + stage(stageCold, selectorCold, 15, 30), + stage(stageFrozen, selectorFrozen, 30, 365), + }, + }, + }, + SegmentInterval: dayInterval(3), + TargetSegmentInterval: dayInterval(15), + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 15}, + }, + { + name: "fallback to ResourceOpts.SegmentInterval when TargetSegmentInterval is nil", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(7), + }, + }, + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 7}, + }, + { + name: "fallback to default 1d when nothing is set", + group: &GroupConfig{ + Group: &commonv1.Group{ResourceOpts: &commonv1.ResourceOpts{}}, + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 1}, + }, + { + name: "fallback handles nil ResourceOpts", + group: &GroupConfig{ + Group: &commonv1.Group{}, + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 1}, + }, + { + name: "TargetSegmentInterval wins over ResourceOpts.SegmentInterval", + group: &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(1), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 7, 7), + stage(stageCold, selectorCold, 15, 30), + }, + }, + }, + SegmentInterval: dayInterval(7), + TargetSegmentInterval: dayInterval(15), + }, + expected: storage.IntervalRule{Unit: storage.DAY, Num: 15}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + got := getTargetStageInterval(c.group) + assert.Equal(t, c.expected, got, + "target stage interval mismatch: expected %d(%s), got %d(%s)", + c.expected.Num, c.expected.Unit.String(), + got.Num, got.Unit.String()) + }) + } +} + +// TestGetTargetStageInterval_ConcurrentReaders pins down that the function is +// safe for concurrent readers - lifecycle migration code calls it from +// multiple goroutines (per-shard / per-group fan-out). +func TestGetTargetStageInterval_ConcurrentReaders(t *testing.T) { + group := &GroupConfig{ + Group: &commonv1.Group{ + ResourceOpts: &commonv1.ResourceOpts{ + SegmentInterval: dayInterval(5), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 7, 7), + stage(stageCold, selectorCold, 15, 30), + }, + }, + }, + SegmentInterval: dayInterval(7), + TargetSegmentInterval: dayInterval(15), + } + expected := storage.IntervalRule{Unit: storage.DAY, Num: 15} + + const goroutines = 64 + const iterations = 1000 + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func() { + defer wg.Done() + for i := 0; i < iterations; i++ { + got := getTargetStageInterval(group) + if got != expected { + t.Errorf("concurrent read returned %v, expected %v", got, expected) + return + } + } + }() + } + wg.Wait() +} diff --git a/banyand/backup/lifecycle/steps.go b/banyand/backup/lifecycle/steps.go index 700897840..080501883 100644 --- a/banyand/backup/lifecycle/steps.go +++ b/banyand/backup/lifecycle/steps.go @@ -86,12 +86,18 @@ func (l *lifecycleService) getSnapshots(groups []*commonv1.Group, p *Progress) ( // It contains all necessary information for migration and deletion operations. type GroupConfig struct { *commonv1.Group - NodeSelector node.Selector - QueueClient queue.Client - AccumulatedTTL *commonv1.IntervalRule + NodeSelector node.Selector + QueueClient queue.Client + AccumulatedTTL *commonv1.IntervalRule + // SegmentInterval is the current stage's segment interval, used to read + // source segments on this node. SegmentInterval *commonv1.IntervalRule - TargetShardNum uint32 - TargetReplicas uint32 + // TargetSegmentInterval is the next stage's segment interval. It differs + // from SegmentInterval on 3-stage deployments (e.g. warm->cold), so any + // computation against the target tier's segment grid must use this field. + TargetSegmentInterval *commonv1.IntervalRule + TargetShardNum uint32 + TargetReplicas uint32 } // Close releases resources held by the GroupConfig. @@ -101,6 +107,17 @@ func (gc *GroupConfig) Close() { } } +// cloneIntervalRule returns a deep copy of ir, or nil if ir is nil. proto.Clone +// applied to a typed-nil *commonv1.IntervalRule yields a non-nil zero value +// (Num=0, Unit=UNIT_UNSPECIFIED) that downstream MustToIntervalRule rejects; +// short-circuiting on nil keeps the GroupConfig fallback chain intact. +func cloneIntervalRule(ir *commonv1.IntervalRule) *commonv1.IntervalRule { + if ir == nil { + return nil + } + return proto.Clone(ir).(*commonv1.IntervalRule) +} + //nolint:contextcheck // health check goroutine uses context.Background() func parseGroup( g *commonv1.Group, nodeLabels map[string]string, nodes []*databasev1.Node, @@ -113,9 +130,25 @@ func parseGroup( if len(ro.Stages) == 0 { return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name) } + // Validate IntervalRules up-front so later derefs (incl. Stages[i+1]) are safe. + if ro.Ttl == nil { + return nil, fmt.Errorf("group %s: missing ttl", g.Metadata.Name) + } + if ro.SegmentInterval == nil { + return nil, fmt.Errorf("group %s: missing segment_interval", g.Metadata.Name) + } + for _, st := range ro.Stages { + if st.SegmentInterval == nil { + return nil, fmt.Errorf("group %s stage %s: missing segment_interval", g.Metadata.Name, st.Name) + } + if st.Ttl == nil { + return nil, fmt.Errorf("group %s stage %s: missing ttl", g.Metadata.Name, st.Name) + } + } ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule) - segmentInterval := proto.Clone(ro.SegmentInterval).(*commonv1.IntervalRule) + segmentInterval := cloneIntervalRule(ro.SegmentInterval) var nst *commonv1.LifecycleStage + var targetSegmentInterval *commonv1.IntervalRule for i, st := range ro.Stages { selector, err := pub.ParseLabelSelector(st.NodeSelector) if err != nil { @@ -130,14 +163,21 @@ func parseGroup( return nil, nil } nst = ro.Stages[i+1] - segmentInterval = st.SegmentInterval - l.Info().Msgf("migrating group %s at stage %s to stage %s, segment interval: %d(%s), total ttl needs: %d(%s)", - g.Metadata.Name, st.Name, nst.Name, segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String()) + // Clone before exposing through GroupConfig so callers cannot mutate + // the shared proto Stages[*] sub-objects. + segmentInterval = cloneIntervalRule(st.SegmentInterval) + targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval) + l.Info().Msgf("migrating group %s at stage %s to stage %s, source segment interval: %d(%s), target segment interval: %d(%s), total ttl needs: %d(%s)", + g.Metadata.Name, st.Name, nst.Name, + segmentInterval.Num, segmentInterval.Unit.String(), + targetSegmentInterval.Num, targetSegmentInterval.Unit.String(), + ttlTime.Num, ttlTime.Unit.String()) break } if nst == nil { nst = ro.Stages[0] ttlTime = proto.Clone(ro.Ttl).(*commonv1.IntervalRule) + targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval) l.Info().Msgf("no matching stage for group %s, defaulting to first stage %s segment interval: %d(%s), total ttl needs: %d(%s)", g.Metadata.Name, nst.Name, segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String()) } @@ -179,13 +219,14 @@ func parseGroup( clusterStateMgr.addRouteTable(t) } return &GroupConfig{ - Group: g, - TargetShardNum: nst.ShardNum, - TargetReplicas: nst.Replicas, - AccumulatedTTL: ttlTime, - SegmentInterval: segmentInterval, - NodeSelector: nodeSel, - QueueClient: client, + Group: g, + TargetShardNum: nst.ShardNum, + TargetReplicas: nst.Replicas, + AccumulatedTTL: ttlTime, + SegmentInterval: segmentInterval, + TargetSegmentInterval: targetSegmentInterval, + NodeSelector: nodeSel, + QueueClient: client, }, nil } diff --git a/banyand/backup/lifecycle/steps_test.go b/banyand/backup/lifecycle/steps_test.go new file mode 100644 index 000000000..c1728b5f2 --- /dev/null +++ b/banyand/backup/lifecycle/steps_test.go @@ -0,0 +1,93 @@ +// Licensed to Apache Software Foundation (ASF) under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Apache Software Foundation (ASF) licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package lifecycle + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + commonv1 "github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1" +) + +// TestParseGroup_RejectsMissingIntervals verifies that parseGroup fails fast +// with a descriptive error when required IntervalRule fields are nil, rather +// than panicking later inside the migration log. parseGroup validates these +// fields before touching metadata.Repo or the cluster state manager, so +// passing nil for those parameters is safe in this test - if a future change +// reorders parseGroup so the deps run first, the tests will panic clearly +// and that contract should be revisited. +func TestParseGroup_RejectsMissingIntervals(t *testing.T) { + makeGroup := func(mutate func(ro *commonv1.ResourceOpts)) *commonv1.Group { + ro := &commonv1.ResourceOpts{ + ShardNum: 1, + SegmentInterval: dayInterval(1), + Ttl: dayInterval(7), + Stages: []*commonv1.LifecycleStage{ + stage(stageWarm, selectorWarm, 7, 7), + stage(stageCold, selectorCold, 15, 30), + }, + } + mutate(ro) + return &commonv1.Group{ + Metadata: &commonv1.Metadata{Name: "test-group"}, + ResourceOpts: ro, + } + } + + cases := []struct { + name string + mutate func(*commonv1.ResourceOpts) + errFrag string + }{ + { + name: "top-level ttl missing", + mutate: func(ro *commonv1.ResourceOpts) { ro.Ttl = nil }, + errFrag: "group test-group: missing ttl", + }, + { + name: "top-level segment_interval missing", + mutate: func(ro *commonv1.ResourceOpts) { ro.SegmentInterval = nil }, + errFrag: "group test-group: missing segment_interval", + }, + { + name: "stage segment_interval missing", + mutate: func(ro *commonv1.ResourceOpts) { ro.Stages[0].SegmentInterval = nil }, + errFrag: "group test-group stage warm: missing segment_interval", + }, + { + name: "stage ttl missing", + mutate: func(ro *commonv1.ResourceOpts) { ro.Stages[0].Ttl = nil }, + errFrag: "group test-group stage warm: missing ttl", + }, + { + name: "next stage segment_interval missing", + mutate: func(ro *commonv1.ResourceOpts) { ro.Stages[1].SegmentInterval = nil }, + errFrag: "group test-group stage cold: missing segment_interval", + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + g := makeGroup(c.mutate) + _, err := parseGroup(g, map[string]string{"type": "warm"}, nil, nil, nil, nil) + require.Error(t, err) + assert.Contains(t, err.Error(), c.errFrag) + }) + } +} diff --git a/banyand/internal/storage/rotation_test.go b/banyand/internal/storage/rotation_test.go index 9577c6736..698df3392 100644 --- a/banyand/internal/storage/rotation_test.go +++ b/banyand/internal/storage/rotation_test.go @@ -249,7 +249,11 @@ func TestRotationDisabled(t *testing.T) { } ctx := context.Background() mc := timestamp.NewMockClock() - ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-01 00:00:00", time.Local) + // 2024-05-02 is day 19845 since the Unix epoch which is divisible by 3, + // so it lies on the 3-day epoch-aligned grid produced by + // IntervalRule.Standard. Picking an aligned day keeps the assertions + // simple: 6 consecutive days collapse into exactly two 3-day segments. + ts, err := time.ParseInLocation("2006-01-02 15:04:05", "2024-05-02 00:00:00", time.UTC) require.NoError(t, err) mc.Set(ts) ctx = timestamp.SetClock(ctx, mc) @@ -267,11 +271,11 @@ func TestRotationDisabled(t *testing.T) { db := tsdb.(*database[*MockTSTable, any]) segCtrl := db.segmentController - // Simulate data arriving day by day for 6 days (mimicking lifecycle migration) - // Day 1 (05-01): already in segment [05-01, 05-04) - // Day 2 (05-02): still in [05-01, 05-04) - // Day 3 (05-03): still in [05-01, 05-04) - // Day 4 (05-04): need new segment [05-04, 05-07) + // Simulate data arriving day by day for 6 days (mimicking lifecycle migration). + // Day 1 (05-02): already in segment [05-02, 05-05) + // Day 2 (05-03): still in [05-02, 05-05) + // Day 3 (05-04): still in [05-02, 05-05) + // Day 4 (05-05): need new segment [05-05, 05-08) for day := 0; day < 6; day++ { dayTime := ts.AddDate(0, 0, day) mc.Set(dayTime) @@ -284,7 +288,7 @@ func TestRotationDisabled(t *testing.T) { tsdb.Tick(maxTS.UnixNano()) } - // Verify: only 2 segments created, both with correct 3-day boundaries + // Verify: only 2 segments created, both on the 3-day epoch-aligned grid. segments, _ := segCtrl.segments(false) defer func() { for i := range segments { @@ -293,13 +297,13 @@ func TestRotationDisabled(t *testing.T) { }() require.Equal(t, 2, len(segments), "expected 2 segments for 6 days with 3-day interval") - // First segment: [05-01, 05-04) - assert.Equal(t, "2024-05-01 00:00:00", segments[0].Start.Format("2006-01-02 15:04:05")) - assert.Equal(t, "2024-05-04 00:00:00", segments[0].End.Format("2006-01-02 15:04:05")) + // First segment: [05-02, 05-05) + assert.Equal(t, "2024-05-02 00:00:00", segments[0].Start.UTC().Format("2006-01-02 15:04:05")) + assert.Equal(t, "2024-05-05 00:00:00", segments[0].End.UTC().Format("2006-01-02 15:04:05")) - // Second segment: [05-04, 05-07) - assert.Equal(t, "2024-05-04 00:00:00", segments[1].Start.Format("2006-01-02 15:04:05")) - assert.Equal(t, "2024-05-07 00:00:00", segments[1].End.Format("2006-01-02 15:04:05")) + // Second segment: [05-05, 05-08) + assert.Equal(t, "2024-05-05 00:00:00", segments[1].Start.UTC().Format("2006-01-02 15:04:05")) + assert.Equal(t, "2024-05-08 00:00:00", segments[1].End.UTC().Format("2006-01-02 15:04:05")) }) } diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index 12cc0fed9..bd8c5c705 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -575,19 +575,37 @@ func (sc *segmentController[T, O]) create(start time.Time) (*segment[T, O], erro } } options := sc.getOptions() - start = options.SegmentInterval.Unit.Standard(start) + // Anchor stdEnd to the aligned start before any bump so end stays on the + // global grid even when start is bumped past a legacy off-grid neighbor; + // subsequent segments then self-heal back to the grid. + alignedStart := options.SegmentInterval.Standard(start) + stdEnd := options.SegmentInterval.NextTime(alignedStart) + start = alignedStart + // sc.lst is sorted ascending by start time with non-overlapping ranges; + // a single pass bumps start past every legacy segment that swallows it + // (each next segment.Start >= previous.End). var next *segment[T, O] for _, s := range sc.lst { if s.Contains(start.UnixNano()) { - return s, nil + start = s.End + continue } if next == nil && s.Start.After(start) { next = s } } - stdEnd := options.SegmentInterval.NextTime(start) var end time.Time if next != nil && next.Start.Before(stdEnd) { + // `next` starts inside the current grid bucket - a legacy off-grid + // segment whose TTL hasn't elapsed. Cap end at next.Start to avoid + // overlap; surfacing this at Info level lets operators see the + // abnormal span until the legacy neighbor ages out. + sc.l.Info(). + Stringer("alignedStart", alignedStart). + Stringer("bumpedStart", start). + Stringer("nextStart", next.Start). + Stringer("stdEnd", stdEnd). + Msg("new segment span is shorter than configured SegmentInterval due to an unaligned legacy neighbor") end = next.Start } else { end = stdEnd diff --git a/banyand/internal/storage/segment_test.go b/banyand/internal/storage/segment_test.go index f50e07603..2430de773 100644 --- a/banyand/internal/storage/segment_test.go +++ b/banyand/internal/storage/segment_test.go @@ -23,6 +23,8 @@ import ( "fmt" "os" "path/filepath" + "slices" + "sort" "strconv" "sync" "sync/atomic" @@ -1178,3 +1180,454 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t *testing.T) { require.Nil(t, s.index, "segment %s should be cleaned up", s.suffix) } } + +// newAlignmentTestController is a compact helper for tests covering the +// epoch-aligned segment.create() behavior introduced by the lifecycle fix. +// It spins up a real segmentController backed by a temp dir, with the given +// SegmentInterval, and returns the controller plus a cleanup function. +func newAlignmentTestController( + t *testing.T, interval IntervalRule, +) (*segmentController[mockTSTable, mockTSTableOpener], string, func()) { + t.Helper() + tempDir, cleanup := setupTestEnvironment(t) + + ctx := context.Background() + l := logger.GetLogger("test-alignment") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{ + Database: "test-db", + Stage: "alignment", + } + }) + + opts := TSDBOpts[mockTSTable, mockTSTableOpener]{ + TSTableCreator: func(_ fs.FileSystem, _ string, _ common.Position, _ *logger.Logger, + _ timestamp.TimeRange, _ mockTSTableOpener, _ any, + ) (mockTSTable, error) { + return mockTSTable{ID: common.ShardID(0)}, nil + }, + ShardNum: 1, + SegmentInterval: interval, + TTL: IntervalRule{Unit: DAY, Num: 60}, + SeriesIndexFlushTimeoutSeconds: 10, + SeriesIndexCacheMaxBytes: 1024 * 1024, + } + + serviceCache := NewServiceCache().(*serviceCache) + sc := newSegmentController[mockTSTable, mockTSTableOpener]( + ctx, + tempDir, + l, + opts, + nil, + nil, + 5*time.Minute, + fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), opts.MemoryLimit), + serviceCache, + group, + ) + return sc, tempDir, cleanup +} + +// TestIntervalRule_Standard_AlignsToEpochGrid pins the alignment math itself. +// Two timestamps in the same N*Unit bucket measured from epoch must round +// down to the exact same instant; two timestamps in adjacent buckets must +// round to instants exactly N*Unit apart. +func TestIntervalRule_Standard_AlignsToEpochGrid(t *testing.T) { + //nolint:govet // fieldalignment: test struct optimization not critical + cases := []struct { + name string + ir IntervalRule + probes []time.Time + expected []time.Time + }{ + { + name: "DAY Num=15 matches calculateTargetSegments grid", + ir: IntervalRule{Unit: DAY, Num: 15}, + probes: []time.Time{ + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 23, 59, 59, 0, time.UTC), + time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), + time.Date(2026, 5, 7, 12, 30, 0, 0, time.UTC), + }, + expected: []time.Time{ + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), + time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC), + }, + }, + { + name: "DAY Num=5", + ir: IntervalRule{Unit: DAY, Num: 5}, + // 1970-01-01 + N*5 day boundaries near today: 2026-04-02 (day 20545), + // 2026-04-07 (day 20550), 2026-04-12 (day 20555). + probes: []time.Time{ + time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 3, 6, 0, 0, 0, time.UTC), + time.Date(2026, 4, 6, 23, 0, 0, 0, time.UTC), + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + }, + expected: []time.Time{ + time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), + }, + }, + { + name: "HOUR Num=6", + ir: IntervalRule{Unit: HOUR, Num: 6}, + probes: []time.Time{ + time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 5, 59, 59, 0, time.UTC), + time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC), + time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC), + }, + expected: []time.Time{ + time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), + time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC), + }, + }, + { + name: "DAY Num=1 matches old Unit.Standard behavior", + ir: IntervalRule{Unit: DAY, Num: 1}, + probes: []time.Time{time.Date(2026, 4, 19, 6, 30, 0, 0, time.UTC)}, + expected: []time.Time{time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC)}, + }, + { + // Pre-epoch inputs: floor division puts them in the bucket below + // rather than collapsing to epoch as truncated division would. + name: "DAY Num=15 pre-epoch uses floor division", + ir: IntervalRule{Unit: DAY, Num: 15}, + probes: []time.Time{ + time.Date(1969, 12, 25, 0, 0, 0, 0, time.UTC), + time.Date(1969, 12, 31, 23, 59, 59, 0, time.UTC), + }, + expected: []time.Time{ + time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC), + time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC), + }, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, len(tc.probes), len(tc.expected)) + for i, p := range tc.probes { + got := tc.ir.Standard(p) + assert.Equal(t, tc.expected[i], got, + "Standard(%s) under %d %s expected %s got %s", + p.Format(time.RFC3339), tc.ir.Num, tc.ir.Unit.String(), + tc.expected[i].Format(time.RFC3339), got.Format(time.RFC3339)) + } + }) + } +} + +// TestIntervalRule_Standard_PreservesLocation verifies that the returned +// time keeps the input's Location and lands on a local-day midnight (DAY) +// or local-hour boundary (HOUR) anchored in the caller's timezone. Under the +// per-timezone grid, the absolute instant naturally differs across timezones. +func TestIntervalRule_Standard_PreservesLocation(t *testing.T) { + shanghai := time.FixedZone("CST", 8*3600) + losAngeles := time.FixedZone("PST", -8*3600) + + cases := []struct { + probe time.Time + ir IntervalRule + }{ + {time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), IntervalRule{Unit: DAY, Num: 15}}, + {time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC), IntervalRule{Unit: HOUR, Num: 6}}, + } + for _, c := range cases { + for _, loc := range []*time.Location{time.UTC, shanghai, losAngeles} { + got := c.ir.Standard(c.probe.In(loc)) + assert.Equal(t, loc, got.Location(), + "Standard must return time in input.Location(); ir=%+v loc=%s", c.ir, loc) + switch c.ir.Unit { + case DAY: + assert.Equal(t, 0, got.Hour(), "DAY bucket must align to local-day midnight") + assert.Equal(t, 0, got.Minute()) + assert.Equal(t, 0, got.Second()) + case HOUR: + assert.Equal(t, 0, got.Minute(), "HOUR bucket must align to local-hour boundary") + assert.Equal(t, 0, got.Second()) + } + } + } +} + +// TestCreateSegment_OutOfOrderArrival_SameBucket reproduces the production +// truncation observed on demo-banyandb-data-cold-0 on 2026-04-30 where a part +// with MinTimestamp ~2026-04-19 created seg-20260419 first and a later- +// processed part with MinTimestamp ~2026-04-16 produced a 3-day truncated +// seg-20260416. With Standard() honoring Num, both timestamps resolve to the +// same epoch-aligned bucket [04/07, 04/22), so the second create returns the +// same segment instead of creating a truncated neighbor. +func TestCreateSegment_OutOfOrderArrival_SameBucket(t *testing.T) { + sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, Num: 15}) + defer cleanup() + + laterFirst := time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC) + earlierAfter := time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC) + expectedBucketStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC) + expectedBucketEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC) + + seg1, err := sc.create(laterFirst) + require.NoError(t, err) + require.NotNil(t, seg1) + defer seg1.DecRef() + + seg2, err := sc.create(earlierAfter) + require.NoError(t, err) + require.NotNil(t, seg2) + + assert.Same(t, seg1, seg2, + "both timestamps fall in the same 15d bucket; create() must reuse the existing segment") + assert.Equal(t, expectedBucketStart, seg1.Start, + "segment must start at the epoch-aligned bucket boundary, not at the first arrival's day") + assert.Equal(t, expectedBucketEnd, seg1.End) + assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start)) +} + +// TestCreateSegment_OutOfOrderArrival_AdjacentBuckets covers the case where +// the late-arrival timestamp falls in a strictly different epoch bucket from +// the first-arrival one. The new segment must occupy a clean N*Unit span +// without being truncated by the existing later segment - the truncation +// branch in create() must collapse to "stdEnd == next.Start" exactly, leaving +// span = N*Unit. +func TestCreateSegment_OutOfOrderArrival_AdjacentBuckets(t *testing.T) { + sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, Num: 15}) + defer cleanup() + + laterFirst := time.Date(2026, 5, 5, 6, 0, 0, 0, time.UTC) // bucket [04/22, 05/07) + earlierAfter := time.Date(2026, 4, 15, 6, 0, 0, 0, time.UTC) // bucket [04/07, 04/22) + + seg1, err := sc.create(laterFirst) + require.NoError(t, err) + require.NotNil(t, seg1) + defer seg1.DecRef() + assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), seg1.Start) + assert.Equal(t, time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC), seg1.End) + assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start)) + + seg2, err := sc.create(earlierAfter) + require.NoError(t, err) + require.NotNil(t, seg2) + defer seg2.DecRef() + assert.NotSame(t, seg1, seg2, + "timestamps in different buckets must not collapse to one segment") + assert.Equal(t, time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), seg2.Start) + assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), seg2.End, + "new earlier segment must abut next.Start with stdEnd==next.Start (no internal truncation)") + assert.Equal(t, 15*24*time.Hour, seg2.End.Sub(seg2.Start)) +} + +// TestCreateSegment_HourInterval_OutOfOrder verifies the same alignment +// guarantee for HOUR-based segment intervals (Num=6). +func TestCreateSegment_HourInterval_OutOfOrder(t *testing.T) { + sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: HOUR, Num: 6}) + defer cleanup() + + t1 := time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC) // bucket [06:00, 12:00) + t2 := time.Date(2026, 4, 19, 8, 15, 0, 0, time.UTC) // same bucket + t3 := time.Date(2026, 4, 19, 13, 0, 0, 0, time.UTC) // bucket [12:00, 18:00) + + seg1, err := sc.create(t1) + require.NoError(t, err) + defer seg1.DecRef() + assert.Equal(t, time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), seg1.Start) + assert.Equal(t, 6*time.Hour, seg1.End.Sub(seg1.Start)) + + seg2, err := sc.create(t2) + require.NoError(t, err) + assert.Same(t, seg1, seg2, "08:15 must reuse the [06:00, 12:00) segment") + + seg3, err := sc.create(t3) + require.NoError(t, err) + defer seg3.DecRef() + assert.NotSame(t, seg1, seg3) + assert.Equal(t, time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC), seg3.Start) + assert.Equal(t, 6*time.Hour, seg3.End.Sub(seg3.Start)) +} + +// TestCreateSegment_ConcurrentCreates_DeterministicAlignment exercises the +// receiver under contention: many goroutines call create() with random +// timestamps spread across a window that covers multiple epoch-aligned +// buckets. Whatever interleaving the scheduler picks, the resulting segment +// set must (a) cover every probe timestamp, (b) all start on epoch-aligned +// boundaries, (c) every span be exactly the configured N*Unit, and (d) be +// non-overlapping. This pins down the property that motivated the fix: +// alignment is determined by the global grid, not by arrival order. +func TestCreateSegment_ConcurrentCreates_DeterministicAlignment(t *testing.T) { + sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, Num: 15}) + defer cleanup() + + // Probe range covers 4 epoch-aligned 15d buckets: + // [03/08, 03/23), [03/23, 04/07), [04/07, 04/22), [04/22, 05/07). + rangeStart := time.Date(2026, 3, 10, 0, 0, 0, 0, time.UTC) + rangeEndExclusive := time.Date(2026, 5, 5, 0, 0, 0, 0, time.UTC) + rangeNanos := rangeEndExclusive.UnixNano() - rangeStart.UnixNano() + + const goroutines = 32 + const probesPerGoroutine = 64 + const knuthMul = uint64(0x9E3779B97F4A7C15) + probes := make([]time.Time, 0, goroutines*probesPerGoroutine) + for i := 0; i < goroutines*probesPerGoroutine; i++ { + offset := int64((uint64(i+1)*knuthMul)>>1) % rangeNanos + probes = append(probes, time.Unix(0, rangeStart.UnixNano()+offset).UTC()) + } + + var wg sync.WaitGroup + wg.Add(goroutines) + for g := 0; g < goroutines; g++ { + go func(idx int) { + defer wg.Done() + start := idx * probesPerGoroutine + end := start + probesPerGoroutine + for _, ts := range probes[start:end] { + seg, err := sc.create(ts) + if err != nil { + t.Errorf("create(%s) returned error: %v", ts.Format(time.RFC3339), err) + return + } + if seg == nil { + t.Errorf("create(%s) returned nil segment", ts.Format(time.RFC3339)) + return + } + if !seg.Contains(ts.UnixNano()) { + t.Errorf("probe %s landed in segment [%s, %s) which does not contain it", + ts.Format(time.RFC3339), seg.Start.Format(time.RFC3339), seg.End.Format(time.RFC3339)) + } + seg.DecRef() + } + }(g) + } + wg.Wait() + + sc.RLock() + segs := slices.Clone(sc.lst) + sc.RUnlock() + + require.NotEmpty(t, segs, "concurrent creates must produce at least one segment") + sort.Slice(segs, func(i, j int) bool { return segs[i].Start.Before(segs[j].Start) }) + + epoch := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC) + fifteenDays := 15 * 24 * time.Hour + for i, seg := range segs { + assert.Equal(t, fifteenDays, seg.End.Sub(seg.Start), + "segment[%d] %s span must be exactly 15d, got %v", + i, seg.suffix, seg.End.Sub(seg.Start)) + offset := seg.Start.Sub(epoch) + assert.Equal(t, time.Duration(0), offset%fifteenDays, + "segment[%d] start %s must be on the epoch-aligned 15d grid", + i, seg.Start.Format(time.RFC3339)) + if i > 0 { + assert.False(t, segs[i-1].End.After(seg.Start), + "segments must not overlap: prev.End=%s next.Start=%s", + segs[i-1].End.Format(time.RFC3339), seg.Start.Format(time.RFC3339)) + } + } +} + +// TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid: a pre-fix +// off-grid segment on disk must produce a transition segment that ends on +// the global grid, so subsequent segments self-heal back to it. +func TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid(t *testing.T) { + sc, tempDir, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, Num: 15}) + defer cleanup() + + ctx := context.Background() + l := logger.GetLogger("test-legacy-transition") + ctx = context.WithValue(ctx, logger.ContextKey, l) + ctx = common.SetPosition(ctx, func(_ common.Position) common.Position { + return common.Position{Database: "test-db", Stage: "cold"} + }) + + // Inject a legacy off-grid segment [2026-05-01, 2026-05-16). The new 15d + // epoch grid puts buckets at 04/22, 05/07, 05/22, 06/06, ...; legacy + // straddles 05/07 and ends inside [05/07, 05/22). + legacyStart := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC) + legacyEnd := time.Date(2026, 5, 16, 0, 0, 0, 0, time.UTC) + suffix := legacyStart.Format(dayFormat) + segPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix)) + require.NoError(t, os.MkdirAll(segPath, DirPerm)) + require.NoError(t, os.WriteFile(filepath.Join(segPath, metadataFilename), []byte(currentVersion), FilePerm)) + legacy, err := sc.openSegment(ctx, legacyStart, legacyEnd, segPath, suffix, sc.groupCache) + require.NoError(t, err) + sc.lst = append(sc.lst, legacy) + sc.sortLst() + + // First write past legacy.End: aligned start (05/07) is inside legacy, + // bump pushes start to 05/16 (legacy.End), but stdEnd was locked to the + // original grid bucket end (05/22). So the transition segment is shorter + // than 15d but still ends on the grid. + probe := time.Date(2026, 5, 17, 6, 0, 0, 0, time.UTC) + transition, err := sc.create(probe) + require.NoError(t, err) + require.NotNil(t, transition) + defer transition.DecRef() + + gridBoundary := time.Date(2026, 5, 22, 0, 0, 0, 0, time.UTC) + assert.True(t, transition.Contains(probe.UnixNano()), + "transition segment must cover the probe; got [%s, %s)", + transition.Start.Format(time.RFC3339), transition.End.Format(time.RFC3339)) + assert.Equal(t, legacyEnd, transition.Start, + "start must be bumped to legacy.End (no overlap, no misroute)") + assert.Equal(t, gridBoundary, transition.End, + "end must land on the next 15d-from-epoch boundary, not legacy.End+15d") + assert.Less(t, transition.End.Sub(transition.Start), 15*24*time.Hour, + "transition segment span is expected to be shorter than the configured SegmentInterval") + + // Second write past the transition segment: aligned start lands cleanly + // on the next grid bucket [05/22, 06/06) and produces a full 15d segment. + postProbe := time.Date(2026, 5, 25, 6, 0, 0, 0, time.UTC) + gridSeg, err := sc.create(postProbe) + require.NoError(t, err) + require.NotNil(t, gridSeg) + defer gridSeg.DecRef() + + assert.Equal(t, gridBoundary, gridSeg.Start, + "first post-transition segment must start on the global grid") + assert.Equal(t, time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC), gridSeg.End, + "first post-transition segment must span a full 15d on the grid") + assert.Equal(t, 15*24*time.Hour, gridSeg.End.Sub(gridSeg.Start), + "transition self-heals: subsequent segments are full Num*Unit") +} + +// TestCreateSegment_PersistedMetadataReflectsAlignedRange writes a segment +// via create() with an unaligned timestamp, then verifies that the on-disk +// metadata records the epoch-aligned start/end - the alignment must survive +// a process restart. +func TestCreateSegment_PersistedMetadataReflectsAlignedRange(t *testing.T) { + sc, tempDir, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, Num: 15}) + defer cleanup() + + probe := time.Date(2026, 4, 16, 6, 30, 0, 0, time.UTC) + expectedStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC) + expectedEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC) + + seg, err := sc.create(probe) + require.NoError(t, err) + require.NotNil(t, seg) + assert.Equal(t, expectedStart, seg.Start) + assert.Equal(t, expectedEnd, seg.End) + seg.DecRef() + + suffix := expectedStart.Format(dayFormat) + metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix), metadataFilename) + rawMeta, readErr := os.ReadFile(metadataPath) + require.NoError(t, readErr) + meta, parseErr := readSegmentMeta(rawMeta) + require.NoError(t, parseErr) + assert.Equal(t, currentVersion, meta.Version) + assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime, + "persisted endTime must reflect the epoch-aligned bucket end, not start+15d from the probe day") +} diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 799f42b82..5e54012ec 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -202,6 +202,45 @@ func (ir IntervalRule) NextTime(current time.Time) time.Time { panic("invalid interval unit") } +// Standard aligns t down to the nearest Num*Unit boundary on a grid anchored +// at 1970-01-01 in t.Location(). Boundaries always fall on local-day midnight +// (DAY) or local-hour boundaries (HOUR), so the result format/parses cleanly +// in t.Location() and Num=1 reduces to IntervalUnit.Standard's local alignment. +// The grid is per-timezone, so two nodes in different geographic timezones may +// produce different bucket starts for the same instant. +func (ir IntervalRule) Standard(t time.Time) time.Time { + if ir.Num <= 0 { + logger.Panicf("interval rule Num must be positive: %+v", ir) + } + if ir.Num == 1 { + return ir.Unit.Standard(t) + } + epochLocal := time.Date(1970, 1, 1, 0, 0, 0, 0, t.Location()) + switch ir.Unit { + case DAY: + todayMidnight := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location()) + // +12 absorbs DST-affected days that are 23 or 25 absolute hours; + // floorDiv handles pre-epoch (negative) inputs correctly. + days := floorDiv(int64(todayMidnight.Sub(epochLocal).Hours()+12), 24) + bucketIdx := floorDiv(days, int64(ir.Num)) + return time.Date(1970, 1, 1+int(bucketIdx)*ir.Num, 0, 0, 0, 0, t.Location()) + case HOUR: + todayHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 0, 0, 0, t.Location()) + hours := floorDiv(int64(todayHour.Sub(epochLocal)), int64(time.Hour)) + bucketIdx := floorDiv(hours, int64(ir.Num)) + return time.Date(1970, 1, 1, int(bucketIdx)*ir.Num, 0, 0, 0, t.Location()) + } + panic("invalid interval unit") +} + +func floorDiv(a, b int64) int64 { + q := a / b + if a%b != 0 && (a < 0) != (b < 0) { + q-- + } + return q +} + func (ir IntervalRule) estimatedDuration() time.Duration { switch ir.Unit { case HOUR: diff --git a/banyand/internal/wqueue/wqueue.go b/banyand/internal/wqueue/wqueue.go index 37d82d1cd..cc95f94f7 100644 --- a/banyand/internal/wqueue/wqueue.go +++ b/banyand/internal/wqueue/wqueue.go @@ -235,7 +235,7 @@ func (q *Queue[S, O]) getShard(shardID common.ShardID) *Shard[S] { // It uses the Queue's SegmentInterval to generate the time range. func (q *Queue[S, O]) GetTimeRange(ts time.Time) timestamp.TimeRange { opts := q.getOpts() - start := opts.SegmentInterval.Unit.Standard(ts) + start := opts.SegmentInterval.Standard(ts) end := opts.SegmentInterval.NextTime(start) return timestamp.NewSectionTimeRange(start, end) } diff --git a/banyand/internal/wqueue/wqueue_test.go b/banyand/internal/wqueue/wqueue_test.go index 91c14ff4b..2dc6b310f 100644 --- a/banyand/internal/wqueue/wqueue_test.go +++ b/banyand/internal/wqueue/wqueue_test.go @@ -68,6 +68,9 @@ func TestQueue_GetTimeRange(t *testing.T) { expectedEnd: time.Date(2023, 12, 16, 0, 0, 0, 0, time.UTC), }, { + // Hour-since-epoch for 2023-12-15 14:00 UTC is 472958, which is even, + // so under Num=2 the bucket boundary already coincides with 14:00 - + // epoch alignment matches plain hour alignment in this case. name: "multiple hours interval", segmentInterval: storage.IntervalRule{ Unit: storage.HOUR, @@ -78,14 +81,30 @@ func TestQueue_GetTimeRange(t *testing.T) { expectedEnd: time.Date(2023, 12, 15, 16, 0, 0, 0, time.UTC), }, { - name: "multiple days interval", + // Hour-since-epoch for 2023-12-15 15:00 UTC is 472959 (odd), so + // under Num=2 the bucket aligns down to 14:00. + name: "multiple hours interval (odd hour drops to even boundary)", + segmentInterval: storage.IntervalRule{ + Unit: storage.HOUR, + Num: 2, + }, + inputTime: time.Date(2023, 12, 15, 15, 30, 45, 123456789, time.UTC), + expectedStart: time.Date(2023, 12, 15, 14, 0, 0, 0, time.UTC), + expectedEnd: time.Date(2023, 12, 15, 16, 0, 0, 0, time.UTC), + }, + { + // Day-since-epoch for 2023-12-15 is 19706. Under Num=3 the bucket + // aligns down to 19704 -> 2023-12-13 .. 2023-12-16. The previous + // expectation of 12/15..12/18 was written against the historical + // bug that aligned only to the day, not to N*day from epoch. + name: "multiple days interval (aligned to N*day from epoch)", segmentInterval: storage.IntervalRule{ Unit: storage.DAY, Num: 3, }, inputTime: time.Date(2023, 12, 15, 14, 30, 45, 123456789, time.UTC), - expectedStart: time.Date(2023, 12, 15, 0, 0, 0, 0, time.UTC), - expectedEnd: time.Date(2023, 12, 18, 0, 0, 0, 0, time.UTC), + expectedStart: time.Date(2023, 12, 13, 0, 0, 0, 0, time.UTC), + expectedEnd: time.Date(2023, 12, 16, 0, 0, 0, 0, time.UTC), }, }
