This is an automated email from the ASF dual-hosted git repository.

hanahmily pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-banyandb.git


The following commit(s) were added to refs/heads/main by this push:
     new e81064021 fix(trace): accept out-of-order timestamps within the same 
traceID (#1114)
e81064021 is described below

commit e8106402114f81371f694314ed02b236eda69094
Author: Gao Hongtao <[email protected]>
AuthorDate: Thu May 7 07:42:27 2026 +0800

    fix(trace): accept out-of-order timestamps within the same traceID (#1114)
---
 CHANGES.md                         |  1 +
 banyand/trace/block_writer.go      | 36 ++-------------------------
 banyand/trace/block_writer_test.go | 50 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 53 insertions(+), 34 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 3195f1d79..1a8a81e63 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -66,6 +66,7 @@ Release Notes.
 - Fix flaky `TestCollectWithPartialClosedSegments` by raising 
`SegmentIdleTimeout` so wall-clock variance on slow CI does not mark still-open 
segments as idle.
 - Fix FODC lifecycle cache poisoning where transient `InspectAll` failures 
were cached for 10 minutes and masked liaison recovery; raise FODC agent and 
proxy timeouts from 10s to 40s.
 - Fix FODC `/cluster/lifecycle` dropping zero-valued group fields (e.g. 
`replicas=0`, `close=false`) under `encoding/json` + `omitempty`; switch to 
`protojson` so all fields are emitted (nil nested messages serialize as `null`).
+- Fix trace `block_writer` panic on out-of-order timestamps within the same 
traceID, which dropped one trace-write batch per panic in multi-agent 
SkyWalking deployments. Spans of a single trace originate from 
independently-clocked services, and trace storage is organized by traceID 
rather than timestamp, so per-traceID timestamp monotonicity is not a writer 
invariant.
 
 ### Chores
 
diff --git a/banyand/trace/block_writer.go b/banyand/trace/block_writer.go
index c4267dd58..c14ec5d1e 100644
--- a/banyand/trace/block_writer.go
+++ b/banyand/trace/block_writer.go
@@ -149,11 +149,8 @@ type blockWriter struct {
        primaryBlockMetadata           primaryBlockMetadata
        totalUncompressedSpanSizeBytes uint64
        totalCount                     uint64
-       minTimestamp                   int64
        totalMinTimestamp              int64
        totalMaxTimestamp              int64
-       minTimestampLast               int64
-       maxTimestamp                   int64
        totalBlocksCount               uint64
        hasWrittenBlocks               bool
 }
@@ -169,9 +166,6 @@ func (bw *blockWriter) reset() {
        } else {
                bw.tagType.reset()
        }
-       bw.minTimestampLast = 0
-       bw.minTimestamp = 0
-       bw.maxTimestamp = 0
        bw.totalUncompressedSpanSizeBytes = 0
        bw.totalCount = 0
        bw.totalBlocksCount = 0
@@ -230,12 +224,10 @@ func (bw *blockWriter) mustWriteBlock(tid string, b 
*block) {
        if tid < bw.tidLast {
                logger.Panicf("the tid=%s cannot be smaller than the previously 
written tid=%s", tid, bw.tidLast)
        }
-       hasWrittenBlocks := bw.hasWrittenBlocks
-       if !hasWrittenBlocks {
+       if !bw.hasWrittenBlocks {
                bw.tidFirst = tid
                bw.hasWrittenBlocks = true
        }
-       isSeenTid := tid == bw.tidLast
        bw.tidLast = tid
 
        bm := generateBlockMetadata()
@@ -251,16 +243,6 @@ func (bw *blockWriter) mustWriteBlock(tid string, b 
*block) {
        if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
                bw.totalMaxTimestamp = tm.max
        }
-       if !hasWrittenBlocks || tm.min < bw.minTimestamp {
-               bw.minTimestamp = tm.min
-       }
-       if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
-               bw.maxTimestamp = tm.max
-       }
-       if isSeenTid && tm.min < bw.minTimestampLast {
-               logger.Panicf("the block for tid=%s cannot contain timestamp 
smaller than %d, but it contains timestamp %d", tid, bw.minTimestampLast, 
tm.min)
-       }
-       bw.minTimestampLast = tm.min
 
        bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
        bw.totalCount += bm.count
@@ -280,8 +262,6 @@ func (bw *blockWriter) mustFlushPrimaryBlock(data []byte) {
                bw.metaData = bw.primaryBlockMetadata.marshal(bw.metaData)
        }
        bw.hasWrittenBlocks = false
-       bw.minTimestamp = 0
-       bw.maxTimestamp = 0
        bw.tidFirst = ""
 }
 
@@ -293,12 +273,10 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
        if bm.traceID < bw.tidLast {
                logger.Panicf("the tid=%s cannot be smaller than the previously 
written tid=%s", bm.traceID, bw.tidLast)
        }
-       hasWrittenBlocks := bw.hasWrittenBlocks
-       if !hasWrittenBlocks {
+       if !bw.hasWrittenBlocks {
                bw.tidFirst = bm.traceID
                bw.hasWrittenBlocks = true
        }
-       isSeenTid := bm.traceID == bw.tidLast
        bw.tidLast = bm.traceID
        if bw.traceIDFilter != nil && bw.traceIDFilter.filter != nil {
                bw.traceIDFilter.filter.Add(convert.StringToBytes(bm.traceID))
@@ -312,16 +290,6 @@ func (bw *blockWriter) mustWriteRawBlock(r *rawBlock) {
        if bw.totalCount == 0 || tm.max > bw.totalMaxTimestamp {
                bw.totalMaxTimestamp = tm.max
        }
-       if !hasWrittenBlocks || tm.min < bw.minTimestamp {
-               bw.minTimestamp = tm.min
-       }
-       if !hasWrittenBlocks || tm.max > bw.maxTimestamp {
-               bw.maxTimestamp = tm.max
-       }
-       if isSeenTid && tm.min < bw.minTimestampLast {
-               logger.Panicf("the block for tid=%s cannot contain timestamp 
smaller than %d, but it contains timestamp %d", bm.traceID, 
bw.minTimestampLast, tm.min)
-       }
-       bw.minTimestampLast = tm.min
 
        bw.totalUncompressedSpanSizeBytes += bm.uncompressedSpanSizeBytes
        bw.totalCount += bm.count
diff --git a/banyand/trace/block_writer_test.go 
b/banyand/trace/block_writer_test.go
index 754b6ec79..053de756d 100644
--- a/banyand/trace/block_writer_test.go
+++ b/banyand/trace/block_writer_test.go
@@ -165,6 +165,56 @@ func TestMultiplePrimaryBlockFlushWithPartIter(t 
*testing.T) {
        }
 }
 
+// TestBlockWriter_OutOfOrderTimestampsSameTraceID exercises the scenario from
+// https://github.com/apache/skywalking/issues/13860: SkyWalking OAP forwards
+// segments produced by multiple agents whose wall clocks drift relative to
+// each other, so a later batch for the same traceID can carry timestamps
+// earlier than an earlier batch. Spans of one trace have no meaningful order
+// here — the writer must accept them without panicking.
+func TestBlockWriter_OutOfOrderTimestampsSameTraceID(t *testing.T) {
+       const tid = "6359c73a002c425785500f958cdc4007.661.17779941290647127"
+
+       mp := &memPart{}
+       mp.reset()
+       bw := generateBlockWriter()
+       bw.MustInitForMemPart(mp, 1)
+
+       tagsFor := func(i int) []*tagValue {
+               return []*tagValue{
+                       {tag: "service", valueType: pbv1.ValueTypeStr, value: 
[]byte(fmt.Sprintf("svc-%d", i))},
+               }
+       }
+       mustWrite := func(timestamps []int64) {
+               spans := make([][]byte, len(timestamps))
+               spanIDs := make([]string, len(timestamps))
+               tagSets := make([][]*tagValue, len(timestamps))
+               for i, ts := range timestamps {
+                       spans[i] = []byte(fmt.Sprintf("span-%d", ts))
+                       spanIDs[i] = fmt.Sprintf("span-%d", ts)
+                       tagSets[i] = tagsFor(i)
+               }
+               bw.MustWriteTrace(tid, spans, tagSets, timestamps, spanIDs)
+       }
+
+       // First batch: minTS=1_777_994_129_833_000_000.
+       mustWrite([]int64{1_777_994_129_833_000_000, 1_777_994_129_900_000_000})
+       // Second batch for the same tid carries an earlier minTS — this used to
+       // fail the per-traceID timestamp monotonicity check and discard the 
batch
+       // on production traffic.
+       require.NotPanics(t, func() {
+               mustWrite([]int64{1_777_994_129_064_000_000})
+       })
+
+       bw.Flush(&mp.partMetadata, &mp.traceIDFilter, &mp.tagType)
+       releaseBlockWriter(bw)
+
+       assert.Equal(t, uint64(3), mp.partMetadata.TotalCount, "all 3 spans 
must be persisted")
+       assert.Equal(t, uint64(2), mp.partMetadata.BlocksCount, "two 
MustWriteTrace calls produce two blocks")
+       assert.Equal(t, int64(1_777_994_129_064_000_000), 
mp.partMetadata.MinTimestamp,
+               "part-level MinTimestamp aggregates across batches regardless 
of arrival order")
+       assert.Equal(t, int64(1_777_994_129_900_000_000), 
mp.partMetadata.MaxTimestamp)
+}
+
 func generateLargeTraceSet() *traces {
        // maxUncompressedPrimaryBlockSize = 128KB
        // Block metadata contains: traceID, timestamps, count, uncompressed 
size, and tag metadata

Reply via email to