This is an automated email from the ASF dual-hosted git repository.
hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git
The following commit(s) were added to refs/heads/main by this push:
new e81064021 fix(trace): accept out-of-order timestamps within the same
traceID (#1114)
e81064021 is described below
commit e8106402114f81371f694314ed02b236eda69094
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu May 7 07:42:27 2026 +0800
fix(trace): accept out-of-order timestamps within the same traceID (#1114)
---
CHANGES.md | 1 +
banyand/trace/block_writer.go | 36 ++-------------------------
banyand/trace/block_writer_test.go | 50 ++++++++++++++++++++++++++++++++++++++
3 files changed, 53 insertions(+), 34 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 3195f1d79..1a8a81e63 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@ Release Notes.
- Fix flaky `TestCollectWithPartialClosedSegments` by raising
`SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open
segments as idle.
- Fix FODC lifecycle cache poisoning where transient `InspectAll` failures
were cached for 10 minutes and masked liaison recovery; raise FODC agent and
proxy timeouts from 10s to 40s.
- Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g.
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
+- Fix trace `block_writer` panic on out-of-order timestamps within the same
traceID, which dropped one trace-write batch per panic in multi-agent
SkyWalking deployments. Spans of a single trace originate from
independently-clocked services, and trace storage is organized by traceID
rather than timestamp, so per-traceID timestamp monotonicity is not a writer
invariant.
### Chores
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index c4267dd58..c14ec5d1e 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -149,11 +149,8 @@ type blockWriter struct {
primaryBlockMetadata primaryBlockMetadata
totalUncompressedSpanSizeBytes uint64
totalCount uint64
- minTimestamp int64
totalMinTimestamp int64
totalMaxTimestamp int64
- minTimestampLast int64
- maxTimestamp int64
totalBlocksCount uint64
hasWrittenBlocks bool
}
@@ -169,9 +166,6 @@ func (bw *blockWriter) reset() {
} else {
bw.tagType.reset()
}
- bw.minTimestampLast = 0
- bw.minTimestamp = 0
- bw.maxTimestamp = 0
bw.totalUncompressedSpanSizeBytes = 0
bw.totalCount = 0
bw.totalBlocksCount = 0
@@ -230,12 +224,10 @@ func (bw *blockWriter) mustWriteBlock(tid string, b
*block) {
if tid < bw.tidLast {
logger.Panicf("the tid=%s cannot be smaller than the previously
written tid=%s", tid, bw.tidLast)
}
- hasWrittenBlocks := bw.hasWrittenBlocks
- if !hasWrittenBlocks {
+ if !bw.hasWrittenBlocks {
bw.tidFirst = tid
bw.hasWrittenBlocks = true
}
- isSeenTid := tid == bw.tidLast
bw.tidLast = tid
bm := generateBlockMetadata()
@@ -251,16 +243,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b
*block) {
if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
bw.totalMaxTimestamp = tm.max
}
- if !hasWrittenBlocks || tm.min < bw.minTimestamp {
- bw.minTimestamp = tm.min
- }
- if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
- bw.maxTimestamp = tm.max
- }
- if isSeenTid && tm.min < bw.minTimestampLast {
- logger.Panicf("the block for tid=%s cannot contain timestamp
smaller than %d, but it contains timestamp %d", tid, bw.minTimestampLast,
tm.min)
- }
- bw.minTimestampLast = tm.min
bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
bw.totalCount += bm.count
@@ -280,8 +262,6 @@ func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
}
bw.hasWrittenBlocks = false
- bw.minTimestamp = 0
- bw.maxTimestamp = 0
bw.tidFirst = ""
}
@@ -293,12 +273,10 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
if bm.traceID < bw.tidLast {
logger.Panicf("the tid=%s cannot be smaller than the previously
written tid=%s", bm.traceID, bw.tidLast)
}
- hasWrittenBlocks := bw.hasWrittenBlocks
- if !hasWrittenBlocks {
+ if !bw.hasWrittenBlocks {
bw.tidFirst = bm.traceID
bw.hasWrittenBlocks = true
}
- isSeenTid := bm.traceID == bw.tidLast
bw.tidLast = bm.traceID
if bw.traceIDFilter != nil && bw.traceIDFilter.filter != nil {
bw.traceIDFilter.filter.Add(convert.StringToBytes(bm.traceID))
@@ -312,16 +290,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
bw.totalMaxTimestamp = tm.max
}
- if !hasWrittenBlocks || tm.min < bw.minTimestamp {
- bw.minTimestamp = tm.min
- }
- if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
- bw.maxTimestamp = tm.max
- }
- if isSeenTid && tm.min < bw.minTimestampLast {
- logger.Panicf("the block for tid=%s cannot contain timestamp
smaller than %d, but it contains timestamp %d", bm.traceID,
bw.minTimestampLast, tm.min)
- }
- bw.minTimestampLast = tm.min
bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
bw.totalCount += bm.count
diff --git a/banyand/trace/block_writer_test.go
b/banyand/trace/block_writer_test.go
index 754b6ec79..053de756d 100644
--- a/banyand/trace/block_writer_test.go
+++ b/banyand/trace/block_writer_test.go
@@ -165,6 +165,56 @@ func TestMultiplePrimaryBlockFlushWithPartIter(t
*testing.T) {
}
}
+// TestBlockWriter_OutOfOrderTimestampsSameTraceID exercises the scenario from
+// https://github.com/apache/skywalking/issues/13860: SkyWalking OAP forwards
+// segments produced by multiple agents whose wall clocks drift relative to
+// each other, so a later batch for the same traceID can carry timestamps
+// earlier than an earlier batch. Spans of one trace have no meaningful order
+// here — the writer must accept them without panicking.
+func TestBlockWriter_OutOfOrderTimestampsSameTraceID(t *testing.T) {
+ const tid = "6359c73a002c425785500f958cdc4007.661.17779941290647127"
+
+ mp := &memPart{}
+ mp.reset()
+ bw := generateBlockWriter()
+ bw.MustInitForMemPart(mp, 1)
+
+ tagsFor := func(i int) []*tagValue {
+ return []*tagValue{
+ {tag: "service", valueType: pbv1.ValueTypeStr, value:
[]byte(fmt.Sprintf("svc-%d", i))},
+ }
+ }
+ mustWrite := func(timestamps []int64) {
+ spans := make([][]byte, len(timestamps))
+ spanIDs := make([]string, len(timestamps))
+ tagSets := make([][]*tagValue, len(timestamps))
+ for i, ts := range timestamps {
+ spans[i] = []byte(fmt.Sprintf("span-%d", ts))
+ spanIDs[i] = fmt.Sprintf("span-%d", ts)
+ tagSets[i] = tagsFor(i)
+ }
+ bw.MustWriteTrace(tid, spans, tagSets, timestamps, spanIDs)
+ }
+
+ // First batch: minTS=1_777_994_129_833_000_000.
+ mustWrite([]int64{1_777_994_129_833_000_000, 1_777_994_129_900_000_000})
+ // Second batch for the same tid carries an earlier minTS — this used to
+ // fail the per-traceID timestamp monotonicity check and discard the
batch
+ // on production traffic.
+ require.NotPanics(t, func() {
+ mustWrite([]int64{1_777_994_129_064_000_000})
+ })
+
+ bw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType)
+ releaseBlockWriter(bw)
+
+ assert.Equal(t, uint64(3), mp.partMetadata.TotalCount, "all 3 spans
must be persisted")
+ assert.Equal(t, uint64(2), mp.partMetadata.BlocksCount, "two
MustWriteTrace calls produce two blocks")
+ assert.Equal(t, int64(1_777_994_129_064_000_000),
mp.partMetadata.MinTimestamp,
+ "part-level MinTimestamp aggregates across batches regardless
of arrival order")
+ assert.Equal(t, int64(1_777_994_129_900_000_000),
mp.partMetadata.MaxTimestamp)
+}
+
func generateLargeTraceSet() *traces {
// maxUncompressedPrimaryBlockSize = 128KB
// Block metadata contains: traceID, timestamps, count, uncompressed
size, and tag metadata