This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch v0.10.x in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
commit 468cb5d44349d9d2a09bd8054048ba4c6b6c891e Author: Gao Hongtao <[email protected]> AuthorDate: Thu May 7 07:42:27 2026 +0800 fix(trace): accept out-of-order timestamps within the same traceID (#1114) --- CHANGES.md | 1 + banyand/trace/block_writer.go | 36 ++------------------------- banyand/trace/block_writer_test.go | 50 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 53 insertions(+), 34 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 6356ba45e..c4b2c5cb7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -31,6 +31,7 @@ Release Notes. - Fix flaky `TestCollectWithPartialClosedSegments` by raising `SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open segments as idle. - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures were cached for 10 minutes and masked liaison recovery; raise FODC agent and proxy timeouts from 10s to 40s. - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. `replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to `protojson` so all fields are emitted (nil nested messages serialize as `null`). +- Fix trace `block_writer` panic on out-of-order timestamps within the same traceID, which dropped one trace-write batch per panic in multi-agent SkyWalking deployments. Spans of a single trace originate from independently-clocked services, and trace storage is organized by traceID rather than timestamp, so per-traceID timestamp monotonicity is not a writer invariant. ### Chores diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go index c4267dd58..c14ec5d1e 100644 --- a/banyand/trace/block_writer.go +++ b/banyand/trace/block_writer.go @@ -149,11 +149,8 @@ type blockWriter struct { primaryBlockMetadata primaryBlockMetadata totalUncompressedSpanSizeBytes uint64 totalCount uint64 - minTimestamp int64 totalMinTimestamp int64 totalMaxTimestamp int64 - minTimestampLast int64 - maxTimestamp int64 totalBlocksCount uint64 hasWrittenBlocks bool } @@ -169,9 +166,6 @@ func (bw *blockWriter) reset() { } else { bw.tagType.reset() } - bw.minTimestampLast = 0 - bw.minTimestamp = 0 - bw.maxTimestamp = 0 bw.totalUncompressedSpanSizeBytes = 0 bw.totalCount = 0 bw.totalBlocksCount = 0 @@ -230,12 +224,10 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { if tid < bw.tidLast { logger.Panicf("the tid=%s cannot be smaller than the previously written tid=%s", tid, bw.tidLast) } - hasWrittenBlocks := bw.hasWrittenBlocks - if !hasWrittenBlocks { + if !bw.hasWrittenBlocks { bw.tidFirst = tid bw.hasWrittenBlocks = true } - isSeenTid := tid == bw.tidLast bw.tidLast = tid bm := generateBlockMetadata() @@ -251,16 +243,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) { if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp { bw.totalMaxTimestamp = tm.max } - if !hasWrittenBlocks || tm.min < bw.minTimestamp { - bw.minTimestamp = tm.min - } - if !hasWrittenBlocks || tm.max > bw.maxTimestamp { - bw.maxTimestamp = tm.max - } - if isSeenTid && tm.min < bw.minTimestampLast { - logger.Panicf("the block for tid=%s cannot contain timestamp smaller than %d, but it contains timestamp %d", tid, bw.minTimestampLast, tm.min) - } - bw.minTimestampLast = tm.min bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes bw.totalCount += bm.count @@ -280,8 +262,6 @@ func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) { bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData) } bw.hasWrittenBlocks = false - bw.minTimestamp = 0 - bw.maxTimestamp = 0 bw.tidFirst = "" } @@ -293,12 +273,10 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) { if bm.traceID < bw.tidLast { logger.Panicf("the tid=%s cannot be smaller than the previously written tid=%s", bm.traceID, bw.tidLast) } - hasWrittenBlocks := bw.hasWrittenBlocks - if !hasWrittenBlocks { + if !bw.hasWrittenBlocks { bw.tidFirst = bm.traceID bw.hasWrittenBlocks = true } - isSeenTid := bm.traceID == bw.tidLast bw.tidLast = bm.traceID if bw.traceIDFilter != nil && bw.traceIDFilter.filter != nil { bw.traceIDFilter.filter.Add(convert.StringToBytes(bm.traceID)) @@ -312,16 +290,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) { if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp { bw.totalMaxTimestamp = tm.max } - if !hasWrittenBlocks || tm.min < bw.minTimestamp { - bw.minTimestamp = tm.min - } - if !hasWrittenBlocks || tm.max > bw.maxTimestamp { - bw.maxTimestamp = tm.max - } - if isSeenTid && tm.min < bw.minTimestampLast { - logger.Panicf("the block for tid=%s cannot contain timestamp smaller than %d, but it contains timestamp %d", bm.traceID, bw.minTimestampLast, tm.min) - } - bw.minTimestampLast = tm.min bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes bw.totalCount += bm.count diff --git a/banyand/trace/block_writer_test.go b/banyand/trace/block_writer_test.go index 754b6ec79..053de756d 100644 --- a/banyand/trace/block_writer_test.go +++ b/banyand/trace/block_writer_test.go @@ -165,6 +165,56 @@ func TestMultiplePrimaryBlockFlushWithPartIter(t *testing.T) { } } +// TestBlockWriter_OutOfOrderTimestampsSameTraceID exercises the scenario from +// https://github.com/apache/skywalking/issues/13860: SkyWalking OAP forwards +// segments produced by multiple agents whose wall clocks drift relative to +// each other, so a later batch for the same traceID can carry timestamps +// earlier than an earlier batch. Spans of one trace have no meaningful order +// here — the writer must accept them without panicking. +func TestBlockWriter_OutOfOrderTimestampsSameTraceID(t *testing.T) { + const tid = "6359c73a002c425785500f958cdc4007.661.17779941290647127" + + mp := &memPart{} + mp.reset() + bw := generateBlockWriter() + bw.MustInitForMemPart(mp, 1) + + tagsFor := func(i int) []*tagValue { + return []*tagValue{ + {tag: "service", valueType: pbv1.ValueTypeStr, value: []byte(fmt.Sprintf("svc-%d", i))}, + } + } + mustWrite := func(timestamps []int64) { + spans := make([][]byte, len(timestamps)) + spanIDs := make([]string, len(timestamps)) + tagSets := make([][]*tagValue, len(timestamps)) + for i, ts := range timestamps { + spans[i] = []byte(fmt.Sprintf("span-%d", ts)) + spanIDs[i] = fmt.Sprintf("span-%d", ts) + tagSets[i] = tagsFor(i) + } + bw.MustWriteTrace(tid, spans, tagSets, timestamps, spanIDs) + } + + // First batch: minTS=1_777_994_129_833_000_000. + mustWrite([]int64{1_777_994_129_833_000_000, 1_777_994_129_900_000_000}) + // Second batch for the same tid carries an earlier minTS — this used to + // fail the per-traceID timestamp monotonicity check and discard the batch + // on production traffic. + require.NotPanics(t, func() { + mustWrite([]int64{1_777_994_129_064_000_000}) + }) + + bw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType) + releaseBlockWriter(bw) + + assert.Equal(t, uint64(3), mp.partMetadata.TotalCount, "all 3 spans must be persisted") + assert.Equal(t, uint64(2), mp.partMetadata.BlocksCount, "two MustWriteTrace calls produce two blocks") + assert.Equal(t, int64(1_777_994_129_064_000_000), mp.partMetadata.MinTimestamp, + "part-level MinTimestamp aggregates across batches regardless of arrival order") + assert.Equal(t, int64(1_777_994_129_900_000_000), mp.partMetadata.MaxTimestamp) +} + func generateLargeTraceSet() *traces { // maxUncompressedPrimaryBlockSize = 128KB // Block metadata contains: traceID, timestamps, count, uncompressed size, and tag metadata
