This is an automated email from the ASF dual-hosted git repository.

ButterBright pushed a commit to branch v0.10.x
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit d16421377cf444efcd0c243c0dd9f4de951e1475
Author: mrproliu <[email protected]>
AuthorDate: Mon May 18 16:34:46 2026 +0800

    align part sync timestamp to segment grid so part and sidx land in the same 
segment (#1136)
---
 banyand/internal/storage/segment.go           |   6 +
 banyand/internal/storage/storage.go           |   2 +
 banyand/internal/storage/tsdb.go              |   4 +
 banyand/measure/write_data.go                 |   3 +-
 banyand/measure/write_data_segmentref_test.go | 168 +++++++++++++++++++++++++-
 banyand/stream/write_data.go                  |   3 +-
 banyand/stream/write_data_segmentref_test.go  | 136 ++++++++++++++++++++-
 banyand/trace/write_data.go                   |   3 +-
 banyand/trace/write_data_segmentref_test.go   | 136 ++++++++++++++++++++-
 9 files changed, 455 insertions(+), 6 deletions(-)

diff --git a/banyand/internal/storage/segment.go 
b/banyand/internal/storage/segment.go
index bd8c5c705..75a2994b0 100644
--- a/banyand/internal/storage/segment.go
+++ b/banyand/internal/storage/segment.go
@@ -400,6 +400,12 @@ func (sc *segmentController[T, O]) getOptions() 
*TSDBOpts[T, O] {
        return sc.opts
 }
 
+func (sc *segmentController[T, O]) getSegmentInterval() IntervalRule {
+       sc.optsMutex.RLock()
+       defer sc.optsMutex.RUnlock()
+       return sc.opts.SegmentInterval
+}
+
 func (sc *segmentController[T, O]) updateOptions(resourceOpts 
*commonv1.ResourceOpts) {
        sc.optsMutex.Lock()
        defer sc.optsMutex.Unlock()
diff --git a/banyand/internal/storage/storage.go 
b/banyand/internal/storage/storage.go
index 5e54012ec..fc33a1ffa 100644
--- a/banyand/internal/storage/storage.go
+++ b/banyand/internal/storage/storage.go
@@ -112,6 +112,8 @@ type TSDB[T TSTable, O any] interface {
        io.Closer
        CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error)
        SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error)
+       // SegmentInterval returns the current segment interval rule.
+       SegmentInterval() IntervalRule
        Tick(ts int64)
        UpdateOptions(opts *commonv1.ResourceOpts)
        TakeFileSnapshot(dst string) (bool, error)
diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go
index 22f26569e..341db79e6 100644
--- a/banyand/internal/storage/tsdb.go
+++ b/banyand/internal/storage/tsdb.go
@@ -275,6 +275,10 @@ func (d *database[T, O]) SelectSegments(timeRange 
timestamp.TimeRange) ([]Segmen
        return d.segmentController.selectSegments(timeRange)
 }
 
+func (d *database[T, O]) SegmentInterval() IntervalRule {
+       return d.segmentController.getSegmentInterval()
+}
+
 func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) {
        if d.closed.Load() {
                return
diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go
index c37b52e60..c6079f365 100644
--- a/banyand/measure/write_data.go
+++ b/banyand/measure/write_data.go
@@ -117,7 +117,8 @@ func (s *syncCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (que
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
                return nil, err
        }
-       segmentTime := time.Unix(0, ctx.MinTimestamp)
+       // Align to the segment grid so part-chunk sync and sidx land in the 
same segment.
+       segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, 
ctx.MinTimestamp))
        segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
        if err != nil {
                s.l.Error().Err(err).Str("group", 
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/measure/write_data_segmentref_test.go 
b/banyand/measure/write_data_segmentref_test.go
index 0606cc204..b71f5e742 100644
--- a/banyand/measure/write_data_segmentref_test.go
+++ b/banyand/measure/write_data_segmentref_test.go
@@ -122,7 +122,10 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
        defer db.Close()
 
-       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+       // CreatePartHandler reinterprets ctx.MinTimestamp via time.Unix in 
Local
+       // tz before alignment; keep the warmup segment on the same tz so both
+       // resolve to the same segment instant.
+       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
 
        warmup, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
@@ -248,3 +251,166 @@ type fakeGroup struct {
 
 func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
 func (f *fakeGroup) SupplyTSDB() io.Closer      { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers the
+// regression where the part chunk-sync receiver passed the raw MinTimestamp
+// straight into CreateSegmentIfNotExist. Real data points carry a timestamp
+// anywhere inside a segment window, while the matching sidx Insert/Update
+// arrives from liaison with a grid-aligned start. Without alignment here,
+// part and sidx end up in different segments. The fix is to feed every
+// raw MinTimestamp through TSDB.SegmentInterval().Standard before creating
+// the segment so both paths converge on the same aligned start.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t 
*testing.T) {
+       t.Run("DAY(1)", func(t *testing.T) {
+               runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 
1})
+       })
+       t.Run("DAY(5)", func(t *testing.T) {
+               runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 
5})
+       })
+}
+
+func runOffGridCase(t *testing.T, ir storage.IntervalRule) {
+       t.Helper()
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "off-grid-group"
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       // Pick a raw timestamp deliberately off-grid: an arbitrary 06:00 in 
local
+       // time inside the current grid bucket. Without the alignment fix the
+       // receiver would build a segment whose Start equals this raw ts (modulo
+       // rotation to a 0-second wall instant), which never matches the 
Standard
+       // grid that liaison-side sidx uses.
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantStart := ir.Standard(rawTS)
+
+       callback := &syncCallback{
+               l:          logger.GetLogger("test-offgrid"),
+               schemaRepo: newTestSchemaRepo(db, groupName),
+       }
+       ctx := &queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       }
+       handler, err := callback.CreatePartHandler(ctx)
+       require.NoError(t, err)
+       defer func() { _ = handler.(*syncPartContext).Close() }()
+       partCtx := handler.(*syncPartContext)
+       require.NotNil(t, partCtx.segment)
+
+       gotStart := partCtx.segment.GetTimeRange().Start
+       require.True(t, gotStart.Equal(wantStart),
+               "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; 
expected aligned start=%s (ir=%+v)",
+               rawTS, gotStart, wantStart, ir)
+       // Sanity: assert the raw ts itself would NOT have produced this start —
+       // otherwise the test passes vacuously when raw happens to land on grid.
+       require.False(t, rawTS.Equal(wantStart),
+               "test setup is degenerate: rawTS already aligned, cannot detect 
missing Standard call")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths verifies that both the part
+// chunk-sync receiver (Path A, raw ts straight from a part's MinTimestamp)
+// and the sidx series-sync receiver (Path B, ts pre-standardized at the
+// liaison) converge on the SAME segment Start. A pre-existing aligned
+// segment in an older grid bucket simulates historical data on disk — it
+// must NOT be touched, and the two paths' new writes must land in the
+// same new bucket as each other.
+//
+// This is the regression demo for the PR #1120 fault where Path A used
+// raw ts while Path B (sidx via liaison) used Standard ts, so the same
+// real datapoint produced part data in one segment and sidx data in
+// another.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "consistency-group"
+       ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       // Historical segment: pre-create an aligned segment in a much older 
grid
+       // bucket so the test cluster looks like it has prior data on disk.
+       historicalRaw := time.Date(2026, 4, 10, 0, 0, 0, 0, time.Local)
+       historicalStart := ir.Standard(historicalRaw)
+       histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+       require.NoError(t, err)
+       histSeg.DecRef()
+       histSeg.DecRef() // refCount->0, idle-closed
+
+       // New raw datapoint with an off-grid (06:00 inside a 5-day bucket) ts.
+       // liaison-side sidx pre-standardizes it; data-node-side part chunk-sync
+       // also runs it through Standard now.
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantNewStart := ir.Standard(rawTS)
+       require.False(t, wantNewStart.Equal(historicalStart),
+               "test setup degenerate: new bucket coincides with historical 
bucket")
+       require.False(t, wantNewStart.Equal(rawTS),
+               "test setup degenerate: rawTS already on grid, cannot detect 
Standard")
+
+       repo := newTestSchemaRepo(db, groupName)
+
+       // Path A: part chunk-sync receiver gets the raw MinTimestamp from the
+       // transferred part's metadata. Aligned internally by the fix.
+       chunkCallback := &syncCallback{l: logger.GetLogger("test-pathA"), 
schemaRepo: repo}
+       handlerA, err := 
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerA.(*syncPartContext).Close() }()
+       gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+       // Path B: sidx series-sync receiver gets a MinTimestamp the liaison has
+       // already aligned via Standard. The handler uses it as-is.
+       seriesCallback := &syncSeriesCallback{l: 
logger.GetLogger("test-pathB"), schemaRepo: repo}
+       handlerB, err := 
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           2,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: wantNewStart.UnixNano(),
+               MaxTimestamp: wantNewStart.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+       gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+       require.True(t, gotStartA.Equal(gotStartB),
+               "REGRESSION: Path A (raw ts %s -> segment %s) and Path B 
(aligned ts %s -> segment %s) diverged",
+               rawTS, gotStartA, wantNewStart, gotStartB)
+       require.True(t, gotStartA.Equal(wantNewStart),
+               "path A segment start %s != expected aligned %s", gotStartA, 
wantNewStart)
+       require.False(t, gotStartA.Equal(historicalStart),
+               "new write must NOT land in historical segment %s", 
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir 
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+       t.Helper()
+       opts := storage.TSDBOpts[*tsTable, option]{
+               ShardNum:        1,
+               Location:        filepath.Join(tmpPath, "tab"),
+               TSTableCreator:  newTSTable,
+               SegmentInterval: ir,
+               TTL:             storage.IntervalRule{Unit: ir.Unit, Num: 60},
+               Option:          option{protector: protector.Nop{}, 
mergePolicy: newDefaultMergePolicyForTesting()},
+       }
+       require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+       ctx := common.SetPosition(
+               context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test-offgrid")),
+               func(p common.Position) common.Position {
+                       p.Database = "test-offgrid"
+                       return p
+               },
+       )
+       db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+       require.NoError(t, err)
+       return db
+}
diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go
index 76f4e4ced..8506a36e7 100644
--- a/banyand/stream/write_data.go
+++ b/banyand/stream/write_data.go
@@ -116,7 +116,8 @@ func (s *syncCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext) (que
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
                return nil, err
        }
-       segmentTime := time.Unix(0, ctx.MinTimestamp)
+       // Align to the segment grid so part-chunk sync and sidx land in the 
same segment.
+       segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, 
ctx.MinTimestamp))
        segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
        if err != nil {
                s.l.Error().Err(err).Str("group", 
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/stream/write_data_segmentref_test.go 
b/banyand/stream/write_data_segmentref_test.go
index 81784ac1c..509d333b9 100644
--- a/banyand/stream/write_data_segmentref_test.go
+++ b/banyand/stream/write_data_segmentref_test.go
@@ -114,6 +114,12 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) {
 // syncCallback.CreatePartHandler through a minimal schemaRepo shim and
 // asserts that segment ownership transfers to partCtx and that Close
 // actually DecRefs. See trace package for rationale.
+// In each StoresSegment test, segTime must use time.Local because the
+// callback path runs time.Unix(0,ns) -> Standard -> CreateSegmentIfNotExist
+// in Local tz. If the warmup segment were created in UTC, the two calls
+// could represent the same instant but format to different segment-suffix
+// strings, causing a directory-name mismatch instead of reusing the same
+// segment.
 func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        tmpPath, cleanup := test.Space(require.New(t))
        defer cleanup()
@@ -122,7 +128,7 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
        defer db.Close()
 
-       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
 
        warmup, err := db.CreateSegmentIfNotExist(segTime)
        require.NoError(t, err)
@@ -246,3 +252,131 @@ type fakeGroup struct {
 
 func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
 func (f *fakeGroup) SupplyTSDB() io.Closer      { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers
+// the regression where the stream part chunk-sync receiver passed the raw
+// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous
+// measure test for full rationale.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t 
*testing.T) {
+       t.Run("DAY(1)", func(t *testing.T) {
+               runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, 
Num: 1})
+       })
+       t.Run("DAY(5)", func(t *testing.T) {
+               runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, 
Num: 5})
+       })
+}
+
+func runStreamOffGridCase(t *testing.T, ir storage.IntervalRule) {
+       t.Helper()
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "off-grid-stream"
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantStart := ir.Standard(rawTS)
+
+       callback := &syncCallback{
+               l:          logger.GetLogger("test-offgrid-stream"),
+               schemaRepo: newTestSchemaRepo(db, groupName),
+       }
+       handler, err := 
callback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handler.(*syncPartContext).Close() }()
+       gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start
+       require.True(t, gotStart.Equal(wantStart),
+               "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; 
expected aligned %s (ir=%+v)",
+               rawTS, gotStart, wantStart, ir)
+       require.False(t, rawTS.Equal(wantStart),
+               "test setup degenerate: rawTS already aligned")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths is the stream variant of the
+// Path-A vs Path-B convergence demo. A pre-existing aligned historical
+// segment must NOT be touched; both new writes must land in the same
+// new bucket.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "consistency-stream"
+       ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, 
time.Local))
+       histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+       require.NoError(t, err)
+       histSeg.DecRef()
+       histSeg.DecRef()
+
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantNewStart := ir.Standard(rawTS)
+       require.False(t, wantNewStart.Equal(historicalStart))
+       require.False(t, wantNewStart.Equal(rawTS))
+
+       repo := newTestSchemaRepo(db, groupName)
+
+       chunkCallback := &syncCallback{l: 
logger.GetLogger("test-stream-pathA"), schemaRepo: repo}
+       handlerA, err := 
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerA.(*syncPartContext).Close() }()
+       gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+       seriesCallback := &syncSeriesCallback{l: 
logger.GetLogger("test-stream-pathB"), schemaRepo: repo}
+       handlerB, err := 
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           2,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: wantNewStart.UnixNano(),
+               MaxTimestamp: wantNewStart.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+       gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+       require.True(t, gotStartA.Equal(gotStartB),
+               "REGRESSION: stream Path A (raw %s -> %s) and Path B (aligned 
%s -> %s) diverged",
+               rawTS, gotStartA, wantNewStart, gotStartB)
+       require.True(t, gotStartA.Equal(wantNewStart),
+               "path A start %s != expected aligned %s", gotStartA, 
wantNewStart)
+       require.False(t, gotStartA.Equal(historicalStart),
+               "new write must NOT land in historical segment %s", 
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir 
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+       t.Helper()
+       opts := storage.TSDBOpts[*tsTable, option]{
+               ShardNum:        1,
+               Location:        filepath.Join(tmpPath, "tab"),
+               TSTableCreator:  newTSTable,
+               SegmentInterval: ir,
+               TTL:             storage.IntervalRule{Unit: ir.Unit, Num: 60},
+               Option:          option{protector: protector.Nop{}, 
mergePolicy: newDefaultMergePolicyForTesting()},
+       }
+       require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+       ctx := common.SetPosition(
+               context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test-offgrid-stream")),
+               func(p common.Position) common.Position {
+                       p.Database = "test-offgrid-stream"
+                       return p
+               },
+       )
+       db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+       require.NoError(t, err)
+       return db
+}
diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go
index 16c989b52..7a57ba4ce 100644
--- a/banyand/trace/write_data.go
+++ b/banyand/trace/write_data.go
@@ -288,7 +288,8 @@ func (s *syncChunkCallback) CreatePartHandler(ctx 
*queue.ChunkedSyncPartContext)
                s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to 
load TSDB for group")
                return nil, err
        }
-       segmentTime := time.Unix(0, ctx.MinTimestamp)
+       // Align to the segment grid so part-chunk sync and sidx land in the 
same segment.
+       segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, 
ctx.MinTimestamp))
        segment, err := tsdb.CreateSegmentIfNotExist(segmentTime)
        if err != nil {
                s.l.Error().Err(err).Str("group", 
ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment")
diff --git a/banyand/trace/write_data_segmentref_test.go 
b/banyand/trace/write_data_segmentref_test.go
index f332141d8..d170bb3cd 100644
--- a/banyand/trace/write_data_segmentref_test.go
+++ b/banyand/trace/write_data_segmentref_test.go
@@ -213,6 +213,12 @@ func openTestTSDBForRefTest(t *testing.T, tmpPath string, 
openShardCount *atomic
 //
 // This complements TestSyncReceiver_SegmentRefOwnership, which builds
 // syncPartContext manually. Together they cover every line of the fix.
+//
+// segTime must use time.Local: the callback path runs time.Unix(0,ns)
+// -> Standard -> CreateSegmentIfNotExist in Local tz. If the warmup
+// segment were created in UTC, the two calls could represent the same
+// instant but format to different segment-suffix strings, causing a
+// directory-name mismatch instead of reusing the same segment.
 func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        tmpPath, cleanup := test.Space(require.New(t))
        defer cleanup()
@@ -221,7 +227,7 @@ func 
TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) {
        db := openTestTSDBForRefTest(t, tmpPath, &openShardCount)
        defer db.Close()
 
-       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC)
+       segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local)
 
        // Drive the segment into the idle-closed state (refCount=0) so that the
        // probe at the end of this test reliably exercises the `initialize`
@@ -352,3 +358,131 @@ type fakeGroup struct {
 
 func (f *fakeGroup) GetSchema() *commonv1.Group { return nil }
 func (f *fakeGroup) SupplyTSDB() io.Closer      { return f.tsdb }
+
+// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers
+// the regression where the trace part chunk-sync receiver passed the raw
+// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous
+// measure test for full rationale.
+func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t 
*testing.T) {
+       t.Run("DAY(1)", func(t *testing.T) {
+               runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, 
Num: 1})
+       })
+       t.Run("DAY(5)", func(t *testing.T) {
+               runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, 
Num: 5})
+       })
+}
+
+func runTraceOffGridCase(t *testing.T, ir storage.IntervalRule) {
+       t.Helper()
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "off-grid-trace"
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantStart := ir.Standard(rawTS)
+
+       callback := &syncChunkCallback{
+               l:          logger.GetLogger("test-offgrid-trace"),
+               schemaRepo: newTestSchemaRepo(db, groupName),
+       }
+       handler, err := 
callback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handler.(*syncPartContext).Close() }()
+       gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start
+       require.True(t, gotStart.Equal(wantStart),
+               "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; 
expected aligned %s (ir=%+v)",
+               rawTS, gotStart, wantStart, ir)
+       require.False(t, rawTS.Equal(wantStart),
+               "test setup degenerate: rawTS already aligned")
+}
+
+// TestSegmentCreateTS_ConsistencyAcrossPaths is the trace variant of the
+// Path-A vs Path-B convergence demo. A pre-existing aligned historical
+// segment must NOT be touched; both new writes must land in the same
+// new bucket.
+func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) {
+       tmpPath, cleanup := test.Space(require.New(t))
+       defer cleanup()
+
+       const groupName = "consistency-trace"
+       ir := storage.IntervalRule{Unit: storage.DAY, Num: 5}
+       db := openTestTSDBWithInterval(t, tmpPath, groupName, ir)
+       defer db.Close()
+
+       historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, 
time.Local))
+       histSeg, err := db.CreateSegmentIfNotExist(historicalStart)
+       require.NoError(t, err)
+       histSeg.DecRef()
+       histSeg.DecRef()
+
+       rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local)
+       wantNewStart := ir.Standard(rawTS)
+       require.False(t, wantNewStart.Equal(historicalStart))
+       require.False(t, wantNewStart.Equal(rawTS))
+
+       repo := newTestSchemaRepo(db, groupName)
+
+       chunkCallback := &syncChunkCallback{l: 
logger.GetLogger("test-trace-pathA"), schemaRepo: repo}
+       handlerA, err := 
chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           1,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: rawTS.UnixNano(),
+               MaxTimestamp: rawTS.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerA.(*syncPartContext).Close() }()
+       gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start
+
+       seriesCallback := &syncSeriesCallback{l: 
logger.GetLogger("test-trace-pathB"), schemaRepo: repo}
+       handlerB, err := 
seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{
+               ID:           2,
+               Group:        groupName,
+               ShardID:      0,
+               MinTimestamp: wantNewStart.UnixNano(),
+               MaxTimestamp: wantNewStart.UnixNano(),
+       })
+       require.NoError(t, err)
+       defer func() { _ = handlerB.(*syncSeriesContext).Close() }()
+       gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start
+
+       require.True(t, gotStartA.Equal(gotStartB),
+               "REGRESSION: trace Path A (raw %s -> %s) and Path B (aligned %s 
-> %s) diverged",
+               rawTS, gotStartA, wantNewStart, gotStartB)
+       require.True(t, gotStartA.Equal(wantNewStart),
+               "path A start %s != expected aligned %s", gotStartA, 
wantNewStart)
+       require.False(t, gotStartA.Equal(historicalStart),
+               "new write must NOT land in historical segment %s", 
historicalStart)
+}
+
+func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir 
storage.IntervalRule) storage.TSDB[*tsTable, option] {
+       t.Helper()
+       opts := storage.TSDBOpts[*tsTable, option]{
+               ShardNum:        1,
+               Location:        filepath.Join(tmpPath, "tab"),
+               TSTableCreator:  newTSTable,
+               SegmentInterval: ir,
+               TTL:             storage.IntervalRule{Unit: ir.Unit, Num: 60},
+               Option:          option{protector: protector.Nop{}, 
mergePolicy: newDefaultMergePolicyForTesting()},
+       }
+       require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm))
+       ctx := common.SetPosition(
+               context.WithValue(context.Background(), logger.ContextKey, 
logger.GetLogger("test-offgrid-trace")),
+               func(p common.Position) common.Position {
+                       p.Database = "test-offgrid-trace"
+                       return p
+               },
+       )
+       db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName)
+       require.NoError(t, err)
+       return db
+}

Reply via email to