This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch 
fix/trace-block-writer-out-of-order-timestamps
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git

commit 2612144b3a2002f03d540cbd88b16e44c4f0866d
Author: Hongtao Gao <[email protected]>
AuthorDate: Wed May 6 06:42:01 2026 +0000

    fix(trace): accept out-of-order timestamps within the same traceID
    
    block_writer panicked when a same-traceID block carried tm.min smaller
    than the previously-written block's tm.min, discarding one trace-write
    batch per occurrence. SkyWalking traces are composed of segments from
    multiple Java agents whose wall clocks drift relative to each other, and
    trace storage is keyed by traceID rather than timestamp, so per-traceID
    timestamp monotonicity is not a writer invariant. Drop the check and
    its now-unused minTimestampLast bookkeeping. Add a regression test that
    mirrors the production traceID/timestamps from
    https://github.com/apache/skywalking/issues/13860.
---
 CHANGES.md                         |  1 +
 banyand/trace/block_writer.go      | 12 ---------
 banyand/trace/block_writer_test.go | 50 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 51 insertions(+), 12 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index a0c3dc0a9..c1ae0e3b2 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -64,6 +64,7 @@ Release Notes.
 - Fix flaky `TestCollectWithPartialClosedSegments` by raising 
`SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open 
segments as idle.
 - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures 
were cached for 10 minutes and masked liaison recovery; raise FODC agent and 
proxy timeouts from 10s to 40s.
 - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. 
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to 
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
+- Fix trace `block_writer` panic on out-of-order timestamps within the same 
traceID, which dropped one trace-write batch per panic in multi-agent 
SkyWalking deployments. Spans of a single trace originate from 
independently-clocked services, and trace storage is organized by traceID 
rather than timestamp, so per-traceID timestamp monotonicity is not a writer 
invariant.
 
 ### Chores
 
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index c4267dd58..aed75b954 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -152,7 +152,6 @@ type blockWriter struct {
        minTimestamp                   int64
        totalMinTimestamp              int64
        totalMaxTimestamp              int64
-       minTimestampLast               int64
        maxTimestamp                   int64
        totalBlocksCount               uint64
        hasWrittenBlocks               bool
@@ -169,7 +168,6 @@ func (bw *blockWriter) reset() {
        } else {
                bw.tagType.reset()
        }
-       bw.minTimestampLast = 0
        bw.minTimestamp = 0
        bw.maxTimestamp = 0
        bw.totalUncompressedSpanSizeBytes = 0
@@ -235,7 +233,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b *block) 
{
                bw.tidFirst = tid
                bw.hasWrittenBlocks = true
        }
-       isSeenTid := tid == bw.tidLast
        bw.tidLast = tid
 
        bm := generateBlockMetadata()
@@ -257,10 +254,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b 
*block) {
        if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
                bw.maxTimestamp = tm.max
        }
-       if isSeenTid && tm.min < bw.minTimestampLast {
-               logger.Panicf("the block for tid=%s cannot contain timestamp 
smaller than %d, but it contains timestamp %d", tid, bw.minTimestampLast, 
tm.min)
-       }
-       bw.minTimestampLast = tm.min
 
        bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
        bw.totalCount += bm.count
@@ -298,7 +291,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
                bw.tidFirst = bm.traceID
                bw.hasWrittenBlocks = true
        }
-       isSeenTid := bm.traceID == bw.tidLast
        bw.tidLast = bm.traceID
        if bw.traceIDFilter != nil && bw.traceIDFilter.filter != nil {
                bw.traceIDFilter.filter.Add(convert.StringToBytes(bm.traceID))
@@ -318,10 +310,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
        if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
                bw.maxTimestamp = tm.max
        }
-       if isSeenTid && tm.min < bw.minTimestampLast {
-               logger.Panicf("the block for tid=%s cannot contain timestamp 
smaller than %d, but it contains timestamp %d", bm.traceID, 
bw.minTimestampLast, tm.min)
-       }
-       bw.minTimestampLast = tm.min
 
        bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
        bw.totalCount += bm.count
diff --git a/banyand/trace/block_writer_test.go 
b/banyand/trace/block_writer_test.go
index 754b6ec79..940132399 100644
--- a/banyand/trace/block_writer_test.go
+++ b/banyand/trace/block_writer_test.go
@@ -165,6 +165,56 @@ func TestMultiplePrimaryBlockFlushWithPartIter(t 
*testing.T) {
        }
 }
 
+// TestBlockWriter_OutOfOrderTimestampsSameTraceID exercises the scenario from
+// https://github.com/apache/skywalking/issues/13860: SkyWalking OAP forwards
+// segments produced by multiple agents whose wall clocks drift relative to
+// each other, so a later batch for the same traceID can carry timestamps
+// earlier than an earlier batch. Spans of one trace have no meaningful order
+// here — the writer must accept them without panicking.
+func TestBlockWriter_OutOfOrderTimestampsSameTraceID(t *testing.T) {
+       const tid = "6359c73a002c425785500f958cdc4007.661.17779941290647127"
+
+       mp := &memPart{}
+       mp.reset()
+       bw := generateBlockWriter()
+       bw.MustInitForMemPart(mp, 1)
+
+       tagsFor := func(i int) []*tagValue {
+               return []*tagValue{
+                       {tag: "service", valueType: pbv1.ValueTypeStr, value: 
[]byte(fmt.Sprintf("svc-%d", i))},
+               }
+       }
+       mustWrite := func(timestamps []int64) {
+               spans := make([][]byte, len(timestamps))
+               spanIDs := make([]string, len(timestamps))
+               tagSets := make([][]*tagValue, len(timestamps))
+               for i, ts := range timestamps {
+                       spans[i] = []byte(fmt.Sprintf("span-%d", ts))
+                       spanIDs[i] = fmt.Sprintf("span-%d", ts)
+                       tagSets[i] = tagsFor(i)
+               }
+               bw.MustWriteTrace(tid, spans, tagSets, timestamps, spanIDs)
+       }
+
+       // First batch: minTS=1_777_994_129_833_000_000.
+       mustWrite([]int64{1_777_994_129_833_000_000, 1_777_994_129_900_000_000})
+       // Second batch for the same tid carries an earlier minTS — this used to
+       // trip block_writer's `tm.min < bw.minTimestampLast` panic and discard
+       // the batch on production traffic.
+       require.NotPanics(t, func() {
+               mustWrite([]int64{1_777_994_129_064_000_000})
+       })
+
+       bw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType)
+       releaseBlockWriter(bw)
+
+       assert.Equal(t, uint64(3), mp.partMetadata.TotalCount, "all 3 spans 
must be persisted")
+       assert.Equal(t, uint64(2), mp.partMetadata.BlocksCount, "two 
MustWriteTrace calls produce two blocks")
+       assert.Equal(t, int64(1_777_994_129_064_000_000), 
mp.partMetadata.MinTimestamp,
+               "part-level MinTimestamp aggregates across batches regardless 
of arrival order")
+       assert.Equal(t, int64(1_777_994_129_900_000_000), 
mp.partMetadata.MaxTimestamp)
+}
+
 func generateLargeTraceSet() *traces {
        // maxUncompressedPrimaryBlockSize = 128KB
        // Block metadata contains: traceID, timestamps, count, uncompressed 
size, and tag metadata

Reply via email to