This is an automated email from the ASF dual-hosted git repository.
wu-sheng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/skywalking-go.git
The following commit(s) were added to refs/heads/main by this push:
new 1474cf8 fix data race in span lifecycle (#247)
1474cf8 is described below
commit 1474cf897b518d00be1c02d334c3a7a4aedc20b7
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 5 22:46:43 2026 +0800
fix data race in span lifecycle (#247)
---
CHANGES.md | 2 +
plugins/core/context.go | 12 +-
plugins/core/correlation.go | 132 +++++++
plugins/core/metrics.go | 19 +-
plugins/core/profile.go | 76 ++--
plugins/core/propagating.go | 24 +-
plugins/core/reporter/grpc/grpc.go | 13 +-
.../reporter/grpc/send_tracing_recover_test.go | 81 ++++
plugins/core/reporter/kafka/kafka.go | 13 +-
.../reporter/kafka/send_tracing_recover_test.go | 81 ++++
plugins/core/segment_datarace_test.go | 232 ++++++++++-
plugins/core/span_crash_e2e_test.go | 134 +++++++
plugins/core/span_default.go | 234 +++++++++--
plugins/core/span_freeze_test.go | 277 +++++++++++++
plugins/core/span_hostile_workload_test.go | 435 +++++++++++++++++++++
plugins/core/span_reuse_test.go | 141 +++++++
plugins/core/span_sync_bench_test.go | 70 ++++
plugins/core/span_tracing.go | 124 ++++--
plugins/core/test_base.go | 22 +-
plugins/core/tracing.go | 56 ++-
20 files changed, 2024 insertions(+), 154 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 401c55c..0fcda9d 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -22,6 +22,8 @@ Release Notes.
* Fix plugin interceptors bypassed on Windows.
* Fix wrong tracing context switch when trace ignore plugin activated.
* Fix data race when sending trace data to reporter.
+* Fix multiple data races in span lifecycle, correlation context and segment
collection.
+* Add recover protection for the metrics, profile and segment-transform
goroutines.
#### Issues and PR
- All issues are
[here](https://github.com/apache/skywalking/milestone/238?closed=1)
diff --git a/plugins/core/context.go b/plugins/core/context.go
index 4f9cefc..6ae1b7e 100644
--- a/plugins/core/context.go
+++ b/plugins/core/context.go
@@ -89,7 +89,12 @@ func NewTracingContext() *TracingContext {
}
func (r *RuntimeContext) clone() *RuntimeContext {
- newData := make(map[string]interface{})
+ if len(r.data) == 0 {
+ // clone runs on every context capture AND continue; skipping
the empty
+ // map allocation matters because most requests never set
runtime values
+ return &RuntimeContext{}
+ }
+ newData := make(map[string]interface{}, len(r.data))
for k, v := range r.data {
newData[k] = v
}
@@ -104,8 +109,11 @@ func (r *RuntimeContext) Get(key string) interface{} {
func (r *RuntimeContext) Set(key string, value interface{}) {
if value == nil {
- delete(r.data, key)
+ delete(r.data, key) // no-op on a nil map
return
}
+ if r.data == nil { // lazily allocated, see clone()
+ r.data = make(map[string]interface{})
+ }
r.data[key] = value
}
diff --git a/plugins/core/correlation.go b/plugins/core/correlation.go
new file mode 100644
index 0000000..8af6a9f
--- /dev/null
+++ b/plugins/core/correlation.go
@@ -0,0 +1,132 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import "sync"
+
+// CorrelationContext is the synchronized correlation key/value storage shared
+// by every span of a segment. It replaces the bare map[string]string that used
+// to live on SegmentContext: that map is legitimately reachable from multiple
+// goroutines (snapshot-continued child spans share it with the segment root),
+// so an unsynchronized write racing the exit-span header encoding or the
+// snapshot copy would crash the process with the unrecoverable
+// "concurrent map iteration and map write" fatal error.
+//
+// SegmentContext is copied by value between the spans of a segment, therefore
+// the lock cannot be embedded there; the pointer wrapper keeps a single lock
+// per logical correlation store. RWMutex is the right tool here (unlike the
+// span opLock): correlation is read on every propagation encode by potentially
+// concurrent goroutines while writes are rare.
+type CorrelationContext struct {
+ mu sync.RWMutex
+ data map[string]string
+}
+
+// newCorrelationContext returns an empty store. The inner map is allocated
+// lazily on the first Set: most spans never touch correlation, and the empty
+// map would be one extra allocation on every segment.
+func newCorrelationContext() *CorrelationContext {
+ return &CorrelationContext{}
+}
+
+// newCorrelationContextFrom builds a store pre-filled with a copy of m
+// (typically the correlation decoded from the inbound propagation headers).
+func newCorrelationContextFrom(m map[string]string) *CorrelationContext {
+ if len(m) == 0 {
+ // keep the lazy inner-map allocation (see
newCorrelationContext):
+ // most inbound requests carry no correlation values
+ return &CorrelationContext{}
+ }
+ c := &CorrelationContext{data: make(map[string]string, len(m))}
+ for k, v := range m {
+ c.data[k] = v
+ }
+ return c
+}
+
+func (c *CorrelationContext) Get(key string) string {
+ if c == nil {
+ return ""
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return c.data[key]
+}
+
+func (c *CorrelationContext) Set(key, value string) {
+ if c == nil {
+ return
+ }
+ c.mu.Lock()
+ defer c.mu.Unlock()
+ if value == "" {
+ delete(c.data, key) // delete on a nil map is a no-op
+ return
+ }
+ if c.data == nil {
+ c.data = make(map[string]string)
+ }
+ c.data[key] = value
+}
+
+func (c *CorrelationContext) Len() int {
+ if c == nil {
+ return 0
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ return len(c.data)
+}
+
+// Snapshot returns a copy of the correlation data taken under the lock. The
+// propagation header encoding and every other map iteration must go through it
+// instead of ranging over the live map. It returns nil when empty (reading a
+// nil map is safe and this avoids an allocation per exit span).
+func (c *CorrelationContext) Snapshot() map[string]string {
+ if c == nil {
+ return nil
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ if len(c.data) == 0 {
+ return nil
+ }
+ cp := make(map[string]string, len(c.data))
+ for k, v := range c.data {
+ cp[k] = v
+ }
+ return cp
+}
+
+// Clone returns an independent CorrelationContext holding a copy of the data,
+// used when a context snapshot crosses a goroutine boundary.
+func (c *CorrelationContext) Clone() *CorrelationContext {
+ if c == nil {
+ return newCorrelationContext()
+ }
+ c.mu.RLock()
+ defer c.mu.RUnlock()
+ if len(c.data) == 0 {
+ return newCorrelationContext()
+ }
+ cp := &CorrelationContext{data: make(map[string]string, len(c.data))}
+ for k, v := range c.data {
+ cp.data[k] = v
+ }
+ return cp
+}
diff --git a/plugins/core/metrics.go b/plugins/core/metrics.go
index ddac85c..0869889 100644
--- a/plugins/core/metrics.go
+++ b/plugins/core/metrics.go
@@ -19,6 +19,7 @@ package core
import (
"math"
+ "runtime/debug"
"sort"
"strings"
"sync/atomic"
@@ -40,9 +41,21 @@ func (t *Tracer) initMetricsCollect(meterCollectSecond int) {
for {
time.Sleep(collectDuration)
- t.reachNotInitMetrics()
-
- t.sendMetrics()
+ // The recover wraps a single iteration: this goroutine
has no other
+ // protection and the collect path executes
user-registered meter
+ // callbacks, whose panic would otherwise kill the
whole process.
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ if t.Log != nil {
+ t.Log.Errorf("metrics
collect panic: %v, stack: %s", err, debug.Stack())
+ }
+ }
+ }()
+ t.reachNotInitMetrics()
+
+ t.sendMetrics()
+ }()
}
}()
}
diff --git a/plugins/core/profile.go b/plugins/core/profile.go
index c8d8f68..87c76dc 100644
--- a/plugins/core/profile.go
+++ b/plugins/core/profile.go
@@ -18,6 +18,7 @@
package core
import (
+ "runtime/debug"
"runtime/pprof"
"strconv"
"sync"
@@ -67,43 +68,56 @@ func (m *ProfileManager) initReportChannel() {
// Original channel for receiving raw data chunks sent by the Writer
rawCh := make(chan profileRawData, maxSendQueueSize)
m.rawCh = rawCh
- var d []byte
// Start a goroutine to supplement each data chunk with business
information
go func() {
for rawResult := range rawCh {
- d = append(d, rawResult.data...)
- m.mu.Lock()
- // Get business information from currentTask
- if m.currentTask == nil {
- m.Log.Info("no task")
- m.mu.Unlock()
- continue // Task has ended, ignore
- }
- task := m.currentTask
- m.mu.Unlock()
-
- if rawResult.isLast {
- m.FinalReportResults <- reporter.ProfileResult{
- TaskID: task.taskID,
- Payload: rawResult.data,
- IsLast: rawResult.isLast,
+ // The recover wraps a single chunk: this goroutine has
no other
+ // protection and a panic here would kill the whole
process. The
+ // locked sections below use deferred unlocks so a
panic can never
+ // leak a held mutex into the next iteration.
+ func() {
+ defer func() {
+ if err := recover(); err != nil {
+ m.Log.Errorf("profile report
panic: %v, stack: %s", err, debug.Stack())
+ }
+ }()
+ // Get business information from currentTask
+ var task *currentTask
+ func() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ task = m.currentTask
+ }()
+ if task == nil {
+ m.Log.Info("no task")
+ return // Task has ended, ignore
}
- m.mu.Lock()
- if m.TraceProfileTask == nil {
- m.Log.Warn("no TraceProfileTask before
finish profile")
+
+ if rawResult.isLast {
+ m.FinalReportResults <-
reporter.ProfileResult{
+ TaskID: task.taskID,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
+ func() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+ if m.TraceProfileTask == nil {
+ m.Log.Warn("no
TraceProfileTask before finish profile")
+ } else {
+
m.TraceProfileTask.Status = reporter.Finished
+ }
+ m.currentTask = nil
+
m.profileEvents.BaseEventStatus[CurTaskExist] = false
+ }()
} else {
- m.TraceProfileTask.Status =
reporter.Finished
+ m.FinalReportResults <-
reporter.ProfileResult{
+ TaskID: task.taskID,
+ Payload: rawResult.data,
+ IsLast: rawResult.isLast,
+ }
}
- m.currentTask = nil
- m.profileEvents.BaseEventStatus[CurTaskExist] =
false
- m.mu.Unlock()
- } else {
- m.FinalReportResults <- reporter.ProfileResult{
- TaskID: task.taskID,
- Payload: rawResult.data,
- IsLast: rawResult.isLast,
- }
- }
+ }()
}
}()
}
diff --git a/plugins/core/propagating.go b/plugins/core/propagating.go
index c47a57c..5756995 100644
--- a/plugins/core/propagating.go
+++ b/plugins/core/propagating.go
@@ -41,16 +41,20 @@ var (
)
type SpanContext struct {
- TraceID string `json:"trace_id"`
- ParentSegmentID string `json:"parent_segment_id"`
- ParentService string `json:"parent_service"`
- ParentServiceInstance string `json:"parent_service_instance"`
- ParentEndpoint string `json:"parent_endpoint"`
- AddressUsedAtClient string `json:"address_used_at_client"`
- ParentSpanID int32 `json:"parent_span_id"`
- Sample int8 `json:"sample"`
- Valid bool `json:"valid"`
- CorrelationContext map[string]string `json:"correlation_context"`
+ TraceID string `json:"trace_id"`
+ ParentSegmentID string `json:"parent_segment_id"`
+ ParentService string `json:"parent_service"`
+ ParentServiceInstance string `json:"parent_service_instance"`
+ ParentEndpoint string `json:"parent_endpoint"`
+ AddressUsedAtClient string `json:"address_used_at_client"`
+ ParentSpanID int32 `json:"parent_span_id"`
+ Sample int8 `json:"sample"`
+ Valid bool `json:"valid"`
+ // CorrelationContext here is intentionally a plain map (NOT the
+ // synchronized core.CorrelationContext): SpanContext is the
short-lived,
+ // single-goroutine wire-format struct holding a decoded inbound header
or
+ // a Snapshot() copy for outbound encoding.
+ CorrelationContext map[string]string `json:"correlation_context"`
}
func (s *SpanContext) GetTraceID() string {
diff --git a/plugins/core/reporter/grpc/grpc.go
b/plugins/core/reporter/grpc/grpc.go
index be60b38..32dab07 100644
--- a/plugins/core/reporter/grpc/grpc.go
+++ b/plugins/core/reporter/grpc/grpc.go
@@ -116,16 +116,19 @@ func (r *gRPCReporter) ConnectionStatus()
reporter.ConnectionStatus {
}
func (r *gRPCReporter) SendTracing(spans []reporter.ReportedSpan) {
- segmentObject := r.transform.TransformSegmentObject(spans)
- if segmentObject == nil {
- return
- }
+ // The recover must be registered BEFORE the transform call: SendTracing
+ // runs on the segment collector goroutine, so a panic escaping from the
+ // transform (or the channel send below, e.g. on a closed tracingSendCh)
+ // would otherwise kill the whole process.
defer func() {
- // recover the panic caused by close tracingSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter segment err %v", err)
}
}()
+ segmentObject := r.transform.TransformSegmentObject(spans)
+ if segmentObject == nil {
+ return
+ }
select {
case r.tracingSendCh <- segmentObject:
default:
diff --git a/plugins/core/reporter/grpc/send_tracing_recover_test.go
b/plugins/core/reporter/grpc/send_tracing_recover_test.go
new file mode 100644
index 0000000..c8d16ae
--- /dev/null
+++ b/plugins/core/reporter/grpc/send_tracing_recover_test.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package grpc
+
+import (
+ "sync/atomic"
+ "testing"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+
+ commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+ agentv3
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+)
+
+type capturingLogger struct {
+ errors int32
+}
+
+func (l *capturingLogger) WithField(key string, value interface{}) interface{}
{ return l }
+func (l *capturingLogger) Info(args ...interface{})
{}
+func (l *capturingLogger) Infof(format string, args ...interface{})
{}
+func (l *capturingLogger) Warn(args ...interface{})
{}
+func (l *capturingLogger) Warnf(format string, args ...interface{})
{}
+func (l *capturingLogger) Error(args ...interface{})
{ atomic.AddInt32(&l.errors, 1) }
+func (l *capturingLogger) Errorf(format string, args ...interface{})
{ atomic.AddInt32(&l.errors, 1) }
+
+// panicReportedSpan triggers a panic as soon as the transform touches it,
+// simulating a corrupted span reaching SendTracing.
+type panicReportedSpan struct{}
+
+func (panicReportedSpan) Context() reporter.SegmentContext {
panic("corrupted span") }
+func (panicReportedSpan) Refs() []reporter.SpanContext { return nil }
+func (panicReportedSpan) StartTime() int64 { return 0 }
+func (panicReportedSpan) EndTime() int64 { return 0 }
+func (panicReportedSpan) OperationName() string { return "op" }
+func (panicReportedSpan) Peer() string { return "" }
+func (panicReportedSpan) SpanType() agentv3.SpanType { return
agentv3.SpanType_Exit }
+func (panicReportedSpan) SpanLayer() agentv3.SpanLayer { return
agentv3.SpanLayer_Database }
+func (panicReportedSpan) IsError() bool { return false }
+func (panicReportedSpan) Tags() []*commonv3.KeyStringValuePair { return nil }
+func (panicReportedSpan) Logs() []*agentv3.Log { return nil }
+func (panicReportedSpan) ComponentID() int32 { return 0 }
+
+// TestSendTracingRecoversTransformPanic guards the recover placement in
+// SendTracing: it must be registered BEFORE the transform call, because
+// SendTracing runs on the segment collector goroutine which has no other
+// recover - an escaping panic would kill the whole process.
+func TestSendTracingRecoversTransformPanic(t *testing.T) {
+ logger := &capturingLogger{}
+ r := &gRPCReporter{
+ logger: logger,
+ transform:
reporter.NewTransform(&reporter.Entity{ServiceName: "svc", ServiceInstanceName:
"inst"}),
+ tracingSendCh: make(chan *agentv3.SegmentObject, 1),
+ }
+
+ defer func() {
+ if p := recover(); p != nil {
+ t.Fatalf("panic escaped SendTracing (recover registered
too late?): %v", p)
+ }
+ }()
+ r.SendTracing([]reporter.ReportedSpan{panicReportedSpan{}})
+
+ if atomic.LoadInt32(&logger.errors) == 0 {
+ t.Fatal("recovered transform panic was not logged")
+ }
+}
diff --git a/plugins/core/reporter/kafka/kafka.go
b/plugins/core/reporter/kafka/kafka.go
index 0655d29..558f544 100644
--- a/plugins/core/reporter/kafka/kafka.go
+++ b/plugins/core/reporter/kafka/kafka.go
@@ -249,16 +249,19 @@ func (r *kafkaReporter) logSendLoop() {
}
func (r *kafkaReporter) SendTracing(spans []reporter.ReportedSpan) {
- segmentObject := r.transform.TransformSegmentObject(spans)
- if segmentObject == nil {
- return
- }
+ // The recover must be registered BEFORE the transform call: SendTracing
+ // runs on the segment collector goroutine, so a panic escaping from the
+ // transform (or the channel send below, e.g. on a closed tracingSendCh)
+ // would otherwise kill the whole process.
defer func() {
- // recover the panic caused by close tracingSendCh
if err := recover(); err != nil {
r.logger.Errorf("reporter segment err %v", err)
}
}()
+ segmentObject := r.transform.TransformSegmentObject(spans)
+ if segmentObject == nil {
+ return
+ }
select {
case r.tracingSendCh <- segmentObject:
default:
diff --git a/plugins/core/reporter/kafka/send_tracing_recover_test.go
b/plugins/core/reporter/kafka/send_tracing_recover_test.go
new file mode 100644
index 0000000..13818d8
--- /dev/null
+++ b/plugins/core/reporter/kafka/send_tracing_recover_test.go
@@ -0,0 +1,81 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package kafka
+
+import (
+ "sync/atomic"
+ "testing"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+
+ commonv3 "github.com/apache/skywalking-go/protocols/collect/common/v3"
+ agentv3
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+)
+
+type capturingLogger struct {
+ errors int32
+}
+
+func (l *capturingLogger) WithField(key string, value interface{}) interface{}
{ return l }
+func (l *capturingLogger) Info(args ...interface{})
{}
+func (l *capturingLogger) Infof(format string, args ...interface{})
{}
+func (l *capturingLogger) Warn(args ...interface{})
{}
+func (l *capturingLogger) Warnf(format string, args ...interface{})
{}
+func (l *capturingLogger) Error(args ...interface{})
{ atomic.AddInt32(&l.errors, 1) }
+func (l *capturingLogger) Errorf(format string, args ...interface{})
{ atomic.AddInt32(&l.errors, 1) }
+
+// panicReportedSpan triggers a panic as soon as the transform touches it,
+// simulating a corrupted span reaching SendTracing.
+type panicReportedSpan struct{}
+
+func (panicReportedSpan) Context() reporter.SegmentContext {
panic("corrupted span") }
+func (panicReportedSpan) Refs() []reporter.SpanContext { return nil }
+func (panicReportedSpan) StartTime() int64 { return 0 }
+func (panicReportedSpan) EndTime() int64 { return 0 }
+func (panicReportedSpan) OperationName() string { return "op" }
+func (panicReportedSpan) Peer() string { return "" }
+func (panicReportedSpan) SpanType() agentv3.SpanType { return
agentv3.SpanType_Exit }
+func (panicReportedSpan) SpanLayer() agentv3.SpanLayer { return
agentv3.SpanLayer_Database }
+func (panicReportedSpan) IsError() bool { return false }
+func (panicReportedSpan) Tags() []*commonv3.KeyStringValuePair { return nil }
+func (panicReportedSpan) Logs() []*agentv3.Log { return nil }
+func (panicReportedSpan) ComponentID() int32 { return 0 }
+
+// TestSendTracingRecoversTransformPanic mirrors the gRPC reporter test: the
+// recover in SendTracing must be registered BEFORE the transform call, because
+// SendTracing runs on the segment collector goroutine which has no other
+// recover - an escaping panic would kill the whole process.
+func TestSendTracingRecoversTransformPanic(t *testing.T) {
+ logger := &capturingLogger{}
+ r := &kafkaReporter{
+ logger: logger,
+ transform:
reporter.NewTransform(&reporter.Entity{ServiceName: "svc", ServiceInstanceName:
"inst"}),
+ tracingSendCh: make(chan *agentv3.SegmentObject, 1),
+ }
+
+ defer func() {
+ if p := recover(); p != nil {
+ t.Fatalf("panic escaped SendTracing (recover registered
too late?): %v", p)
+ }
+ }()
+ r.SendTracing([]reporter.ReportedSpan{panicReportedSpan{}})
+
+ if atomic.LoadInt32(&logger.errors) == 0 {
+ t.Fatal("recovered transform panic was not logged")
+ }
+}
diff --git a/plugins/core/segment_datarace_test.go
b/plugins/core/segment_datarace_test.go
index e407101..46c4ad3 100644
--- a/plugins/core/segment_datarace_test.go
+++ b/plugins/core/segment_datarace_test.go
@@ -45,12 +45,14 @@ func buildReportedSpan() *SegmentSpanImpl {
OperationName: "users/SELECT",
Peer: "127.0.0.1:5432",
SpanType: SpanTypeExit,
+ opLock: &sync.Mutex{},
},
SegmentContext: SegmentContext{
- TraceID: "trace-id",
- SegmentID: "segment-id",
- SpanID: 0,
- ParentSpanID: -1,
+ TraceID: "trace-id",
+ SegmentID: "segment-id",
+ SpanID: 0,
+ ParentSpanID: -1,
+ CorrelationContext: newCorrelationContext(),
},
}
}
@@ -189,3 +191,225 @@ func TestRaceReporterNeverPanicsWhileSpanMutated(t
*testing.T) {
t.Fatalf("reporter panicked %d time(s) - the #13885 crash
regressed", atomic.LoadInt32(&panicked))
}
}
+
+// wireCollector attaches a working collector harness (collect channel +
+// collectorDone) to the span so End()/AsyncFinish() can run their real end0
+// path inside tests. It returns the channel the span will be delivered on.
+func wireCollector(t *testing.T, span *SegmentSpanImpl) chan
reporter.ReportedSpan {
+ ch := make(chan reporter.ReportedSpan, 8)
+ done := make(chan struct{})
+ span.SegmentContext.collect = ch
+ span.SegmentContext.collectorDone = done
+ span.DefaultSpan.EndTime = time.Time{} // not ended yet
+ span.DefaultSpan.ended = false
+ span.DefaultSpan.tracer = Tracing // so11y bookkeeping in End needs a
tracer
+ t.Cleanup(func() { close(done) }) // release any straggling end0
goroutine
+ return ch
+}
+
+// TestRaceConcurrentMutators reproduces the "crossed span" misuse (e.g. the
gorm
+// plugin handing the same live span to two goroutines through a shared
+// *gorm.DB): several goroutines mutate ONE span concurrently, repeatedly
+// rewriting the same tag key in place - the exact write that used to tear the
+// string header read by the reporter. With the per-span lock this must be
+// race-detector clean; before the fix this test reports races immediately.
+func TestRaceConcurrentMutators(t *testing.T) {
+ span := buildReportedSpan()
+ const workers = 4
+ var stop int32
+ var wg sync.WaitGroup
+ wg.Add(workers)
+ for w := 0; w < workers; w++ {
+ go func(w int) {
+ defer wg.Done()
+ for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+ span.Tag("db.statement", fmt.Sprintf("select %d
from t%d", i, w)) // same-key in-place rewrite
+ span.Tag(fmt.Sprintf("k-%d-%d", w, i%8), "v")
+ span.Log("event", fmt.Sprintf("w%d-%d", w, i))
+ span.SetOperationName(fmt.Sprintf("op-%d-%d",
w, i))
+ span.SetPeer("10.0.0.1:3306")
+ span.Error("boom")
+ }
+ }(w)
+ }
+ time.Sleep(200 * time.Millisecond)
+ atomic.StoreInt32(&stop, 1)
+ wg.Wait()
+}
+
+// TestRaceMutateAfterFreezeWhileReporting verifies the lock-free reporting
+// guarantee: once endAndFreeze returns, the reporter may transform and marshal
+// the span without locks even though other goroutines are still calling
+// mutators (their writes are dropped under the lock without touching data).
+func TestRaceMutateAfterFreezeWhileReporting(t *testing.T) {
+ span := buildReportedSpan()
+ for i := 0; i < 16; i++ {
+ span.Tag(fmt.Sprintf("key-%d", i), "init")
+ span.Log("event", fmt.Sprintf("log-%d", i))
+ }
+
+ var stop int32
+ var wg sync.WaitGroup
+ wg.Add(3)
+ for w := 0; w < 3; w++ {
+ go func(w int) {
+ defer wg.Done()
+ for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+ span.Tag(fmt.Sprintf("key-%d", i%16),
fmt.Sprintf("late-%d-%d", w, i))
+ span.Log("late", "v")
+ span.SetOperationName("late-op")
+ span.Error("late")
+ }
+ }(w)
+ }
+
+ time.Sleep(50 * time.Millisecond) // let some pre-freeze writes land
(race builds are slow)
+ if !span.endAndFreeze() {
+ t.Fatal("first endAndFreeze must return true")
+ }
+
+ transform := reporter.NewTransform(&reporter.Entity{
+ ServiceName: "svc",
+ ServiceInstanceName: "inst",
+ })
+ // 50 iterations: the race detector is a binary signal, more iterations
add
+ // wall time (~7s -> ~2s under -race) without adding coverage
+ for i := 0; i < 50; i++ {
+ seg :=
transform.TransformSegmentObject([]reporter.ReportedSpan{span})
+ if seg == nil {
+ t.Fatal("nil segment")
+ }
+ if _, err := proto.Marshal(seg); err != nil {
+ t.Fatalf("marshal segment: %v", err)
+ }
+ }
+
+ atomic.StoreInt32(&stop, 1)
+ wg.Wait()
+}
+
+// TestRaceDoubleEnd races two End() calls on the same span and asserts the
+// segment collector receives the span exactly once (the old IsValid
check-then-
+// act allowed duplicated end0 sends, corrupting the segment accounting).
+func TestRaceDoubleEnd(t *testing.T) {
+ ResetTracingContext()
+ defer ResetTracingContext()
+ span := buildReportedSpan()
+ ch := wireCollector(t, span)
+
+ var wg sync.WaitGroup
+ wg.Add(2)
+ for i := 0; i < 2; i++ {
+ go func() {
+ defer wg.Done()
+ span.End()
+ }()
+ }
+ wg.Wait()
+
+ received := 0
+ select {
+ case <-ch:
+ received++
+ case <-time.After(2 * time.Second):
+ }
+ // grace period for a (buggy) duplicated delivery
+ select {
+ case <-ch:
+ received++
+ case <-time.After(200 * time.Millisecond):
+ }
+ if received != 1 {
+ t.Fatalf("expected the span to be collected exactly once, got
%d", received)
+ }
+}
+
+// TestRaceAsyncFinishVsMutators covers the async span pattern (gRPC streaming,
+// toolkit async API): one goroutine keeps tagging while another finishes the
+// span asynchronously. Both AsyncFinish/End themselves and the mutators must
be
+// fully synchronized; before the fix AsyncFinish/End were unlocked even in
+// async mode.
+func TestRaceAsyncFinishVsMutators(t *testing.T) {
+ ResetTracingContext()
+ defer ResetTracingContext()
+ span := buildReportedSpan()
+ ch := wireCollector(t, span)
+ span.PrepareAsync()
+
+ var stop int32
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+ span.Tag("async-key", fmt.Sprintf("v-%d", i))
+ span.Log("async", "v")
+ }
+ }()
+
+ span.End() // async mode: does not finish the span
+
+ finished := make(chan struct{})
+ go func() {
+ defer close(finished)
+ span.AsyncFinish()
+ }()
+ <-finished
+
+ atomic.StoreInt32(&stop, 1)
+ wg.Wait()
+
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatal("async finished span was never collected")
+ }
+}
+
+// TestRaceHostileWorkload runs the full end-to-end hostile workload (see
+// span_hostile_workload_test.go) in-process under the race detector: every
+// misuse pattern from the #13885 audit against the real pipeline must be free
+// of data races. The no-panic/no-throw property of the same workload is
+// asserted by TestE2ESpanCrashSafety in a child process.
+func TestRaceHostileWorkload(t *testing.T) {
+ segments, marshals := runHostileSpanWorkload(1500 * time.Millisecond)
+ if segments == 0 || marshals == 0 {
+ t.Fatalf("hostile workload processed no data (segments=%d
marshals=%d)", segments, marshals)
+ }
+}
+
+// TestRaceCorrelationSetVsSnapshot covers the correlation storage: concurrent
+// writers and snapshot/encode readers used to race on a bare map, which is an
+// unrecoverable "concurrent map iteration and map write" fatal error in
+// production.
+func TestRaceCorrelationSetVsSnapshot(t *testing.T) {
+ c := newCorrelationContext()
+ var stop int32
+ var wg sync.WaitGroup
+ wg.Add(4)
+ for w := 0; w < 2; w++ {
+ go func(w int) {
+ defer wg.Done()
+ for i := 0; atomic.LoadInt32(&stop) == 0; i++ {
+ c.Set(fmt.Sprintf("k%d", i%4),
fmt.Sprintf("v-%d-%d", w, i))
+ if i%8 == 0 {
+ c.Set(fmt.Sprintf("k%d", i%4), "") //
delete path
+ }
+ }
+ }(w)
+ }
+ for r := 0; r < 2; r++ {
+ go func() {
+ defer wg.Done()
+ for atomic.LoadInt32(&stop) == 0 {
+ _ = c.Snapshot()
+ _ = c.Get("k1")
+ _ = c.Len()
+ _ = c.Clone()
+ }
+ }()
+ }
+ time.Sleep(200 * time.Millisecond)
+ atomic.StoreInt32(&stop, 1)
+ wg.Wait()
+}
diff --git a/plugins/core/span_crash_e2e_test.go
b/plugins/core/span_crash_e2e_test.go
new file mode 100644
index 0000000..885aec2
--- /dev/null
+++ b/plugins/core/span_crash_e2e_test.go
@@ -0,0 +1,134 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "fmt"
+ "os"
+ "os/exec"
+ "strconv"
+ "strings"
+ "testing"
+ "time"
+)
+
+const (
+ e2eChildEnv = "SW_SPAN_E2E_CHILD"
+ e2eDurationEnv = "SW_SPAN_E2E_DURATION"
+ e2eOKMarker = "E2E_OK"
+)
+
+// TestE2ESpanCrashSafety is the end-to-end guarantee for
apache/skywalking#13885:
+// under every concurrency-abuse pattern found in the audit, the agent must
+// produce NO panic and NO runtime fatal error (`runtime.throw`, e.g. "invalid
+// pointer found on stack" / "concurrent map writes").
+//
+// A runtime.throw is unrecoverable and kills the process, so this cannot be
+// asserted in-process: the hostile workload runs in a CHILD test process and
+// the parent asserts a clean exit. Any panic, data-race fatal, GC/stack-scan
+// "bad pointer" throw or deadlock (child test timeout) fails this test.
+//
+// Tune the stress duration with SW_SPAN_E2E_DURATION (default 3s).
+func TestE2ESpanCrashSafety(t *testing.T) {
+ if os.Getenv(e2eChildEnv) == "1" {
+ runE2EChild()
+ return
+ }
+ if testing.Short() {
+ t.Skip("skipping subprocess e2e in -short mode")
+ }
+
+ cmd := exec.Command(os.Args[0],
+ "-test.run", "^TestE2ESpanCrashSafety$", "-test.v",
"-test.timeout", "120s")
+ // Runtime hardening for the child: clobberfree (Go 1.13+, harmlessly
+ // ignored by runtimes without it) makes the GC overwrite freed objects
+ // with junk, so any use-after-free (the silent precursor of the
production
+ // "invalid pointer found on stack" throw) crashes immediately and
+ // deterministically instead of depending on scheduling luck.
+ // invalidptr is on by default and kept explicit for documentation.
+ godebug := "clobberfree=1,invalidptr=1"
+ env := make([]string, 0, len(os.Environ())+2)
+ for _, kv := range os.Environ() {
+ if strings.HasPrefix(kv, "GODEBUG=") {
+ // keep our hardening LAST: runtime parsegodebug
assigns each
+ // key=value as encountered front-to-back, so the last
occurrence
+ // of a duplicated key wins (verified against
runtime1.go)
+ godebug = strings.TrimPrefix(kv, "GODEBUG=") + "," +
godebug
+ continue
+ }
+ env = append(env, kv)
+ }
+ env = append(env, e2eChildEnv+"=1", "GODEBUG="+godebug)
+ cmd.Env = env
+ out, err := cmd.CombinedOutput()
+ output := string(out)
+
+ if err != nil {
+ // non-zero exit = panic, runtime.throw or test timeout in the
child
+ t.Fatalf("hostile-workload child process did not exit cleanly
(panic or runtime fatal): %v\n--- child output ---\n%s", err, output)
+ }
+ for _, fatal := range []string{"fatal error:", "panic:", "DATA RACE"} {
+ if strings.Contains(output, fatal) {
+ t.Fatalf("child output contains %q:\n--- child output
---\n%s", fatal, output)
+ }
+ }
+ if !strings.Contains(output, e2eOKMarker) {
+ t.Fatalf("child never reached the completion marker:\n--- child
output ---\n%s", output)
+ }
+
+ // sanity: the pipeline must have actually transformed and marshaled
work,
+ // otherwise the e2e silently tested nothing
+ segments := parseE2ECounter(t, output, "segments")
+ marshals := parseE2ECounter(t, output, "marshals")
+ if segments == 0 || marshals == 0 {
+ t.Fatalf("e2e processed no data (segments=%d
marshals=%d):\n%s", segments, marshals, output)
+ }
+ t.Logf("e2e clean: segments=%d marshals=%d", segments, marshals)
+}
+
+func runE2EChild() {
+ d := 3 * time.Second
+ if v := os.Getenv(e2eDurationEnv); v != "" {
+ if parsed, err := time.ParseDuration(v); err == nil && parsed >
0 {
+ d = parsed
+ }
+ }
+ segments, marshals := runHostileSpanWorkload(d)
+ fmt.Printf("%s segments=%d marshals=%d\n", e2eOKMarker, segments,
marshals)
+}
+
+func parseE2ECounter(t *testing.T, output, name string) int64 {
+ t.Helper()
+ idx := strings.Index(output, name+"=")
+ if idx < 0 {
+ t.Fatalf("counter %q missing from child output", name)
+ }
+ rest := output[idx+len(name)+1:]
+ end := 0
+ for end < len(rest) && rest[end] >= '0' && rest[end] <= '9' {
+ end++
+ }
+ if end == 0 {
+ t.Fatalf("counter %q has no digits after '='", name)
+ }
+ v, err := strconv.ParseInt(rest[:end], 10, 64)
+ if err != nil {
+ t.Fatalf("counter %q unparsable: %v", name, err)
+ }
+ return v
+}
diff --git a/plugins/core/span_default.go b/plugins/core/span_default.go
index 4c6a5aa..a7c1525 100644
--- a/plugins/core/span_default.go
+++ b/plugins/core/span_default.go
@@ -45,7 +45,30 @@ type DefaultSpan struct {
InAsyncMode bool
AsyncModeFinished bool
- AsyncOpLocker *sync.Mutex
+
+ // opLock guards the mutable fields above (OperationName, Peer, Layer,
+ // ComponentID, Tags, Logs, IsError, EndTime, the async flags) together
with
+ // the ended flag. SpanType, Parent, Refs, StartTime and tracer are
+ // write-once during construction - before the span is ever shared -
and are
+ // therefore read without the lock
(IsEntry/IsExit/ParentSpan/StartTime).
+ // It must stay a pointer: DefaultSpan is copied by value when it is
embedded
+ // into SegmentSpanImpl/SnapshotSpan, and an embedded sync.Mutex value
would
+ // trip the go vet copylocks check.
+ opLock *sync.Mutex
+ // ended is set by endAndFreeze right before the span is handed over to
the
+ // segment collector. Once set, every late mutation is dropped, so the
span
+ // data is frozen and the reporting path may read it without locking
+ // (see the comment on the ReportedSpan accessors in span_tracing.go).
+ ended bool
+ // droppedLogged rate-limits logDroppedWrite to one warning per span: a
+ // span leaked across goroutines would otherwise flood the log with one
+ // line per dropped write.
+ droppedLogged bool
+ // reuseCount counts the extra logical owners that the span reuse rule
in
+ // CreateEntrySpan/CreateExitSpan hands this span to. Every owner calls
End
+ // exactly once and only the LAST End freezes and reports the span (see
+ // enterReuse and endSyncAndFreeze). Guarded by opLock.
+ reuseCount int
}
func NewDefaultSpan(tracer *Tracer, parent TracingSpan) *DefaultSpan {
@@ -54,62 +77,81 @@ func NewDefaultSpan(tracer *Tracer, parent TracingSpan)
*DefaultSpan {
StartTime: time.Now(),
SpanType: SpanTypeLocal,
Parent: parent,
+ opLock: &sync.Mutex{},
}
}
// For TracingSpan
func (ds *DefaultSpan) SetOperationName(name string) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("operation name", name)
+ return
}
ds.OperationName = name
}
func (ds *DefaultSpan) GetOperationName() string {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
return ds.OperationName
}
func (ds *DefaultSpan) SetPeer(peer string) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("peer", peer)
+ return
}
ds.Peer = peer
}
func (ds *DefaultSpan) GetPeer() string {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
return ds.Peer
}
func (ds *DefaultSpan) SetSpanLayer(layer int32) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("span layer", "")
+ return
}
ds.Layer = agentv3.SpanLayer(layer)
}
func (ds *DefaultSpan) GetSpanLayer() agentv3.SpanLayer {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
return ds.Layer
}
func (ds *DefaultSpan) SetComponent(componentID int32) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("component", "")
+ return
}
ds.ComponentID = componentID
}
func (ds *DefaultSpan) GetComponent() int32 {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
return ds.ComponentID
}
func (ds *DefaultSpan) Tag(key, value string) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("tag", key)
+ return
}
for _, tag := range ds.Tags {
if tag.Key == key {
@@ -120,11 +162,9 @@ func (ds *DefaultSpan) Tag(key, value string) {
ds.Tags = append(ds.Tags, &commonv3.KeyStringValuePair{Key: key, Value:
value})
}
-func (ds *DefaultSpan) Log(ll ...string) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
- }
+// log0 is the lock-free internal implementation of Log: Error reuses it while
+// already holding opLock, avoiding a re-entrant deadlock.
+func (ds *DefaultSpan) log0(ll ...string) {
data := make([]*commonv3.KeyStringValuePair, 0,
int32(math.Ceil(float64(len(ll))/2.0)))
var kvp *commonv3.KeyStringValuePair
for i, l := range ll {
@@ -139,26 +179,62 @@ func (ds *DefaultSpan) Log(ll ...string) {
ds.Logs = append(ds.Logs, &agentv3.Log{Time: Millisecond(time.Now()),
Data: data})
}
+func (ds *DefaultSpan) Log(ll ...string) {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("log", "")
+ return
+ }
+ ds.log0(ll...)
+}
+
func (ds *DefaultSpan) Error(ll ...string) {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("error", "")
+ return
}
ds.IsError = true
- ds.Log(ll...)
+ ds.log0(ll...)
}
func (ds *DefaultSpan) ErrorOccured() {
- if ds.InAsyncMode {
- ds.AsyncOpLocker.Lock()
- defer ds.AsyncOpLocker.Unlock()
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ ds.logDroppedWrite("error flag", "")
+ return
}
ds.IsError = true
}
-func (ds *DefaultSpan) End(changeParent bool) {
- ds.EndTime = time.Now()
+// logDroppedWrite reports a mutation that arrived after the span was frozen.
+// The caller must hold opLock (reading OperationName/droppedLogged is
therefore
+// safe). The span name (e.g. "GET:/api/xxx", "MySQL/query") lets users locate
+// which plugin or code path is still writing a finished span, which usually
+// means the span leaked across goroutines. Only the FIRST drop per span is
+// logged so a leaked hot span cannot flood the log.
+func (ds *DefaultSpan) logDroppedWrite(op, detail string) {
+ if ds.droppedLogged {
+ return
+ }
+ ds.droppedLogged = true
+ if ds.tracer != nil && ds.tracer.Log != nil {
+ ds.tracer.Log.Warnf(
+ "span %q already ended, dropping %s %q (and any further
late writes; span shared across goroutines?)",
+ ds.OperationName, op, detail)
+ }
+}
+func (ds *DefaultSpan) End(changeParent bool) {
+ ds.opLock.Lock()
+ if !ds.ended {
+ ds.EndTime = time.Now()
+ }
+ ds.opLock.Unlock()
+ // The remaining work is goroutine-local bookkeeping, not shared span
data.
GetSo11y(ds.tracer).MeasureTracingContextCompletion(false)
if changeParent {
if ctx := getTracingContext(); ctx != nil {
@@ -167,6 +243,78 @@ func (ds *DefaultSpan) End(changeParent bool) {
}
}
+// endAndFreeze marks the span as ended under the lock. It returns true only on
+// the first call, so the caller can ensure end0() runs exactly once even when
+// End is raced from multiple goroutines (no duplicated segment reporting).
+// After it returns, every late mutator observes ended==true and drops its
+// write, so the span data is frozen: the reporting path reads it without locks
+// relying on the channel handoff in end0 for the happens-before edge.
+func (ds *DefaultSpan) endAndFreeze() bool {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ return false
+ }
+ ds.ended = true
+ return true
+}
+
+// enterReuse registers one more owner of this span. It is called from the span
+// reuse branches of CreateEntrySpan/CreateExitSpan when the active span is
+// handed to a nested plugin; that owner's End then only decrements the counter
+// (see endSyncAndFreeze) instead of freezing the span.
+func (ds *DefaultSpan) enterReuse() {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if !ds.ended {
+ ds.reuseCount++
+ }
+}
+
+// endSyncAndFreeze performs the whole synchronous End() path in ONE critical
+// section - the "already ended" fast check, the EndTime write and the freeze
+// decision used to take three separate lock round-trips per span end. It
+// returns true when the caller must hand the span to the collector (exactly
+// once, and never for async spans: AsyncFinish owns the freeze there, and
+// reading InAsyncMode under the same lock keeps even a misused concurrent
+// PrepareAsync/End pair free of data races).
+func (ds *DefaultSpan) endSyncAndFreeze() bool {
+ ds.opLock.Lock()
+ if ds.reuseCount > 0 {
+ // a nested owner of a reused span (see enterReuse) finished:
this is
+ // not the last End, so keep the span open - the outer owner
still
+ // writes to it (e.g. gorm tags db.statement after the sql
driver's
+ // End) and will freeze it with its own End. Restoring the
active span
+ // is kept here because the nested plugin expects its End to
pop the
+ // span, exactly like before the freeze mechanism.
+ ds.reuseCount--
+ ds.opLock.Unlock()
+ if ctx := getTracingContext(); ctx != nil {
+ ctx.SaveActiveSpan(ds.Parent)
+ }
+ return false
+ }
+ if !ds.EndTime.IsZero() { // already ended
+ ds.opLock.Unlock()
+ return false
+ }
+ ds.EndTime = time.Now()
+ frozen := false
+ // EndTime and ended are distinct on purpose: in async mode End() sets
+ // EndTime but leaves ended=false - AsyncFinish owns the freeze there.
+ if !ds.InAsyncMode && !ds.ended {
+ ds.ended = true
+ frozen = true
+ }
+ ds.opLock.Unlock()
+ // goroutine-local bookkeeping stays outside the lock
+ GetSo11y(ds.tracer).MeasureTracingContextCompletion(false)
+ if ctx := getTracingContext(); ctx != nil {
+ ctx.SaveActiveSpan(ds.Parent)
+ }
+ return frozen
+}
+
func (ds *DefaultSpan) IsEntry() bool {
return ds.SpanType == SpanTypeEntry
}
@@ -176,6 +324,10 @@ func (ds *DefaultSpan) IsExit() bool {
}
func (ds *DefaultSpan) IsValid() bool {
+ // EndTime may be written by another goroutine in async mode, take the
lock
+ // to avoid a torn read of the time.Time value.
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
return ds.EndTime.IsZero()
}
@@ -184,15 +336,31 @@ func (ds *DefaultSpan) ParentSpan() TracingSpan {
}
func (ds *DefaultSpan) PrepareAsync() {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ // the span is already frozen and reported; entering async mode
now is
+ // a misuse and would only confuse the lifecycle, so drop it
+ ds.logDroppedWrite("prepare async", "")
+ return
+ }
if ds.InAsyncMode {
panic("already in async mode")
}
ds.InAsyncMode = true
ds.AsyncModeFinished = false
- ds.AsyncOpLocker = &sync.Mutex{}
}
func (ds *DefaultSpan) AsyncFinish() {
+ ds.opLock.Lock()
+ defer ds.opLock.Unlock()
+ if ds.ended {
+ // already frozen and reported (the matching PrepareAsync was
dropped,
+ // or another finisher won the race): drop, mirroring the
mutator
+ // policy, instead of panicking on an already-completed span
+ ds.logDroppedWrite("async finish", "")
+ return
+ }
if !ds.InAsyncMode {
panic("not in async mode")
}
@@ -202,9 +370,11 @@ func (ds *DefaultSpan) AsyncFinish() {
ds.AsyncModeFinished = true
}
+// GetEndPointName must not be called while holding opLock (it locks through
+// GetOperationName, mirroring how Error must use log0 instead of Log).
func (ds *DefaultSpan) GetEndPointName() string {
if ds.SpanType == SpanTypeEntry {
- return ds.OperationName
+ return ds.GetOperationName()
}
return ""
}
diff --git a/plugins/core/span_freeze_test.go b/plugins/core/span_freeze_test.go
new file mode 100644
index 0000000..95d3b6e
--- /dev/null
+++ b/plugins/core/span_freeze_test.go
@@ -0,0 +1,277 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+)
+
+// newTestSegmentSpan builds a SegmentSpanImpl the same way the agent does,
+// usable from non-race builds (buildReportedSpan lives behind the race tag).
+func newTestSegmentSpan() *SegmentSpanImpl {
+ return &SegmentSpanImpl{
+ DefaultSpan: DefaultSpan{
+ StartTime: time.Now(),
+ OperationName: "users/SELECT",
+ Peer: "127.0.0.1:5432",
+ SpanType: SpanTypeExit,
+ opLock: &sync.Mutex{},
+ },
+ SegmentContext: SegmentContext{
+ TraceID: "trace-id",
+ SegmentID: "segment-id",
+ SpanID: 0,
+ ParentSpanID: -1,
+ CorrelationContext: newCorrelationContext(),
+ },
+ }
+}
+
+func TestLateWritesDroppedAfterFreeze(t *testing.T) {
+ span := newTestSegmentSpan()
+ span.Tag("k1", "v1")
+ span.Log("event", "before")
+ span.SetOperationName("op-before")
+
+ if !span.endAndFreeze() {
+ t.Fatal("first endAndFreeze must return true")
+ }
+ if span.endAndFreeze() {
+ t.Fatal("second endAndFreeze must return false")
+ }
+
+ // every late mutation must be silently dropped without panicking
+ span.Tag("k1", "late")
+ span.Tag("k2", "late")
+ span.Log("event", "late")
+ span.SetOperationName("op-late")
+ span.SetPeer("late:1")
+ span.SetSpanLayer(int32(3))
+ span.SetComponent(99)
+ span.Error("late")
+ span.ErrorOccured()
+
+ if got := len(span.Tags()); got != 1 {
+ t.Fatalf("late tag was not dropped, tags=%d", got)
+ }
+ if span.Tags()[0].Value != "v1" {
+ t.Fatalf("in-place tag rewrite after freeze was not dropped:
%s", span.Tags()[0].Value)
+ }
+ if got := len(span.Logs()); got != 1 {
+ t.Fatalf("late log was not dropped, logs=%d", got)
+ }
+ if span.OperationName() != "op-before" {
+ t.Fatalf("late operation name was not dropped: %s",
span.OperationName())
+ }
+ if span.IsError() {
+ t.Fatal("late error flag was not dropped")
+ }
+}
+
+func TestEnd0AfterCollectorExitIsSafe(t *testing.T) {
+ ResetTracingContext()
+ span := newTestSegmentSpan()
+ span.DefaultSpan.tracer = Tracing
+ // collector already exited: data channel has no receiver and stays
open,
+ // only the done channel is closed
+ span.SegmentContext.collect = make(chan reporter.ReportedSpan)
+ span.SegmentContext.collectorDone = make(chan struct{})
+ close(span.SegmentContext.collectorDone)
+
+ span.End() // end0 must neither panic nor block forever
+
+ // the send goroutine selects the done branch; give it a moment and make
+ // sure the test itself completes (a blocked send would hang the test)
+ time.Sleep(50 * time.Millisecond)
+}
+
+func TestDoubleEndCollectsOnce(t *testing.T) {
+ ResetTracingContext()
+ span := newTestSegmentSpan()
+ span.DefaultSpan.tracer = Tracing
+ ch := make(chan reporter.ReportedSpan, 8)
+ span.SegmentContext.collect = ch
+ span.SegmentContext.collectorDone = make(chan struct{})
+
+ span.End()
+ span.End() // second End must be a no-op
+
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatal("ended span was never collected")
+ }
+ select {
+ case <-ch:
+ t.Fatal("span was collected twice")
+ case <-time.After(100 * time.Millisecond):
+ }
+}
+
+func TestAsyncFlowCollectsOnce(t *testing.T) {
+ ResetTracingContext()
+ span := newTestSegmentSpan()
+ span.DefaultSpan.tracer = Tracing
+ ch := make(chan reporter.ReportedSpan, 8)
+ span.SegmentContext.collect = ch
+ span.SegmentContext.collectorDone = make(chan struct{})
+
+ span.PrepareAsync()
+ span.End() // async mode: must not collect yet
+ select {
+ case <-ch:
+ t.Fatal("async span collected before AsyncFinish")
+ case <-time.After(200 * time.Millisecond): // generous window so a slow
end0 goroutine could not hide a premature delivery
+ }
+
+ span.Tag("after-end", "v") // still allowed between End and AsyncFinish
+ span.AsyncFinish()
+
+ select {
+ case <-ch:
+ case <-time.After(time.Second):
+ t.Fatal("async span was never collected")
+ }
+ found := false
+ for _, tag := range span.Tags() {
+ if tag.Key == "after-end" {
+ found = true
+ }
+ }
+ if !found {
+ t.Fatal("tag written between End and AsyncFinish was lost")
+ }
+}
+
+func TestCorrelationContextBasics(t *testing.T) {
+ c := newCorrelationContext()
+ c.Set("a", "1")
+ c.Set("b", "2")
+ if c.Get("a") != "1" || c.Len() != 2 {
+ t.Fatal("set/get/len mismatch")
+ }
+ c.Set("a", "") // empty value deletes
+ if c.Get("a") != "" || c.Len() != 1 {
+ t.Fatal("empty-value delete failed")
+ }
+
+ snap := c.Snapshot()
+ clone := c.Clone()
+ c.Set("b", "changed")
+ if snap["b"] != "2" || clone.Get("b") != "2" {
+ t.Fatal("snapshot/clone must be independent of later writes")
+ }
+
+ var nilCtx *CorrelationContext
+ if nilCtx.Get("x") != "" || nilCtx.Len() != 0 {
+ t.Fatal("nil receiver reads must be safe")
+ }
+ nilCtx.Set("x", "y") // must not panic
+ if nilCtx.Clone() == nil {
+ t.Fatal("nil receiver clone must return a usable value")
+ }
+ // Snapshot returns nil when empty (allocation-free); nil maps are
readable
+ if len(nilCtx.Snapshot()) != 0 {
+ t.Fatal("nil receiver snapshot must be empty")
+ }
+
+ // lazy data allocation: a fresh context must support all reads before
any Set
+ fresh := newCorrelationContext()
+ if fresh.Get("x") != "" || fresh.Len() != 0 || fresh.Snapshot() != nil {
+ t.Fatal("fresh context reads must be safe and allocation-free")
+ }
+ fresh.Set("x", "1")
+ if fresh.Get("x") != "1" {
+ t.Fatal("set after lazy init failed")
+ }
+}
+
+// TestContinueContextClonesRuntime guards the clone-on-continue behavior: two
+// goroutines continuing the same snapshot must not share one RuntimeContext
map
+// (that sharing was a fatal concurrent-map-access risk, e.g. the send and
+// receive goroutines of a gRPC stream).
+func TestContinueContextClonesRuntime(t *testing.T) {
+ ResetTracingContext()
+ defer ResetTracingContext()
+
+ Tracing.SetRuntimeContextValue("k", "original")
+ snap := Tracing.CaptureContext()
+ if snap == nil {
+ t.Fatal("capture returned nil snapshot")
+ }
+
+ Tracing.ContinueContext(snap)
+ Tracing.SetRuntimeContextValue("k", "changed-after-first-continue")
+
+ Tracing.ContinueContext(snap) // the same snapshot continued again
+ if got := Tracing.GetRuntimeContextValue("k"); got != "original" {
+ t.Fatalf("ContinueContext shared the runtime map across
continues: got %v", got)
+ }
+}
+
+// TestMetricsCollectPanicRecovered proves the metrics collect loop survives a
+// panicking user meter callback (it used to kill the whole process: the
+// goroutine had no recover).
+// The metrics collect goroutine leaks by design (the loop has no shutdown) and
+// keeps reading the MetricsObtain global forever - ANY later write to that
+// global (restore on test exit, or re-install on `-test.count=2` reruns) would
+// race with those leaked readers. So the panicking wrapper is installed
exactly
+// once per process, before the first collect goroutine exists (happens-before
+// safe), and reruns only flip the atomic switch.
+var (
+ metricsPanicHookOnce sync.Once
+ metricsPanicArmed atomic.Bool
+ metricsPanicCalls atomic.Int32
+)
+
+func TestMetricsCollectPanicRecovered(t *testing.T) {
+ metricsPanicHookOnce.Do(func() {
+ old := MetricsObtain
+ MetricsObtain = func() ([]interface{}, []func()) {
+ if !metricsPanicArmed.Load() {
+ return old()
+ }
+ metricsPanicCalls.Add(1)
+ panic("meter callback boom")
+ }
+ })
+ metricsPanicCalls.Store(0)
+ metricsPanicArmed.Store(true)
+ defer metricsPanicArmed.Store(false)
+
+ // A dedicated tracer keeps the leaked collect goroutine (the loop has
no
+ // shutdown mechanism) away from the shared Tracing used by other tests.
+ tr := &Tracer{initFlag: 1, Sampler: NewConstSampler(true), Reporter:
NewStoreReporter(),
+ ServiceEntity: NewEntity("metrics-recover-test", "inst"),
meterMap: &sync.Map{}}
+ tr.ProfileManager = NewProfileManager(nil)
+ tr.initMetricsCollect(1)
+
+ deadline := time.After(5 * time.Second)
+ for metricsPanicCalls.Load() < 2 {
+ select {
+ case <-deadline:
+ t.Fatalf("collect loop did not survive the panic,
iterations=%d", metricsPanicCalls.Load())
+ case <-time.After(50 * time.Millisecond):
+ }
+ }
+}
diff --git a/plugins/core/span_hostile_workload_test.go
b/plugins/core/span_hostile_workload_test.go
new file mode 100644
index 0000000..dfe4cf8
--- /dev/null
+++ b/plugins/core/span_hostile_workload_test.go
@@ -0,0 +1,435 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// span_hostile_workload_test.go drives the REAL agent pipeline (public tracing
+// API -> spans/segments -> collector goroutines -> Transform -> proto.Marshal)
+// under every concurrency-abuse pattern found in the apache/skywalking#13885
+// investigation, plus aggressive GC and stack growth so that any pointer
+// corruption is caught by the runtime scanners (which is exactly how the
+// production crash manifested as `fatal error: invalid pointer found on
+// stack`).
+//
+// It is shared by two tests:
+// - TestE2ESpanCrashSafety (span_crash_e2e_test.go): runs the workload in a
+// CHILD PROCESS - a runtime.throw is unrecoverable and kills the process,
+// so only a subprocess can assert "no panic AND no runtime fatal at all";
+// - TestRaceHostileWorkload (segment_datarace_test.go, race build): runs it
+// in-process under the race detector to catch any remaining data race.
+//
+// There is deliberately NO recover anywhere in this workload or its pipeline
+// reporter: any panic must surface and fail the test.
+package core
+
+import (
+ "fmt"
+ "runtime"
+ "runtime/debug"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "google.golang.org/protobuf/proto"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+ "github.com/apache/skywalking-go/plugins/core/tracing"
+
+ agentv3
"github.com/apache/skywalking-go/protocols/collect/language/agent/v3"
+ logv3 "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+)
+
+// ---------------------------------------------------------------------------
+// real goroutine-local GLS for tests
+// ---------------------------------------------------------------------------
+
+// goid parses the current goroutine id from the stack header ("goroutine N
[").
+// Slow, but faithful: it gives the tests true goroutine-local storage,
matching
+// the production GLS that the build toolchain injects into runtime.g.
+func goid() uint64 {
+ var buf [32]byte
+ n := runtime.Stack(buf[:], false)
+ id := uint64(0)
+ for _, c := range buf[10:n] {
+ if c < '0' || c > '9' {
+ break
+ }
+ id = id*10 + uint64(c-'0')
+ }
+ return id
+}
+
+// installGoroutineLocalGLS replaces the single-variable test GLS from
+// test_base.go with a real per-goroutine implementation so that context
+// propagation (capture/continue, snapshots) behaves like production.
+func installGoroutineLocalGLS() (restore func()) {
+ oldGet, oldSet := GetGLS, SetGLS
+ var m sync.Map
+ GetGLS = func() interface{} {
+ if v, ok := m.Load(goid()); ok {
+ return v
+ }
+ return nil
+ }
+ SetGLS = func(v interface{}) {
+ if v == nil {
+ m.Delete(goid())
+ return
+ }
+ m.Store(goid(), v)
+ }
+ return func() {
+ GetGLS = oldGet
+ SetGLS = oldSet
+ }
+}
+
+// ---------------------------------------------------------------------------
+// pipeline reporter: replicates the production gRPC reporter data path
+// ---------------------------------------------------------------------------
+
+// pipelineReporter mirrors what the production reporters do with a finished
+// segment: TransformSegmentObject on the segment-collector goroutine, then
+// proto.Marshal on a dedicated send goroutine. Deliberately NO recover: a
+// panic anywhere in this path must crash the test/child process.
+type pipelineReporter struct {
+ transform *reporter.Transform
+ segCh chan *agentv3.SegmentObject
+ closed chan struct{} // shutdown signal; the DATA channel is never
closed
+ done chan struct{}
+ segments int64
+ marshals int64
+}
+
+func newPipelineReporter() *pipelineReporter {
+ r := &pipelineReporter{
+ transform: reporter.NewTransform(&reporter.Entity{ServiceName:
"e2e", ServiceInstanceName: "inst"}),
+ segCh: make(chan *agentv3.SegmentObject, 1024),
+ closed: make(chan struct{}),
+ done: make(chan struct{}),
+ }
+ go func() { // the "send goroutine" of the production pipeline
+ defer close(r.done)
+ for {
+ select {
+ case seg := <-r.segCh:
+ if _, err := proto.Marshal(seg); err == nil {
+ atomic.AddInt64(&r.marshals, 1)
+ }
+ case <-r.closed:
+ for { // drain what is already buffered, then
exit
+ select {
+ case seg := <-r.segCh:
+ if _, err :=
proto.Marshal(seg); err == nil {
+
atomic.AddInt64(&r.marshals, 1)
+ }
+ default:
+ return
+ }
+ }
+ }
+ }
+ }()
+ return r
+}
+
+func (r *pipelineReporter) SendTracing(spans []reporter.ReportedSpan) {
+ // runs on the segment collector goroutine, exactly like production
+ seg := r.transform.TransformSegmentObject(spans)
+ if seg == nil {
+ return
+ }
+ atomic.AddInt64(&r.segments, 1)
+ // Same done-channel pattern as the production fix: straggler collector
+ // goroutines may still deliver after the workload ended, and closing
the
+ // DATA channel here would be exactly the send-on-closed-channel bug
this
+ // PR eliminates (the race variant of this test caught that mistake in
an
+ // earlier revision of this harness).
+ select {
+ case r.segCh <- seg:
+ case <-r.closed:
+ }
+}
+
+func (r *pipelineReporter) closeAndWait() {
+ close(r.closed)
+ <-r.done
+}
+
+func (r *pipelineReporter) Boot(entity *reporter.Entity, cdsWatchers
[]reporter.AgentConfigChangeWatcher) {
+}
+func (r *pipelineReporter) SendMetrics(metrics []reporter.ReportedMeter) {}
+func (r *pipelineReporter) SendLog(log *logv3.LogData) {}
+func (r *pipelineReporter) ConnectionStatus() reporter.ConnectionStatus {
+ return reporter.ConnectionStatusConnected
+}
+func (r *pipelineReporter) Close()
{}
+func (r *pipelineReporter) AddProfileTaskManager(p
reporter.ProfileTaskManager) {}
+
+// ---------------------------------------------------------------------------
+// hostile flows: one per misuse pattern found in the audit
+// ---------------------------------------------------------------------------
+
+// growStack forces stack growth (morestack/copystack) with live span data on
+// the stack - the exact runtime path that detected the corrupted pointer in
+// the production crash.
+//
+//go:noinline
+func growStack(depth int) int {
+ if depth == 0 {
+ return 0
+ }
+ var pad [64]byte
+ pad[0] = byte(depth)
+ return int(pad[0]) + growStack(depth-1)
+}
+
+// wellBehavedFlow is the correct usage baseline: a full entry/exit/local span
+// tree flowing through the real pipeline.
+func wellBehavedFlow(i int) {
+ entry, err := tracing.CreateEntrySpan("GET:/e2e", func(string) (string,
error) { return "", nil })
+ if err != nil || entry == nil {
+ return
+ }
+ entry.Tag("http.method", "GET")
+ if exit, err := tracing.CreateExitSpan("e2e/db", "127.0.0.1:3306",
+ func(k, v string) error { return nil }); err == nil && exit !=
nil {
+ exit.Tag("db.type", "sql")
+ exit.Tag("db.type", "mysql") // same-key in-place rewrite path
+ exit.Log("event", "query")
+ exit.End()
+ }
+ if local, err := tracing.CreateLocalSpan("e2e/biz"); err == nil &&
local != nil {
+ local.Error("boom")
+ local.End()
+ }
+ growStack(64 + i%64)
+ entry.End()
+ tracing.CleanContext()
+}
+
+// crossedSpanFlow reproduces the gorm "crossed span" bug class: one live span
+// mutated and ended by several goroutines at once.
+func crossedSpanFlow(i int) {
+ span, err := tracing.CreateLocalSpan("e2e/crossed")
+ if err != nil || span == nil {
+ return
+ }
+ var wg sync.WaitGroup
+ for w := 0; w < 2; w++ {
+ wg.Add(1)
+ go func(w int) {
+ defer wg.Done()
+ for k := 0; k < 16; k++ {
+ span.Tag("db.statement", fmt.Sprintf("select %d
from t%d_%d", k, w, i))
+ span.Log("event", "x")
+ span.SetOperationName(fmt.Sprintf("op-%d-%d",
w, k))
+ span.SetPeer("10.0.0.1:3306")
+ growStack(32)
+ }
+ span.End() // partners race End as well
+ }(w)
+ }
+ span.Tag("db.statement", "owner")
+ span.End()
+ wg.Wait()
+ tracing.CleanContext()
+}
+
+// lateWriteFlow reproduces the rocketmq/pulsar callback bug class: writes
+// arriving after the span was ended and reported.
+func lateWriteFlow(i int) {
+ span, err := tracing.CreateLocalSpan("e2e/late")
+ if err != nil || span == nil {
+ return
+ }
+ span.Tag("k", "v")
+ span.End()
+ var wg sync.WaitGroup
+ for w := 0; w < 2; w++ {
+ wg.Add(1)
+ go func(w int) {
+ defer wg.Done()
+ for k := 0; k < 8; k++ {
+ span.Tag("k", fmt.Sprintf("late-%d-%d-%d", w,
k, i)) // must be dropped
+ span.Error("late")
+ span.Log("late", "x")
+ }
+ }(w)
+ }
+ wg.Wait()
+ tracing.CleanContext()
+}
+
+// asyncFlow exercises the documented async pattern (gRPC streaming, toolkit)
+// plus late writes after AsyncFinish.
+func asyncFlow(i int) {
+ span, err := tracing.CreateLocalSpan("e2e/async")
+ if err != nil || span == nil {
+ return
+ }
+ span.PrepareAsync()
+ span.End()
+ var wg sync.WaitGroup
+ for w := 0; w < 2; w++ {
+ wg.Add(1)
+ go func(w int) {
+ defer wg.Done()
+ for k := 0; k < 8; k++ {
+ span.Tag("async-key", fmt.Sprintf("v-%d-%d-%d",
w, k, i))
+ span.Log("async", "x")
+ }
+ }(w)
+ }
+ wg.Wait()
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ span.AsyncFinish()
+ span.Tag("after-finish", "must-be-dropped")
+ }()
+ <-done
+ tracing.CleanContext()
+}
+
+// correlationFlow hammers the correlation store from concurrently continued
+// snapshots while exit spans encode propagation headers (the C1/C2 class of
+// unrecoverable concurrent-map fatals).
+func correlationFlow(i int) {
+ root, err := tracing.CreateLocalSpan("e2e/correlation-root")
+ if err != nil || root == nil {
+ return
+ }
+ tracing.SetCorrelationContextValue("seed", fmt.Sprintf("%d", i))
+ snap := tracing.CaptureContext()
+ var wg sync.WaitGroup
+ for w := 0; w < 2; w++ {
+ wg.Add(1)
+ go func(w int) {
+ defer wg.Done()
+ defer tracing.CleanContext()
+ tracing.ContinueContext(snap)
+ for k := 0; k < 8; k++ {
+
tracing.SetCorrelationContextValue(fmt.Sprintf("k%d", k%4),
fmt.Sprintf("v-%d-%d", w, k))
+ _ = tracing.GetCorrelationContextValue("k1")
+ if child, err :=
tracing.CreateExitSpan("e2e/exit", "127.0.0.1:80",
+ func(k, v string) error { return nil
}); err == nil && child != nil {
+ child.Tag("http.method", "GET")
+ child.End()
+ }
+ }
+ }(w)
+ }
+ wg.Wait()
+ root.End()
+ tracing.CleanContext()
+}
+
+// doubleEndFlow races End() on one span from two goroutines.
+func doubleEndFlow(i int) {
+ span, err := tracing.CreateLocalSpan("e2e/double-end")
+ if err != nil || span == nil {
+ return
+ }
+ span.Tag("i", fmt.Sprintf("%d", i))
+ var wg sync.WaitGroup
+ for w := 0; w < 2; w++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ span.End()
+ }()
+ }
+ wg.Wait()
+ tracing.CleanContext()
+}
+
+// snapshotChaosFlow continues ONE snapshot from several goroutines at once
+// (the shared snapshot + runtime-context class: gRPC stream send/recv,
+// microv4 connections).
+func snapshotChaosFlow(i int) {
+ root, err := tracing.CreateLocalSpan("e2e/snapshot-root")
+ if err != nil || root == nil {
+ return
+ }
+ snap := tracing.CaptureContext()
+ var wg sync.WaitGroup
+ for w := 0; w < 3; w++ {
+ wg.Add(1)
+ go func(w int) {
+ defer wg.Done()
+ defer tracing.CleanContext()
+ tracing.ContinueContext(snap)
+ if child, err :=
tracing.CreateLocalSpan(fmt.Sprintf("e2e/snap-child-%d", w)); err == nil &&
child != nil {
+ child.Tag("w", fmt.Sprintf("%d-%d", w, i))
+ child.Log("event", "child")
+ child.End()
+ }
+ }(w)
+ }
+ wg.Wait()
+ root.End()
+ tracing.CleanContext()
+}
+
+// ---------------------------------------------------------------------------
+// the workload driver
+// ---------------------------------------------------------------------------
+
+// runHostileSpanWorkload runs every hostile flow concurrently against the real
+// pipeline for roughly d, under aggressive GC. It returns the number of
+// segments transformed and payloads marshaled so callers can assert the
+// pipeline actually processed work.
+func runHostileSpanWorkload(d time.Duration) (segments, marshals int64) {
+ restore := installGoroutineLocalGLS()
+ defer restore()
+ ResetTracingContext()
+ rep := newPipelineReporter()
+ Tracing.Reporter = rep
+
+ // aggressive GC maximizes heap/stack scans - the runtime checks that
turn
+ // any pointer corruption into `fatal error: ... bad pointer ...`
+ oldGC := debug.SetGCPercent(10)
+ defer debug.SetGCPercent(oldGC)
+
+ deadline := time.Now().Add(d)
+ var wg sync.WaitGroup
+ launch := func(n int, fn func(i int)) {
+ for w := 0; w < n; w++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ defer tracing.CleanContext()
+ for i := 0; time.Now().Before(deadline); i++ {
+ fn(i)
+ }
+ }()
+ }
+ }
+
+ launch(2, wellBehavedFlow)
+ launch(2, crossedSpanFlow)
+ launch(1, lateWriteFlow)
+ launch(1, asyncFlow)
+ launch(1, correlationFlow)
+ launch(1, doubleEndFlow)
+ launch(1, snapshotChaosFlow)
+
+ wg.Wait()
+ time.Sleep(200 * time.Millisecond) // let collector goroutines flush
+ rep.closeAndWait()
+ ResetTracingContext()
+ return atomic.LoadInt64(&rep.segments), atomic.LoadInt64(&rep.marshals)
+}
diff --git a/plugins/core/span_reuse_test.go b/plugins/core/span_reuse_test.go
new file mode 100644
index 0000000..407ab87
--- /dev/null
+++ b/plugins/core/span_reuse_test.go
@@ -0,0 +1,141 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "testing"
+ "time"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+ "github.com/apache/skywalking-go/plugins/core/tracing"
+)
+
+// The span reuse rule in CreateEntrySpan/CreateExitSpan returns the existing
+// active span when the span types match, so one span can have several logical
+// owners that each call End once. These tests pin the required semantics: the
+// span is frozen and reported only by the LAST End, so writes from the outer
+// owner that happen after the inner owner's End still land (e.g. gorm tags
+// db.statement after the sql driver plugin already ended the reused span).
+
+func waitReportedSpans(t *testing.T, want int) []reporter.ReportedSpan {
+ t.Helper()
+ deadline := time.Now().Add(2 * time.Second)
+ for {
+ spans := GetReportedSpans()
+ if len(spans) >= want {
+ return spans
+ }
+ if time.Now().After(deadline) {
+ t.Fatalf("expected %d reported spans, got %d", want,
len(spans))
+ }
+ time.Sleep(20 * time.Millisecond)
+ }
+}
+
+func findReportedSpan(spans []reporter.ReportedSpan, name string)
reporter.ReportedSpan {
+ for _, s := range spans {
+ if s.OperationName() == name {
+ return s
+ }
+ }
+ return nil
+}
+
+func reportedTagValue(s reporter.ReportedSpan, key string) (string, bool) {
+ for _, tag := range s.Tags() {
+ if tag.Key == key {
+ return tag.Value, true
+ }
+ }
+ return "", false
+}
+
+// TestExitSpanReuseFreezesOnLastEnd replicates the gorm + sql driver timeline
+// from the gorm-postgres plugin scenario: gorm creates the exit span, the sql
+// driver's CreateExitSpan reuses it and Ends it first, and only afterwards
+// gorm tags db.statement and Ends. The tag must not be dropped and the span
+// must be reported exactly once.
+func TestExitSpanReuseFreezesOnLastEnd(t *testing.T) {
+ ResetTracingContext()
+ defer ResetTracingContext()
+
+ entry, err := tracing.CreateEntrySpan("GET:/execute", func(string)
(string, error) { return "", nil })
+ if err != nil {
+ t.Fatal(err)
+ }
+ outer, err := tracing.CreateExitSpan("users/create", "db:5432", func(k,
v string) error { return nil })
+ if err != nil {
+ t.Fatal(err)
+ }
+ inner, err := tracing.CreateExitSpan("PostgreSQL/Exec", "db:5432",
func(k, v string) error { return nil })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ inner.End() // inner
owner: must NOT freeze the reused span
+ outer.Tag("db.statement", "INSERT INTO users VALUES ($1,$2)") // late
outer write must land
+ outer.End() // last
owner: freezes and reports
+ entry.End()
+
+ spans := waitReportedSpans(t, 2)
+ if len(spans) != 2 {
+ t.Fatalf("reuse must not create or report an extra span, got
%d", len(spans))
+ }
+ exitSpan := findReportedSpan(spans, "users/create")
+ if exitSpan == nil {
+ t.Fatal("exit span was not reported (or was renamed by the
reuse)")
+ }
+ if v, ok := reportedTagValue(exitSpan, "db.statement"); !ok || v !=
"INSERT INTO users VALUES ($1,$2)" {
+ t.Fatalf("tag written after the inner owner's End was dropped,
tags=%v", exitSpan.Tags())
+ }
+ if exitSpan.EndTime() <= 0 {
+ t.Fatal("reported reused span has no end time")
+ }
+}
+
+// TestEntrySpanReuseFreezesOnLastEnd is the entry-side twin (e.g. an http
+// framework plugin reusing the net/http entry span): the inner owner renames
+// and Ends first, the outer owner then tags the status code and Ends.
+func TestEntrySpanReuseFreezesOnLastEnd(t *testing.T) {
+ ResetTracingContext()
+ defer ResetTracingContext()
+
+ outer, err := tracing.CreateEntrySpan("GET:/raw", func(string) (string,
error) { return "", nil })
+ if err != nil {
+ t.Fatal(err)
+ }
+ inner, err := tracing.CreateEntrySpan("GET:/renamed", func(string)
(string, error) { return "", nil })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ inner.End() // inner owner: must NOT freeze the
reused span
+ outer.Tag("status_code", "200") // late outer write must land
+ outer.End() // last owner: freezes and reports the
root segment
+
+ spans := waitReportedSpans(t, 1)
+ if len(spans) != 1 {
+ t.Fatalf("entry reuse must report exactly one span, got %d",
len(spans))
+ }
+ if spans[0].OperationName() != "GET:/renamed" {
+ t.Fatalf("reuse must keep the inner owner's rename, got %s",
spans[0].OperationName())
+ }
+ if v, ok := reportedTagValue(spans[0], "status_code"); !ok || v !=
"200" {
+ t.Fatalf("tag written after the inner owner's End was dropped,
tags=%v", spans[0].Tags())
+ }
+}
diff --git a/plugins/core/span_sync_bench_test.go
b/plugins/core/span_sync_bench_test.go
new file mode 100644
index 0000000..22db991
--- /dev/null
+++ b/plugins/core/span_sync_bench_test.go
@@ -0,0 +1,70 @@
+// Licensed to Apache Software Foundation (ASF) under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Apache Software Foundation (ASF) licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package core
+
+import (
+ "testing"
+
+ "github.com/apache/skywalking-go/plugins/core/reporter"
+
+ logv3 "github.com/apache/skywalking-go/protocols/collect/logging/v3"
+)
+
+// benchNopReporter discards everything so the benchmark measures the span
+// machinery itself rather than a reporter implementation.
+type benchNopReporter struct{}
+
+func (benchNopReporter) Boot(entity *reporter.Entity, cdsWatchers
[]reporter.AgentConfigChangeWatcher) {
+}
+func (benchNopReporter) SendTracing(spans []reporter.ReportedSpan) {}
+func (benchNopReporter) SendMetrics(metrics []reporter.ReportedMeter) {}
+func (benchNopReporter) SendLog(log *logv3.LogData) {}
+func (benchNopReporter) ConnectionStatus() reporter.ConnectionStatus {
+ return reporter.ConnectionStatusConnected
+}
+func (benchNopReporter) Close() {}
+func (benchNopReporter) AddProfileTaskManager(p reporter.ProfileTaskManager) {}
+
+// BenchmarkSpanLifecycleRealAgent measures the genuine span hot path
+// (CreateLocalSpan -> rename -> tags incl. a same-key rewrite -> log -> error
+// -> End, through the real sampler/segment/GLS/collector machinery) and serves
+// as the performance regression guard for the per-span locking introduced for
+// apache/skywalking#13885.
+func BenchmarkSpanLifecycleRealAgent(b *testing.B) {
+ ResetTracingContext()
+ Tracing.Reporter = benchNopReporter{}
+ b.ReportAllocs()
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ s, err := Tracing.CreateLocalSpan("bench/op")
+ if err != nil {
+ b.Fatal(err)
+ }
+ span := s.(TracingSpan)
+ span.SetOperationName("bench/op/renamed")
+ span.Tag("db.type", "sql")
+ span.Tag("db.instance", "benchdb")
+ span.Tag("db.type", "mysql")
+ span.Log("event", "cache-miss")
+ span.Error("error", "boom")
+ span.End()
+ }
+ b.StopTimer()
+ // restore clean state for other tests in the package
+ ResetTracingContext()
+}
diff --git a/plugins/core/span_tracing.go b/plugins/core/span_tracing.go
index 8aaeab6..5ee4bf3 100644
--- a/plugins/core/span_tracing.go
+++ b/plugins/core/span_tracing.go
@@ -19,6 +19,8 @@ package core
import (
"fmt"
+ "runtime/debug"
+ "sync"
"sync/atomic"
"github.com/apache/skywalking-go/plugins/core/reporter"
@@ -50,16 +52,20 @@ func NewSegmentSpan(ctx *TracingContext, defaultSpan
*DefaultSpan, parentSpan Se
// SegmentContext is the context in a segment
type SegmentContext struct {
- TraceID string
- SegmentID string
- SpanID int32
- ParentSpanID int32
- ParentSegmentID string
- collect chan<- reporter.ReportedSpan
+ TraceID string
+ SegmentID string
+ SpanID int32
+ ParentSpanID int32
+ ParentSegmentID string
+ collect chan<- reporter.ReportedSpan
+ // collectorDone is closed when the segment collector goroutine exits.
Late
+ // senders select on it instead of risking a send on a closed data
channel
+ // (receiving from a closed channel is always safe, sending never is).
+ collectorDone chan struct{}
refNum *int32
spanIDGenerator *int32
FirstSpan TracingSpan `json:"-"`
- CorrelationContext map[string]string
+ CorrelationContext *CorrelationContext
}
func (c *SegmentContext) GetTraceID() string {
@@ -83,14 +89,11 @@ func (c *SegmentContext) GetParentSegmentID() string {
}
func (c *SegmentContext) GetCorrelationContextValue(key string) string {
- return c.CorrelationContext[key]
+ return c.CorrelationContext.Get(key)
}
func (c *SegmentContext) SetCorrelationContextValue(key, value string) {
- c.CorrelationContext[key] = value
- if value == "" {
- delete(c.CorrelationContext, key)
- }
+ c.CorrelationContext.Set(key, value)
}
type SegmentSpan interface {
@@ -108,11 +111,7 @@ type SegmentSpanImpl struct {
// For TracingSpan
func (s *SegmentSpanImpl) End() {
- if !s.IsValid() {
- return
- }
- s.DefaultSpan.End(true)
- if !s.DefaultSpan.InAsyncMode {
+ if s.DefaultSpan.endSyncAndFreeze() {
s.end0()
}
}
@@ -120,19 +119,39 @@ func (s *SegmentSpanImpl) End() {
func (s *SegmentSpanImpl) AsyncFinish() {
s.DefaultSpan.AsyncFinish()
s.DefaultSpan.End(false)
- s.end0()
+ if s.DefaultSpan.endAndFreeze() {
+ s.end0()
+ }
}
func (s *SegmentSpanImpl) end0() {
go func() {
- s.SegmentContext.collect <- s
+ select {
+ case s.SegmentContext.collect <- s:
+ case <-s.SegmentContext.collectorDone:
+ // The collector already exited (unreachable once the
freeze
+ // functions - endSyncAndFreeze/endAndFreeze -
guarantee a single
+ // end0 per span, kept as defense in depth): drop the
span instead
+ // of panicking on a closed channel send or blocking
forever.
+ }
}()
}
+
func (s *SegmentSpanImpl) GetDefaultSpan() *DefaultSpan {
return &s.DefaultSpan
}
// For Reported TracingSpan
+//
+// The ReportedSpan accessors below are intentionally NOT locked. Their safety
+// rests on three guarantees that must be preserved together (re-review all of
+// them before changing any one):
+// 1. every mutator holds opLock and drops the write once ended==true;
+// 2. endAndFreeze sets ended=true under opLock, so the span data can no
+// longer change after it returns;
+// 3. the span reaches the collector goroutine through the end0 channel send,
+// whose happens-before edge publishes all pre-freeze writes to the reader.
+// Therefore reporter.Transform always observes immutable data here.
func (s *SegmentSpanImpl) Context() reporter.SegmentContext {
return &s.SegmentContext
@@ -207,13 +226,13 @@ func (s *SegmentSpanImpl) createSegmentContext(ctx
*TracingContext, parent Segme
s.SegmentContext = SegmentContext{}
if len(s.DefaultSpan.Refs) > 0 {
s.TraceID = s.DefaultSpan.Refs[0].GetTraceID()
- s.CorrelationContext =
s.DefaultSpan.Refs[0].(*SpanContext).CorrelationContext
+ s.CorrelationContext =
newCorrelationContextFrom(s.DefaultSpan.Refs[0].(*SpanContext).CorrelationContext)
} else {
s.TraceID, err = GenerateGlobalID(ctx)
if err != nil {
return err
}
- s.CorrelationContext = make(map[string]string)
+ s.CorrelationContext = newCorrelationContext()
}
} else {
s.SegmentContext = parent.GetSegmentContext()
@@ -226,7 +245,7 @@ func (s *SegmentSpanImpl) createSegmentContext(ctx
*TracingContext, parent Segme
s.SegmentContext.FirstSpan = s
}
if s.CorrelationContext == nil {
- s.CorrelationContext = make(map[string]string)
+ s.CorrelationContext = newCorrelationContext()
}
return
}
@@ -243,11 +262,7 @@ type RootSegmentSpan struct {
}
func (rs *RootSegmentSpan) End() {
- if !rs.IsValid() {
- return
- }
- rs.DefaultSpan.End(true)
- if !rs.InAsyncMode {
+ if rs.DefaultSpan.endSyncAndFreeze() {
rs.end0()
}
}
@@ -255,15 +270,19 @@ func (rs *RootSegmentSpan) End() {
func (rs *RootSegmentSpan) AsyncFinish() {
rs.DefaultSpan.AsyncFinish()
rs.DefaultSpan.End(false)
- rs.end0()
+ if rs.DefaultSpan.endAndFreeze() {
+ rs.end0()
+ }
}
func (rs *RootSegmentSpan) end0() {
- defer func() {
- _ = recover()
- }()
- if rs != nil && rs.doneCh != nil && rs.SegmentContext.refNum != nil {
- rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1)
+ if rs == nil || rs.doneCh == nil || rs.SegmentContext.refNum == nil {
+ return
+ }
+ select {
+ case rs.doneCh <- atomic.SwapInt32(rs.SegmentContext.refNum, -1):
+ case <-rs.SegmentContext.collectorDone:
+ // see SegmentSpanImpl.end0
}
}
@@ -363,10 +382,34 @@ func newSegmentRoot(segmentSpan *SegmentSpanImpl)
*RootSegmentSpan {
s.notify = ch
s.segment = make([]reporter.ReportedSpan, 0, 10)
s.doneCh = make(chan int32)
+ s.collectorDone = make(chan struct{})
go func() {
total := -1
- defer close(ch)
- defer close(s.doneCh)
+ // Closing collectorDone (instead of the data channels) lets
late
+ // senders exit safely through their select; the unclosed
channels are
+ // reclaimed by the GC. It is closed right after the collect
loop stops
+ // receiving (so a late sender never blocks behind a slow
+ // Reporter.SendTracing) and kept in a defer as well so that
even a
+ // panic below cannot leave senders blocked forever. Both call
sites
+ // run on this goroutine, so the plain bool needs no
synchronization.
+ doneClosed := false
+ closeDone := func() {
+ if !doneClosed {
+ doneClosed = true
+ close(s.collectorDone)
+ }
+ }
+ defer closeDone()
+ defer func() {
+ // Defense in depth: a panic here would kill the
process since this
+ // goroutine has no other recover.
+ if err := recover(); err != nil {
+ defer func() { _ = recover() }() // a panicking
logger must not re-kill us
+ if tr := s.tracer(); tr != nil && tr.Log != nil
{
+ tr.Log.Errorf("segment collector panic:
%v, stack: %s", err, debug.Stack())
+ }
+ }
+ }()
for {
select {
case span := <-s.notify:
@@ -378,6 +421,9 @@ func newSegmentRoot(segmentSpan *SegmentSpanImpl)
*RootSegmentSpan {
break
}
}
+ // the loop above is the only receiver: unblock late senders
before the
+ // (possibly slow) reporter call
+ closeDone()
s.tracer().Reporter.SendTracing(append(s.segment, s))
}()
return s
@@ -396,26 +442,24 @@ func newSnapshotSpan(current TracingSpan) TracingSpan {
}
segCtx := segmentSpan.GetSegmentContext()
- copiedCorrelation := make(map[string]string)
- for k, v := range segCtx.CorrelationContext {
- copiedCorrelation[k] = v
- }
s := &SnapshotSpan{
DefaultSpan: DefaultSpan{
OperationName: segmentSpan.GetOperationName(),
Refs: nil,
tracer: segmentSpan.tracer(),
Peer: segmentSpan.GetPeer(),
+ opLock: &sync.Mutex{}, // keep the "opLock is
never nil" invariant
},
SegmentContext: SegmentContext{
TraceID: segCtx.GetTraceID(),
SegmentID: segCtx.SegmentID,
SpanID: segCtx.SpanID,
collect: segCtx.collect,
+ collectorDone: segCtx.collectorDone,
refNum: segCtx.refNum,
spanIDGenerator: segCtx.spanIDGenerator,
FirstSpan: segCtx.FirstSpan,
- CorrelationContext: copiedCorrelation,
+ CorrelationContext: segCtx.CorrelationContext.Clone(),
},
}
diff --git a/plugins/core/test_base.go b/plugins/core/test_base.go
index 8cd197c..2aa9004 100644
--- a/plugins/core/test_base.go
+++ b/plugins/core/test_base.go
@@ -46,7 +46,11 @@ func init() {
func ResetTracingContext() {
SetGLS(nil)
Tracing = &Tracer{initFlag: 1, Sampler: NewConstSampler(true),
Reporter: &StoreReporter{},
- ServiceEntity: NewEntity("test", "test-instance"), meterMap:
&sync.Map{}}
+ ServiceEntity: NewEntity("test", "test-instance"), meterMap:
&sync.Map{},
+ // production Boot always sets the correlation config; the
tests must
+ // too, otherwise correlation APIs nil-dereference (found by the
+ // hostile-workload e2e). Values mirror the agent defaults.
+ correlation: &CorrelationConfig{MaxKeyCount: 3, MaxValueSize:
128}}
// Initialize ProfileManager to avoid nil pointer dereference
Tracing.ProfileManager = NewProfileManager(nil)
Tracing.Reporter.AddProfileTaskManager(Tracing.ProfileManager)
@@ -65,10 +69,18 @@ func SetAsNewGoroutine() {
}
func GetReportedSpans() []reporter.ReportedSpan {
- return Tracing.Reporter.(*StoreReporter).Spans
+ sr := Tracing.Reporter.(*StoreReporter)
+ sr.mu.Lock()
+ defer sr.mu.Unlock()
+ return append([]reporter.ReportedSpan(nil), sr.Spans...)
}
+// StoreReporter is the in-memory test reporter. SendTracing is invoked from
+// the per-segment collector goroutines while tests read the results, so the
+// storage must be synchronized (this used to be the test-harness data race
+// that kept the full suite from running under -race).
type StoreReporter struct {
+ mu sync.Mutex
Spans []reporter.ReportedSpan
Metrics []reporter.ReportedMeter
Logs []*logv3.LogData
@@ -82,14 +94,20 @@ func (r *StoreReporter) Boot(entity *reporter.Entity,
cdsWatchers []reporter.Age
}
func (r *StoreReporter) SendTracing(spans []reporter.ReportedSpan) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
r.Spans = append(r.Spans, spans...)
}
func (r *StoreReporter) SendMetrics(metrics []reporter.ReportedMeter) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
r.Metrics = append(r.Metrics, metrics...)
}
func (r *StoreReporter) SendLog(log *logv3.LogData) {
+ r.mu.Lock()
+ defer r.mu.Unlock()
r.Logs = append(r.Logs, log)
}
diff --git a/plugins/core/tracing.go b/plugins/core/tracing.go
index 3cddf2c..7c96fa9 100644
--- a/plugins/core/tracing.go
+++ b/plugins/core/tracing.go
@@ -57,6 +57,11 @@ func (t *Tracer) CreateEntrySpan(operationName string,
extractor interface{}, op
// if parent span is entry span, then use parent span as result
if tracingSpan != nil && tracingSpan.IsEntry() &&
reflect.ValueOf(tracingSpan).Type() != snapshotType {
tracingSpan.SetOperationName(operationName)
+ // the caller becomes one more owner of the reused span and
will call
+ // End on it: count the reuse so only the last End freezes the
span
+ if segmentSpan, ok := tracingSpan.(SegmentSpan); ok {
+ segmentSpan.GetDefaultSpan().enterReuse()
+ }
return tracingSpan, nil
}
var ref = &SpanContext{}
@@ -123,6 +128,11 @@ func (t *Tracer) CreateExitSpan(operationName, peer
string, injector interface{}
// if parent span is exit span, then use parent span as result
if tracingSpan != nil && tracingSpan.IsExit() &&
reflect.ValueOf(tracingSpan).Type() != snapshotType {
+ // the caller becomes one more owner of the reused span and
will call
+ // End on it: count the reuse so only the last End freezes the
span
+ if segmentSpan, ok := tracingSpan.(SegmentSpan); ok {
+ segmentSpan.GetDefaultSpan().enterReuse()
+ }
return tracingSpan, nil
}
span, noop, err := t.createSpan0(ctx, tracingSpan, opts,
withSpanType(SpanTypeExit), withOperationName(operationName), withPeer(peer))
@@ -147,7 +157,10 @@ func (t *Tracer) CreateExitSpan(operationName, peer
string, injector interface{}
spanContext.ParentServiceInstance = t.ServiceEntity.ServiceInstanceName
spanContext.ParentEndpoint = firstSpan.GetOperationName()
spanContext.AddressUsedAtClient = peer
- spanContext.CorrelationContext =
reportedSpan.GetSegmentContext().CorrelationContext
+ // Snapshot, not the live map: the propagation header encoding iterates
this
+ // map while other goroutines of the segment may concurrently set
correlation
+ // values, which would be a fatal concurrent map iteration and map
write.
+ spanContext.CorrelationContext =
reportedSpan.GetSegmentContext().CorrelationContext.Snapshot()
err = spanContext.Encode(injector.(tracing.InjectorWrapper).Fun())
if err != nil {
@@ -207,7 +220,11 @@ func (t *Tracer) ContinueContext(snapshot interface{}) {
ctx.activeSpanLock.Lock()
defer ctx.activeSpanLock.Unlock()
ctx.activeSpan = snap.activeSpan
- ctx.Runtime = snap.runtime
+ // Clone on continue as well (capture already clones): the same
snapshot
+ // may be continued by multiple goroutines (e.g. the send and
receive
+ // goroutines of one gRPC stream), and sharing one
RuntimeContext map
+ // between them would be a fatal concurrent map read/write.
+ ctx.Runtime = snap.runtime.clone()
}
}
@@ -220,11 +237,10 @@ func (t *Tracer) GetCorrelationContextValue(key string)
string {
if span == nil {
return ""
}
- switch reportedSpan := span.(type) {
- case *SegmentSpanImpl:
- return reportedSpan.Context().GetCorrelationContextValue(key)
- case *RootSegmentSpan:
- return reportedSpan.Context().GetCorrelationContextValue(key)
+ switch span.(type) {
+ case *SegmentSpanImpl, *RootSegmentSpan:
+ segCtx := span.(SegmentSpan).GetSegmentContext()
+ return segCtx.GetCorrelationContextValue(key)
default:
return ""
}
@@ -235,30 +251,30 @@ func (t *Tracer) SetCorrelationContextValue(key, value
string) {
if span == nil {
return
}
- switch reportedSpan := span.(type) {
- case *SegmentSpanImpl:
- if len(value) > t.correlation.MaxValueSize {
- return
- }
- if len(reportedSpan.GetSegmentContext().CorrelationContext) >=
t.correlation.MaxKeyCount {
- return
- }
- reportedSpan.Context().SetCorrelationContextValue(key, value)
- case *RootSegmentSpan:
+ switch span.(type) {
+ case *SegmentSpanImpl, *RootSegmentSpan:
if len(value) > t.correlation.MaxValueSize {
return
}
- if len(reportedSpan.GetSegmentContext().CorrelationContext) >=
t.correlation.MaxKeyCount {
+ segCtx := span.(SegmentSpan).GetSegmentContext()
+ // Len/Set are two separate lock acquisitions, so concurrent
writers can
+ // exceed MaxKeyCount by a few entries. The limit is a soft
bound (same
+ // behavior as the pre-synchronization bare map) - keeping the
two calls
+ // separate avoids a combined check-and-set API for a non-issue.
+ if segCtx.CorrelationContext.Len() >= t.correlation.MaxKeyCount
{
return
}
- reportedSpan.Context().SetCorrelationContextValue(key, value)
+ segCtx.SetCorrelationContextValue(key, value)
default:
}
}
type ContextSnapshot struct {
activeSpan TracingSpan
- runtime *RuntimeContext
+ // runtime is cloned at capture time and treated as IMMUTABLE
afterwards:
+ // ContinueContext may read it concurrently from several goroutines (it
+ // clones again per continue), so never mutate it through the snapshot.
+ runtime *RuntimeContext
}
func (s *ContextSnapshot) IsValid() bool {