This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch fix/trace-block-writer-out-of-order-timestamps in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 2612144b3a2002f03d540cbd88b16e44c4f0866d Author: Hongtao Gao <[email protected]> AuthorDate: Wed May 6 06:42:01 2026 +0000 fix(trace): accept out-of-order timestamps within the same traceID block_writer panicked when a same-traceID block carried tm.min smaller than the previously-written block's tm.min, discarding one trace-write batch per occurrence. SkyWalking traces are composed of segments from multiple Java agents whose wall clocks drift relative to each other, and trace storage is keyed by traceID rather than timestamp, so per-traceID timestamp monotonicity is not a writer invariant. Drop the check and its now-unused minTimestampLast bookkeeping. Add a regression test that mirrors the production traceID/timestamps from https://github.com/apache/skywalking/issues/13860. --- CHANGES.md | 1 + banyand/trace/block_writer.go | 12 --------- banyand/trace/block_writer_test.go | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 51 insertions(+), 12 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index a0c3dc0a9..c1ae0e3b2 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ Release Notes. - Fix flaky `TestCollectWithPartialClosedSegments` by raising `SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open segments as idle. - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures were cached for 10 minutes and masked liaison recovery; raise FODC agent and proxy timeouts from 10s to 40s. - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. `replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to `protojson` so all fields are emitted (nil nested messages serialize as `null`). +- Fix trace `block_writer` panic on out-of-order timestamps within the same traceID, which dropped one trace-write batch per panic in multi-agent SkyWalking deployments. Spans of a single trace originate from independently-clocked services, and trace storage is organized by traceID rather than timestamp, so per-traceID timestamp monotonicity is not a writer invariant. ### Chores diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go index c4267dd58..aed75b954 100644 --- a/banyand/trace/block_writer.go +++ b/banyand/trace/block_writer.go @@ -152,7 +152,6 @@ type blockWriter struct { minTimestamp int64 totalMinTimestamp int64 totalMaxTimestamp int64 - minTimestampLast int64 maxTimestamp int64 totalBlocksCount uint64 hasWrittenBlocks bool @@ -169,7 +168,6 @@ func (bw *blockWriter) reset() { } else { bw.tagType.reset() } - bw.minTimestampLast = 0 bw.minTimestamp = 0 bw.maxTimestamp = 0 bw.totalUncompressedSpanSizeBytes = 0 @@ -235,7 +233,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { bw.tidFirst = tid bw.hasWrittenBlocks = true } - isSeenTid := tid == bw.tidLast bw.tidLast = tid bm := generateBlockMetadata() @@ -257,10 +254,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { if !hasWrittenBlocks || tm.max > bw.maxTimestamp { bw.maxTimestamp = tm.max } - if isSeenTid && tm.min < bw.minTimestampLast { - logger.Panicf("the block for tid=%s cannot contain timestamp smaller than %d, but it contains timestamp %d", tid, bw.minTimestampLast, tm.min) - } - bw.minTimestampLast = tm.min bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes bw.totalCount += bm.count @@ -298,7 +291,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) { bw.tidFirst = bm.traceID bw.hasWrittenBlocks = true } - isSeenTid := bm.traceID == bw.tidLast bw.tidLast = bm.traceID if bw.traceIDFilter != nil && bw.traceIDFilter.filter != nil { bw.traceIDFilter.filter.Add(convert.StringToBytes(bm.traceID)) @@ -318,10 +310,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) { if !hasWrittenBlocks || tm.max > bw.maxTimestamp { bw.maxTimestamp = tm.max } - if isSeenTid && tm.min < bw.minTimestampLast { - logger.Panicf("the block for tid=%s cannot contain timestamp smaller than %d, but it contains timestamp %d", bm.traceID, bw.minTimestampLast, tm.min) - } - bw.minTimestampLast = tm.min bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes bw.totalCount += bm.count diff --git a/banyand/trace/block_writer_test.go b/banyand/trace/block_writer_test.go index 754b6ec79..940132399 100644 --- a/banyand/trace/block_writer_test.go +++ b/banyand/trace/block_writer_test.go @@ -165,6 +165,56 @@ func TestMultiplePrimaryBlockFlushWithPartIter(t *testing.T) { } } +// TestBlockWriter_OutOfOrderTimestampsSameTraceID exercises the scenario from +// https://github.com/apache/skywalking/issues/13860: SkyWalking OAP forwards +// segments produced by multiple agents whose wall clocks drift relative to +// each other, so a later batch for the same traceID can carry timestamps +// earlier than an earlier batch. Spans of one trace have no meaningful order +// here — the writer must accept them without panicking. +func TestBlockWriter_OutOfOrderTimestampsSameTraceID(t *testing.T) { + const tid = "6359c73a002c425785500f958cdc4007.661.17779941290647127" + + mp := &memPart{} + mp.reset() + bw := generateBlockWriter() + bw.MustInitForMemPart(mp, 1) + + tagsFor := func(i int) []*tagValue { + return []*tagValue{ + {tag: "service", valueType: pbv1.ValueTypeStr, value: []byte(fmt.Sprintf("svc-%d", i))}, + } + } + mustWrite := func(timestamps []int64) { + spans := make([][]byte, len(timestamps)) + spanIDs := make([]string, len(timestamps)) + tagSets := make([][]*tagValue, len(timestamps)) + for i, ts := range timestamps { + spans[i] = []byte(fmt.Sprintf("span-%d", ts)) + spanIDs[i] = fmt.Sprintf("span-%d", ts) + tagSets[i] = tagsFor(i) + } + bw.MustWriteTrace(tid, spans, tagSets, timestamps, spanIDs) + } + + // First batch: minTS=1_777_994_129_833_000_000. + mustWrite([]int64{1_777_994_129_833_000_000, 1_777_994_129_900_000_000}) + // Second batch for the same tid carries an earlier minTS — this used to + // trip block_writer's `tm.min < bw.minTimestampLast` panic and discard + // the batch on production traffic. + require.NotPanics(t, func() { + mustWrite([]int64{1_777_994_129_064_000_000}) + }) + + bw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType) + releaseBlockWriter(bw) + + assert.Equal(t, uint64(3), mp.partMetadata.TotalCount, "all 3 spans must be persisted") + assert.Equal(t, uint64(2), mp.partMetadata.BlocksCount, "two MustWriteTrace calls produce two blocks") + assert.Equal(t, int64(1_777_994_129_064_000_000), mp.partMetadata.MinTimestamp, + "part-level MinTimestamp aggregates across batches regardless of arrival order") + assert.Equal(t, int64(1_777_994_129_900_000_000), mp.partMetadata.MaxTimestamp) +} + func generateLargeTraceSet() *traces { // maxUncompressedPrimaryBlockSize = 128KB // Block metadata contains: traceID, timestamps, count, uncompressed size, and tag metadata
