This is an automated email from the ASF dual-hosted git repository. ButterBright pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit d16421377cf444efcd0c243c0dd9f4de951e1475 Author: mrproliu <[email protected]> AuthorDate: Mon May 18 16:34:46 2026 +0800 align part sync timestamp to segment grid so part and sidx land in the same segment (#1136) --- banyand/internal/storage/segment.go | 6 + banyand/internal/storage/storage.go | 2 + banyand/internal/storage/tsdb.go | 4 + banyand/measure/write_data.go | 3 +- banyand/measure/write_data_segmentref_test.go | 168 +++++++++++++++++++++++++- banyand/stream/write_data.go | 3 +- banyand/stream/write_data_segmentref_test.go | 136 ++++++++++++++++++++- banyand/trace/write_data.go | 3 +- banyand/trace/write_data_segmentref_test.go | 136 ++++++++++++++++++++- 9 files changed, 455 insertions(+), 6 deletions(-) diff --git a/banyand/internal/storage/segment.go b/banyand/internal/storage/segment.go index bd8c5c705..75a2994b0 100644 --- a/banyand/internal/storage/segment.go +++ b/banyand/internal/storage/segment.go @@ -400,6 +400,12 @@ func (sc *segmentController[T, O]) getOptions() *TSDBOpts[T, O] { return sc.opts } +func (sc *segmentController[T, O]) getSegmentInterval() IntervalRule { + sc.optsMutex.RLock() + defer sc.optsMutex.RUnlock() + return sc.opts.SegmentInterval +} + func (sc *segmentController[T, O]) updateOptions(resourceOpts *commonv1.ResourceOpts) { sc.optsMutex.Lock() defer sc.optsMutex.Unlock() diff --git a/banyand/internal/storage/storage.go b/banyand/internal/storage/storage.go index 5e54012ec..fc33a1ffa 100644 --- a/banyand/internal/storage/storage.go +++ b/banyand/internal/storage/storage.go @@ -112,6 +112,8 @@ type TSDB[T TSTable, O any] interface { io.Closer CreateSegmentIfNotExist(ts time.Time) (Segment[T, O], error) SelectSegments(timeRange timestamp.TimeRange) ([]Segment[T, O], error) + // SegmentInterval returns the current segment interval rule. + SegmentInterval() IntervalRule Tick(ts int64) UpdateOptions(opts *commonv1.ResourceOpts) TakeFileSnapshot(dst string) (bool, error) diff --git a/banyand/internal/storage/tsdb.go b/banyand/internal/storage/tsdb.go index 22f26569e..341db79e6 100644 --- a/banyand/internal/storage/tsdb.go +++ b/banyand/internal/storage/tsdb.go @@ -275,6 +275,10 @@ func (d *database[T, O]) SelectSegments(timeRange timestamp.TimeRange) ([]Segmen return d.segmentController.selectSegments(timeRange) } +func (d *database[T, O]) SegmentInterval() IntervalRule { + return d.segmentController.getSegmentInterval() +} + func (d *database[T, O]) UpdateOptions(resourceOpts *commonv1.ResourceOpts) { if d.closed.Load() { return diff --git a/banyand/measure/write_data.go b/banyand/measure/write_data.go index c37b52e60..c6079f365 100644 --- a/banyand/measure/write_data.go +++ b/banyand/measure/write_data.go @@ -117,7 +117,8 @@ func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (que s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") return nil, err } - segmentTime := time.Unix(0, ctx.MinTimestamp) + // Align to the segment grid so part-chunk sync and sidx land in the same segment. + segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, ctx.MinTimestamp)) segment, err := tsdb.CreateSegmentIfNotExist(segmentTime) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment") diff --git a/banyand/measure/write_data_segmentref_test.go b/banyand/measure/write_data_segmentref_test.go index 0606cc204..b71f5e742 100644 --- a/banyand/measure/write_data_segmentref_test.go +++ b/banyand/measure/write_data_segmentref_test.go @@ -122,7 +122,10 @@ func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) { db := openTestTSDBForRefTest(t, tmpPath, &openShardCount) defer db.Close() - segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC) + // CreatePartHandler reinterprets ctx.MinTimestamp via time.Unix in Local + // tz before alignment; keep the warmup segment on the same tz so both + // resolve to the same segment instant. + segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local) warmup, err := db.CreateSegmentIfNotExist(segTime) require.NoError(t, err) @@ -248,3 +251,166 @@ type fakeGroup struct { func (f *fakeGroup) GetSchema() *commonv1.Group { return nil } func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb } + +// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers the +// regression where the part chunk-sync receiver passed the raw MinTimestamp +// straight into CreateSegmentIfNotExist. Real data points carry a timestamp +// anywhere inside a segment window, while the matching sidx Insert/Update +// arrives from liaison with a grid-aligned start. Without alignment here, +// part and sidx end up in different segments. The fix is to feed every +// raw MinTimestamp through TSDB.SegmentInterval().Standard before creating +// the segment so both paths converge on the same aligned start. +func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t *testing.T) { + t.Run("DAY(1)", func(t *testing.T) { + runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 1}) + }) + t.Run("DAY(5)", func(t *testing.T) { + runOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 5}) + }) +} + +func runOffGridCase(t *testing.T, ir storage.IntervalRule) { + t.Helper() + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "off-grid-group" + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + // Pick a raw timestamp deliberately off-grid: an arbitrary 06:00 in local + // time inside the current grid bucket. Without the alignment fix the + // receiver would build a segment whose Start equals this raw ts (modulo + // rotation to a 0-second wall instant), which never matches the Standard + // grid that liaison-side sidx uses. + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantStart := ir.Standard(rawTS) + + callback := &syncCallback{ + l: logger.GetLogger("test-offgrid"), + schemaRepo: newTestSchemaRepo(db, groupName), + } + ctx := &queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + } + handler, err := callback.CreatePartHandler(ctx) + require.NoError(t, err) + defer func() { _ = handler.(*syncPartContext).Close() }() + partCtx := handler.(*syncPartContext) + require.NotNil(t, partCtx.segment) + + gotStart := partCtx.segment.GetTimeRange().Start + require.True(t, gotStart.Equal(wantStart), + "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; expected aligned start=%s (ir=%+v)", + rawTS, gotStart, wantStart, ir) + // Sanity: assert the raw ts itself would NOT have produced this start — + // otherwise the test passes vacuously when raw happens to land on grid. + require.False(t, rawTS.Equal(wantStart), + "test setup is degenerate: rawTS already aligned, cannot detect missing Standard call") +} + +// TestSegmentCreateTS_ConsistencyAcrossPaths verifies that both the part +// chunk-sync receiver (Path A, raw ts straight from a part's MinTimestamp) +// and the sidx series-sync receiver (Path B, ts pre-standardized at the +// liaison) converge on the SAME segment Start. A pre-existing aligned +// segment in an older grid bucket simulates historical data on disk — it +// must NOT be touched, and the two paths' new writes must land in the +// same new bucket as each other. +// +// This is the regression demo for the PR #1120 fault where Path A used +// raw ts while Path B (sidx via liaison) used Standard ts, so the same +// real datapoint produced part data in one segment and sidx data in +// another. +func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) { + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "consistency-group" + ir := storage.IntervalRule{Unit: storage.DAY, Num: 5} + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + // Historical segment: pre-create an aligned segment in a much older grid + // bucket so the test cluster looks like it has prior data on disk. + historicalRaw := time.Date(2026, 4, 10, 0, 0, 0, 0, time.Local) + historicalStart := ir.Standard(historicalRaw) + histSeg, err := db.CreateSegmentIfNotExist(historicalStart) + require.NoError(t, err) + histSeg.DecRef() + histSeg.DecRef() // refCount->0, idle-closed + + // New raw datapoint with an off-grid (06:00 inside a 5-day bucket) ts. + // liaison-side sidx pre-standardizes it; data-node-side part chunk-sync + // also runs it through Standard now. + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantNewStart := ir.Standard(rawTS) + require.False(t, wantNewStart.Equal(historicalStart), + "test setup degenerate: new bucket coincides with historical bucket") + require.False(t, wantNewStart.Equal(rawTS), + "test setup degenerate: rawTS already on grid, cannot detect Standard") + + repo := newTestSchemaRepo(db, groupName) + + // Path A: part chunk-sync receiver gets the raw MinTimestamp from the + // transferred part's metadata. Aligned internally by the fix. + chunkCallback := &syncCallback{l: logger.GetLogger("test-pathA"), schemaRepo: repo} + handlerA, err := chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerA.(*syncPartContext).Close() }() + gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start + + // Path B: sidx series-sync receiver gets a MinTimestamp the liaison has + // already aligned via Standard. The handler uses it as-is. + seriesCallback := &syncSeriesCallback{l: logger.GetLogger("test-pathB"), schemaRepo: repo} + handlerB, err := seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 2, + Group: groupName, + ShardID: 0, + MinTimestamp: wantNewStart.UnixNano(), + MaxTimestamp: wantNewStart.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerB.(*syncSeriesContext).Close() }() + gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start + + require.True(t, gotStartA.Equal(gotStartB), + "REGRESSION: Path A (raw ts %s -> segment %s) and Path B (aligned ts %s -> segment %s) diverged", + rawTS, gotStartA, wantNewStart, gotStartB) + require.True(t, gotStartA.Equal(wantNewStart), + "path A segment start %s != expected aligned %s", gotStartA, wantNewStart) + require.False(t, gotStartA.Equal(historicalStart), + "new write must NOT land in historical segment %s", historicalStart) +} + +func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir storage.IntervalRule) storage.TSDB[*tsTable, option] { + t.Helper() + opts := storage.TSDBOpts[*tsTable, option]{ + ShardNum: 1, + Location: filepath.Join(tmpPath, "tab"), + TSTableCreator: newTSTable, + SegmentInterval: ir, + TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60}, + Option: option{protector: protector.Nop{}, mergePolicy: newDefaultMergePolicyForTesting()}, + } + require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm)) + ctx := common.SetPosition( + context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test-offgrid")), + func(p common.Position) common.Position { + p.Database = "test-offgrid" + return p + }, + ) + db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName) + require.NoError(t, err) + return db +} diff --git a/banyand/stream/write_data.go b/banyand/stream/write_data.go index 76f4e4ced..8506a36e7 100644 --- a/banyand/stream/write_data.go +++ b/banyand/stream/write_data.go @@ -116,7 +116,8 @@ func (s *syncCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) (que s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") return nil, err } - segmentTime := time.Unix(0, ctx.MinTimestamp) + // Align to the segment grid so part-chunk sync and sidx land in the same segment. + segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, ctx.MinTimestamp)) segment, err := tsdb.CreateSegmentIfNotExist(segmentTime) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment") diff --git a/banyand/stream/write_data_segmentref_test.go b/banyand/stream/write_data_segmentref_test.go index 81784ac1c..509d333b9 100644 --- a/banyand/stream/write_data_segmentref_test.go +++ b/banyand/stream/write_data_segmentref_test.go @@ -114,6 +114,12 @@ func TestSyncReceiver_SegmentRefOwnership(t *testing.T) { // syncCallback.CreatePartHandler through a minimal schemaRepo shim and // asserts that segment ownership transfers to partCtx and that Close // actually DecRefs. See trace package for rationale. +// In each StoresSegment test, segTime must use time.Local because the +// callback path runs time.Unix(0,ns) -> Standard -> CreateSegmentIfNotExist +// in Local tz. If the warmup segment were created in UTC, the two calls +// could represent the same instant but format to different segment-suffix +// strings, causing a directory-name mismatch instead of reusing the same +// segment. func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) { tmpPath, cleanup := test.Space(require.New(t)) defer cleanup() @@ -122,7 +128,7 @@ func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) { db := openTestTSDBForRefTest(t, tmpPath, &openShardCount) defer db.Close() - segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC) + segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local) warmup, err := db.CreateSegmentIfNotExist(segTime) require.NoError(t, err) @@ -246,3 +252,131 @@ type fakeGroup struct { func (f *fakeGroup) GetSchema() *commonv1.Group { return nil } func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb } + +// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers +// the regression where the stream part chunk-sync receiver passed the raw +// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous +// measure test for full rationale. +func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t *testing.T) { + t.Run("DAY(1)", func(t *testing.T) { + runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 1}) + }) + t.Run("DAY(5)", func(t *testing.T) { + runStreamOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 5}) + }) +} + +func runStreamOffGridCase(t *testing.T, ir storage.IntervalRule) { + t.Helper() + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "off-grid-stream" + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantStart := ir.Standard(rawTS) + + callback := &syncCallback{ + l: logger.GetLogger("test-offgrid-stream"), + schemaRepo: newTestSchemaRepo(db, groupName), + } + handler, err := callback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handler.(*syncPartContext).Close() }() + gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start + require.True(t, gotStart.Equal(wantStart), + "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; expected aligned %s (ir=%+v)", + rawTS, gotStart, wantStart, ir) + require.False(t, rawTS.Equal(wantStart), + "test setup degenerate: rawTS already aligned") +} + +// TestSegmentCreateTS_ConsistencyAcrossPaths is the stream variant of the +// Path-A vs Path-B convergence demo. A pre-existing aligned historical +// segment must NOT be touched; both new writes must land in the same +// new bucket. +func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) { + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "consistency-stream" + ir := storage.IntervalRule{Unit: storage.DAY, Num: 5} + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, time.Local)) + histSeg, err := db.CreateSegmentIfNotExist(historicalStart) + require.NoError(t, err) + histSeg.DecRef() + histSeg.DecRef() + + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantNewStart := ir.Standard(rawTS) + require.False(t, wantNewStart.Equal(historicalStart)) + require.False(t, wantNewStart.Equal(rawTS)) + + repo := newTestSchemaRepo(db, groupName) + + chunkCallback := &syncCallback{l: logger.GetLogger("test-stream-pathA"), schemaRepo: repo} + handlerA, err := chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerA.(*syncPartContext).Close() }() + gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start + + seriesCallback := &syncSeriesCallback{l: logger.GetLogger("test-stream-pathB"), schemaRepo: repo} + handlerB, err := seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 2, + Group: groupName, + ShardID: 0, + MinTimestamp: wantNewStart.UnixNano(), + MaxTimestamp: wantNewStart.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerB.(*syncSeriesContext).Close() }() + gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start + + require.True(t, gotStartA.Equal(gotStartB), + "REGRESSION: stream Path A (raw %s -> %s) and Path B (aligned %s -> %s) diverged", + rawTS, gotStartA, wantNewStart, gotStartB) + require.True(t, gotStartA.Equal(wantNewStart), + "path A start %s != expected aligned %s", gotStartA, wantNewStart) + require.False(t, gotStartA.Equal(historicalStart), + "new write must NOT land in historical segment %s", historicalStart) +} + +func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir storage.IntervalRule) storage.TSDB[*tsTable, option] { + t.Helper() + opts := storage.TSDBOpts[*tsTable, option]{ + ShardNum: 1, + Location: filepath.Join(tmpPath, "tab"), + TSTableCreator: newTSTable, + SegmentInterval: ir, + TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60}, + Option: option{protector: protector.Nop{}, mergePolicy: newDefaultMergePolicyForTesting()}, + } + require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm)) + ctx := common.SetPosition( + context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test-offgrid-stream")), + func(p common.Position) common.Position { + p.Database = "test-offgrid-stream" + return p + }, + ) + db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName) + require.NoError(t, err) + return db +} diff --git a/banyand/trace/write_data.go b/banyand/trace/write_data.go index 16c989b52..7a57ba4ce 100644 --- a/banyand/trace/write_data.go +++ b/banyand/trace/write_data.go @@ -288,7 +288,8 @@ func (s *syncChunkCallback) CreatePartHandler(ctx *queue.ChunkedSyncPartContext) s.l.Error().Err(err).Str("group", ctx.Group).Msg("failed to load TSDB for group") return nil, err } - segmentTime := time.Unix(0, ctx.MinTimestamp) + // Align to the segment grid so part-chunk sync and sidx land in the same segment. + segmentTime := tsdb.SegmentInterval().Standard(time.Unix(0, ctx.MinTimestamp)) segment, err := tsdb.CreateSegmentIfNotExist(segmentTime) if err != nil { s.l.Error().Err(err).Str("group", ctx.Group).Time("segmentTime", segmentTime).Msg("failed to create segment") diff --git a/banyand/trace/write_data_segmentref_test.go b/banyand/trace/write_data_segmentref_test.go index f332141d8..d170bb3cd 100644 --- a/banyand/trace/write_data_segmentref_test.go +++ b/banyand/trace/write_data_segmentref_test.go @@ -213,6 +213,12 @@ func openTestTSDBForRefTest(t *testing.T, tmpPath string, openShardCount *atomic // // This complements TestSyncReceiver_SegmentRefOwnership, which builds // syncPartContext manually. Together they cover every line of the fix. +// +// segTime must use time.Local: the callback path runs time.Unix(0,ns) +// -> Standard -> CreateSegmentIfNotExist in Local tz. If the warmup +// segment were created in UTC, the two calls could represent the same +// instant but format to different segment-suffix strings, causing a +// directory-name mismatch instead of reusing the same segment. func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) { tmpPath, cleanup := test.Space(require.New(t)) defer cleanup() @@ -221,7 +227,7 @@ func TestSyncChunkCallback_CreatePartHandler_StoresSegment(t *testing.T) { db := openTestTSDBForRefTest(t, tmpPath, &openShardCount) defer db.Close() - segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.UTC) + segTime := time.Date(2026, 4, 17, 0, 0, 0, 0, time.Local) // Drive the segment into the idle-closed state (refCount=0) so that the // probe at the end of this test reliably exercises the `initialize` @@ -352,3 +358,131 @@ type fakeGroup struct { func (f *fakeGroup) GetSchema() *commonv1.Group { return nil } func (f *fakeGroup) SupplyTSDB() io.Closer { return f.tsdb } + +// TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp covers +// the regression where the trace part chunk-sync receiver passed the raw +// MinTimestamp straight into CreateSegmentIfNotExist. See the analogous +// measure test for full rationale. +func TestSyncChunkCallback_CreatePartHandler_AlignsOffGridMinTimestamp(t *testing.T) { + t.Run("DAY(1)", func(t *testing.T) { + runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 1}) + }) + t.Run("DAY(5)", func(t *testing.T) { + runTraceOffGridCase(t, storage.IntervalRule{Unit: storage.DAY, Num: 5}) + }) +} + +func runTraceOffGridCase(t *testing.T, ir storage.IntervalRule) { + t.Helper() + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "off-grid-trace" + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantStart := ir.Standard(rawTS) + + callback := &syncChunkCallback{ + l: logger.GetLogger("test-offgrid-trace"), + schemaRepo: newTestSchemaRepo(db, groupName), + } + handler, err := callback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handler.(*syncPartContext).Close() }() + gotStart := handler.(*syncPartContext).segment.GetTimeRange().Start + require.True(t, gotStart.Equal(wantStart), + "REGRESSION: raw MinTimestamp=%s landed in segment start=%s; expected aligned %s (ir=%+v)", + rawTS, gotStart, wantStart, ir) + require.False(t, rawTS.Equal(wantStart), + "test setup degenerate: rawTS already aligned") +} + +// TestSegmentCreateTS_ConsistencyAcrossPaths is the trace variant of the +// Path-A vs Path-B convergence demo. A pre-existing aligned historical +// segment must NOT be touched; both new writes must land in the same +// new bucket. +func TestSegmentCreateTS_ConsistencyAcrossPaths(t *testing.T) { + tmpPath, cleanup := test.Space(require.New(t)) + defer cleanup() + + const groupName = "consistency-trace" + ir := storage.IntervalRule{Unit: storage.DAY, Num: 5} + db := openTestTSDBWithInterval(t, tmpPath, groupName, ir) + defer db.Close() + + historicalStart := ir.Standard(time.Date(2026, 4, 10, 0, 0, 0, 0, time.Local)) + histSeg, err := db.CreateSegmentIfNotExist(historicalStart) + require.NoError(t, err) + histSeg.DecRef() + histSeg.DecRef() + + rawTS := time.Date(2026, 5, 15, 6, 0, 0, 0, time.Local) + wantNewStart := ir.Standard(rawTS) + require.False(t, wantNewStart.Equal(historicalStart)) + require.False(t, wantNewStart.Equal(rawTS)) + + repo := newTestSchemaRepo(db, groupName) + + chunkCallback := &syncChunkCallback{l: logger.GetLogger("test-trace-pathA"), schemaRepo: repo} + handlerA, err := chunkCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 1, + Group: groupName, + ShardID: 0, + MinTimestamp: rawTS.UnixNano(), + MaxTimestamp: rawTS.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerA.(*syncPartContext).Close() }() + gotStartA := handlerA.(*syncPartContext).segment.GetTimeRange().Start + + seriesCallback := &syncSeriesCallback{l: logger.GetLogger("test-trace-pathB"), schemaRepo: repo} + handlerB, err := seriesCallback.CreatePartHandler(&queue.ChunkedSyncPartContext{ + ID: 2, + Group: groupName, + ShardID: 0, + MinTimestamp: wantNewStart.UnixNano(), + MaxTimestamp: wantNewStart.UnixNano(), + }) + require.NoError(t, err) + defer func() { _ = handlerB.(*syncSeriesContext).Close() }() + gotStartB := handlerB.(*syncSeriesContext).segment.GetTimeRange().Start + + require.True(t, gotStartA.Equal(gotStartB), + "REGRESSION: trace Path A (raw %s -> %s) and Path B (aligned %s -> %s) diverged", + rawTS, gotStartA, wantNewStart, gotStartB) + require.True(t, gotStartA.Equal(wantNewStart), + "path A start %s != expected aligned %s", gotStartA, wantNewStart) + require.False(t, gotStartA.Equal(historicalStart), + "new write must NOT land in historical segment %s", historicalStart) +} + +func openTestTSDBWithInterval(t *testing.T, tmpPath, groupName string, ir storage.IntervalRule) storage.TSDB[*tsTable, option] { + t.Helper() + opts := storage.TSDBOpts[*tsTable, option]{ + ShardNum: 1, + Location: filepath.Join(tmpPath, "tab"), + TSTableCreator: newTSTable, + SegmentInterval: ir, + TTL: storage.IntervalRule{Unit: ir.Unit, Num: 60}, + Option: option{protector: protector.Nop{}, mergePolicy: newDefaultMergePolicyForTesting()}, + } + require.NoError(t, os.MkdirAll(opts.Location, storage.DirPerm)) + ctx := common.SetPosition( + context.WithValue(context.Background(), logger.ContextKey, logger.GetLogger("test-offgrid-trace")), + func(p common.Position) common.Position { + p.Database = "test-offgrid-trace" + return p + }, + ) + db, err := storage.OpenTSDB[*tsTable, option](ctx, opts, nil, groupName) + require.NoError(t, err) + return db +}
