This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 137dabb24962fec4fa086950994d1803c285fda1
Author: mrproliu <[email protected]>
AuthorDate: Sat May 9 12:46:34 2026 +0800

    Fix lifecycle migration segment shorter than the configured 
`SegmentInterval` (#1120)
---
 CHANGES.md                                         |   1 +
 banyand/backup/lifecycle/segment_boundary_utils.go |  43 +-
 .../lifecycle/segment_boundary_utils_test.go       | 229 ++++++++++-
 banyand/backup/lifecycle/steps.go                  |  73 +++-
 banyand/backup/lifecycle/steps_test.go             |  93 +++++
 banyand/internal/storage/rotation_test.go          |  30 +-
 banyand/internal/storage/segment.go                |  24 +-
 banyand/internal/storage/segment_test.go           | 453 +++++++++++++++++++++
 banyand/internal/storage/storage.go                |  39 ++
 banyand/internal/wqueue/wqueue.go                  |   2 +-
 banyand/internal/wqueue/wqueue_test.go             |  25 +-
 11 files changed, 930 insertions(+), 82 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index d912f0c04..c893e5740 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -38,6 +38,7 @@ Release Notes.
 - Fix nil-pointer panic on cold-tier data nodes when FODC `InspectAll` raced 
with idle-segment cleanup.
 - Add `GroupLifecycleInfo.errors` to surface per-group collection failures 
from FODC `InspectAll` instead of silently dropping the affected node entry.
 - Fix `CollectDataInfo` and `CollectLiaisonInfo` not handling 
`CATALOG_PROPERTY` groups.
+- Fix lifecycle migration where the receiving node could create segments 
shorter than the configured `SegmentInterval`.
 
 ### Chores
 
diff --git a/banyand/backup/lifecycle/segment_boundary_utils.go 
b/banyand/backup/lifecycle/segment_boundary_utils.go
index c3f0d6a12..1fd32c298 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils.go
@@ -25,34 +25,18 @@ import (
 )
 
 func calculateTargetSegments(partMinTS, partMaxTS int64, targetInterval 
storage.IntervalRule) []time.Time {
-       minTime := time.Unix(0, partMinTS).UTC()
-       maxTime := time.Unix(0, partMaxTS).UTC()
+       // Use time.Local so sender's bucket math stays on the same per-timezone
+       // grid as the receiver, which sees ctx.MinTimestamp as time.Local too.
+       minTime := time.Unix(0, partMinTS)
+       maxTime := time.Unix(0, partMaxTS)
 
        var targetSegments []time.Time
-
-       var segmentStart time.Time
-       switch targetInterval.Unit {
-       case storage.DAY:
-               daysSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0, 0, 
time.UTC)).Hours() / 24
-               segmentIndex := int(daysSinceEpoch) / targetInterval.Num
-               segmentStart = time.Date(1970, 1, 
1+segmentIndex*targetInterval.Num, 0, 0, 0, 0, time.UTC)
-       case storage.HOUR:
-               hoursSinceEpoch := minTime.Sub(time.Date(1970, 1, 1, 0, 0, 0, 
0, time.UTC)).Hours()
-               segmentIndex := int(hoursSinceEpoch) / targetInterval.Num
-               segmentStart = time.Date(1970, 1, 1, 
segmentIndex*targetInterval.Num, 0, 0, 0, time.UTC)
-       default:
-               segmentStart = targetInterval.Unit.Standard(minTime)
-       }
-
-       current := segmentStart
-       for !current.After(maxTime) {
-               segmentEnd := targetInterval.NextTime(current)
-               if !(segmentEnd.Before(minTime) || current.After(maxTime)) {
-                       targetSegments = append(targetSegments, current)
-               }
-               current = segmentEnd
+       // current starts at the bucket containing minTime, so segmentEnd is
+       // always > minTime and the loop guard already excludes anything past
+       // maxTime - every iteration produces a real overlap.
+       for current := targetInterval.Standard(minTime); 
!current.After(maxTime); current = targetInterval.NextTime(current) {
+               targetSegments = append(targetSegments, current)
        }
-
        return targetSegments
 }
 
@@ -61,12 +45,11 @@ func getSegmentTimeRange(segmentStart time.Time, interval 
storage.IntervalRule)
        return timestamp.NewSectionTimeRange(segmentStart, segmentEnd)
 }
 
+// getTargetStageInterval returns the segment interval of the migration target
+// (the next stage relative to the current node), as populated by parseGroup.
 func getTargetStageInterval(group *GroupConfig) storage.IntervalRule {
-       if group.ResourceOpts != nil && len(group.ResourceOpts.Stages) > 0 {
-               stage := group.ResourceOpts.Stages[0]
-               if stage.SegmentInterval != nil {
-                       return storage.MustToIntervalRule(stage.SegmentInterval)
-               }
+       if group.TargetSegmentInterval != nil {
+               return storage.MustToIntervalRule(group.TargetSegmentInterval)
        }
 
        if group.ResourceOpts != nil && group.ResourceOpts.SegmentInterval != 
nil {
diff --git a/banyand/backup/lifecycle/segment_boundary_utils_test.go 
b/banyand/backup/lifecycle/segment_boundary_utils_test.go
index 003558667..98777f455 100644
--- a/banyand/backup/lifecycle/segment_boundary_utils_test.go
+++ b/banyand/backup/lifecycle/segment_boundary_utils_test.go
@@ -18,11 +18,13 @@
 package lifecycle
 
 import (
+       "sync"
        "testing"
        "time"
 
        "github.com/stretchr/testify/assert"
 
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
        "github.com/apache/skywalking-banyandb/banyand/internal/storage"
 )
 
@@ -37,54 +39,54 @@ func TestCalculateTargetSegments(t *testing.T) {
        }{
                {
                        name:      "2-day to 3-day segments - part spans 
multiple segments",
-                       partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0, 
time.UTC).UnixNano(), // Day 2
-                       partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0, 
time.UTC).UnixNano(), // Day 4
+                       partMinTS: time.Date(2024, 1, 2, 0, 0, 0, 0, 
time.Local).UnixNano(), // Day 2
+                       partMaxTS: time.Date(2024, 1, 4, 0, 0, 0, 0, 
time.Local).UnixNano(), // Day 4
                        targetInterval: storage.IntervalRule{
                                Unit: storage.DAY,
                                Num:  3,
                        },
                        expected: []time.Time{
-                               time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), // 
Day 0-3
-                               time.Date(2024, 1, 3, 0, 0, 0, 0, time.UTC), // 
Day 3-6
+                               time.Date(2023, 12, 31, 0, 0, 0, 0, 
time.Local), // Day 0-3
+                               time.Date(2024, 1, 3, 0, 0, 0, 0, time.Local),  
 // Day 3-6
                        },
                },
                {
                        name:      "2-day to 3-day segments - part fits in 
single segment",
-                       partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, 
time.UTC).UnixNano(), // Day 1
-                       partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0, 
time.UTC).UnixNano(), // Day 2
+                       partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, 
time.Local).UnixNano(), // Day 1
+                       partMaxTS: time.Date(2024, 1, 2, 0, 0, 0, 0, 
time.Local).UnixNano(), // Day 2
                        targetInterval: storage.IntervalRule{
                                Unit: storage.DAY,
                                Num:  3,
                        },
                        expected: []time.Time{
-                               time.Date(2024, 1, 0, 0, 0, 0, 0, time.UTC), // 
Day 0-3
+                               time.Date(2023, 12, 31, 0, 0, 0, 0, 
time.Local), // Day 0-3
                        },
                },
                {
                        name:      "hour to day segments - part spans multiple 
segments",
-                       partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0, 
time.UTC).UnixNano(), // Hour 12
-                       partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0, 
time.UTC).UnixNano(), // Hour 18
+                       partMinTS: time.Date(2024, 1, 1, 12, 0, 0, 0, 
time.Local).UnixNano(), // Hour 12
+                       partMaxTS: time.Date(2024, 1, 1, 18, 0, 0, 0, 
time.Local).UnixNano(), // Hour 18
                        targetInterval: storage.IntervalRule{
                                Unit: storage.DAY,
                                Num:  1,
                        },
                        expected: []time.Time{
-                               time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC), // 
Day 1
+                               time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local), 
// Day 1
                        },
                },
                {
                        name:      "day to hour segments - part spans multiple 
segments",
-                       partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, 
time.UTC).UnixNano(),    // Day 1
-                       partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0, 
time.UTC).UnixNano(), // End of Day 1
+                       partMinTS: time.Date(2024, 1, 1, 0, 0, 0, 0, 
time.Local).UnixNano(),    // Day 1
+                       partMaxTS: time.Date(2024, 1, 1, 23, 59, 59, 0, 
time.Local).UnixNano(), // End of Day 1
                        targetInterval: storage.IntervalRule{
                                Unit: storage.HOUR,
                                Num:  6,
                        },
                        expected: []time.Time{
-                               time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),  
// Hour 0-6
-                               time.Date(2024, 1, 1, 6, 0, 0, 0, time.UTC),  
// Hour 6-12
-                               time.Date(2024, 1, 1, 12, 0, 0, 0, time.UTC), 
// Hour 12-18
-                               time.Date(2024, 1, 1, 18, 0, 0, 0, time.UTC), 
// Hour 18-24
+                               time.Date(2024, 1, 1, 0, 0, 0, 0, time.Local),  
// Hour 0-6
+                               time.Date(2024, 1, 1, 6, 0, 0, 0, time.Local),  
// Hour 6-12
+                               time.Date(2024, 1, 1, 12, 0, 0, 0, time.Local), 
// Hour 12-18
+                               time.Date(2024, 1, 1, 18, 0, 0, 0, time.Local), 
// Hour 18-24
                        },
                },
        }
@@ -136,3 +138,198 @@ func TestGetSegmentTimeRange(t *testing.T) {
                })
        }
 }
+
+const (
+       stageWarm      = "warm"
+       stageCold      = "cold"
+       stageFrozen    = "frozen"
+       selectorWarm   = "type=warm"
+       selectorCold   = "type=cold"
+       selectorFrozen = "type=frozen"
+)
+
+func dayInterval(num uint32) *commonv1.IntervalRule {
+       return &commonv1.IntervalRule{Unit: commonv1.IntervalRule_UNIT_DAY, 
Num: num}
+}
+
+func stage(name, selector string, segNum, ttlNum uint32) 
*commonv1.LifecycleStage {
+       return &commonv1.LifecycleStage{
+               Name:            name,
+               SegmentInterval: dayInterval(segNum),
+               Ttl:             dayInterval(ttlNum),
+               NodeSelector:    selector,
+       }
+}
+
+// TestGetTargetStageInterval pins every branch of getTargetStageInterval:
+// the explicit TargetSegmentInterval, the ResourceOpts.SegmentInterval
+// fallback, and the final 1d default.
+func TestGetTargetStageInterval(t *testing.T) {
+       //nolint:govet // fieldalignment: test struct optimization not critical
+       type tc struct {
+               name     string
+               group    *GroupConfig
+               expected storage.IntervalRule
+       }
+
+       cases := []tc{
+               {
+                       name: "3-stage warm->cold returns cold's 15d 
(sw_metricsHour)",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(5),
+                                               Stages: 
[]*commonv1.LifecycleStage{
+                                                       stage(stageWarm, 
selectorWarm, 7, 7),
+                                                       stage(stageCold, 
selectorCold, 15, 30),
+                                               },
+                                       },
+                               },
+                               SegmentInterval:       dayInterval(7),
+                               TargetSegmentInterval: dayInterval(15),
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
15},
+               },
+               {
+                       name: "3-stage warm->cold returns cold's 5d 
(sw_metricsMinute)",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(1),
+                                               Stages: 
[]*commonv1.LifecycleStage{
+                                                       stage(stageWarm, 
selectorWarm, 3, 7),
+                                                       stage(stageCold, 
selectorCold, 5, 30),
+                                               },
+                                       },
+                               },
+                               SegmentInterval:       dayInterval(3),
+                               TargetSegmentInterval: dayInterval(5),
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
5},
+               },
+               {
+                       name: "2-stage hot->cold returns cold's interval",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(1),
+                                               Stages: 
[]*commonv1.LifecycleStage{
+                                                       stage(stageCold, 
selectorCold, 15, 30),
+                                               },
+                                       },
+                               },
+                               SegmentInterval:       dayInterval(1),
+                               TargetSegmentInterval: dayInterval(15),
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
15},
+               },
+               {
+                       name: "4-stage chain warm->cold picks cold, not the 
last stage",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(1),
+                                               Stages: 
[]*commonv1.LifecycleStage{
+                                                       stage(stageWarm, 
selectorWarm, 3, 7),
+                                                       stage(stageCold, 
selectorCold, 15, 30),
+                                                       stage(stageFrozen, 
selectorFrozen, 30, 365),
+                                               },
+                                       },
+                               },
+                               SegmentInterval:       dayInterval(3),
+                               TargetSegmentInterval: dayInterval(15),
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
15},
+               },
+               {
+                       name: "fallback to ResourceOpts.SegmentInterval when 
TargetSegmentInterval is nil",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(7),
+                                       },
+                               },
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
7},
+               },
+               {
+                       name: "fallback to default 1d when nothing is set",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{ResourceOpts: 
&commonv1.ResourceOpts{}},
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
1},
+               },
+               {
+                       name: "fallback handles nil ResourceOpts",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{},
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
1},
+               },
+               {
+                       name: "TargetSegmentInterval wins over 
ResourceOpts.SegmentInterval",
+                       group: &GroupConfig{
+                               Group: &commonv1.Group{
+                                       ResourceOpts: &commonv1.ResourceOpts{
+                                               SegmentInterval: dayInterval(1),
+                                               Stages: 
[]*commonv1.LifecycleStage{
+                                                       stage(stageWarm, 
selectorWarm, 7, 7),
+                                                       stage(stageCold, 
selectorCold, 15, 30),
+                                               },
+                                       },
+                               },
+                               SegmentInterval:       dayInterval(7),
+                               TargetSegmentInterval: dayInterval(15),
+                       },
+                       expected: storage.IntervalRule{Unit: storage.DAY, Num: 
15},
+               },
+       }
+
+       for _, c := range cases {
+               t.Run(c.name, func(t *testing.T) {
+                       got := getTargetStageInterval(c.group)
+                       assert.Equal(t, c.expected, got,
+                               "target stage interval mismatch: expected 
%d(%s), got %d(%s)",
+                               c.expected.Num, c.expected.Unit.String(),
+                               got.Num, got.Unit.String())
+               })
+       }
+}
+
+// TestGetTargetStageInterval_ConcurrentReaders pins down that the function is
+// safe for concurrent readers - lifecycle migration code calls it from
+// multiple goroutines (per-shard / per-group fan-out).
+func TestGetTargetStageInterval_ConcurrentReaders(t *testing.T) {
+       group := &GroupConfig{
+               Group: &commonv1.Group{
+                       ResourceOpts: &commonv1.ResourceOpts{
+                               SegmentInterval: dayInterval(5),
+                               Stages: []*commonv1.LifecycleStage{
+                                       stage(stageWarm, selectorWarm, 7, 7),
+                                       stage(stageCold, selectorCold, 15, 30),
+                               },
+                       },
+               },
+               SegmentInterval:       dayInterval(7),
+               TargetSegmentInterval: dayInterval(15),
+       }
+       expected := storage.IntervalRule{Unit: storage.DAY, Num: 15}
+
+       const goroutines = 64
+       const iterations = 1000
+       var wg sync.WaitGroup
+       wg.Add(goroutines)
+       for g := 0; g < goroutines; g++ {
+               go func() {
+                       defer wg.Done()
+                       for i := 0; i < iterations; i++ {
+                               got := getTargetStageInterval(group)
+                               if got != expected {
+                                       t.Errorf("concurrent read returned %v, 
expected %v", got, expected)
+                                       return
+                               }
+                       }
+               }()
+       }
+       wg.Wait()
+}
diff --git a/banyand/backup/lifecycle/steps.go 
b/banyand/backup/lifecycle/steps.go
index 700897840..080501883 100644
--- a/banyand/backup/lifecycle/steps.go
+++ b/banyand/backup/lifecycle/steps.go
@@ -86,12 +86,18 @@ func (l *lifecycleService) getSnapshots(groups 
[]*commonv1.Group, p *Progress) (
 // It contains all necessary information for migration and deletion operations.
 type GroupConfig struct {
        *commonv1.Group
-       NodeSelector    node.Selector
-       QueueClient     queue.Client
-       AccumulatedTTL  *commonv1.IntervalRule
+       NodeSelector   node.Selector
+       QueueClient    queue.Client
+       AccumulatedTTL *commonv1.IntervalRule
+       // SegmentInterval is the current stage's segment interval, used to read
+       // source segments on this node.
        SegmentInterval *commonv1.IntervalRule
-       TargetShardNum  uint32
-       TargetReplicas  uint32
+       // TargetSegmentInterval is the next stage's segment interval. It 
differs
+       // from SegmentInterval on 3-stage deployments (e.g. warm->cold), so any
+       // computation against the target tier's segment grid must use this 
field.
+       TargetSegmentInterval *commonv1.IntervalRule
+       TargetShardNum        uint32
+       TargetReplicas        uint32
 }
 
 // Close releases resources held by the GroupConfig.
@@ -101,6 +107,17 @@ func (gc *GroupConfig) Close() {
        }
 }
 
+// cloneIntervalRule returns a deep copy of ir, or nil if ir is nil. 
proto.Clone
+// applied to a typed-nil *commonv1.IntervalRule yields a non-nil zero value
+// (Num=0, Unit=UNIT_UNSPECIFIED) that downstream MustToIntervalRule rejects;
+// short-circuiting on nil keeps the GroupConfig fallback chain intact.
+func cloneIntervalRule(ir *commonv1.IntervalRule) *commonv1.IntervalRule {
+       if ir == nil {
+               return nil
+       }
+       return proto.Clone(ir).(*commonv1.IntervalRule)
+}
+
 //nolint:contextcheck // health check goroutine uses context.Background()
 func parseGroup(
        g *commonv1.Group, nodeLabels map[string]string, nodes 
[]*databasev1.Node,
@@ -113,9 +130,25 @@ func parseGroup(
        if len(ro.Stages) == 0 {
                return nil, fmt.Errorf("no stages in group %s", g.Metadata.Name)
        }
+       // Validate IntervalRules up-front so later derefs (incl. Stages[i+1]) 
are safe.
+       if ro.Ttl == nil {
+               return nil, fmt.Errorf("group %s: missing ttl", g.Metadata.Name)
+       }
+       if ro.SegmentInterval == nil {
+               return nil, fmt.Errorf("group %s: missing segment_interval", 
g.Metadata.Name)
+       }
+       for _, st := range ro.Stages {
+               if st.SegmentInterval == nil {
+                       return nil, fmt.Errorf("group %s stage %s: missing 
segment_interval", g.Metadata.Name, st.Name)
+               }
+               if st.Ttl == nil {
+                       return nil, fmt.Errorf("group %s stage %s: missing 
ttl", g.Metadata.Name, st.Name)
+               }
+       }
        ttlTime := proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
-       segmentInterval := 
proto.Clone(ro.SegmentInterval).(*commonv1.IntervalRule)
+       segmentInterval := cloneIntervalRule(ro.SegmentInterval)
        var nst *commonv1.LifecycleStage
+       var targetSegmentInterval *commonv1.IntervalRule
        for i, st := range ro.Stages {
                selector, err := pub.ParseLabelSelector(st.NodeSelector)
                if err != nil {
@@ -130,14 +163,21 @@ func parseGroup(
                        return nil, nil
                }
                nst = ro.Stages[i+1]
-               segmentInterval = st.SegmentInterval
-               l.Info().Msgf("migrating group %s at stage %s to stage %s, 
segment interval: %d(%s), total ttl needs: %d(%s)",
-                       g.Metadata.Name, st.Name, nst.Name, 
segmentInterval.Num, segmentInterval.Unit.String(), ttlTime.Num, 
ttlTime.Unit.String())
+               // Clone before exposing through GroupConfig so callers cannot 
mutate
+               // the shared proto Stages[*] sub-objects.
+               segmentInterval = cloneIntervalRule(st.SegmentInterval)
+               targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval)
+               l.Info().Msgf("migrating group %s at stage %s to stage %s, 
source segment interval: %d(%s), target segment interval: %d(%s), total ttl 
needs: %d(%s)",
+                       g.Metadata.Name, st.Name, nst.Name,
+                       segmentInterval.Num, segmentInterval.Unit.String(),
+                       targetSegmentInterval.Num, 
targetSegmentInterval.Unit.String(),
+                       ttlTime.Num, ttlTime.Unit.String())
                break
        }
        if nst == nil {
                nst = ro.Stages[0]
                ttlTime = proto.Clone(ro.Ttl).(*commonv1.IntervalRule)
+               targetSegmentInterval = cloneIntervalRule(nst.SegmentInterval)
                l.Info().Msgf("no matching stage for group %s, defaulting to 
first stage %s segment interval: %d(%s), total ttl needs: %d(%s)",
                        g.Metadata.Name, nst.Name, segmentInterval.Num, 
segmentInterval.Unit.String(), ttlTime.Num, ttlTime.Unit.String())
        }
@@ -179,13 +219,14 @@ func parseGroup(
                clusterStateMgr.addRouteTable(t)
        }
        return &GroupConfig{
-               Group:           g,
-               TargetShardNum:  nst.ShardNum,
-               TargetReplicas:  nst.Replicas,
-               AccumulatedTTL:  ttlTime,
-               SegmentInterval: segmentInterval,
-               NodeSelector:    nodeSel,
-               QueueClient:     client,
+               Group:                 g,
+               TargetShardNum:        nst.ShardNum,
+               TargetReplicas:        nst.Replicas,
+               AccumulatedTTL:        ttlTime,
+               SegmentInterval:       segmentInterval,
+               TargetSegmentInterval: targetSegmentInterval,
+               NodeSelector:          nodeSel,
+               QueueClient:           client,
        }, nil
 }
 
diff --git a/banyand/backup/lifecycle/steps_test.go 
b/banyand/backup/lifecycle/steps_test.go
new file mode 100644
index 000000000..c1728b5f2
--- /dev/null
+++ b/banyand/backup/lifecycle/steps_test.go
@@ -0,0 +1,93 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package lifecycle
+
+import (
+       "testing"
+
+       "github.com/stretchr/testify/assert"
+       "github.com/stretchr/testify/require"
+
+       commonv1 
"github.com/apache/skywalking-banyandb/api/proto/banyandb/common/v1"
+)
+
+// TestParseGroup_RejectsMissingIntervals verifies that parseGroup fails fast
+// with a descriptive error when required IntervalRule fields are nil, rather
+// than panicking later inside the migration log. parseGroup validates these
+// fields before touching metadata.Repo or the cluster state manager, so
+// passing nil for those parameters is safe in this test - if a future change
+// reorders parseGroup so the deps run first, the tests will panic clearly
+// and that contract should be revisited.
+func TestParseGroup_RejectsMissingIntervals(t *testing.T) {
+       makeGroup := func(mutate func(ro *commonv1.ResourceOpts)) 
*commonv1.Group {
+               ro := &commonv1.ResourceOpts{
+                       ShardNum:        1,
+                       SegmentInterval: dayInterval(1),
+                       Ttl:             dayInterval(7),
+                       Stages: []*commonv1.LifecycleStage{
+                               stage(stageWarm, selectorWarm, 7, 7),
+                               stage(stageCold, selectorCold, 15, 30),
+                       },
+               }
+               mutate(ro)
+               return &commonv1.Group{
+                       Metadata:     &commonv1.Metadata{Name: "test-group"},
+                       ResourceOpts: ro,
+               }
+       }
+
+       cases := []struct {
+               name    string
+               mutate  func(*commonv1.ResourceOpts)
+               errFrag string
+       }{
+               {
+                       name:    "top-level ttl missing",
+                       mutate:  func(ro *commonv1.ResourceOpts) { ro.Ttl = nil 
},
+                       errFrag: "group test-group: missing ttl",
+               },
+               {
+                       name:    "top-level segment_interval missing",
+                       mutate:  func(ro *commonv1.ResourceOpts) { 
ro.SegmentInterval = nil },
+                       errFrag: "group test-group: missing segment_interval",
+               },
+               {
+                       name:    "stage segment_interval missing",
+                       mutate:  func(ro *commonv1.ResourceOpts) { 
ro.Stages[0].SegmentInterval = nil },
+                       errFrag: "group test-group stage warm: missing 
segment_interval",
+               },
+               {
+                       name:    "stage ttl missing",
+                       mutate:  func(ro *commonv1.ResourceOpts) { 
ro.Stages[0].Ttl = nil },
+                       errFrag: "group test-group stage warm: missing ttl",
+               },
+               {
+                       name:    "next stage segment_interval missing",
+                       mutate:  func(ro *commonv1.ResourceOpts) { 
ro.Stages[1].SegmentInterval = nil },
+                       errFrag: "group test-group stage cold: missing 
segment_interval",
+               },
+       }
+       for _, c := range cases {
+               t.Run(c.name, func(t *testing.T) {
+                       g := makeGroup(c.mutate)
+                       _, err := parseGroup(g, map[string]string{"type": 
"warm"}, nil, nil, nil, nil)
+                       require.Error(t, err)
+                       assert.Contains(t, err.Error(), c.errFrag)
+               })
+       }
+}
diff --git a/banyand/internal/storage/rotation_test.go 
b/banyand/internal/storage/rotation_test.go
index 9577c6736..698df3392 100644
--- a/banyand/internal/storage/rotation_test.go
+++ b/banyand/internal/storage/rotation_test.go
@@ -249,7 +249,11 @@ func TestRotationDisabled(t *testing.T) {
                }
                ctx := context.Background()
                mc := timestamp.NewMockClock()
-               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-01 00:00:00", time.Local)
+               // 2024-05-02 is day 19845 since the Unix epoch which is 
divisible by 3,
+               // so it lies on the 3-day epoch-aligned grid produced by
+               // IntervalRule.Standard. Picking an aligned day keeps the 
assertions
+               // simple: 6 consecutive days collapse into exactly two 3-day 
segments.
+               ts, err := time.ParseInLocation("2006-01-02 15:04:05", 
"2024-05-02 00:00:00", time.UTC)
                require.NoError(t, err)
                mc.Set(ts)
                ctx = timestamp.SetClock(ctx, mc)
@@ -267,11 +271,11 @@ func TestRotationDisabled(t *testing.T) {
                db := tsdb.(*database[*MockTSTable, any])
                segCtrl := db.segmentController
 
-               // Simulate data arriving day by day for 6 days (mimicking 
lifecycle migration)
-               // Day 1 (05-01): already in segment [05-01, 05-04)
-               // Day 2 (05-02): still in [05-01, 05-04)
-               // Day 3 (05-03): still in [05-01, 05-04)
-               // Day 4 (05-04): need new segment [05-04, 05-07)
+               // Simulate data arriving day by day for 6 days (mimicking 
lifecycle migration).
+               // Day 1 (05-02): already in segment [05-02, 05-05)
+               // Day 2 (05-03): still in [05-02, 05-05)
+               // Day 3 (05-04): still in [05-02, 05-05)
+               // Day 4 (05-05): need new segment [05-05, 05-08)
                for day := 0; day < 6; day++ {
                        dayTime := ts.AddDate(0, 0, day)
                        mc.Set(dayTime)
@@ -284,7 +288,7 @@ func TestRotationDisabled(t *testing.T) {
                        tsdb.Tick(maxTS.UnixNano())
                }
 
-               // Verify: only 2 segments created, both with correct 3-day 
boundaries
+               // Verify: only 2 segments created, both on the 3-day 
epoch-aligned grid.
                segments, _ := segCtrl.segments(false)
                defer func() {
                        for i := range segments {
@@ -293,13 +297,13 @@ func TestRotationDisabled(t *testing.T) {
                }()
                require.Equal(t, 2, len(segments), "expected 2 segments for 6 
days with 3-day interval")
 
-               // First segment: [05-01, 05-04)
-               assert.Equal(t, "2024-05-01 00:00:00", 
segments[0].Start.Format("2006-01-02 15:04:05"))
-               assert.Equal(t, "2024-05-04 00:00:00", 
segments[0].End.Format("2006-01-02 15:04:05"))
+               // First segment: [05-02, 05-05)
+               assert.Equal(t, "2024-05-02 00:00:00", 
segments[0].Start.UTC().Format("2006-01-02 15:04:05"))
+               assert.Equal(t, "2024-05-05 00:00:00", 
segments[0].End.UTC().Format("2006-01-02 15:04:05"))
 
-               // Second segment: [05-04, 05-07)
-               assert.Equal(t, "2024-05-04 00:00:00", 
segments[1].Start.Format("2006-01-02 15:04:05"))
-               assert.Equal(t, "2024-05-07 00:00:00", 
segments[1].End.Format("2006-01-02 15:04:05"))
+               // Second segment: [05-05, 05-08)
+               assert.Equal(t, "2024-05-05 00:00:00", 
segments[1].Start.UTC().Format("2006-01-02 15:04:05"))
+               assert.Equal(t, "2024-05-08 00:00:00", 
segments[1].End.UTC().Format("2006-01-02 15:04:05"))
        })
 }
 
diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index 12cc0fed9..bd8c5c705 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -575,19 +575,37 @@ func (sc *segmentController[T, O]) create(start 
time.Time) (*segment[T, O], erro
                }
        }
        options := sc.getOptions()
-       start = options.SegmentInterval.Unit.Standard(start)
+       // Anchor stdEnd to the aligned start before any bump so end stays on 
the
+       // global grid even when start is bumped past a legacy off-grid 
neighbor;
+       // subsequent segments then self-heal back to the grid.
+       alignedStart := options.SegmentInterval.Standard(start)
+       stdEnd := options.SegmentInterval.NextTime(alignedStart)
+       start = alignedStart
+       // sc.lst is sorted ascending by start time with non-overlapping ranges;
+       // a single pass bumps start past every legacy segment that swallows it
+       // (each next segment.Start >= previous.End).
        var next *segment[T, O]
        for _, s := range sc.lst {
                if s.Contains(start.UnixNano()) {
-                       return s, nil
+                       start = s.End
+                       continue
                }
                if next == nil && s.Start.After(start) {
                        next = s
                }
        }
-       stdEnd := options.SegmentInterval.NextTime(start)
        var end time.Time
        if next != nil && next.Start.Before(stdEnd) {
+               // `next` starts inside the current grid bucket - a legacy 
off-grid
+               // segment whose TTL hasn't elapsed. Cap end at next.Start to 
avoid
+               // overlap; surfacing this at Info level lets operators see the
+               // abnormal span until the legacy neighbor ages out.
+               sc.l.Info().
+                       Stringer("alignedStart", alignedStart).
+                       Stringer("bumpedStart", start).
+                       Stringer("nextStart", next.Start).
+                       Stringer("stdEnd", stdEnd).
+                       Msg("new segment span is shorter than configured 
SegmentInterval due to an unaligned legacy neighbor")
                end = next.Start
        } else {
                end = stdEnd
diff --git a/banyand/internal/storage/segment_test.go 
b/banyand/internal/storage/segment_test.go
index f50e07603..2430de773 100644
--- a/banyand/internal/storage/segment_test.go
+++ b/banyand/internal/storage/segment_test.go
@@ -23,6 +23,8 @@ import (
        "fmt"
        "os"
        "path/filepath"
+       "slices"
+       "sort"
        "strconv"
        "sync"
        "sync/atomic"
@@ -1178,3 +1180,454 @@ func TestSegment_ConcurrentReopenAndClose_NoPanic(t 
*testing.T) {
                require.Nil(t, s.index, "segment %s should be cleaned up", 
s.suffix)
        }
 }
+
+// newAlignmentTestController is a compact helper for tests covering the
+// epoch-aligned segment.create() behavior introduced by the lifecycle fix.
+// It spins up a real segmentController backed by a temp dir, with the given
+// SegmentInterval, and returns the controller plus a cleanup function.
+func newAlignmentTestController(
+       t *testing.T, interval IntervalRule,
+) (*segmentController[mockTSTable, mockTSTableOpener], string, func()) {
+       t.Helper()
+       tempDir, cleanup := setupTestEnvironment(t)
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-alignment")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{
+                       Database: "test-db",
+                       Stage:    "alignment",
+               }
+       })
+
+       opts := TSDBOpts[mockTSTable, mockTSTableOpener]{
+               TSTableCreator: func(_ fs.FileSystem, _ string, _ 
common.Position, _ *logger.Logger,
+                       _ timestamp.TimeRange, _ mockTSTableOpener, _ any,
+               ) (mockTSTable, error) {
+                       return mockTSTable{ID: common.ShardID(0)}, nil
+               },
+               ShardNum:                       1,
+               SegmentInterval:                interval,
+               TTL:                            IntervalRule{Unit: DAY, Num: 
60},
+               SeriesIndexFlushTimeoutSeconds: 10,
+               SeriesIndexCacheMaxBytes:       1024 * 1024,
+       }
+
+       serviceCache := NewServiceCache().(*serviceCache)
+       sc := newSegmentController[mockTSTable, mockTSTableOpener](
+               ctx,
+               tempDir,
+               l,
+               opts,
+               nil,
+               nil,
+               5*time.Minute,
+               
fs.NewLocalFileSystemWithLoggerAndLimit(logger.GetLogger("storage"), 
opts.MemoryLimit),
+               serviceCache,
+               group,
+       )
+       return sc, tempDir, cleanup
+}
+
+// TestIntervalRule_Standard_AlignsToEpochGrid pins the alignment math itself.
+// Two timestamps in the same N*Unit bucket measured from epoch must round
+// down to the exact same instant; two timestamps in adjacent buckets must
+// round to instants exactly N*Unit apart.
+func TestIntervalRule_Standard_AlignsToEpochGrid(t *testing.T) {
+       //nolint:govet // fieldalignment: test struct optimization not critical
+       cases := []struct {
+               name     string
+               ir       IntervalRule
+               probes   []time.Time
+               expected []time.Time
+       }{
+               {
+                       name: "DAY Num=15 matches calculateTargetSegments grid",
+                       ir:   IntervalRule{Unit: DAY, Num: 15},
+                       probes: []time.Time{
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 23, 59, 59, 0, time.UTC),
+                               time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 5, 7, 12, 30, 0, 0, time.UTC),
+                       },
+                       expected: []time.Time{
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC),
+                       },
+               },
+               {
+                       name: "DAY Num=5",
+                       ir:   IntervalRule{Unit: DAY, Num: 5},
+                       // 1970-01-01 + N*5 day boundaries near today: 
2026-04-02 (day 20545),
+                       // 2026-04-07 (day 20550), 2026-04-12 (day 20555).
+                       probes: []time.Time{
+                               time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 3, 6, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 6, 23, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                       },
+                       expected: []time.Time{
+                               time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 2, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC),
+                       },
+               },
+               {
+                       name: "HOUR Num=6",
+                       ir:   IntervalRule{Unit: HOUR, Num: 6},
+                       probes: []time.Time{
+                               time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 5, 59, 59, 0, time.UTC),
+                               time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC),
+                       },
+                       expected: []time.Time{
+                               time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 0, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC),
+                               time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC),
+                       },
+               },
+               {
+                       name:     "DAY Num=1 matches old Unit.Standard 
behavior",
+                       ir:       IntervalRule{Unit: DAY, Num: 1},
+                       probes:   []time.Time{time.Date(2026, 4, 19, 6, 30, 0, 
0, time.UTC)},
+                       expected: []time.Time{time.Date(2026, 4, 19, 0, 0, 0, 
0, time.UTC)},
+               },
+               {
+                       // Pre-epoch inputs: floor division puts them in the 
bucket below
+                       // rather than collapsing to epoch as truncated 
division would.
+                       name: "DAY Num=15 pre-epoch uses floor division",
+                       ir:   IntervalRule{Unit: DAY, Num: 15},
+                       probes: []time.Time{
+                               time.Date(1969, 12, 25, 0, 0, 0, 0, time.UTC),
+                               time.Date(1969, 12, 31, 23, 59, 59, 0, 
time.UTC),
+                       },
+                       expected: []time.Time{
+                               time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC),
+                               time.Date(1969, 12, 17, 0, 0, 0, 0, time.UTC),
+                       },
+               },
+       }
+       for _, tc := range cases {
+               t.Run(tc.name, func(t *testing.T) {
+                       require.Equal(t, len(tc.probes), len(tc.expected))
+                       for i, p := range tc.probes {
+                               got := tc.ir.Standard(p)
+                               assert.Equal(t, tc.expected[i], got,
+                                       "Standard(%s) under %d %s expected %s 
got %s",
+                                       p.Format(time.RFC3339), tc.ir.Num, 
tc.ir.Unit.String(),
+                                       tc.expected[i].Format(time.RFC3339), 
got.Format(time.RFC3339))
+                       }
+               })
+       }
+}
+
+// TestIntervalRule_Standard_PreservesLocation verifies that the returned
+// time keeps the input's Location and lands on a local-day midnight (DAY)
+// or local-hour boundary (HOUR) anchored in the caller's timezone. Under the
+// per-timezone grid, the absolute instant naturally differs across timezones.
+func TestIntervalRule_Standard_PreservesLocation(t *testing.T) {
+       shanghai := time.FixedZone("CST", 8*3600)
+       losAngeles := time.FixedZone("PST", -8*3600)
+
+       cases := []struct {
+               probe time.Time
+               ir    IntervalRule
+       }{
+               {time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), 
IntervalRule{Unit: DAY, Num: 15}},
+               {time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC), 
IntervalRule{Unit: HOUR, Num: 6}},
+       }
+       for _, c := range cases {
+               for _, loc := range []*time.Location{time.UTC, shanghai, 
losAngeles} {
+                       got := c.ir.Standard(c.probe.In(loc))
+                       assert.Equal(t, loc, got.Location(),
+                               "Standard must return time in input.Location(); 
ir=%+v loc=%s", c.ir, loc)
+                       switch c.ir.Unit {
+                       case DAY:
+                               assert.Equal(t, 0, got.Hour(), "DAY bucket must 
align to local-day midnight")
+                               assert.Equal(t, 0, got.Minute())
+                               assert.Equal(t, 0, got.Second())
+                       case HOUR:
+                               assert.Equal(t, 0, got.Minute(), "HOUR bucket 
must align to local-hour boundary")
+                               assert.Equal(t, 0, got.Second())
+                       }
+               }
+       }
+}
+
+// TestCreateSegment_OutOfOrderArrival_SameBucket reproduces the production
+// truncation observed on demo-banyandb-data-cold-0 on 2026-04-30 where a part
+// with MinTimestamp ~2026-04-19 created seg-20260419 first and a later-
+// processed part with MinTimestamp ~2026-04-16 produced a 3-day truncated
+// seg-20260416. With Standard() honoring Num, both timestamps resolve to the
+// same epoch-aligned bucket [04/07, 04/22), so the second create returns the
+// same segment instead of creating a truncated neighbor.
+func TestCreateSegment_OutOfOrderArrival_SameBucket(t *testing.T) {
+       sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, 
Num: 15})
+       defer cleanup()
+
+       laterFirst := time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC)
+       earlierAfter := time.Date(2026, 4, 16, 6, 0, 0, 0, time.UTC)
+       expectedBucketStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC)
+       expectedBucketEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC)
+
+       seg1, err := sc.create(laterFirst)
+       require.NoError(t, err)
+       require.NotNil(t, seg1)
+       defer seg1.DecRef()
+
+       seg2, err := sc.create(earlierAfter)
+       require.NoError(t, err)
+       require.NotNil(t, seg2)
+
+       assert.Same(t, seg1, seg2,
+               "both timestamps fall in the same 15d bucket; create() must 
reuse the existing segment")
+       assert.Equal(t, expectedBucketStart, seg1.Start,
+               "segment must start at the epoch-aligned bucket boundary, not 
at the first arrival's day")
+       assert.Equal(t, expectedBucketEnd, seg1.End)
+       assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start))
+}
+
+// TestCreateSegment_OutOfOrderArrival_AdjacentBuckets covers the case where
+// the late-arrival timestamp falls in a strictly different epoch bucket from
+// the first-arrival one. The new segment must occupy a clean N*Unit span
+// without being truncated by the existing later segment - the truncation
+// branch in create() must collapse to "stdEnd == next.Start" exactly, leaving
+// span = N*Unit.
+func TestCreateSegment_OutOfOrderArrival_AdjacentBuckets(t *testing.T) {
+       sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, 
Num: 15})
+       defer cleanup()
+
+       laterFirst := time.Date(2026, 5, 5, 6, 0, 0, 0, time.UTC)    // bucket 
[04/22, 05/07)
+       earlierAfter := time.Date(2026, 4, 15, 6, 0, 0, 0, time.UTC) // bucket 
[04/07, 04/22)
+
+       seg1, err := sc.create(laterFirst)
+       require.NoError(t, err)
+       require.NotNil(t, seg1)
+       defer seg1.DecRef()
+       assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), 
seg1.Start)
+       assert.Equal(t, time.Date(2026, 5, 7, 0, 0, 0, 0, time.UTC), seg1.End)
+       assert.Equal(t, 15*24*time.Hour, seg1.End.Sub(seg1.Start))
+
+       seg2, err := sc.create(earlierAfter)
+       require.NoError(t, err)
+       require.NotNil(t, seg2)
+       defer seg2.DecRef()
+       assert.NotSame(t, seg1, seg2,
+               "timestamps in different buckets must not collapse to one 
segment")
+       assert.Equal(t, time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC), seg2.Start)
+       assert.Equal(t, time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC), seg2.End,
+               "new earlier segment must abut next.Start with 
stdEnd==next.Start (no internal truncation)")
+       assert.Equal(t, 15*24*time.Hour, seg2.End.Sub(seg2.Start))
+}
+
+// TestCreateSegment_HourInterval_OutOfOrder verifies the same alignment
+// guarantee for HOUR-based segment intervals (Num=6).
+func TestCreateSegment_HourInterval_OutOfOrder(t *testing.T) {
+       sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: 
HOUR, Num: 6})
+       defer cleanup()
+
+       t1 := time.Date(2026, 4, 19, 11, 30, 0, 0, time.UTC) // bucket [06:00, 
12:00)
+       t2 := time.Date(2026, 4, 19, 8, 15, 0, 0, time.UTC)  // same bucket
+       t3 := time.Date(2026, 4, 19, 13, 0, 0, 0, time.UTC)  // bucket [12:00, 
18:00)
+
+       seg1, err := sc.create(t1)
+       require.NoError(t, err)
+       defer seg1.DecRef()
+       assert.Equal(t, time.Date(2026, 4, 19, 6, 0, 0, 0, time.UTC), 
seg1.Start)
+       assert.Equal(t, 6*time.Hour, seg1.End.Sub(seg1.Start))
+
+       seg2, err := sc.create(t2)
+       require.NoError(t, err)
+       assert.Same(t, seg1, seg2, "08:15 must reuse the [06:00, 12:00) 
segment")
+
+       seg3, err := sc.create(t3)
+       require.NoError(t, err)
+       defer seg3.DecRef()
+       assert.NotSame(t, seg1, seg3)
+       assert.Equal(t, time.Date(2026, 4, 19, 12, 0, 0, 0, time.UTC), 
seg3.Start)
+       assert.Equal(t, 6*time.Hour, seg3.End.Sub(seg3.Start))
+}
+
+// TestCreateSegment_ConcurrentCreates_DeterministicAlignment exercises the
+// receiver under contention: many goroutines call create() with random
+// timestamps spread across a window that covers multiple epoch-aligned
+// buckets. Whatever interleaving the scheduler picks, the resulting segment
+// set must (a) cover every probe timestamp, (b) all start on epoch-aligned
+// boundaries, (c) every span be exactly the configured N*Unit, and (d) be
+// non-overlapping. This pins down the property that motivated the fix:
+// alignment is determined by the global grid, not by arrival order.
+func TestCreateSegment_ConcurrentCreates_DeterministicAlignment(t *testing.T) {
+       sc, _, cleanup := newAlignmentTestController(t, IntervalRule{Unit: DAY, 
Num: 15})
+       defer cleanup()
+
+       // Probe range covers 4 epoch-aligned 15d buckets:
+       // [03/08, 03/23), [03/23, 04/07), [04/07, 04/22), [04/22, 05/07).
+       rangeStart := time.Date(2026, 3, 10, 0, 0, 0, 0, time.UTC)
+       rangeEndExclusive := time.Date(2026, 5, 5, 0, 0, 0, 0, time.UTC)
+       rangeNanos := rangeEndExclusive.UnixNano() - rangeStart.UnixNano()
+
+       const goroutines = 32
+       const probesPerGoroutine = 64
+       const knuthMul = uint64(0x9E3779B97F4A7C15)
+       probes := make([]time.Time, 0, goroutines*probesPerGoroutine)
+       for i := 0; i < goroutines*probesPerGoroutine; i++ {
+               offset := int64((uint64(i+1)*knuthMul)>>1) % rangeNanos
+               probes = append(probes, time.Unix(0, 
rangeStart.UnixNano()+offset).UTC())
+       }
+
+       var wg sync.WaitGroup
+       wg.Add(goroutines)
+       for g := 0; g < goroutines; g++ {
+               go func(idx int) {
+                       defer wg.Done()
+                       start := idx * probesPerGoroutine
+                       end := start + probesPerGoroutine
+                       for _, ts := range probes[start:end] {
+                               seg, err := sc.create(ts)
+                               if err != nil {
+                                       t.Errorf("create(%s) returned error: 
%v", ts.Format(time.RFC3339), err)
+                                       return
+                               }
+                               if seg == nil {
+                                       t.Errorf("create(%s) returned nil 
segment", ts.Format(time.RFC3339))
+                                       return
+                               }
+                               if !seg.Contains(ts.UnixNano()) {
+                                       t.Errorf("probe %s landed in segment 
[%s, %s) which does not contain it",
+                                               ts.Format(time.RFC3339), 
seg.Start.Format(time.RFC3339), seg.End.Format(time.RFC3339))
+                               }
+                               seg.DecRef()
+                       }
+               }(g)
+       }
+       wg.Wait()
+
+       sc.RLock()
+       segs := slices.Clone(sc.lst)
+       sc.RUnlock()
+
+       require.NotEmpty(t, segs, "concurrent creates must produce at least one 
segment")
+       sort.Slice(segs, func(i, j int) bool { return 
segs[i].Start.Before(segs[j].Start) })
+
+       epoch := time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC)
+       fifteenDays := 15 * 24 * time.Hour
+       for i, seg := range segs {
+               assert.Equal(t, fifteenDays, seg.End.Sub(seg.Start),
+                       "segment[%d] %s span must be exactly 15d, got %v",
+                       i, seg.suffix, seg.End.Sub(seg.Start))
+               offset := seg.Start.Sub(epoch)
+               assert.Equal(t, time.Duration(0), offset%fifteenDays,
+                       "segment[%d] start %s must be on the epoch-aligned 15d 
grid",
+                       i, seg.Start.Format(time.RFC3339))
+               if i > 0 {
+                       assert.False(t, segs[i-1].End.After(seg.Start),
+                               "segments must not overlap: prev.End=%s 
next.Start=%s",
+                               segs[i-1].End.Format(time.RFC3339), 
seg.Start.Format(time.RFC3339))
+               }
+       }
+}
+
+// TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid: a pre-fix
+// off-grid segment on disk must produce a transition segment that ends on
+// the global grid, so subsequent segments self-heal back to it.
+func TestCreateSegment_LegacyOffGridNeighbour_TransitionThenGrid(t *testing.T) 
{
+       sc, tempDir, cleanup := newAlignmentTestController(t, 
IntervalRule{Unit: DAY, Num: 15})
+       defer cleanup()
+
+       ctx := context.Background()
+       l := logger.GetLogger("test-legacy-transition")
+       ctx = context.WithValue(ctx, logger.ContextKey, l)
+       ctx = common.SetPosition(ctx, func(_ common.Position) common.Position {
+               return common.Position{Database: "test-db", Stage: "cold"}
+       })
+
+       // Inject a legacy off-grid segment [2026-05-01, 2026-05-16). The new 
15d
+       // epoch grid puts buckets at 04/22, 05/07, 05/22, 06/06, ...; legacy
+       // straddles 05/07 and ends inside [05/07, 05/22).
+       legacyStart := time.Date(2026, 5, 1, 0, 0, 0, 0, time.UTC)
+       legacyEnd := time.Date(2026, 5, 16, 0, 0, 0, 0, time.UTC)
+       suffix := legacyStart.Format(dayFormat)
+       segPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix))
+       require.NoError(t, os.MkdirAll(segPath, DirPerm))
+       require.NoError(t, os.WriteFile(filepath.Join(segPath, 
metadataFilename), []byte(currentVersion), FilePerm))
+       legacy, err := sc.openSegment(ctx, legacyStart, legacyEnd, segPath, 
suffix, sc.groupCache)
+       require.NoError(t, err)
+       sc.lst = append(sc.lst, legacy)
+       sc.sortLst()
+
+       // First write past legacy.End: aligned start (05/07) is inside legacy,
+       // bump pushes start to 05/16 (legacy.End), but stdEnd was locked to the
+       // original grid bucket end (05/22). So the transition segment is 
shorter
+       // than 15d but still ends on the grid.
+       probe := time.Date(2026, 5, 17, 6, 0, 0, 0, time.UTC)
+       transition, err := sc.create(probe)
+       require.NoError(t, err)
+       require.NotNil(t, transition)
+       defer transition.DecRef()
+
+       gridBoundary := time.Date(2026, 5, 22, 0, 0, 0, 0, time.UTC)
+       assert.True(t, transition.Contains(probe.UnixNano()),
+               "transition segment must cover the probe; got [%s, %s)",
+               transition.Start.Format(time.RFC3339), 
transition.End.Format(time.RFC3339))
+       assert.Equal(t, legacyEnd, transition.Start,
+               "start must be bumped to legacy.End (no overlap, no misroute)")
+       assert.Equal(t, gridBoundary, transition.End,
+               "end must land on the next 15d-from-epoch boundary, not 
legacy.End+15d")
+       assert.Less(t, transition.End.Sub(transition.Start), 15*24*time.Hour,
+               "transition segment span is expected to be shorter than the 
configured SegmentInterval")
+
+       // Second write past the transition segment: aligned start lands cleanly
+       // on the next grid bucket [05/22, 06/06) and produces a full 15d 
segment.
+       postProbe := time.Date(2026, 5, 25, 6, 0, 0, 0, time.UTC)
+       gridSeg, err := sc.create(postProbe)
+       require.NoError(t, err)
+       require.NotNil(t, gridSeg)
+       defer gridSeg.DecRef()
+
+       assert.Equal(t, gridBoundary, gridSeg.Start,
+               "first post-transition segment must start on the global grid")
+       assert.Equal(t, time.Date(2026, 6, 6, 0, 0, 0, 0, time.UTC), 
gridSeg.End,
+               "first post-transition segment must span a full 15d on the 
grid")
+       assert.Equal(t, 15*24*time.Hour, gridSeg.End.Sub(gridSeg.Start),
+               "transition self-heals: subsequent segments are full Num*Unit")
+}
+
+// TestCreateSegment_PersistedMetadataReflectsAlignedRange writes a segment
+// via create() with an unaligned timestamp, then verifies that the on-disk
+// metadata records the epoch-aligned start/end - the alignment must survive
+// a process restart.
+func TestCreateSegment_PersistedMetadataReflectsAlignedRange(t *testing.T) {
+       sc, tempDir, cleanup := newAlignmentTestController(t, 
IntervalRule{Unit: DAY, Num: 15})
+       defer cleanup()
+
+       probe := time.Date(2026, 4, 16, 6, 30, 0, 0, time.UTC)
+       expectedStart := time.Date(2026, 4, 7, 0, 0, 0, 0, time.UTC)
+       expectedEnd := time.Date(2026, 4, 22, 0, 0, 0, 0, time.UTC)
+
+       seg, err := sc.create(probe)
+       require.NoError(t, err)
+       require.NotNil(t, seg)
+       assert.Equal(t, expectedStart, seg.Start)
+       assert.Equal(t, expectedEnd, seg.End)
+       seg.DecRef()
+
+       suffix := expectedStart.Format(dayFormat)
+       metadataPath := filepath.Join(tempDir, fmt.Sprintf("seg-%s", suffix), 
metadataFilename)
+       rawMeta, readErr := os.ReadFile(metadataPath)
+       require.NoError(t, readErr)
+       meta, parseErr := readSegmentMeta(rawMeta)
+       require.NoError(t, parseErr)
+       assert.Equal(t, currentVersion, meta.Version)
+       assert.Equal(t, expectedEnd.Format(time.RFC3339Nano), meta.EndTime,
+               "persisted endTime must reflect the epoch-aligned bucket end, 
not start+15d from the probe day")
+}
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 799f42b82..5e54012ec 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -202,6 +202,45 @@ func (ir IntervalRule) NextTime(current time.Time) 
time.Time {
        panic("invalid interval unit")
 }
 
+// Standard aligns t down to the nearest Num*Unit boundary on a grid anchored
+// at 1970-01-01 in t.Location(). Boundaries always fall on local-day midnight
+// (DAY) or local-hour boundaries (HOUR), so the result format/parses cleanly
+// in t.Location() and Num=1 reduces to IntervalUnit.Standard's local 
alignment.
+// The grid is per-timezone, so two nodes in different geographic timezones may
+// produce different bucket starts for the same instant.
+func (ir IntervalRule) Standard(t time.Time) time.Time {
+       if ir.Num <= 0 {
+               logger.Panicf("interval rule Num must be positive: %+v", ir)
+       }
+       if ir.Num == 1 {
+               return ir.Unit.Standard(t)
+       }
+       epochLocal := time.Date(1970, 1, 1, 0, 0, 0, 0, t.Location())
+       switch ir.Unit {
+       case DAY:
+               todayMidnight := time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 
0, 0, t.Location())
+               // +12 absorbs DST-affected days that are 23 or 25 absolute 
hours;
+               // floorDiv handles pre-epoch (negative) inputs correctly.
+               days := 
floorDiv(int64(todayMidnight.Sub(epochLocal).Hours()+12), 24)
+               bucketIdx := floorDiv(days, int64(ir.Num))
+               return time.Date(1970, 1, 1+int(bucketIdx)*ir.Num, 0, 0, 0, 0, 
t.Location())
+       case HOUR:
+               todayHour := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), 
0, 0, 0, t.Location())
+               hours := floorDiv(int64(todayHour.Sub(epochLocal)), 
int64(time.Hour))
+               bucketIdx := floorDiv(hours, int64(ir.Num))
+               return time.Date(1970, 1, 1, int(bucketIdx)*ir.Num, 0, 0, 0, 
t.Location())
+       }
+       panic("invalid interval unit")
+}
+
+func floorDiv(a, b int64) int64 {
+       q := a / b
+       if a%b != 0 && (a < 0) != (b < 0) {
+               q--
+       }
+       return q
+}
+
 func (ir IntervalRule) estimatedDuration() time.Duration {
        switch ir.Unit {
        case HOUR:
diff --git a/banyand/internal/wqueue/wqueue.go 
b/banyand/internal/wqueue/wqueue.go
index 37d82d1cd..cc95f94f7 100644
--- a/banyand/internal/wqueue/wqueue.go
+++ b/banyand/internal/wqueue/wqueue.go
@@ -235,7 +235,7 @@ func (q *Queue[S, O]) getShard(shardID common.ShardID) 
*Shard[S] {
 // It uses the Queue's SegmentInterval to generate the time range.
 func (q *Queue[S, O]) GetTimeRange(ts time.Time) timestamp.TimeRange {
        opts := q.getOpts()
-       start := opts.SegmentInterval.Unit.Standard(ts)
+       start := opts.SegmentInterval.Standard(ts)
        end := opts.SegmentInterval.NextTime(start)
        return timestamp.NewSectionTimeRange(start, end)
 }
diff --git a/banyand/internal/wqueue/wqueue_test.go 
b/banyand/internal/wqueue/wqueue_test.go
index 91c14ff4b..2dc6b310f 100644
--- a/banyand/internal/wqueue/wqueue_test.go
+++ b/banyand/internal/wqueue/wqueue_test.go
@@ -68,6 +68,9 @@ func TestQueue_GetTimeRange(t *testing.T) {
                        expectedEnd:   time.Date(2023, 12, 16, 0, 0, 0, 0, 
time.UTC),
                },
                {
+                       // Hour-since-epoch for 2023-12-15 14:00 UTC is 472958, 
which is even,
+                       // so under Num=2 the bucket boundary already coincides 
with 14:00 -
+                       // epoch alignment matches plain hour alignment in this 
case.
                        name: "multiple hours interval",
                        segmentInterval: storage.IntervalRule{
                                Unit: storage.HOUR,
@@ -78,14 +81,30 @@ func TestQueue_GetTimeRange(t *testing.T) {
                        expectedEnd:   time.Date(2023, 12, 15, 16, 0, 0, 0, 
time.UTC),
                },
                {
-                       name: "multiple days interval",
+                       // Hour-since-epoch for 2023-12-15 15:00 UTC is 472959 
(odd), so
+                       // under Num=2 the bucket aligns down to 14:00.
+                       name: "multiple hours interval (odd hour drops to even 
boundary)",
+                       segmentInterval: storage.IntervalRule{
+                               Unit: storage.HOUR,
+                               Num:  2,
+                       },
+                       inputTime:     time.Date(2023, 12, 15, 15, 30, 45, 
123456789, time.UTC),
+                       expectedStart: time.Date(2023, 12, 15, 14, 0, 0, 0, 
time.UTC),
+                       expectedEnd:   time.Date(2023, 12, 15, 16, 0, 0, 0, 
time.UTC),
+               },
+               {
+                       // Day-since-epoch for 2023-12-15 is 19706. Under Num=3 
the bucket
+                       // aligns down to 19704 -> 2023-12-13 .. 2023-12-16. 
The previous
+                       // expectation of 12/15..12/18 was written against the 
historical
+                       // bug that aligned only to the day, not to N*day from 
epoch.
+                       name: "multiple days interval (aligned to N*day from 
epoch)",
                        segmentInterval: storage.IntervalRule{
                                Unit: storage.DAY,
                                Num:  3,
                        },
                        inputTime:     time.Date(2023, 12, 15, 14, 30, 45, 
123456789, time.UTC),
-                       expectedStart: time.Date(2023, 12, 15, 0, 0, 0, 0, 
time.UTC),
-                       expectedEnd:   time.Date(2023, 12, 18, 0, 0, 0, 0, 
time.UTC),
+                       expectedStart: time.Date(2023, 12, 13, 0, 0, 0, 0, 
time.UTC),
+                       expectedEnd:   time.Date(2023, 12, 16, 0, 0, 0, 0, 
time.UTC),
                },
        }
 

Reply via email to