This is an automated email from the ASF dual-hosted git repository.

wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git


The following commit(s) were added to refs/heads/main by this push:
     new 1474cf8  fix data race in span lifecycle (#247)
1474cf8 is described below

commit 1474cf897b518d00be1c02d334c3a7a4aedc20b7
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 5 22:46:43 2026 +0800

    fix data race in span lifecycle (#247)
---
 CHANGES.md                                         |   2 +
 plugins/core/context.go                            |  12 +-
 plugins/core/correlation.go                        | 132 +++++++
 plugins/core/metrics.go                            |  19 +-
 plugins/core/profile.go                            |  76 ++--
 plugins/core/propagating.go                        |  24 +-
 plugins/core/reporter/grpc/grpc.go                 |  13 +-
 .../reporter/grpc/send_tracing_recover_test.go     |  81 ++++
 plugins/core/reporter/kafka/kafka.go               |  13 +-
 .../reporter/kafka/send_tracing_recover_test.go    |  81 ++++
 plugins/core/segment_datarace_test.go              | 232 ++++++++++-
 plugins/core/span_crash_e2e_test.go                | 134 +++++++
 plugins/core/span_default.go                       | 234 +++++++++--
 plugins/core/span_freeze_test.go                   | 277 +++++++++++++
 plugins/core/span_hostile_workload_test.go         | 435 +++++++++++++++++++++
 plugins/core/span_reuse_test.go                    | 141 +++++++
 plugins/core/span_sync_bench_test.go               |  70 ++++
 plugins/core/span_tracing.go                       | 124 ++++--
 plugins/core/test_base.go                          |  22 +-
 plugins/core/tracing.go                            |  56 ++-
 20 files changed, 2024 insertions(+), 154 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 401c55c..0fcda9d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,8 @@ Release Notes.
 * Fix plugin interceptors bypassed on Windows.
 * Fix wrong tracing context switch when trace ignore plugin activated.
 * Fix data race when sending trace data to reporter.
+* Fix multiple data races in span lifecycle, correlation context and segment 
collection.
+* Add recover protection for the metrics, profile and segment-transform 
goroutines.
 
 #### Issues and PR
 - All issues are 
[here](https://github.com/apache/skywalking/milestone/238?closed=1)
diff --git a/plugins/core/context.go b/plugins/core/context.go
index 4f9cefc..6ae1b7e 100644
--- a/plugins/core/context.go
+++ b/plugins/core/context.go
@@ -89,7 +89,12 @@ func NewTracingContext() *TracingContext {
 }
 
 func (r *RuntimeContext) clone() *RuntimeContext {
-       newData := make(map[string]interface{})
+       if len(r.data) == 0 {
+               // clone runs on every context capture AND continue; skipping 
the empty
+               // map allocation matters because most requests never set 
runtime values
+               return &RuntimeContext{}
+       }
+       newData := make(map[string]interface{}, len(r.data))
        for k, v := range r.data {
                newData[k] = v
        }
@@ -104,8 +109,11 @@ func (r *RuntimeContext) Get(key string) interface{} {
 
 func (r *RuntimeContext) Set(key string, value interface{}) {
        if value == nil {
-               delete(r.data, key)
+               delete(r.data, key) // no-op on a nil map
                return
        }
+       if r.data == nil { // lazily allocated, see clone()
+               r.data = make(map[string]interface{})
+       }
        r.data[key] = value
 }
diff --git a/plugins/core/correlation.go b/plugins/core/correlation.go
new file mode 100644
index 0000000..8af6a9f
--- /dev/null
+++ b/plugins/core/correlation.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import "sync"
+
+// CorrelationContext is the synchronized correlation key/value storage shared
+// by every span of a segment. It replaces the bare map[string]string that used
+// to live on SegmentContext: that map is legitimately reachable from multiple
+// goroutines (snapshot-continued child spans share it with the segment root),
+// so an unsynchronized write racing the exit-span header encoding or the
+// snapshot copy would crash the process with the unrecoverable
+// "concurrent map iteration and map write" fatal error.
+//
+// SegmentContext is copied by value between the spans of a segment, therefore
+// the lock cannot be embedded there; the pointer wrapper keeps a single lock
+// per logical correlation store. RWMutex is the right tool here (unlike the
+// span opLock): correlation is read on every propagation encode by potentially
+// concurrent goroutines while writes are rare.
+type CorrelationContext struct {
+       mu   sync.RWMutex
+       data map[string]string
+}
+
+// newCorrelationContext returns an empty store. The inner map is allocated
+// lazily on the first Set: most spans never touch correlation, and the empty
+// map would be one extra allocation on every segment.
+func newCorrelationContext() *CorrelationContext {
+       return &CorrelationContext{}
+}
+
+// newCorrelationContextFrom builds a store pre-filled with a copy of m
+// (typically the correlation decoded from the inbound propagation headers).
+func newCorrelationContextFrom(m map[string]string) *CorrelationContext {
+       if len(m) == 0 {
+               // keep the lazy inner-map allocation (see 
newCorrelationContext):
+               // most inbound requests carry no correlation values
+               return &CorrelationContext{}
+       }
+       c := &CorrelationContext{data: make(map[string]string, len(m))}
+       for k, v := range m {
+               c.data[k] = v
+       }
+       return c
+}
+
+func (c *CorrelationContext) Get(key string) string {
+       if c == nil {
+               return ""
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return c.data[key]
+}
+
+func (c *CorrelationContext) Set(key, value string) {
+       if c == nil {
+               return
+       }
+       c.mu.Lock()
+       defer c.mu.Unlock()
+       if value == "" {
+               delete(c.data, key) // delete on a nil map is a no-op
+               return
+       }
+       if c.data == nil {
+               c.data = make(map[string]string)
+       }
+       c.data[key] = value
+}
+
+func (c *CorrelationContext) Len() int {
+       if c == nil {
+               return 0
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       return len(c.data)
+}
+
+// Snapshot returns a copy of the correlation data taken under the lock. The
+// propagation header encoding and every other map iteration must go through it
+// instead of ranging over the live map. It returns nil when empty (reading a
+// nil map is safe and this avoids an allocation per exit span).
+func (c *CorrelationContext) Snapshot() map[string]string {
+       if c == nil {
+               return nil
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       if len(c.data) == 0 {
+               return nil
+       }
+       cp := make(map[string]string, len(c.data))
+       for k, v := range c.data {
+               cp[k] = v
+       }
+       return cp
+}
+
+// Clone returns an independent CorrelationContext holding a copy of the data,
+// used when a context snapshot crosses a goroutine boundary.
+func (c *CorrelationContext) Clone() *CorrelationContext {
+       if c == nil {
+               return newCorrelationContext()
+       }
+       c.mu.RLock()
+       defer c.mu.RUnlock()
+       if len(c.data) == 0 {
+               return newCorrelationContext()
+       }
+       cp := &CorrelationContext{data: make(map[string]string, len(c.data))}
+       for k, v := range c.data {
+               cp.data[k] = v
+       }
+       return cp
+}
diff --git a/plugins/core/metrics.go b/plugins/core/metrics.go
index ddac85c..0869889 100644
--- a/plugins/core/metrics.go
+++ b/plugins/core/metrics.go
@@ -19,6 +19,7 @@ package core
 
 import (
        "math"
+       "runtime/debug"
        "sort"
        "strings"
        "sync/atomic"
@@ -40,9 +41,21 @@ func (t *Tracer) initMetricsCollect(meterCollectSecond int) {
                for {
                        time.Sleep(collectDuration)
 
-                       t.reachNotInitMetrics()
-
-                       t.sendMetrics()
+                       // The recover wraps a single iteration: this goroutine 
has no other
+                       // protection and the collect path executes 
user-registered meter
+                       // callbacks, whose panic would otherwise kill the 
whole process.
+                       func() {
+                               defer func() {
+                                       if err := recover(); err != nil {
+                                               if t.Log != nil {
+                                                       t.Log.Errorf("metrics 
collect panic: %v, stack: %s", err, debug.Stack())
+                                               }
+                                       }
+                               }()
+                               t.reachNotInitMetrics()
+
+                               t.sendMetrics()
+                       }()
                }
        }()
 }
diff --git a/plugins/core/profile.go b/plugins/core/profile.go
index c8d8f68..87c76dc 100644
--- a/plugins/core/profile.go
+++ b/plugins/core/profile.go
@@ -18,6 +18,7 @@
 package core
 
 import (
+       "runtime/debug"
        "runtime/pprof"
        "strconv"
        "sync"
@@ -67,43 +68,56 @@ func (m *ProfileManager) initReportChannel() {
        // Original channel for receiving raw data chunks sent by the Writer
        rawCh := make(chan profileRawData, maxSendQueueSize)
        m.rawCh = rawCh
-       var d []byte
        // Start a goroutine to supplement each data chunk with business 
information
        go func() {
                for rawResult := range rawCh {
-                       d = append(d, rawResult.data...)
-                       m.mu.Lock()
-                       // Get business information from currentTask
-                       if m.currentTask == nil {
-                               m.Log.Info("no task")
-                               m.mu.Unlock()
-                               continue // Task has ended, ignore
-                       }
-                       task := m.currentTask
-                       m.mu.Unlock()
-
-                       if rawResult.isLast {
-                               m.FinalReportResults <- reporter.ProfileResult{
-                                       TaskID:  task.taskID,
-                                       Payload: rawResult.data,
-                                       IsLast:  rawResult.isLast,
+                       // The recover wraps a single chunk: this goroutine has 
no other
+                       // protection and a panic here would kill the whole 
process. The
+                       // locked sections below use deferred unlocks so a 
panic can never
+                       // leak a held mutex into the next iteration.
+                       func() {
+                               defer func() {
+                                       if err := recover(); err != nil {
+                                               m.Log.Errorf("profile report 
panic: %v, stack: %s", err, debug.Stack())
+                                       }
+                               }()
+                               // Get business information from currentTask
+                               var task *currentTask
+                               func() {
+                                       m.mu.Lock()
+                                       defer m.mu.Unlock()
+                                       task = m.currentTask
+                               }()
+                               if task == nil {
+                                       m.Log.Info("no task")
+                                       return // Task has ended, ignore
                                }
-                               m.mu.Lock()
-                               if m.TraceProfileTask == nil {
-                                       m.Log.Warn("no TraceProfileTask before 
finish profile")
+
+                               if rawResult.isLast {
+                                       m.FinalReportResults <- 
reporter.ProfileResult{
+                                               TaskID:  task.taskID,
+                                               Payload: rawResult.data,
+                                               IsLast:  rawResult.isLast,
+                                       }
+                                       func() {
+                                               m.mu.Lock()
+                                               defer m.mu.Unlock()
+                                               if m.TraceProfileTask == nil {
+                                                       m.Log.Warn("no 
TraceProfileTask before finish profile")
+                                               } else {
+                                                       
m.TraceProfileTask.Status = reporter.Finished
+                                               }
+                                               m.currentTask = nil
+                                               
m.profileEvents.BaseEventStatus[CurTaskExist] = false
+                                       }()
                                } else {
-                                       m.TraceProfileTask.Status = 
reporter.Finished
+                                       m.FinalReportResults <- 
reporter.ProfileResult{
+                                               TaskID:  task.taskID,
+                                               Payload: rawResult.data,
+                                               IsLast:  rawResult.isLast,
+                                       }
                                }
-                               m.currentTask = nil
-                               m.profileEvents.BaseEventStatus[CurTaskExist] = 
false
-                               m.mu.Unlock()
-                       } else {
-                               m.FinalReportResults <- reporter.ProfileResult{
-                                       TaskID:  task.taskID,
-                                       Payload: rawResult.data,
-                                       IsLast:  rawResult.isLast,
-                               }
-                       }
+                       }()
                }
        }()
 }
diff --git a/plugins/core/propagating.go b/plugins/core/propagating.go
index c47a57c..5756995 100644
--- a/plugins/core/propagating.go
+++ b/plugins/core/propagating.go
@@ -41,16 +41,20 @@ var (
 )
 
 type SpanContext struct {
-       TraceID               string            `json:"trace_id"`
-       ParentSegmentID       string            `json:"parent_segment_id"`
-       ParentService         string            `json:"parent_service"`
-       ParentServiceInstance string            `json:"parent_service_instance"`
-       ParentEndpoint        string            `json:"parent_endpoint"`
-       AddressUsedAtClient   string            `json:"address_used_at_client"`
-       ParentSpanID          int32             `json:"parent_span_id"`
-       Sample                int8              `json:"sample"`
-       Valid                 bool              `json:"valid"`
-       CorrelationContext    map[string]string `json:"correlation_context"`
+       TraceID               string `json:"trace_id"`
+       ParentSegmentID       string `json:"parent_segment_id"`
+       ParentService         string `json:"parent_service"`
+       ParentServiceInstance string `json:"parent_service_instance"`
+       ParentEndpoint        string `json:"parent_endpoint"`
+       AddressUsedAtClient   string `json:"address_used_at_client"`
+       ParentSpanID          int32  `json:"parent_span_id"`
+       Sample                int8   `json:"sample"`
+       Valid                 bool   `json:"valid"`
+       // CorrelationContext here is intentionally a plain map (NOT the
+       // synchronized core.CorrelationContext): SpanContext is the 
short-lived,
+       // single-goroutine wire-format struct holding a decoded inbound header 
or
+       // a Snapshot() copy for outbound encoding.
+       CorrelationContext map[string]string `json:"correlation_context"`
 }
 
 func (s *SpanContext) GetTraceID() string {
diff --git a/plugins/core/reporter/grpc/grpc.go 
b/plugins/core/reporter/grpc/grpc.go
index be60b38..32dab07 100644
--- a/plugins/core/reporter/grpc/grpc.go
+++ b/plugins/core/reporter/grpc/grpc.go
@@ -116,16 +116,19 @@ func (r *gRPCReporter) ConnectionStatus() 
reporter.ConnectionStatus {
 }
 
 func (r *gRPCReporter) SendTracing(spans []reporter.ReportedSpan) {
-       segmentObject := r.transform.TransformSegmentObject(spans)
-       if segmentObject == nil {
-               return
-       }
+       // The recover must be registered BEFORE the transform call: SendTracing
+       // runs on the segment collector goroutine, so a panic escaping from the
+       // transform (or the channel send below, e.g. on a closed tracingSendCh)
+       // would otherwise kill the whole process.
        defer func() {
-               // recover the panic caused by close tracingSendCh
                if err := recover(); err != nil {
                        r.logger.Errorf("reporter segment err %v", err)
                }
        }()
+       segmentObject := r.transform.TransformSegmentObject(spans)
+       if segmentObject == nil {
+               return
+       }
        select {
        case r.tracingSendCh <- segmentObject:
        default:
diff --git a/plugins/core/reporter/grpc/send_tracing_recover_test.go 
b/plugins/core/reporter/grpc/send_tracing_recover_test.go
new file mode 100644
index 0000000..c8d16ae
--- /dev/null
+++ b/plugins/core/reporter/grpc/send_tracing_recover_test.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+
+       commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+       agentv3 
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+)
+
+type capturingLogger struct {
+       errors int32
+}
+
+func (l *capturingLogger) WithField(key string, value interface{}) interface{} 
{ return l }
+func (l *capturingLogger) Info(args ...interface{})                            
{}
+func (l *capturingLogger) Infof(format string, args ...interface{})            
{}
+func (l *capturingLogger) Warn(args ...interface{})                            
{}
+func (l *capturingLogger) Warnf(format string, args ...interface{})            
{}
+func (l *capturingLogger) Error(args ...interface{})                           
{ atomic.AddInt32(&l.errors, 1) }
+func (l *capturingLogger) Errorf(format string, args ...interface{})           
{ atomic.AddInt32(&l.errors, 1) }
+
+// panicReportedSpan triggers a panic as soon as the transform touches it,
+// simulating a corrupted span reaching SendTracing.
+type panicReportedSpan struct{}
+
+func (panicReportedSpan) Context() reporter.SegmentContext     { 
panic("corrupted span") }
+func (panicReportedSpan) Refs() []reporter.SpanContext         { return nil }
+func (panicReportedSpan) StartTime() int64                     { return 0 }
+func (panicReportedSpan) EndTime() int64                       { return 0 }
+func (panicReportedSpan) OperationName() string                { return "op" }
+func (panicReportedSpan) Peer() string                         { return "" }
+func (panicReportedSpan) SpanType() agentv3.SpanType           { return 
agentv3.SpanType_Exit }
+func (panicReportedSpan) SpanLayer() agentv3.SpanLayer         { return 
agentv3.SpanLayer_Database }
+func (panicReportedSpan) IsError() bool                        { return false }
+func (panicReportedSpan) Tags() []*commonv3.KeyStringValuePair { return nil }
+func (panicReportedSpan) Logs() []*agentv3.Log                 { return nil }
+func (panicReportedSpan) ComponentID() int32                   { return 0 }
+
+// TestSendTracingRecoversTransformPanic guards the recover placement in
+// SendTracing: it must be registered BEFORE the transform call, because
+// SendTracing runs on the segment collector goroutine which has no other
+// recover - an escaping panic would kill the whole process.
+func TestSendTracingRecoversTransformPanic(t *testing.T) {
+       logger := &capturingLogger{}
+       r := &gRPCReporter{
+               logger:        logger,
+               transform:     
reporter.NewTransform(&reporter.Entity{ServiceName: "svc", ServiceInstanceName: 
"inst"}),
+               tracingSendCh: make(chan *agentv3.SegmentObject, 1),
+       }
+
+       defer func() {
+               if p := recover(); p != nil {
+                       t.Fatalf("panic escaped SendTracing (recover registered 
too late?): %v", p)
+               }
+       }()
+       r.SendTracing([]reporter.ReportedSpan{panicReportedSpan{}})
+
+       if atomic.LoadInt32(&logger.errors) == 0 {
+               t.Fatal("recovered transform panic was not logged")
+       }
+}
diff --git a/plugins/core/reporter/kafka/kafka.go 
b/plugins/core/reporter/kafka/kafka.go
index 0655d29..558f544 100644
--- a/plugins/core/reporter/kafka/kafka.go
+++ b/plugins/core/reporter/kafka/kafka.go
@@ -249,16 +249,19 @@ func (r *kafkaReporter) logSendLoop() {
 }
 
 func (r *kafkaReporter) SendTracing(spans []reporter.ReportedSpan) {
-       segmentObject := r.transform.TransformSegmentObject(spans)
-       if segmentObject == nil {
-               return
-       }
+       // The recover must be registered BEFORE the transform call: SendTracing
+       // runs on the segment collector goroutine, so a panic escaping from the
+       // transform (or the channel send below, e.g. on a closed tracingSendCh)
+       // would otherwise kill the whole process.
        defer func() {
-               // recover the panic caused by close tracingSendCh
                if err := recover(); err != nil {
                        r.logger.Errorf("reporter segment err %v", err)
                }
        }()
+       segmentObject := r.transform.TransformSegmentObject(spans)
+       if segmentObject == nil {
+               return
+       }
        select {
        case r.tracingSendCh <- segmentObject:
        default:
diff --git a/plugins/core/reporter/kafka/send_tracing_recover_test.go 
b/plugins/core/reporter/kafka/send_tracing_recover_test.go
new file mode 100644
index 0000000..13818d8
--- /dev/null
+++ b/plugins/core/reporter/kafka/send_tracing_recover_test.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kafka
+
+import (
+       "sync/atomic"
+       "testing"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+
+       commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+       agentv3 
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+)
+
+type capturingLogger struct {
+       errors int32
+}
+
+func (l *capturingLogger) WithField(key string, value interface{}) interface{} 
{ return l }
+func (l *capturingLogger) Info(args ...interface{})                            
{}
+func (l *capturingLogger) Infof(format string, args ...interface{})            
{}
+func (l *capturingLogger) Warn(args ...interface{})                            
{}
+func (l *capturingLogger) Warnf(format string, args ...interface{})            
{}
+func (l *capturingLogger) Error(args ...interface{})                           
{ atomic.AddInt32(&l.errors, 1) }
+func (l *capturingLogger) Errorf(format string, args ...interface{})           
{ atomic.AddInt32(&l.errors, 1) }
+
+// panicReportedSpan triggers a panic as soon as the transform touches it,
+// simulating a corrupted span reaching SendTracing.
+type panicReportedSpan struct{}
+
+func (panicReportedSpan) Context() reporter.SegmentContext     { 
panic("corrupted span") }
+func (panicReportedSpan) Refs() []reporter.SpanContext         { return nil }
+func (panicReportedSpan) StartTime() int64                     { return 0 }
+func (panicReportedSpan) EndTime() int64                       { return 0 }
+func (panicReportedSpan) OperationName() string                { return "op" }
+func (panicReportedSpan) Peer() string                         { return "" }
+func (panicReportedSpan) SpanType() agentv3.SpanType           { return 
agentv3.SpanType_Exit }
+func (panicReportedSpan) SpanLayer() agentv3.SpanLayer         { return 
agentv3.SpanLayer_Database }
+func (panicReportedSpan) IsError() bool                        { return false }
+func (panicReportedSpan) Tags() []*commonv3.KeyStringValuePair { return nil }
+func (panicReportedSpan) Logs() []*agentv3.Log                 { return nil }
+func (panicReportedSpan) ComponentID() int32                   { return 0 }
+
+// TestSendTracingRecoversTransformPanic mirrors the gRPC reporter test: the
+// recover in SendTracing must be registered BEFORE the transform call, because
+// SendTracing runs on the segment collector goroutine which has no other
+// recover - an escaping panic would kill the whole process.
+func TestSendTracingRecoversTransformPanic(t *testing.T) {
+       logger := &capturingLogger{}
+       r := &kafkaReporter{
+               logger:        logger,
+               transform:     
reporter.NewTransform(&reporter.Entity{ServiceName: "svc", ServiceInstanceName: 
"inst"}),
+               tracingSendCh: make(chan *agentv3.SegmentObject, 1),
+       }
+
+       defer func() {
+               if p := recover(); p != nil {
+                       t.Fatalf("panic escaped SendTracing (recover registered 
too late?): %v", p)
+               }
+       }()
+       r.SendTracing([]reporter.ReportedSpan{panicReportedSpan{}})
+
+       if atomic.LoadInt32(&logger.errors) == 0 {
+               t.Fatal("recovered transform panic was not logged")
+       }
+}
diff --git a/plugins/core/segment_datarace_test.go 
b/plugins/core/segment_datarace_test.go
index e407101..46c4ad3 100644
--- a/plugins/core/segment_datarace_test.go
+++ b/plugins/core/segment_datarace_test.go
@@ -45,12 +45,14 @@ func buildReportedSpan() *SegmentSpanImpl {
                        OperationName: "users/SELECT",
                        Peer:          "127.0.0.1:5432",
                        SpanType:      SpanTypeExit,
+                       opLock:        &sync.Mutex{},
                },
                SegmentContext: SegmentContext{
-                       TraceID:      "trace-id",
-                       SegmentID:    "segment-id",
-                       SpanID:       0,
-                       ParentSpanID: -1,
+                       TraceID:            "trace-id",
+                       SegmentID:          "segment-id",
+                       SpanID:             0,
+                       ParentSpanID:       -1,
+                       CorrelationContext: newCorrelationContext(),
                },
        }
 }
@@ -189,3 +191,225 @@ func TestRaceReporterNeverPanicsWhileSpanMutated(t 
*testing.T) {
                t.Fatalf("reporter panicked %d time(s) - the #13885 crash 
regressed", atomic.LoadInt32(&panicked))
        }
 }
+
+// wireCollector attaches a working collector harness (collect channel +
+// collectorDone) to the span so End()/AsyncFinish() can run their real end0
+// path inside tests. It returns the channel the span will be delivered on.
+func wireCollector(t *testing.T, span *SegmentSpanImpl) chan 
reporter.ReportedSpan {
+       ch := make(chan reporter.ReportedSpan, 8)
+       done := make(chan struct{})
+       span.SegmentContext.collect = ch
+       span.SegmentContext.collectorDone = done
+       span.DefaultSpan.EndTime = time.Time{} // not ended yet
+       span.DefaultSpan.ended = false
+       span.DefaultSpan.tracer = Tracing // so11y bookkeeping in End needs a 
tracer
+       t.Cleanup(func() { close(done) }) // release any straggling end0 
goroutine
+       return ch
+}
+
+// TestRaceConcurrentMutators reproduces the "crossed span" misuse (e.g. the 
gorm
+// plugin handing the same live span to two goroutines through a shared
+// *gorm.DB): several goroutines mutate ONE span concurrently, repeatedly
+// rewriting the same tag key in place - the exact write that used to tear the
+// string header read by the reporter. With the per-span lock this must be
+// race-detector clean; before the fix this test reports races immediately.
+func TestRaceConcurrentMutators(t *testing.T) {
+       span := buildReportedSpan()
+       const workers = 4
+       var stop int32
+       var wg sync.WaitGroup
+       wg.Add(workers)
+       for w := 0; w < workers; w++ {
+               go func(w int) {
+                       defer wg.Done()
+                       for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+                               span.Tag("db.statement", fmt.Sprintf("select %d 
from t%d", i, w)) // same-key in-place rewrite
+                               span.Tag(fmt.Sprintf("k-%d-%d", w, i%8), "v")
+                               span.Log("event", fmt.Sprintf("w%d-%d", w, i))
+                               span.SetOperationName(fmt.Sprintf("op-%d-%d", 
w, i))
+                               span.SetPeer("10.0.0.1:3306")
+                               span.Error("boom")
+                       }
+               }(w)
+       }
+       time.Sleep(200 * time.Millisecond)
+       atomic.StoreInt32(&stop, 1)
+       wg.Wait()
+}
+
+// TestRaceMutateAfterFreezeWhileReporting verifies the lock-free reporting
+// guarantee: once endAndFreeze returns, the reporter may transform and marshal
+// the span without locks even though other goroutines are still calling
+// mutators (their writes are dropped under the lock without touching data).
+func TestRaceMutateAfterFreezeWhileReporting(t *testing.T) {
+       span := buildReportedSpan()
+       for i := 0; i < 16; i++ {
+               span.Tag(fmt.Sprintf("key-%d", i), "init")
+               span.Log("event", fmt.Sprintf("log-%d", i))
+       }
+
+       var stop int32
+       var wg sync.WaitGroup
+       wg.Add(3)
+       for w := 0; w < 3; w++ {
+               go func(w int) {
+                       defer wg.Done()
+                       for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+                               span.Tag(fmt.Sprintf("key-%d", i%16), 
fmt.Sprintf("late-%d-%d", w, i))
+                               span.Log("late", "v")
+                               span.SetOperationName("late-op")
+                               span.Error("late")
+                       }
+               }(w)
+       }
+
+       time.Sleep(50 * time.Millisecond) // let some pre-freeze writes land 
(race builds are slow)
+       if !span.endAndFreeze() {
+               t.Fatal("first endAndFreeze must return true")
+       }
+
+       transform := reporter.NewTransform(&reporter.Entity{
+               ServiceName:         "svc",
+               ServiceInstanceName: "inst",
+       })
+       // 50 iterations: the race detector is a binary signal, more iterations 
add
+       // wall time (~7s -> ~2s under -race) without adding coverage
+       for i := 0; i < 50; i++ {
+               seg := 
transform.TransformSegmentObject([]reporter.ReportedSpan{span})
+               if seg == nil {
+                       t.Fatal("nil segment")
+               }
+               if _, err := proto.Marshal(seg); err != nil {
+                       t.Fatalf("marshal segment: %v", err)
+               }
+       }
+
+       atomic.StoreInt32(&stop, 1)
+       wg.Wait()
+}
+
+// TestRaceDoubleEnd races two End() calls on the same span and asserts the
+// segment collector receives the span exactly once (the old IsValid 
check-then-
+// act allowed duplicated end0 sends, corrupting the segment accounting).
+func TestRaceDoubleEnd(t *testing.T) {
+       ResetTracingContext()
+       defer ResetTracingContext()
+       span := buildReportedSpan()
+       ch := wireCollector(t, span)
+
+       var wg sync.WaitGroup
+       wg.Add(2)
+       for i := 0; i < 2; i++ {
+               go func() {
+                       defer wg.Done()
+                       span.End()
+               }()
+       }
+       wg.Wait()
+
+       received := 0
+       select {
+       case <-ch:
+               received++
+       case <-time.After(2 * time.Second):
+       }
+       // grace period for a (buggy) duplicated delivery
+       select {
+       case <-ch:
+               received++
+       case <-time.After(200 * time.Millisecond):
+       }
+       if received != 1 {
+               t.Fatalf("expected the span to be collected exactly once, got 
%d", received)
+       }
+}
+
+// TestRaceAsyncFinishVsMutators covers the async span pattern (gRPC streaming,
+// toolkit async API): one goroutine keeps tagging while another finishes the
+// span asynchronously. Both AsyncFinish/End themselves and the mutators must 
be
+// fully synchronized; before the fix AsyncFinish/End were unlocked even in
+// async mode.
+func TestRaceAsyncFinishVsMutators(t *testing.T) {
+       ResetTracingContext()
+       defer ResetTracingContext()
+       span := buildReportedSpan()
+       ch := wireCollector(t, span)
+       span.PrepareAsync()
+
+       var stop int32
+       var wg sync.WaitGroup
+       wg.Add(1)
+       go func() {
+               defer wg.Done()
+               for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+                       span.Tag("async-key", fmt.Sprintf("v-%d", i))
+                       span.Log("async", "v")
+               }
+       }()
+
+       span.End() // async mode: does not finish the span
+
+       finished := make(chan struct{})
+       go func() {
+               defer close(finished)
+               span.AsyncFinish()
+       }()
+       <-finished
+
+       atomic.StoreInt32(&stop, 1)
+       wg.Wait()
+
+       select {
+       case <-ch:
+       case <-time.After(time.Second):
+               t.Fatal("async finished span was never collected")
+       }
+}
+
+// TestRaceHostileWorkload runs the full end-to-end hostile workload (see
+// span_hostile_workload_test.go) in-process under the race detector: every
+// misuse pattern from the #13885 audit against the real pipeline must be free
+// of data races. The no-panic/no-throw property of the same workload is
+// asserted by TestE2ESpanCrashSafety in a child process.
+func TestRaceHostileWorkload(t *testing.T) {
+       segments, marshals := runHostileSpanWorkload(1500 * time.Millisecond)
+       if segments == 0 || marshals == 0 {
+               t.Fatalf("hostile workload processed no data (segments=%d 
marshals=%d)", segments, marshals)
+       }
+}
+
+// TestRaceCorrelationSetVsSnapshot covers the correlation storage: concurrent
+// writers and snapshot/encode readers used to race on a bare map, which is an
+// unrecoverable "concurrent map iteration and map write" fatal error in
+// production.
+func TestRaceCorrelationSetVsSnapshot(t *testing.T) {
+       c := newCorrelationContext()
+       var stop int32
+       var wg sync.WaitGroup
+       wg.Add(4)
+       for w := 0; w < 2; w++ {
+               go func(w int) {
+                       defer wg.Done()
+                       for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+                               c.Set(fmt.Sprintf("k%d", i%4), 
fmt.Sprintf("v-%d-%d", w, i))
+                               if i%8 == 0 {
+                                       c.Set(fmt.Sprintf("k%d", i%4), "") // 
delete path
+                               }
+                       }
+               }(w)
+       }
+       for r := 0; r < 2; r++ {
+               go func() {
+                       defer wg.Done()
+                       for atomic.LoadInt32(&stop) == 0 {
+                               _ = c.Snapshot()
+                               _ = c.Get("k1")
+                               _ = c.Len()
+                               _ = c.Clone()
+                       }
+               }()
+       }
+       time.Sleep(200 * time.Millisecond)
+       atomic.StoreInt32(&stop, 1)
+       wg.Wait()
+}
diff --git a/plugins/core/span_crash_e2e_test.go 
b/plugins/core/span_crash_e2e_test.go
new file mode 100644
index 0000000..885aec2
--- /dev/null
+++ b/plugins/core/span_crash_e2e_test.go
@@ -0,0 +1,134 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+       "fmt"
+       "os"
+       "os/exec"
+       "strconv"
+       "strings"
+       "testing"
+       "time"
+)
+
+const (
+       e2eChildEnv    = "SW_SPAN_E2E_CHILD"
+       e2eDurationEnv = "SW_SPAN_E2E_DURATION"
+       e2eOKMarker    = "E2E_OK"
+)
+
+// TestE2ESpanCrashSafety is the end-to-end guarantee for 
apache/skywalking#13885:
+// under every concurrency-abuse pattern found in the audit, the agent must
+// produce NO panic and NO runtime fatal error (`runtime.throw`, e.g. "invalid
+// pointer found on stack" / "concurrent map writes").
+//
+// A runtime.throw is unrecoverable and kills the process, so this cannot be
+// asserted in-process: the hostile workload runs in a CHILD test process and
+// the parent asserts a clean exit. Any panic, data-race fatal, GC/stack-scan
+// "bad pointer" throw or deadlock (child test timeout) fails this test.
+//
+// Tune the stress duration with SW_SPAN_E2E_DURATION (default 3s).
+func TestE2ESpanCrashSafety(t *testing.T) {
+       if os.Getenv(e2eChildEnv) == "1" {
+               runE2EChild()
+               return
+       }
+       if testing.Short() {
+               t.Skip("skipping subprocess e2e in -short mode")
+       }
+
+       cmd := exec.Command(os.Args[0],
+               "-test.run", "^TestE2ESpanCrashSafety$", "-test.v", 
"-test.timeout", "120s")
+       // Runtime hardening for the child: clobberfree (Go 1.13+, harmlessly
+       // ignored by runtimes without it) makes the GC overwrite freed objects
+       // with junk, so any use-after-free (the silent precursor of the 
production
+       // "invalid pointer found on stack" throw) crashes immediately and
+       // deterministically instead of depending on scheduling luck.
+       // invalidptr is on by default and kept explicit for documentation.
+       godebug := "clobberfree=1,invalidptr=1"
+       env := make([]string, 0, len(os.Environ())+2)
+       for _, kv := range os.Environ() {
+               if strings.HasPrefix(kv, "GODEBUG=") {
+                       // keep our hardening LAST: runtime parsegodebug 
assigns each
+                       // key=value as encountered front-to-back, so the last 
occurrence
+                       // of a duplicated key wins (verified against 
runtime1.go)
+                       godebug = strings.TrimPrefix(kv, "GODEBUG=") + "," + 
godebug
+                       continue
+               }
+               env = append(env, kv)
+       }
+       env = append(env, e2eChildEnv+"=1", "GODEBUG="+godebug)
+       cmd.Env = env
+       out, err := cmd.CombinedOutput()
+       output := string(out)
+
+       if err != nil {
+               // non-zero exit = panic, runtime.throw or test timeout in the 
child
+               t.Fatalf("hostile-workload child process did not exit cleanly 
(panic or runtime fatal): %v\n--- child output ---\n%s", err, output)
+       }
+       for _, fatal := range []string{"fatal error:", "panic:", "DATA RACE"} {
+               if strings.Contains(output, fatal) {
+                       t.Fatalf("child output contains %q:\n--- child output 
---\n%s", fatal, output)
+               }
+       }
+       if !strings.Contains(output, e2eOKMarker) {
+               t.Fatalf("child never reached the completion marker:\n--- child 
output ---\n%s", output)
+       }
+
+       // sanity: the pipeline must have actually transformed and marshaled 
work,
+       // otherwise the e2e silently tested nothing
+       segments := parseE2ECounter(t, output, "segments")
+       marshals := parseE2ECounter(t, output, "marshals")
+       if segments == 0 || marshals == 0 {
+               t.Fatalf("e2e processed no data (segments=%d 
marshals=%d):\n%s", segments, marshals, output)
+       }
+       t.Logf("e2e clean: segments=%d marshals=%d", segments, marshals)
+}
+
+func runE2EChild() {
+       d := 3 * time.Second
+       if v := os.Getenv(e2eDurationEnv); v != "" {
+               if parsed, err := time.ParseDuration(v); err == nil && parsed > 
0 {
+                       d = parsed
+               }
+       }
+       segments, marshals := runHostileSpanWorkload(d)
+       fmt.Printf("%s segments=%d marshals=%d\n", e2eOKMarker, segments, 
marshals)
+}
+
+func parseE2ECounter(t *testing.T, output, name string) int64 {
+       t.Helper()
+       idx := strings.Index(output, name+"=")
+       if idx < 0 {
+               t.Fatalf("counter %q missing from child output", name)
+       }
+       rest := output[idx+len(name)+1:]
+       end := 0
+       for end < len(rest) && rest[end] >= '0' && rest[end] <= '9' {
+               end++
+       }
+       if end == 0 {
+               t.Fatalf("counter %q has no digits after '='", name)
+       }
+       v, err := strconv.ParseInt(rest[:end], 10, 64)
+       if err != nil {
+               t.Fatalf("counter %q unparsable: %v", name, err)
+       }
+       return v
+}
diff --git a/plugins/core/span_default.go b/plugins/core/span_default.go
index 4c6a5aa..a7c1525 100644
--- a/plugins/core/span_default.go
+++ b/plugins/core/span_default.go
@@ -45,7 +45,30 @@ type DefaultSpan struct {
 
        InAsyncMode       bool
        AsyncModeFinished bool
-       AsyncOpLocker     *sync.Mutex
+
+       // opLock guards the mutable fields above (OperationName, Peer, Layer,
+       // ComponentID, Tags, Logs, IsError, EndTime, the async flags) together 
with
+       // the ended flag. SpanType, Parent, Refs, StartTime and tracer are
+       // write-once during construction - before the span is ever shared - 
and are
+       // therefore read without the lock 
(IsEntry/IsExit/ParentSpan/StartTime).
+       // It must stay a pointer: DefaultSpan is copied by value when it is 
embedded
+       // into SegmentSpanImpl/SnapshotSpan, and an embedded sync.Mutex value 
would
+       // trip the go vet copylocks check.
+       opLock *sync.Mutex
+       // ended is set by endAndFreeze right before the span is handed over to 
the
+       // segment collector. Once set, every late mutation is dropped, so the 
span
+       // data is frozen and the reporting path may read it without locking
+       // (see the comment on the ReportedSpan accessors in span_tracing.go).
+       ended bool
+       // droppedLogged rate-limits logDroppedWrite to one warning per span: a
+       // span leaked across goroutines would otherwise flood the log with one
+       // line per dropped write.
+       droppedLogged bool
+       // reuseCount counts the extra logical owners that the span reuse rule 
in
+       // CreateEntrySpan/CreateExitSpan hands this span to. Every owner calls 
End
+       // exactly once and only the LAST End freezes and reports the span (see
+       // enterReuse and endSyncAndFreeze). Guarded by opLock.
+       reuseCount int
 }
 
 func NewDefaultSpan(tracer *Tracer, parent TracingSpan) *DefaultSpan {
@@ -54,62 +77,81 @@ func NewDefaultSpan(tracer *Tracer, parent TracingSpan) 
*DefaultSpan {
                StartTime: time.Now(),
                SpanType:  SpanTypeLocal,
                Parent:    parent,
+               opLock:    &sync.Mutex{},
        }
 }
 
 // For TracingSpan
 func (ds *DefaultSpan) SetOperationName(name string) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("operation name", name)
+               return
        }
        ds.OperationName = name
 }
 
 func (ds *DefaultSpan) GetOperationName() string {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
        return ds.OperationName
 }
 
 func (ds *DefaultSpan) SetPeer(peer string) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("peer", peer)
+               return
        }
        ds.Peer = peer
 }
 
 func (ds *DefaultSpan) GetPeer() string {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
        return ds.Peer
 }
 
 func (ds *DefaultSpan) SetSpanLayer(layer int32) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("span layer", "")
+               return
        }
        ds.Layer = agentv3.SpanLayer(layer)
 }
 
 func (ds *DefaultSpan) GetSpanLayer() agentv3.SpanLayer {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
        return ds.Layer
 }
 
 func (ds *DefaultSpan) SetComponent(componentID int32) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("component", "")
+               return
        }
        ds.ComponentID = componentID
 }
 
 func (ds *DefaultSpan) GetComponent() int32 {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
        return ds.ComponentID
 }
 
 func (ds *DefaultSpan) Tag(key, value string) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("tag", key)
+               return
        }
        for _, tag := range ds.Tags {
                if tag.Key == key {
@@ -120,11 +162,9 @@ func (ds *DefaultSpan) Tag(key, value string) {
        ds.Tags = append(ds.Tags, &commonv3.KeyStringValuePair{Key: key, Value: 
value})
 }
 
-func (ds *DefaultSpan) Log(ll ...string) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
-       }
+// log0 is the lock-free internal implementation of Log: Error reuses it while
+// already holding opLock, avoiding a re-entrant deadlock.
+func (ds *DefaultSpan) log0(ll ...string) {
        data := make([]*commonv3.KeyStringValuePair, 0, 
int32(math.Ceil(float64(len(ll))/2.0)))
        var kvp *commonv3.KeyStringValuePair
        for i, l := range ll {
@@ -139,26 +179,62 @@ func (ds *DefaultSpan) Log(ll ...string) {
        ds.Logs = append(ds.Logs, &agentv3.Log{Time: Millisecond(time.Now()), 
Data: data})
 }
 
+func (ds *DefaultSpan) Log(ll ...string) {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("log", "")
+               return
+       }
+       ds.log0(ll...)
+}
+
 func (ds *DefaultSpan) Error(ll ...string) {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("error", "")
+               return
        }
        ds.IsError = true
-       ds.Log(ll...)
+       ds.log0(ll...)
 }
 
 func (ds *DefaultSpan) ErrorOccured() {
-       if ds.InAsyncMode {
-               ds.AsyncOpLocker.Lock()
-               defer ds.AsyncOpLocker.Unlock()
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               ds.logDroppedWrite("error flag", "")
+               return
        }
        ds.IsError = true
 }
 
-func (ds *DefaultSpan) End(changeParent bool) {
-       ds.EndTime = time.Now()
+// logDroppedWrite reports a mutation that arrived after the span was frozen.
+// The caller must hold opLock (reading OperationName/droppedLogged is 
therefore
+// safe). The span name (e.g. "GET:/api/xxx", "MySQL/query") lets users locate
+// which plugin or code path is still writing a finished span, which usually
+// means the span leaked across goroutines. Only the FIRST drop per span is
+// logged so a leaked hot span cannot flood the log.
+func (ds *DefaultSpan) logDroppedWrite(op, detail string) {
+       if ds.droppedLogged {
+               return
+       }
+       ds.droppedLogged = true
+       if ds.tracer != nil && ds.tracer.Log != nil {
+               ds.tracer.Log.Warnf(
+                       "span %q already ended, dropping %s %q (and any further 
late writes; span shared across goroutines?)",
+                       ds.OperationName, op, detail)
+       }
+}
 
+func (ds *DefaultSpan) End(changeParent bool) {
+       ds.opLock.Lock()
+       if !ds.ended {
+               ds.EndTime = time.Now()
+       }
+       ds.opLock.Unlock()
+       // The remaining work is goroutine-local bookkeeping, not shared span 
data.
        GetSo11y(ds.tracer).MeasureTracingContextCompletion(false)
        if changeParent {
                if ctx := getTracingContext(); ctx != nil {
@@ -167,6 +243,78 @@ func (ds *DefaultSpan) End(changeParent bool) {
        }
 }
 
+// endAndFreeze marks the span as ended under the lock. It returns true only on
+// the first call, so the caller can ensure end0() runs exactly once even when
+// End is raced from multiple goroutines (no duplicated segment reporting).
+// After it returns, every late mutator observes ended==true and drops its
+// write, so the span data is frozen: the reporting path reads it without locks
+// relying on the channel handoff in end0 for the happens-before edge.
+func (ds *DefaultSpan) endAndFreeze() bool {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               return false
+       }
+       ds.ended = true
+       return true
+}
+
+// enterReuse registers one more owner of this span. It is called from the span
+// reuse branches of CreateEntrySpan/CreateExitSpan when the active span is
+// handed to a nested plugin; that owner's End then only decrements the counter
+// (see endSyncAndFreeze) instead of freezing the span.
+func (ds *DefaultSpan) enterReuse() {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if !ds.ended {
+               ds.reuseCount++
+       }
+}
+
+// endSyncAndFreeze performs the whole synchronous End() path in ONE critical
+// section - the "already ended" fast check, the EndTime write and the freeze
+// decision used to take three separate lock round-trips per span end. It
+// returns true when the caller must hand the span to the collector (exactly
+// once, and never for async spans: AsyncFinish owns the freeze there, and
+// reading InAsyncMode under the same lock keeps even a misused concurrent
+// PrepareAsync/End pair free of data races).
+func (ds *DefaultSpan) endSyncAndFreeze() bool {
+       ds.opLock.Lock()
+       if ds.reuseCount > 0 {
+               // a nested owner of a reused span (see enterReuse) finished: 
this is
+               // not the last End, so keep the span open - the outer owner 
still
+               // writes to it (e.g. gorm tags db.statement after the sql 
driver's
+               // End) and will freeze it with its own End. Restoring the 
active span
+               // is kept here because the nested plugin expects its End to 
pop the
+               // span, exactly like before the freeze mechanism.
+               ds.reuseCount--
+               ds.opLock.Unlock()
+               if ctx := getTracingContext(); ctx != nil {
+                       ctx.SaveActiveSpan(ds.Parent)
+               }
+               return false
+       }
+       if !ds.EndTime.IsZero() { // already ended
+               ds.opLock.Unlock()
+               return false
+       }
+       ds.EndTime = time.Now()
+       frozen := false
+       // EndTime and ended are distinct on purpose: in async mode End() sets
+       // EndTime but leaves ended=false - AsyncFinish owns the freeze there.
+       if !ds.InAsyncMode && !ds.ended {
+               ds.ended = true
+               frozen = true
+       }
+       ds.opLock.Unlock()
+       // goroutine-local bookkeeping stays outside the lock
+       GetSo11y(ds.tracer).MeasureTracingContextCompletion(false)
+       if ctx := getTracingContext(); ctx != nil {
+               ctx.SaveActiveSpan(ds.Parent)
+       }
+       return frozen
+}
+
 func (ds *DefaultSpan) IsEntry() bool {
        return ds.SpanType == SpanTypeEntry
 }
@@ -176,6 +324,10 @@ func (ds *DefaultSpan) IsExit() bool {
 }
 
 func (ds *DefaultSpan) IsValid() bool {
+       // EndTime may be written by another goroutine in async mode, take the 
lock
+       // to avoid a torn read of the time.Time value.
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
        return ds.EndTime.IsZero()
 }
 
@@ -184,15 +336,31 @@ func (ds *DefaultSpan) ParentSpan() TracingSpan {
 }
 
 func (ds *DefaultSpan) PrepareAsync() {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               // the span is already frozen and reported; entering async mode 
now is
+               // a misuse and would only confuse the lifecycle, so drop it
+               ds.logDroppedWrite("prepare async", "")
+               return
+       }
        if ds.InAsyncMode {
                panic("already in async mode")
        }
        ds.InAsyncMode = true
        ds.AsyncModeFinished = false
-       ds.AsyncOpLocker = &sync.Mutex{}
 }
 
 func (ds *DefaultSpan) AsyncFinish() {
+       ds.opLock.Lock()
+       defer ds.opLock.Unlock()
+       if ds.ended {
+               // already frozen and reported (the matching PrepareAsync was 
dropped,
+               // or another finisher won the race): drop, mirroring the 
mutator
+               // policy, instead of panicking on an already-completed span
+               ds.logDroppedWrite("async finish", "")
+               return
+       }
        if !ds.InAsyncMode {
                panic("not in async mode")
        }
@@ -202,9 +370,11 @@ func (ds *DefaultSpan) AsyncFinish() {
        ds.AsyncModeFinished = true
 }
 
+// GetEndPointName must not be called while holding opLock (it locks through
+// GetOperationName, mirroring how Error must use log0 instead of Log).
 func (ds *DefaultSpan) GetEndPointName() string {
        if ds.SpanType == SpanTypeEntry {
-               return ds.OperationName
+               return ds.GetOperationName()
        }
        return ""
 }
diff --git a/plugins/core/span_freeze_test.go b/plugins/core/span_freeze_test.go
new file mode 100644
index 0000000..95d3b6e
--- /dev/null
+++ b/plugins/core/span_freeze_test.go
@@ -0,0 +1,277 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+       "sync"
+       "sync/atomic"
+       "testing"
+       "time"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+)
+
+// newTestSegmentSpan builds a SegmentSpanImpl the same way the agent does,
+// usable from non-race builds (buildReportedSpan lives behind the race tag).
+func newTestSegmentSpan() *SegmentSpanImpl {
+       return &SegmentSpanImpl{
+               DefaultSpan: DefaultSpan{
+                       StartTime:     time.Now(),
+                       OperationName: "users/SELECT",
+                       Peer:          "127.0.0.1:5432",
+                       SpanType:      SpanTypeExit,
+                       opLock:        &sync.Mutex{},
+               },
+               SegmentContext: SegmentContext{
+                       TraceID:            "trace-id",
+                       SegmentID:          "segment-id",
+                       SpanID:             0,
+                       ParentSpanID:       -1,
+                       CorrelationContext: newCorrelationContext(),
+               },
+       }
+}
+
+func TestLateWritesDroppedAfterFreeze(t *testing.T) {
+       span := newTestSegmentSpan()
+       span.Tag("k1", "v1")
+       span.Log("event", "before")
+       span.SetOperationName("op-before")
+
+       if !span.endAndFreeze() {
+               t.Fatal("first endAndFreeze must return true")
+       }
+       if span.endAndFreeze() {
+               t.Fatal("second endAndFreeze must return false")
+       }
+
+       // every late mutation must be silently dropped without panicking
+       span.Tag("k1", "late")
+       span.Tag("k2", "late")
+       span.Log("event", "late")
+       span.SetOperationName("op-late")
+       span.SetPeer("late:1")
+       span.SetSpanLayer(int32(3))
+       span.SetComponent(99)
+       span.Error("late")
+       span.ErrorOccured()
+
+       if got := len(span.Tags()); got != 1 {
+               t.Fatalf("late tag was not dropped, tags=%d", got)
+       }
+       if span.Tags()[0].Value != "v1" {
+               t.Fatalf("in-place tag rewrite after freeze was not dropped: 
%s", span.Tags()[0].Value)
+       }
+       if got := len(span.Logs()); got != 1 {
+               t.Fatalf("late log was not dropped, logs=%d", got)
+       }
+       if span.OperationName() != "op-before" {
+               t.Fatalf("late operation name was not dropped: %s", 
span.OperationName())
+       }
+       if span.IsError() {
+               t.Fatal("late error flag was not dropped")
+       }
+}
+
+func TestEnd0AfterCollectorExitIsSafe(t *testing.T) {
+       ResetTracingContext()
+       span := newTestSegmentSpan()
+       span.DefaultSpan.tracer = Tracing
+       // collector already exited: data channel has no receiver and stays 
open,
+       // only the done channel is closed
+       span.SegmentContext.collect = make(chan reporter.ReportedSpan)
+       span.SegmentContext.collectorDone = make(chan struct{})
+       close(span.SegmentContext.collectorDone)
+
+       span.End() // end0 must neither panic nor block forever
+
+       // the send goroutine selects the done branch; give it a moment and make
+       // sure the test itself completes (a blocked send would hang the test)
+       time.Sleep(50 * time.Millisecond)
+}
+
+func TestDoubleEndCollectsOnce(t *testing.T) {
+       ResetTracingContext()
+       span := newTestSegmentSpan()
+       span.DefaultSpan.tracer = Tracing
+       ch := make(chan reporter.ReportedSpan, 8)
+       span.SegmentContext.collect = ch
+       span.SegmentContext.collectorDone = make(chan struct{})
+
+       span.End()
+       span.End() // second End must be a no-op
+
+       select {
+       case <-ch:
+       case <-time.After(time.Second):
+               t.Fatal("ended span was never collected")
+       }
+       select {
+       case <-ch:
+               t.Fatal("span was collected twice")
+       case <-time.After(100 * time.Millisecond):
+       }
+}
+
+func TestAsyncFlowCollectsOnce(t *testing.T) {
+       ResetTracingContext()
+       span := newTestSegmentSpan()
+       span.DefaultSpan.tracer = Tracing
+       ch := make(chan reporter.ReportedSpan, 8)
+       span.SegmentContext.collect = ch
+       span.SegmentContext.collectorDone = make(chan struct{})
+
+       span.PrepareAsync()
+       span.End() // async mode: must not collect yet
+       select {
+       case <-ch:
+               t.Fatal("async span collected before AsyncFinish")
+       case <-time.After(200 * time.Millisecond): // generous window so a slow 
end0 goroutine could not hide a premature delivery
+       }
+
+       span.Tag("after-end", "v") // still allowed between End and AsyncFinish
+       span.AsyncFinish()
+
+       select {
+       case <-ch:
+       case <-time.After(time.Second):
+               t.Fatal("async span was never collected")
+       }
+       found := false
+       for _, tag := range span.Tags() {
+               if tag.Key == "after-end" {
+                       found = true
+               }
+       }
+       if !found {
+               t.Fatal("tag written between End and AsyncFinish was lost")
+       }
+}
+
+func TestCorrelationContextBasics(t *testing.T) {
+       c := newCorrelationContext()
+       c.Set("a", "1")
+       c.Set("b", "2")
+       if c.Get("a") != "1" || c.Len() != 2 {
+               t.Fatal("set/get/len mismatch")
+       }
+       c.Set("a", "") // empty value deletes
+       if c.Get("a") != "" || c.Len() != 1 {
+               t.Fatal("empty-value delete failed")
+       }
+
+       snap := c.Snapshot()
+       clone := c.Clone()
+       c.Set("b", "changed")
+       if snap["b"] != "2" || clone.Get("b") != "2" {
+               t.Fatal("snapshot/clone must be independent of later writes")
+       }
+
+       var nilCtx *CorrelationContext
+       if nilCtx.Get("x") != "" || nilCtx.Len() != 0 {
+               t.Fatal("nil receiver reads must be safe")
+       }
+       nilCtx.Set("x", "y") // must not panic
+       if nilCtx.Clone() == nil {
+               t.Fatal("nil receiver clone must return a usable value")
+       }
+       // Snapshot returns nil when empty (allocation-free); nil maps are 
readable
+       if len(nilCtx.Snapshot()) != 0 {
+               t.Fatal("nil receiver snapshot must be empty")
+       }
+
+       // lazy data allocation: a fresh context must support all reads before 
any Set
+       fresh := newCorrelationContext()
+       if fresh.Get("x") != "" || fresh.Len() != 0 || fresh.Snapshot() != nil {
+               t.Fatal("fresh context reads must be safe and allocation-free")
+       }
+       fresh.Set("x", "1")
+       if fresh.Get("x") != "1" {
+               t.Fatal("set after lazy init failed")
+       }
+}
+
+// TestContinueContextClonesRuntime guards the clone-on-continue behavior: two
+// goroutines continuing the same snapshot must not share one RuntimeContext 
map
+// (that sharing was a fatal concurrent-map-access risk, e.g. the send and
+// receive goroutines of a gRPC stream).
+func TestContinueContextClonesRuntime(t *testing.T) {
+       ResetTracingContext()
+       defer ResetTracingContext()
+
+       Tracing.SetRuntimeContextValue("k", "original")
+       snap := Tracing.CaptureContext()
+       if snap == nil {
+               t.Fatal("capture returned nil snapshot")
+       }
+
+       Tracing.ContinueContext(snap)
+       Tracing.SetRuntimeContextValue("k", "changed-after-first-continue")
+
+       Tracing.ContinueContext(snap) // the same snapshot continued again
+       if got := Tracing.GetRuntimeContextValue("k"); got != "original" {
+               t.Fatalf("ContinueContext shared the runtime map across 
continues: got %v", got)
+       }
+}
+
+// TestMetricsCollectPanicRecovered proves the metrics collect loop survives a
+// panicking user meter callback (it used to kill the whole process: the
+// goroutine had no recover).
+// The metrics collect goroutine leaks by design (the loop has no shutdown) and
+// keeps reading the MetricsObtain global forever - ANY later write to that
+// global (restore on test exit, or re-install on `-test.count=2` reruns) would
+// race with those leaked readers. So the panicking wrapper is installed 
exactly
+// once per process, before the first collect goroutine exists (happens-before
+// safe), and reruns only flip the atomic switch.
+var (
+       metricsPanicHookOnce sync.Once
+       metricsPanicArmed    atomic.Bool
+       metricsPanicCalls    atomic.Int32
+)
+
+func TestMetricsCollectPanicRecovered(t *testing.T) {
+       metricsPanicHookOnce.Do(func() {
+               old := MetricsObtain
+               MetricsObtain = func() ([]interface{}, []func()) {
+                       if !metricsPanicArmed.Load() {
+                               return old()
+                       }
+                       metricsPanicCalls.Add(1)
+                       panic("meter callback boom")
+               }
+       })
+       metricsPanicCalls.Store(0)
+       metricsPanicArmed.Store(true)
+       defer metricsPanicArmed.Store(false)
+
+       // A dedicated tracer keeps the leaked collect goroutine (the loop has 
no
+       // shutdown mechanism) away from the shared Tracing used by other tests.
+       tr := &Tracer{initFlag: 1, Sampler: NewConstSampler(true), Reporter: 
NewStoreReporter(),
+               ServiceEntity: NewEntity("metrics-recover-test", "inst"), 
meterMap: &sync.Map{}}
+       tr.ProfileManager = NewProfileManager(nil)
+       tr.initMetricsCollect(1)
+
+       deadline := time.After(5 * time.Second)
+       for metricsPanicCalls.Load() < 2 {
+               select {
+               case <-deadline:
+                       t.Fatalf("collect loop did not survive the panic, 
iterations=%d", metricsPanicCalls.Load())
+               case <-time.After(50 * time.Millisecond):
+               }
+       }
+}
diff --git a/plugins/core/span_hostile_workload_test.go 
b/plugins/core/span_hostile_workload_test.go
new file mode 100644
index 0000000..dfe4cf8
--- /dev/null
+++ b/plugins/core/span_hostile_workload_test.go
@@ -0,0 +1,435 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// span_hostile_workload_test.go drives the REAL agent pipeline (public tracing
+// API -> spans/segments -> collector goroutines -> Transform -> proto.Marshal)
+// under every concurrency-abuse pattern found in the apache/skywalking#13885
+// investigation, plus aggressive GC and stack growth so that any pointer
+// corruption is caught by the runtime scanners (which is exactly how the
+// production crash manifested as `fatal error: invalid pointer found on
+// stack`).
+//
+// It is shared by two tests:
+//   - TestE2ESpanCrashSafety (span_crash_e2e_test.go): runs the workload in a
+//     CHILD PROCESS - a runtime.throw is unrecoverable and kills the process,
+//     so only a subprocess can assert "no panic AND no runtime fatal at all";
+//   - TestRaceHostileWorkload (segment_datarace_test.go, race build): runs it
+//     in-process under the race detector to catch any remaining data race.
+//
+// There is deliberately NO recover anywhere in this workload or its pipeline
+// reporter: any panic must surface and fail the test.
+package core
+
+import (
+       "fmt"
+       "runtime"
+       "runtime/debug"
+       "sync"
+       "sync/atomic"
+       "time"
+
+       "google.golang.org/protobuf/proto"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+
+       agentv3 
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+       logv3 "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+)
+
+// ---------------------------------------------------------------------------
+// real goroutine-local GLS for tests
+// ---------------------------------------------------------------------------
+
+// goid parses the current goroutine id from the stack header ("goroutine N 
[").
+// Slow, but faithful: it gives the tests true goroutine-local storage, 
matching
+// the production GLS that the build toolchain injects into runtime.g.
+func goid() uint64 {
+       var buf [32]byte
+       n := runtime.Stack(buf[:], false)
+       id := uint64(0)
+       for _, c := range buf[10:n] {
+               if c < '0' || c > '9' {
+                       break
+               }
+               id = id*10 + uint64(c-'0')
+       }
+       return id
+}
+
+// installGoroutineLocalGLS replaces the single-variable test GLS from
+// test_base.go with a real per-goroutine implementation so that context
+// propagation (capture/continue, snapshots) behaves like production.
+func installGoroutineLocalGLS() (restore func()) {
+       oldGet, oldSet := GetGLS, SetGLS
+       var m sync.Map
+       GetGLS = func() interface{} {
+               if v, ok := m.Load(goid()); ok {
+                       return v
+               }
+               return nil
+       }
+       SetGLS = func(v interface{}) {
+               if v == nil {
+                       m.Delete(goid())
+                       return
+               }
+               m.Store(goid(), v)
+       }
+       return func() {
+               GetGLS = oldGet
+               SetGLS = oldSet
+       }
+}
+
+// ---------------------------------------------------------------------------
+// pipeline reporter: replicates the production gRPC reporter data path
+// ---------------------------------------------------------------------------
+
+// pipelineReporter mirrors what the production reporters do with a finished
+// segment: TransformSegmentObject on the segment-collector goroutine, then
+// proto.Marshal on a dedicated send goroutine. Deliberately NO recover: a
+// panic anywhere in this path must crash the test/child process.
+type pipelineReporter struct {
+       transform *reporter.Transform
+       segCh     chan *agentv3.SegmentObject
+       closed    chan struct{} // shutdown signal; the DATA channel is never 
closed
+       done      chan struct{}
+       segments  int64
+       marshals  int64
+}
+
+func newPipelineReporter() *pipelineReporter {
+       r := &pipelineReporter{
+               transform: reporter.NewTransform(&reporter.Entity{ServiceName: 
"e2e", ServiceInstanceName: "inst"}),
+               segCh:     make(chan *agentv3.SegmentObject, 1024),
+               closed:    make(chan struct{}),
+               done:      make(chan struct{}),
+       }
+       go func() { // the "send goroutine" of the production pipeline
+               defer close(r.done)
+               for {
+                       select {
+                       case seg := <-r.segCh:
+                               if _, err := proto.Marshal(seg); err == nil {
+                                       atomic.AddInt64(&r.marshals, 1)
+                               }
+                       case <-r.closed:
+                               for { // drain what is already buffered, then 
exit
+                                       select {
+                                       case seg := <-r.segCh:
+                                               if _, err := 
proto.Marshal(seg); err == nil {
+                                                       
atomic.AddInt64(&r.marshals, 1)
+                                               }
+                                       default:
+                                               return
+                                       }
+                               }
+                       }
+               }
+       }()
+       return r
+}
+
+func (r *pipelineReporter) SendTracing(spans []reporter.ReportedSpan) {
+       // runs on the segment collector goroutine, exactly like production
+       seg := r.transform.TransformSegmentObject(spans)
+       if seg == nil {
+               return
+       }
+       atomic.AddInt64(&r.segments, 1)
+       // Same done-channel pattern as the production fix: straggler collector
+       // goroutines may still deliver after the workload ended, and closing 
the
+       // DATA channel here would be exactly the send-on-closed-channel bug 
this
+       // PR eliminates (the race variant of this test caught that mistake in 
an
+       // earlier revision of this harness).
+       select {
+       case r.segCh <- seg:
+       case <-r.closed:
+       }
+}
+
+func (r *pipelineReporter) closeAndWait() {
+       close(r.closed)
+       <-r.done
+}
+
+func (r *pipelineReporter) Boot(entity *reporter.Entity, cdsWatchers 
[]reporter.AgentConfigChangeWatcher) {
+}
+func (r *pipelineReporter) SendMetrics(metrics []reporter.ReportedMeter) {}
+func (r *pipelineReporter) SendLog(log *logv3.LogData)                   {}
+func (r *pipelineReporter) ConnectionStatus() reporter.ConnectionStatus {
+       return reporter.ConnectionStatusConnected
+}
+func (r *pipelineReporter) Close()                                             
 {}
+func (r *pipelineReporter) AddProfileTaskManager(p 
reporter.ProfileTaskManager) {}
+
+// ---------------------------------------------------------------------------
+// hostile flows: one per misuse pattern found in the audit
+// ---------------------------------------------------------------------------
+
+// growStack forces stack growth (morestack/copystack) with live span data on
+// the stack - the exact runtime path that detected the corrupted pointer in
+// the production crash.
+//
+//go:noinline
+func growStack(depth int) int {
+       if depth == 0 {
+               return 0
+       }
+       var pad [64]byte
+       pad[0] = byte(depth)
+       return int(pad[0]) + growStack(depth-1)
+}
+
+// wellBehavedFlow is the correct usage baseline: a full entry/exit/local span
+// tree flowing through the real pipeline.
+func wellBehavedFlow(i int) {
+       entry, err := tracing.CreateEntrySpan("GET:/e2e", func(string) (string, 
error) { return "", nil })
+       if err != nil || entry == nil {
+               return
+       }
+       entry.Tag("http.method", "GET")
+       if exit, err := tracing.CreateExitSpan("e2e/db", "127.0.0.1:3306",
+               func(k, v string) error { return nil }); err == nil && exit != 
nil {
+               exit.Tag("db.type", "sql")
+               exit.Tag("db.type", "mysql") // same-key in-place rewrite path
+               exit.Log("event", "query")
+               exit.End()
+       }
+       if local, err := tracing.CreateLocalSpan("e2e/biz"); err == nil && 
local != nil {
+               local.Error("boom")
+               local.End()
+       }
+       growStack(64 + i%64)
+       entry.End()
+       tracing.CleanContext()
+}
+
+// crossedSpanFlow reproduces the gorm "crossed span" bug class: one live span
+// mutated and ended by several goroutines at once.
+func crossedSpanFlow(i int) {
+       span, err := tracing.CreateLocalSpan("e2e/crossed")
+       if err != nil || span == nil {
+               return
+       }
+       var wg sync.WaitGroup
+       for w := 0; w < 2; w++ {
+               wg.Add(1)
+               go func(w int) {
+                       defer wg.Done()
+                       for k := 0; k < 16; k++ {
+                               span.Tag("db.statement", fmt.Sprintf("select %d 
from t%d_%d", k, w, i))
+                               span.Log("event", "x")
+                               span.SetOperationName(fmt.Sprintf("op-%d-%d", 
w, k))
+                               span.SetPeer("10.0.0.1:3306")
+                               growStack(32)
+                       }
+                       span.End() // partners race End as well
+               }(w)
+       }
+       span.Tag("db.statement", "owner")
+       span.End()
+       wg.Wait()
+       tracing.CleanContext()
+}
+
+// lateWriteFlow reproduces the rocketmq/pulsar callback bug class: writes
+// arriving after the span was ended and reported.
+func lateWriteFlow(i int) {
+       span, err := tracing.CreateLocalSpan("e2e/late")
+       if err != nil || span == nil {
+               return
+       }
+       span.Tag("k", "v")
+       span.End()
+       var wg sync.WaitGroup
+       for w := 0; w < 2; w++ {
+               wg.Add(1)
+               go func(w int) {
+                       defer wg.Done()
+                       for k := 0; k < 8; k++ {
+                               span.Tag("k", fmt.Sprintf("late-%d-%d-%d", w, 
k, i)) // must be dropped
+                               span.Error("late")
+                               span.Log("late", "x")
+                       }
+               }(w)
+       }
+       wg.Wait()
+       tracing.CleanContext()
+}
+
+// asyncFlow exercises the documented async pattern (gRPC streaming, toolkit)
+// plus late writes after AsyncFinish.
+func asyncFlow(i int) {
+       span, err := tracing.CreateLocalSpan("e2e/async")
+       if err != nil || span == nil {
+               return
+       }
+       span.PrepareAsync()
+       span.End()
+       var wg sync.WaitGroup
+       for w := 0; w < 2; w++ {
+               wg.Add(1)
+               go func(w int) {
+                       defer wg.Done()
+                       for k := 0; k < 8; k++ {
+                               span.Tag("async-key", fmt.Sprintf("v-%d-%d-%d", 
w, k, i))
+                               span.Log("async", "x")
+                       }
+               }(w)
+       }
+       wg.Wait()
+       done := make(chan struct{})
+       go func() {
+               defer close(done)
+               span.AsyncFinish()
+               span.Tag("after-finish", "must-be-dropped")
+       }()
+       <-done
+       tracing.CleanContext()
+}
+
+// correlationFlow hammers the correlation store from concurrently continued
+// snapshots while exit spans encode propagation headers (the C1/C2 class of
+// unrecoverable concurrent-map fatals).
+func correlationFlow(i int) {
+       root, err := tracing.CreateLocalSpan("e2e/correlation-root")
+       if err != nil || root == nil {
+               return
+       }
+       tracing.SetCorrelationContextValue("seed", fmt.Sprintf("%d", i))
+       snap := tracing.CaptureContext()
+       var wg sync.WaitGroup
+       for w := 0; w < 2; w++ {
+               wg.Add(1)
+               go func(w int) {
+                       defer wg.Done()
+                       defer tracing.CleanContext()
+                       tracing.ContinueContext(snap)
+                       for k := 0; k < 8; k++ {
+                               
tracing.SetCorrelationContextValue(fmt.Sprintf("k%d", k%4), 
fmt.Sprintf("v-%d-%d", w, k))
+                               _ = tracing.GetCorrelationContextValue("k1")
+                               if child, err := 
tracing.CreateExitSpan("e2e/exit", "127.0.0.1:80",
+                                       func(k, v string) error { return nil 
}); err == nil && child != nil {
+                                       child.Tag("http.method", "GET")
+                                       child.End()
+                               }
+                       }
+               }(w)
+       }
+       wg.Wait()
+       root.End()
+       tracing.CleanContext()
+}
+
+// doubleEndFlow races End() on one span from two goroutines.
+func doubleEndFlow(i int) {
+       span, err := tracing.CreateLocalSpan("e2e/double-end")
+       if err != nil || span == nil {
+               return
+       }
+       span.Tag("i", fmt.Sprintf("%d", i))
+       var wg sync.WaitGroup
+       for w := 0; w < 2; w++ {
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       span.End()
+               }()
+       }
+       wg.Wait()
+       tracing.CleanContext()
+}
+
+// snapshotChaosFlow continues ONE snapshot from several goroutines at once
+// (the shared snapshot + runtime-context class: gRPC stream send/recv,
+// microv4 connections).
+func snapshotChaosFlow(i int) {
+       root, err := tracing.CreateLocalSpan("e2e/snapshot-root")
+       if err != nil || root == nil {
+               return
+       }
+       snap := tracing.CaptureContext()
+       var wg sync.WaitGroup
+       for w := 0; w < 3; w++ {
+               wg.Add(1)
+               go func(w int) {
+                       defer wg.Done()
+                       defer tracing.CleanContext()
+                       tracing.ContinueContext(snap)
+                       if child, err := 
tracing.CreateLocalSpan(fmt.Sprintf("e2e/snap-child-%d", w)); err == nil && 
child != nil {
+                               child.Tag("w", fmt.Sprintf("%d-%d", w, i))
+                               child.Log("event", "child")
+                               child.End()
+                       }
+               }(w)
+       }
+       wg.Wait()
+       root.End()
+       tracing.CleanContext()
+}
+
+// ---------------------------------------------------------------------------
+// the workload driver
+// ---------------------------------------------------------------------------
+
+// runHostileSpanWorkload runs every hostile flow concurrently against the real
+// pipeline for roughly d, under aggressive GC. It returns the number of
+// segments transformed and payloads marshaled so callers can assert the
+// pipeline actually processed work.
+func runHostileSpanWorkload(d time.Duration) (segments, marshals int64) {
+       restore := installGoroutineLocalGLS()
+       defer restore()
+       ResetTracingContext()
+       rep := newPipelineReporter()
+       Tracing.Reporter = rep
+
+       // aggressive GC maximizes heap/stack scans - the runtime checks that 
turn
+       // any pointer corruption into `fatal error: ... bad pointer ...`
+       oldGC := debug.SetGCPercent(10)
+       defer debug.SetGCPercent(oldGC)
+
+       deadline := time.Now().Add(d)
+       var wg sync.WaitGroup
+       launch := func(n int, fn func(i int)) {
+               for w := 0; w < n; w++ {
+                       wg.Add(1)
+                       go func() {
+                               defer wg.Done()
+                               defer tracing.CleanContext()
+                               for i := 0; time.Now().Before(deadline); i++ {
+                                       fn(i)
+                               }
+                       }()
+               }
+       }
+
+       launch(2, wellBehavedFlow)
+       launch(2, crossedSpanFlow)
+       launch(1, lateWriteFlow)
+       launch(1, asyncFlow)
+       launch(1, correlationFlow)
+       launch(1, doubleEndFlow)
+       launch(1, snapshotChaosFlow)
+
+       wg.Wait()
+       time.Sleep(200 * time.Millisecond) // let collector goroutines flush
+       rep.closeAndWait()
+       ResetTracingContext()
+       return atomic.LoadInt64(&rep.segments), atomic.LoadInt64(&rep.marshals)
+}
diff --git a/plugins/core/span_reuse_test.go b/plugins/core/span_reuse_test.go
new file mode 100644
index 0000000..407ab87
--- /dev/null
+++ b/plugins/core/span_reuse_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+       "testing"
+       "time"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+       "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+// The span reuse rule in CreateEntrySpan/CreateExitSpan returns the existing
+// active span when the span types match, so one span can have several logical
+// owners that each call End once. These tests pin the required semantics: the
+// span is frozen and reported only by the LAST End, so writes from the outer
+// owner that happen after the inner owner's End still land (e.g. gorm tags
+// db.statement after the sql driver plugin already ended the reused span).
+
+func waitReportedSpans(t *testing.T, want int) []reporter.ReportedSpan {
+       t.Helper()
+       deadline := time.Now().Add(2 * time.Second)
+       for {
+               spans := GetReportedSpans()
+               if len(spans) >= want {
+                       return spans
+               }
+               if time.Now().After(deadline) {
+                       t.Fatalf("expected %d reported spans, got %d", want, 
len(spans))
+               }
+               time.Sleep(20 * time.Millisecond)
+       }
+}
+
+func findReportedSpan(spans []reporter.ReportedSpan, name string) 
reporter.ReportedSpan {
+       for _, s := range spans {
+               if s.OperationName() == name {
+                       return s
+               }
+       }
+       return nil
+}
+
+func reportedTagValue(s reporter.ReportedSpan, key string) (string, bool) {
+       for _, tag := range s.Tags() {
+               if tag.Key == key {
+                       return tag.Value, true
+               }
+       }
+       return "", false
+}
+
+// TestExitSpanReuseFreezesOnLastEnd replicates the gorm + sql driver timeline
+// from the gorm-postgres plugin scenario: gorm creates the exit span, the sql
+// driver's CreateExitSpan reuses it and Ends it first, and only afterwards
+// gorm tags db.statement and Ends. The tag must not be dropped and the span
+// must be reported exactly once.
+func TestExitSpanReuseFreezesOnLastEnd(t *testing.T) {
+       ResetTracingContext()
+       defer ResetTracingContext()
+
+       entry, err := tracing.CreateEntrySpan("GET:/execute", func(string) 
(string, error) { return "", nil })
+       if err != nil {
+               t.Fatal(err)
+       }
+       outer, err := tracing.CreateExitSpan("users/create", "db:5432", func(k, 
v string) error { return nil })
+       if err != nil {
+               t.Fatal(err)
+       }
+       inner, err := tracing.CreateExitSpan("PostgreSQL/Exec", "db:5432", 
func(k, v string) error { return nil })
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       inner.End()                                                   // inner 
owner: must NOT freeze the reused span
+       outer.Tag("db.statement", "INSERT INTO users VALUES ($1,$2)") // late 
outer write must land
+       outer.End()                                                   // last 
owner: freezes and reports
+       entry.End()
+
+       spans := waitReportedSpans(t, 2)
+       if len(spans) != 2 {
+               t.Fatalf("reuse must not create or report an extra span, got 
%d", len(spans))
+       }
+       exitSpan := findReportedSpan(spans, "users/create")
+       if exitSpan == nil {
+               t.Fatal("exit span was not reported (or was renamed by the 
reuse)")
+       }
+       if v, ok := reportedTagValue(exitSpan, "db.statement"); !ok || v != 
"INSERT INTO users VALUES ($1,$2)" {
+               t.Fatalf("tag written after the inner owner's End was dropped, 
tags=%v", exitSpan.Tags())
+       }
+       if exitSpan.EndTime() <= 0 {
+               t.Fatal("reported reused span has no end time")
+       }
+}
+
+// TestEntrySpanReuseFreezesOnLastEnd is the entry-side twin (e.g. an http
+// framework plugin reusing the net/http entry span): the inner owner renames
+// and Ends first, the outer owner then tags the status code and Ends.
+func TestEntrySpanReuseFreezesOnLastEnd(t *testing.T) {
+       ResetTracingContext()
+       defer ResetTracingContext()
+
+       outer, err := tracing.CreateEntrySpan("GET:/raw", func(string) (string, 
error) { return "", nil })
+       if err != nil {
+               t.Fatal(err)
+       }
+       inner, err := tracing.CreateEntrySpan("GET:/renamed", func(string) 
(string, error) { return "", nil })
+       if err != nil {
+               t.Fatal(err)
+       }
+
+       inner.End()                     // inner owner: must NOT freeze the 
reused span
+       outer.Tag("status_code", "200") // late outer write must land
+       outer.End()                     // last owner: freezes and reports the 
root segment
+
+       spans := waitReportedSpans(t, 1)
+       if len(spans) != 1 {
+               t.Fatalf("entry reuse must report exactly one span, got %d", 
len(spans))
+       }
+       if spans[0].OperationName() != "GET:/renamed" {
+               t.Fatalf("reuse must keep the inner owner's rename, got %s", 
spans[0].OperationName())
+       }
+       if v, ok := reportedTagValue(spans[0], "status_code"); !ok || v != 
"200" {
+               t.Fatalf("tag written after the inner owner's End was dropped, 
tags=%v", spans[0].Tags())
+       }
+}
diff --git a/plugins/core/span_sync_bench_test.go 
b/plugins/core/span_sync_bench_test.go
new file mode 100644
index 0000000..22db991
--- /dev/null
+++ b/plugins/core/span_sync_bench_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+       "testing"
+
+       "github.com/apache/skywalking-go/plugins/core/reporter"
+
+       logv3 "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+)
+
+// benchNopReporter discards everything so the benchmark measures the span
+// machinery itself rather than a reporter implementation.
+type benchNopReporter struct{}
+
+func (benchNopReporter) Boot(entity *reporter.Entity, cdsWatchers 
[]reporter.AgentConfigChangeWatcher) {
+}
+func (benchNopReporter) SendTracing(spans []reporter.ReportedSpan)    {}
+func (benchNopReporter) SendMetrics(metrics []reporter.ReportedMeter) {}
+func (benchNopReporter) SendLog(log *logv3.LogData)                   {}
+func (benchNopReporter) ConnectionStatus() reporter.ConnectionStatus {
+       return reporter.ConnectionStatusConnected
+}
+func (benchNopReporter) Close()                                              {}
+func (benchNopReporter) AddProfileTaskManager(p reporter.ProfileTaskManager) {}
+
+// BenchmarkSpanLifecycleRealAgent measures the genuine span hot path
+// (CreateLocalSpan -> rename -> tags incl. a same-key rewrite -> log -> error
+// -> End, through the real sampler/segment/GLS/collector machinery) and serves
+// as the performance regression guard for the per-span locking introduced for
+// apache/skywalking#13885.
+func BenchmarkSpanLifecycleRealAgent(b *testing.B) {
+       ResetTracingContext()
+       Tracing.Reporter = benchNopReporter{}
+       b.ReportAllocs()
+       b.ResetTimer()
+       for i := 0; i < b.N; i++ {
+               s, err := Tracing.CreateLocalSpan("bench/op")
+               if err != nil {
+                       b.Fatal(err)
+               }
+               span := s.(TracingSpan)
+               span.SetOperationName("bench/op/renamed")
+               span.Tag("db.type", "sql")
+               span.Tag("db.instance", "benchdb")
+               span.Tag("db.type", "mysql")
+               span.Log("event", "cache-miss")
+               span.Error("error", "boom")
+               span.End()
+       }
+       b.StopTimer()
+       // restore clean state for other tests in the package
+       ResetTracingContext()
+}
diff --git a/plugins/core/span_tracing.go b/plugins/core/span_tracing.go
index 8aaeab6..5ee4bf3 100644
--- a/plugins/core/span_tracing.go
+++ b/plugins/core/span_tracing.go
@@ -19,6 +19,8 @@ package core
 
 import (
        "fmt"
+       "runtime/debug"
+       "sync"
        "sync/atomic"
 
        "github.com/apache/skywalking-go/plugins/core/reporter"
@@ -50,16 +52,20 @@ func NewSegmentSpan(ctx *TracingContext, defaultSpan 
*DefaultSpan, parentSpan Se
 
 // SegmentContext is the context in a segment
 type SegmentContext struct {
-       TraceID            string
-       SegmentID          string
-       SpanID             int32
-       ParentSpanID       int32
-       ParentSegmentID    string
-       collect            chan<- reporter.ReportedSpan
+       TraceID         string
+       SegmentID       string
+       SpanID          int32
+       ParentSpanID    int32
+       ParentSegmentID string
+       collect         chan<- reporter.ReportedSpan
+       // collectorDone is closed when the segment collector goroutine exits. 
Late
+       // senders select on it instead of risking a send on a closed data 
channel
+       // (receiving from a closed channel is always safe, sending never is).
+       collectorDone      chan struct{}
        refNum             *int32
        spanIDGenerator    *int32
        FirstSpan          TracingSpan `json:"-"`
-       CorrelationContext map[string]string
+       CorrelationContext *CorrelationContext
 }
 
 func (c *SegmentContext) GetTraceID() string {
@@ -83,14 +89,11 @@ func (c *SegmentContext) GetParentSegmentID() string {
 }
 
 func (c *SegmentContext) GetCorrelationContextValue(key string) string {
-       return c.CorrelationContext[key]
+       return c.CorrelationContext.Get(key)
 }
 
 func (c *SegmentContext) SetCorrelationContextValue(key, value string) {
-       c.CorrelationContext[key] = value
-       if value == "" {
-               delete(c.CorrelationContext, key)
-       }
+       c.CorrelationContext.Set(key, value)
 }
 
 type SegmentSpan interface {
@@ -108,11 +111,7 @@ type SegmentSpanImpl struct {
 
 // For TracingSpan
 func (s *SegmentSpanImpl) End() {
-       if !s.IsValid() {
-               return
-       }
-       s.DefaultSpan.End(true)
-       if !s.DefaultSpan.InAsyncMode {
+       if s.DefaultSpan.endSyncAndFreeze() {
                s.end0()
        }
 }
@@ -120,19 +119,39 @@ func (s *SegmentSpanImpl) End() {
 func (s *SegmentSpanImpl) AsyncFinish() {
        s.DefaultSpan.AsyncFinish()
        s.DefaultSpan.End(false)
-       s.end0()
+       if s.DefaultSpan.endAndFreeze() {
+               s.end0()
+       }
 }
 
 func (s *SegmentSpanImpl) end0() {
        go func() {
-               s.SegmentContext.collect <- s
+               select {
+               case s.SegmentContext.collect <- s:
+               case <-s.SegmentContext.collectorDone:
+                       // The collector already exited (unreachable once the 
freeze
+                       // functions - endSyncAndFreeze/endAndFreeze - 
guarantee a single
+                       // end0 per span, kept as defense in depth): drop the 
span instead
+                       // of panicking on a closed channel send or blocking 
forever.
+               }
        }()
 }
+
 func (s *SegmentSpanImpl) GetDefaultSpan() *DefaultSpan {
        return &s.DefaultSpan
 }
 
 // For Reported TracingSpan
+//
+// The ReportedSpan accessors below are intentionally NOT locked. Their safety
+// rests on three guarantees that must be preserved together (re-review all of
+// them before changing any one):
+//  1. every mutator holds opLock and drops the write once ended==true;
+//  2. endAndFreeze sets ended=true under opLock, so the span data can no
+//     longer change after it returns;
+//  3. the span reaches the collector goroutine through the end0 channel send,
+//     whose happens-before edge publishes all pre-freeze writes to the reader.
+// Therefore reporter.Transform always observes immutable data here.
 
 func (s *SegmentSpanImpl) Context() reporter.SegmentContext {
        return &s.SegmentContext
@@ -207,13 +226,13 @@ func (s *SegmentSpanImpl) createSegmentContext(ctx 
*TracingContext, parent Segme
                s.SegmentContext = SegmentContext{}
                if len(s.DefaultSpan.Refs) > 0 {
                        s.TraceID = s.DefaultSpan.Refs[0].GetTraceID()
-                       s.CorrelationContext = 
s.DefaultSpan.Refs[0].(*SpanContext).CorrelationContext
+                       s.CorrelationContext = 
newCorrelationContextFrom(s.DefaultSpan.Refs[0].(*SpanContext).CorrelationContext)
                } else {
                        s.TraceID, err = GenerateGlobalID(ctx)
                        if err != nil {
                                return err
                        }
-                       s.CorrelationContext = make(map[string]string)
+                       s.CorrelationContext = newCorrelationContext()
                }
        } else {
                s.SegmentContext = parent.GetSegmentContext()
@@ -226,7 +245,7 @@ func (s *SegmentSpanImpl) createSegmentContext(ctx 
*TracingContext, parent Segme
                s.SegmentContext.FirstSpan = s
        }
        if s.CorrelationContext == nil {
-               s.CorrelationContext = make(map[string]string)
+               s.CorrelationContext = newCorrelationContext()
        }
        return
 }
@@ -243,11 +262,7 @@ type RootSegmentSpan struct {
 }
 
 func (rs *RootSegmentSpan) End() {
-       if !rs.IsValid() {
-               return
-       }
-       rs.DefaultSpan.End(true)
-       if !rs.InAsyncMode {
+       if rs.DefaultSpan.endSyncAndFreeze() {
                rs.end0()
        }
 }
@@ -255,15 +270,19 @@ func (rs *RootSegmentSpan) End() {
 func (rs *RootSegmentSpan) AsyncFinish() {
        rs.DefaultSpan.AsyncFinish()
        rs.DefaultSpan.End(false)
-       rs.end0()
+       if rs.DefaultSpan.endAndFreeze() {
+               rs.end0()
+       }
 }
 
 func (rs *RootSegmentSpan) end0() {
-       defer func() {
-               _ = recover()
-       }()
-       if rs != nil && rs.doneCh != nil && rs.SegmentContext.refNum != nil {
-               rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
+       if rs == nil || rs.doneCh == nil || rs.SegmentContext.refNum == nil {
+               return
+       }
+       select {
+       case rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1):
+       case <-rs.SegmentContext.collectorDone:
+               // see SegmentSpanImpl.end0
        }
 }
 
@@ -363,10 +382,34 @@ func newSegmentRoot(segmentSpan *SegmentSpanImpl) 
*RootSegmentSpan {
        s.notify = ch
        s.segment = make([]reporter.ReportedSpan, 0, 10)
        s.doneCh = make(chan int32)
+       s.collectorDone = make(chan struct{})
        go func() {
                total := -1
-               defer close(ch)
-               defer close(s.doneCh)
+               // Closing collectorDone (instead of the data channels) lets 
late
+               // senders exit safely through their select; the unclosed 
channels are
+               // reclaimed by the GC. It is closed right after the collect 
loop stops
+               // receiving (so a late sender never blocks behind a slow
+               // Reporter.SendTracing) and kept in a defer as well so that 
even a
+               // panic below cannot leave senders blocked forever. Both call 
sites
+               // run on this goroutine, so the plain bool needs no 
synchronization.
+               doneClosed := false
+               closeDone := func() {
+                       if !doneClosed {
+                               doneClosed = true
+                               close(s.collectorDone)
+                       }
+               }
+               defer closeDone()
+               defer func() {
+                       // Defense in depth: a panic here would kill the 
process since this
+                       // goroutine has no other recover.
+                       if err := recover(); err != nil {
+                               defer func() { _ = recover() }() // a panicking 
logger must not re-kill us
+                               if tr := s.tracer(); tr != nil && tr.Log != nil 
{
+                                       tr.Log.Errorf("segment collector panic: 
%v, stack: %s", err, debug.Stack())
+                               }
+                       }
+               }()
                for {
                        select {
                        case span := <-s.notify:
@@ -378,6 +421,9 @@ func newSegmentRoot(segmentSpan *SegmentSpanImpl) 
*RootSegmentSpan {
                                break
                        }
                }
+               // the loop above is the only receiver: unblock late senders 
before the
+               // (possibly slow) reporter call
+               closeDone()
                s.tracer().Reporter.SendTracing(append(s.segment, s))
        }()
        return s
@@ -396,26 +442,24 @@ func newSnapshotSpan(current TracingSpan) TracingSpan {
        }
 
        segCtx := segmentSpan.GetSegmentContext()
-       copiedCorrelation := make(map[string]string)
-       for k, v := range segCtx.CorrelationContext {
-               copiedCorrelation[k] = v
-       }
        s := &SnapshotSpan{
                DefaultSpan: DefaultSpan{
                        OperationName: segmentSpan.GetOperationName(),
                        Refs:          nil,
                        tracer:        segmentSpan.tracer(),
                        Peer:          segmentSpan.GetPeer(),
+                       opLock:        &sync.Mutex{}, // keep the "opLock is 
never nil" invariant
                },
                SegmentContext: SegmentContext{
                        TraceID:            segCtx.GetTraceID(),
                        SegmentID:          segCtx.SegmentID,
                        SpanID:             segCtx.SpanID,
                        collect:            segCtx.collect,
+                       collectorDone:      segCtx.collectorDone,
                        refNum:             segCtx.refNum,
                        spanIDGenerator:    segCtx.spanIDGenerator,
                        FirstSpan:          segCtx.FirstSpan,
-                       CorrelationContext: copiedCorrelation,
+                       CorrelationContext: segCtx.CorrelationContext.Clone(),
                },
        }
 
diff --git a/plugins/core/test_base.go b/plugins/core/test_base.go
index 8cd197c..2aa9004 100644
--- a/plugins/core/test_base.go
+++ b/plugins/core/test_base.go
@@ -46,7 +46,11 @@ func init() {
 func ResetTracingContext() {
        SetGLS(nil)
        Tracing = &Tracer{initFlag: 1, Sampler: NewConstSampler(true), 
Reporter: &StoreReporter{},
-               ServiceEntity: NewEntity("test", "test-instance"), meterMap: 
&sync.Map{}}
+               ServiceEntity: NewEntity("test", "test-instance"), meterMap: 
&sync.Map{},
+               // production Boot always sets the correlation config; the 
tests must
+               // too, otherwise correlation APIs nil-dereference (found by the
+               // hostile-workload e2e). Values mirror the agent defaults.
+               correlation: &CorrelationConfig{MaxKeyCount: 3, MaxValueSize: 
128}}
        // Initialize ProfileManager to avoid nil pointer dereference
        Tracing.ProfileManager = NewProfileManager(nil)
        Tracing.Reporter.AddProfileTaskManager(Tracing.ProfileManager)
@@ -65,10 +69,18 @@ func SetAsNewGoroutine() {
 }
 
 func GetReportedSpans() []reporter.ReportedSpan {
-       return Tracing.Reporter.(*StoreReporter).Spans
+       sr := Tracing.Reporter.(*StoreReporter)
+       sr.mu.Lock()
+       defer sr.mu.Unlock()
+       return append([]reporter.ReportedSpan(nil), sr.Spans...)
 }
 
+// StoreReporter is the in-memory test reporter. SendTracing is invoked from
+// the per-segment collector goroutines while tests read the results, so the
+// storage must be synchronized (this used to be the test-harness data race
+// that kept the full suite from running under -race).
 type StoreReporter struct {
+       mu      sync.Mutex
        Spans   []reporter.ReportedSpan
        Metrics []reporter.ReportedMeter
        Logs    []*logv3.LogData
@@ -82,14 +94,20 @@ func (r *StoreReporter) Boot(entity *reporter.Entity, 
cdsWatchers []reporter.Age
 }
 
 func (r *StoreReporter) SendTracing(spans []reporter.ReportedSpan) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
        r.Spans = append(r.Spans, spans...)
 }
 
 func (r *StoreReporter) SendMetrics(metrics []reporter.ReportedMeter) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
        r.Metrics = append(r.Metrics, metrics...)
 }
 
 func (r *StoreReporter) SendLog(log *logv3.LogData) {
+       r.mu.Lock()
+       defer r.mu.Unlock()
        r.Logs = append(r.Logs, log)
 }
 
diff --git a/plugins/core/tracing.go b/plugins/core/tracing.go
index 3cddf2c..7c96fa9 100644
--- a/plugins/core/tracing.go
+++ b/plugins/core/tracing.go
@@ -57,6 +57,11 @@ func (t *Tracer) CreateEntrySpan(operationName string, 
extractor interface{}, op
        // if parent span is entry span, then use parent span as result
        if tracingSpan != nil && tracingSpan.IsEntry() && 
reflect.ValueOf(tracingSpan).Type() != snapshotType {
                tracingSpan.SetOperationName(operationName)
+               // the caller becomes one more owner of the reused span and 
will call
+               // End on it: count the reuse so only the last End freezes the 
span
+               if segmentSpan, ok := tracingSpan.(SegmentSpan); ok {
+                       segmentSpan.GetDefaultSpan().enterReuse()
+               }
                return tracingSpan, nil
        }
        var ref = &SpanContext{}
@@ -123,6 +128,11 @@ func (t *Tracer) CreateExitSpan(operationName, peer 
string, injector interface{}
 
        // if parent span is exit span, then use parent span as result
        if tracingSpan != nil && tracingSpan.IsExit() && 
reflect.ValueOf(tracingSpan).Type() != snapshotType {
+               // the caller becomes one more owner of the reused span and 
will call
+               // End on it: count the reuse so only the last End freezes the 
span
+               if segmentSpan, ok := tracingSpan.(SegmentSpan); ok {
+                       segmentSpan.GetDefaultSpan().enterReuse()
+               }
                return tracingSpan, nil
        }
        span, noop, err := t.createSpan0(ctx, tracingSpan, opts, 
withSpanType(SpanTypeExit), withOperationName(operationName), withPeer(peer))
@@ -147,7 +157,10 @@ func (t *Tracer) CreateExitSpan(operationName, peer 
string, injector interface{}
        spanContext.ParentServiceInstance = t.ServiceEntity.ServiceInstanceName
        spanContext.ParentEndpoint = firstSpan.GetOperationName()
        spanContext.AddressUsedAtClient = peer
-       spanContext.CorrelationContext = 
reportedSpan.GetSegmentContext().CorrelationContext
+       // Snapshot, not the live map: the propagation header encoding iterates 
this
+       // map while other goroutines of the segment may concurrently set 
correlation
+       // values, which would be a fatal concurrent map iteration and map 
write.
+       spanContext.CorrelationContext = 
reportedSpan.GetSegmentContext().CorrelationContext.Snapshot()
 
        err = spanContext.Encode(injector.(tracing.InjectorWrapper).Fun())
        if err != nil {
@@ -207,7 +220,11 @@ func (t *Tracer) ContinueContext(snapshot interface{}) {
                ctx.activeSpanLock.Lock()
                defer ctx.activeSpanLock.Unlock()
                ctx.activeSpan = snap.activeSpan
-               ctx.Runtime = snap.runtime
+               // Clone on continue as well (capture already clones): the same 
snapshot
+               // may be continued by multiple goroutines (e.g. the send and 
receive
+               // goroutines of one gRPC stream), and sharing one 
RuntimeContext map
+               // between them would be a fatal concurrent map read/write.
+               ctx.Runtime = snap.runtime.clone()
        }
 }
 
@@ -220,11 +237,10 @@ func (t *Tracer) GetCorrelationContextValue(key string) 
string {
        if span == nil {
                return ""
        }
-       switch reportedSpan := span.(type) {
-       case *SegmentSpanImpl:
-               return reportedSpan.Context().GetCorrelationContextValue(key)
-       case *RootSegmentSpan:
-               return reportedSpan.Context().GetCorrelationContextValue(key)
+       switch span.(type) {
+       case *SegmentSpanImpl, *RootSegmentSpan:
+               segCtx := span.(SegmentSpan).GetSegmentContext()
+               return segCtx.GetCorrelationContextValue(key)
        default:
                return ""
        }
@@ -235,30 +251,30 @@ func (t *Tracer) SetCorrelationContextValue(key, value 
string) {
        if span == nil {
                return
        }
-       switch reportedSpan := span.(type) {
-       case *SegmentSpanImpl:
-               if len(value) > t.correlation.MaxValueSize {
-                       return
-               }
-               if len(reportedSpan.GetSegmentContext().CorrelationContext) >= 
t.correlation.MaxKeyCount {
-                       return
-               }
-               reportedSpan.Context().SetCorrelationContextValue(key, value)
-       case *RootSegmentSpan:
+       switch span.(type) {
+       case *SegmentSpanImpl, *RootSegmentSpan:
                if len(value) > t.correlation.MaxValueSize {
                        return
                }
-               if len(reportedSpan.GetSegmentContext().CorrelationContext) >= 
t.correlation.MaxKeyCount {
+               segCtx := span.(SegmentSpan).GetSegmentContext()
+               // Len/Set are two separate lock acquisitions, so concurrent 
writers can
+               // exceed MaxKeyCount by a few entries. The limit is a soft 
bound (same
+               // behavior as the pre-synchronization bare map) - keeping the 
two calls
+               // separate avoids a combined check-and-set API for a non-issue.
+               if segCtx.CorrelationContext.Len() >= t.correlation.MaxKeyCount 
{
                        return
                }
-               reportedSpan.Context().SetCorrelationContextValue(key, value)
+               segCtx.SetCorrelationContextValue(key, value)
        default:
        }
 }
 
 type ContextSnapshot struct {
        activeSpan TracingSpan
-       runtime    *RuntimeContext
+       // runtime is cloned at capture time and treated as IMMUTABLE 
afterwards:
+       // ContinueContext may read it concurrently from several goroutines (it
+       // clones again per continue), so never mutate it through the snapshot.
+       runtime *RuntimeContext
 }
 
 func (s *ContextSnapshot) IsValid() bool {


Reply via email to