This is an automated email from the ASF dual-hosted git repository. wusheng pushed a commit to branch zipkin in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 9b65221ba05318613613202b5611a4af307f9fa8 Author: Wu Sheng <[email protected]> AuthorDate: Tue Jul 14 09:44:13 2020 +0800 Zipkin tracer context implementation draft. --- .../apm/agent/core/context/ContextCarrier.java | 140 +++++++--------- .../apm/agent/core/context/ContextManager.java | 13 +- .../apm/agent/core/context/ContextSnapshot.java | 75 +++++++-- .../apm/agent/core/context/CorrelationContext.java | 4 +- .../apm/agent/core/context/PrimaryContext.java | 96 +++++++++++ ...xtSnapshot.java => PrimaryContextSnapshot.java} | 30 +--- .../apm/agent/core/context/SW8CarrierItem.java | 8 +- ...CarrierItem.java => SW8PrimaryCarrierItem.java} | 15 +- .../apm/agent/core/context/TracingContext.java | 29 ++-- .../agent/core/context/trace/TraceSegmentRef.java | 22 +-- .../core/context/ContextCarrierV3HeaderTest.java | 79 +++++---- .../apm/agent/core/context/ContextManagerTest.java | 22 +-- .../core/context/IgnoredTracerContextTest.java | 6 +- .../EventBusImplDeliverToHandlerInterceptor.java | 2 +- .../vertx3/HandlerRegistrationInterceptor.java | 2 +- apm-sniffer/optional-reporter-plugins/pom.xml | 121 ++++++++++++++ .../zipkin-reporter-plugin/pom.xml | 73 ++++++++ .../reporter/zipkin/GRPCBlockingService.java} | 24 +-- .../reporter/zipkin/ZipkinContextManager.java | 46 ++++++ .../apm/plugin/reporter/zipkin/ZipkinSpan.java | 184 +++++++++++++++++++++ .../reporter/zipkin/ZipkinTraceReporter.java | 67 ++++++++ .../reporter/zipkin/ZipkinTracerContext.java | 159 ++++++++++++++++++ ...ache.skywalking.apm.agent.core.boot.BootService | 30 ++++ 23 files changed, 1031 insertions(+), 216 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java index e288c1b..2c7005e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextCarrier.java @@ -19,116 +19,98 @@ package org.apache.skywalking.apm.agent.core.context; import java.io.Serializable; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; -import lombok.Setter; -import org.apache.skywalking.apm.agent.core.base64.Base64; -import org.apache.skywalking.apm.agent.core.conf.Constants; -import org.apache.skywalking.apm.util.StringUtil; /** * {@link ContextCarrier} is a data carrier of {@link TracingContext}. It holds the snapshot (current state) of {@link * TracingContext}. * <p> */ -@Setter -@Getter public class ContextCarrier implements Serializable { - private String traceId; - private String traceSegmentId; - private int spanId = -1; - private String parentService = Constants.EMPTY_STRING; - private String parentServiceInstance = Constants.EMPTY_STRING; - private String parentEndpoint; - private String addressUsedAtClient; - + @Getter + private PrimaryContext primaryContext = new PrimaryContext(); + @Getter private CorrelationContext correlationContext = new CorrelationContext(); + @Getter private ExtensionContext extensionContext = new ExtensionContext(); - public CarrierItem items() { - SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem(extensionContext, null); - SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem(correlationContext, sw8ExtensionCarrierItem); - SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(this, sw8CorrelationCarrierItem); - return new CarrierItemHead(sw8CarrierItem); - } - /** - * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split. + * Additional keys for reading propagated context. + * + * <p>These context should never be used on the core and plugin codes.</p> * - * @return the serialization string. + * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent + * propagation mechanism. */ - String serialize(HeaderVersion version) { - if (this.isValid(version)) { - return StringUtil.join( - '-', - "1", - Base64.encode(this.getTraceId()), - Base64.encode(this.getTraceSegmentId()), - this.getSpanId() + "", - Base64.encode(this.getParentService()), - Base64.encode(this.getParentServiceInstance()), - Base64.encode(this.getParentEndpoint()), - Base64.encode(this.getAddressUsedAtClient()) - ); - } - return ""; - } - + private String[] customKeys; /** - * Initialize fields with the given text. + * Additional key:value(s) for propagation. + * + * <p>These context should never be used on the core and plugin codes.</p> * - * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split. + * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent + * propagation mechanism. */ - ContextCarrier deserialize(String text, HeaderVersion version) { - if (text == null) { - return this; - } - if (HeaderVersion.v3.equals(version)) { - String[] parts = text.split("-", 8); - if (parts.length == 8) { - try { - // parts[0] is sample flag, always trace if header exists. - this.traceId = Base64.decode2UTFString(parts[1]); - this.traceSegmentId = Base64.decode2UTFString(parts[2]); - this.spanId = Integer.parseInt(parts[3]); - this.parentService = Base64.decode2UTFString(parts[4]); - this.parentServiceInstance = Base64.decode2UTFString(parts[5]); - this.parentEndpoint = Base64.decode2UTFString(parts[6]); - this.addressUsedAtClient = Base64.decode2UTFString(parts[7]); - } catch (IllegalArgumentException ignored) { + private Map<String, String> customContext; + /** + * @return items required to propagate + */ + public CarrierItem items() { + CarrierItem customItemsHead = null; + if (customContext != null) { + for (final Map.Entry<String, String> keyValuePair : customContext.entrySet()) { + customItemsHead = new CarrierItem(keyValuePair.getKey(), keyValuePair.getValue(), customItemsHead); + } + } else { + if (customKeys != null) { + for (final String customKey : customKeys) { + customItemsHead = new CarrierItem(customKey, "", customItemsHead); } } } - return this; + SW8ExtensionCarrierItem sw8ExtensionCarrierItem = new SW8ExtensionCarrierItem( + extensionContext, customItemsHead); + SW8CorrelationCarrierItem sw8CorrelationCarrierItem = new SW8CorrelationCarrierItem( + correlationContext, sw8ExtensionCarrierItem); + SW8CarrierItem sw8CarrierItem = new SW8CarrierItem(primaryContext, sw8CorrelationCarrierItem); + return new CarrierItemHead(sw8CarrierItem); } - public boolean isValid() { - return isValid(HeaderVersion.v3); + /** + * Add custom key:value pair to propagate. Only work before the injection. + */ + public void addCustomContext(String key, String value) { + if (customContext == null) { + customContext = new HashMap<>(); + } + customContext.put(key, value); } /** - * Make sure this {@link ContextCarrier} has been initialized. - * - * @return true for unbroken {@link ContextCarrier} or no-initialized. Otherwise, false; + * Read propagated context. The key should be set through {@link #setCustomKeys(String...)} before read. */ - boolean isValid(HeaderVersion version) { - if (HeaderVersion.v3 == version) { - return StringUtil.isNotEmpty(traceId) - && StringUtil.isNotEmpty(traceSegmentId) - && getSpanId() > -1 - && StringUtil.isNotEmpty(parentService) - && StringUtil.isNotEmpty(parentServiceInstance) - && StringUtil.isNotEmpty(parentEndpoint) - && StringUtil.isNotEmpty(addressUsedAtClient); + public String readCustomContext(String key) { + if (customContext == null) { + return null; + } else { + return customContext.get(key); } - return false; } - public CorrelationContext getCorrelationContext() { - return correlationContext; + /** + * @return true if SkyWalking primary context is valid. + */ + public boolean isValid() { + return primaryContext.isValid(); } - public enum HeaderVersion { - v3 + /** + * Add custom key(s) to read from propagated context(usually headers or metadata of RPC). + */ + public void setCustomKeys(final String... customKeys) { + this.customKeys = customKeys; } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java index 46f7dc0..3117092 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManager.java @@ -83,9 +83,12 @@ public class ContextManager implements BootService { AbstractSpan span; AbstractTracerContext context; operationName = StringUtil.cut(operationName, OPERATION_NAME_THRESHOLD); - if (carrier != null && carrier.isValid()) { - SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); - samplingService.forceSampled(); + if (carrier != null) { + if (carrier.isValid()) { + // If primary context exists, force sampling activated. + SamplingService samplingService = ServiceManager.INSTANCE.findService(SamplingService.class); + samplingService.forceSampled(); + } context = getOrCreate(operationName, true); span = context.createEntrySpan(operationName); context.extract(carrier); @@ -127,9 +130,7 @@ public class ContextManager implements BootService { if (carrier == null) { throw new IllegalArgumentException("ContextCarrier can't be null."); } - if (carrier.isValid()) { - get().extract(carrier); - } + get().extract(carrier); } public static ContextSnapshot capture() { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java index 9db53a1..8031b4e 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java @@ -18,6 +18,8 @@ package org.apache.skywalking.apm.agent.core.context; +import java.util.HashMap; +import java.util.Map; import lombok.Getter; import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId; @@ -25,32 +27,48 @@ import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId; * The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building * reference between two segments in two thread, but have a causal relationship. */ -@Getter public class ContextSnapshot { - private DistributedTraceId traceId; - private String traceSegmentId; - private int spanId; - private String parentEndpoint; - + @Getter + private PrimaryContextSnapshot primaryContextSnapshot; + @Getter private CorrelationContext correlationContext; + @Getter private ExtensionContext extensionContext; - ContextSnapshot(String traceSegmentId, - int spanId, - DistributedTraceId primaryTraceId, - String parentEndpoint, - CorrelationContext correlationContext, - ExtensionContext extensionContext) { - this.traceSegmentId = traceSegmentId; - this.spanId = spanId; - this.traceId = primaryTraceId; - this.parentEndpoint = parentEndpoint; + /** + * Additional key:value(s) for propagation. + * + * <p>These context should never be used on the core and plugin codes.</p> + * + * Only highly customized core extension, such as new tracer or new tracer context should use this to re-use agent + * propagation mechanism. + */ + private Map<String, Object> customContext; + + /** + * Create standard ContextSnapshot for SkyWalking default core. + */ + public ContextSnapshot(String traceSegmentId, + int spanId, + DistributedTraceId primaryTraceId, + String parentEndpoint, + CorrelationContext correlationContext, + ExtensionContext extensionContext) { + this.primaryContextSnapshot = + new PrimaryContextSnapshot(primaryTraceId, traceSegmentId, spanId, parentEndpoint); this.correlationContext = correlationContext.clone(); this.extensionContext = extensionContext.clone(); } + /** + * Create an empty ContextSnapshot shell, for extension only. + */ + public ContextSnapshot(CorrelationContext correlationContext) { + this.correlationContext = correlationContext.clone(); + } + public boolean isFromCurrent() { - return traceSegmentId != null && traceSegmentId.equals(ContextManager.capture().getTraceSegmentId()); + return primaryContextSnapshot.isFromCurrent(); } public CorrelationContext getCorrelationContext() { @@ -58,6 +76,27 @@ public class ContextSnapshot { } public boolean isValid() { - return traceSegmentId != null && spanId > -1 && traceId != null; + return primaryContextSnapshot.isValid(); + } + + /** + * Add custom key:value pair to propagate. Only work in the capture stage. + */ + public void addCustomContext(String key, Object value) { + if (customContext == null) { + customContext = new HashMap<>(); + } + customContext.put(key, value); + } + + /** + * Read cached propagated context. + */ + public Object readCustomContext(String key) { + if (customContext == null) { + return null; + } else { + return customContext.get(key); + } } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java index 4f860b2..e28072d 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/CorrelationContext.java @@ -118,14 +118,14 @@ public class CorrelationContext { /** * Prepare for the cross-process propagation. Inject the {@link #data} into {@link ContextCarrier#getCorrelationContext()} */ - void inject(ContextCarrier carrier) { + public void inject(ContextCarrier carrier) { carrier.getCorrelationContext().data.putAll(this.data); } /** * Extra the {@link ContextCarrier#getCorrelationContext()} into this context. */ - void extract(ContextCarrier carrier) { + public void extract(ContextCarrier carrier) { final Map<String, String> carrierCorrelationContext = carrier.getCorrelationContext().data; for (Map.Entry<String, String> entry : carrierCorrelationContext.entrySet()) { // Only data with limited count of elements can be added diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java new file mode 100644 index 0000000..d34aabb --- /dev/null +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContext.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.agent.core.context; + +import lombok.Getter; +import lombok.Setter; +import org.apache.skywalking.apm.agent.core.base64.Base64; +import org.apache.skywalking.apm.agent.core.conf.Constants; +import org.apache.skywalking.apm.util.StringUtil; + +@Setter +@Getter +public class PrimaryContext { + private String traceId; + private String traceSegmentId; + private int spanId = -1; + private String parentService = Constants.EMPTY_STRING; + private String parentServiceInstance = Constants.EMPTY_STRING; + private String parentEndpoint; + private String addressUsedAtClient; + + /** + * Serialize this {@link ContextCarrier} to a {@link String}, with '|' split. + * + * @return the serialization string. + */ + String serialize() { + if (this.isValid()) { + return StringUtil.join( + '-', + "1", + Base64.encode(this.getTraceId()), + Base64.encode(this.getTraceSegmentId()), + this.getSpanId() + "", + Base64.encode(this.getParentService()), + Base64.encode(this.getParentServiceInstance()), + Base64.encode(this.getParentEndpoint()), + Base64.encode(this.getAddressUsedAtClient()) + ); + } + return ""; + } + + /** + * Initialize fields with the given text. + * + * @param text carries {@link #traceSegmentId} and {@link #spanId}, with '|' split. + */ + PrimaryContext deserialize(String text) { + if (text == null) { + return this; + } + String[] parts = text.split("-", 8); + if (parts.length == 8) { + try { + // parts[0] is sample flag, always trace if header exists. + this.traceId = Base64.decode2UTFString(parts[1]); + this.traceSegmentId = Base64.decode2UTFString(parts[2]); + this.spanId = Integer.parseInt(parts[3]); + this.parentService = Base64.decode2UTFString(parts[4]); + this.parentServiceInstance = Base64.decode2UTFString(parts[5]); + this.parentEndpoint = Base64.decode2UTFString(parts[6]); + this.addressUsedAtClient = Base64.decode2UTFString(parts[7]); + } catch (IllegalArgumentException ignored) { + + } + } + return this; + } + + public boolean isValid() { + return StringUtil.isNotEmpty(traceId) + && StringUtil.isNotEmpty(traceSegmentId) + && getSpanId() > -1 + && StringUtil.isNotEmpty(parentService) + && StringUtil.isNotEmpty(parentServiceInstance) + && StringUtil.isNotEmpty(parentEndpoint) + && StringUtil.isNotEmpty(addressUsedAtClient); + } +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java similarity index 58% copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java index 9db53a1..eee4291 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextSnapshot.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/PrimaryContextSnapshot.java @@ -21,40 +21,26 @@ package org.apache.skywalking.apm.agent.core.context; import lombok.Getter; import org.apache.skywalking.apm.agent.core.context.ids.DistributedTraceId; -/** - * The <code>ContextSnapshot</code> is a snapshot for current context. The snapshot carries the info for building - * reference between two segments in two thread, but have a causal relationship. - */ @Getter -public class ContextSnapshot { +public class PrimaryContextSnapshot { private DistributedTraceId traceId; private String traceSegmentId; private int spanId; private String parentEndpoint; - private CorrelationContext correlationContext; - private ExtensionContext extensionContext; - - ContextSnapshot(String traceSegmentId, - int spanId, - DistributedTraceId primaryTraceId, - String parentEndpoint, - CorrelationContext correlationContext, - ExtensionContext extensionContext) { + public PrimaryContextSnapshot(final DistributedTraceId traceId, + final String traceSegmentId, + final int spanId, + final String parentEndpoint) { + this.traceId = traceId; this.traceSegmentId = traceSegmentId; this.spanId = spanId; - this.traceId = primaryTraceId; this.parentEndpoint = parentEndpoint; - this.correlationContext = correlationContext.clone(); - this.extensionContext = extensionContext.clone(); } public boolean isFromCurrent() { - return traceSegmentId != null && traceSegmentId.equals(ContextManager.capture().getTraceSegmentId()); - } - - public CorrelationContext getCorrelationContext() { - return correlationContext; + return traceSegmentId != null + && traceSegmentId.equals(ContextManager.capture().getPrimaryContextSnapshot().getTraceSegmentId()); } public boolean isValid() { diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java index 07f79f2..139b5dd 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java @@ -20,15 +20,15 @@ package org.apache.skywalking.apm.agent.core.context; public class SW8CarrierItem extends CarrierItem { public static final String HEADER_NAME = "sw8"; - private ContextCarrier carrier; + private PrimaryContext carrier; - public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) { - super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next); + public SW8CarrierItem(PrimaryContext carrier, CarrierItem next) { + super(HEADER_NAME, carrier.serialize(), next); this.carrier = carrier; } @Override public void setHeadValue(String headValue) { - carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3); + carrier.deserialize(headValue); } } \ No newline at end of file diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java similarity index 73% copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java index 07f79f2..afe10ab 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8PrimaryCarrierItem.java @@ -18,17 +18,18 @@ package org.apache.skywalking.apm.agent.core.context; -public class SW8CarrierItem extends CarrierItem { +public class SW8PrimaryCarrierItem extends CarrierItem { public static final String HEADER_NAME = "sw8"; - private ContextCarrier carrier; + private final PrimaryContext primaryContext; - public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) { - super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next); - this.carrier = carrier; + public SW8PrimaryCarrierItem(PrimaryContext primaryContext, CarrierItem next) { + super(HEADER_NAME, primaryContext.serialize(), next); + this.primaryContext = primaryContext; } @Override public void setHeadValue(String headValue) { - carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3); + this.primaryContext.deserialize(headValue); } -} \ No newline at end of file + +} diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java index d803084..639e257 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java @@ -170,13 +170,14 @@ public class TracingContext implements AbstractTracerContext { throw new IllegalStateException("Exit span doesn't include meaningful peer information."); } - carrier.setTraceId(getReadablePrimaryTraceId()); - carrier.setTraceSegmentId(this.segment.getTraceSegmentId()); - carrier.setSpanId(exitSpan.getSpanId()); - carrier.setParentService(Config.Agent.SERVICE_NAME); - carrier.setParentServiceInstance(Config.Agent.INSTANCE_NAME); - carrier.setParentEndpoint(first().getOperationName()); - carrier.setAddressUsedAtClient(peer); + final PrimaryContext primaryContext = carrier.getPrimaryContext(); + primaryContext.setTraceId(getReadablePrimaryTraceId()); + primaryContext.setTraceSegmentId(this.segment.getTraceSegmentId()); + primaryContext.setSpanId(exitSpan.getSpanId()); + primaryContext.setParentService(Config.Agent.SERVICE_NAME); + primaryContext.setParentServiceInstance(Config.Agent.INSTANCE_NAME); + primaryContext.setParentEndpoint(first().getOperationName()); + primaryContext.setAddressUsedAtClient(peer); this.correlationContext.inject(carrier); this.extensionContext.inject(carrier); @@ -189,12 +190,14 @@ public class TracingContext implements AbstractTracerContext { */ @Override public void extract(ContextCarrier carrier) { - TraceSegmentRef ref = new TraceSegmentRef(carrier); - this.segment.ref(ref); - this.segment.relatedGlobalTraces(new PropagatedTraceId(carrier.getTraceId())); AbstractSpan span = this.activeSpan(); - if (span instanceof EntrySpan) { - span.ref(ref); + if (carrier.isValid()) { + TraceSegmentRef ref = new TraceSegmentRef(carrier); + this.segment.ref(ref); + this.segment.relatedGlobalTraces(new PropagatedTraceId(carrier.getPrimaryContext().getTraceId())); + if (span instanceof EntrySpan) { + span.ref(ref); + } } this.correlationContext.extract(carrier); @@ -232,7 +235,7 @@ public class TracingContext implements AbstractTracerContext { TraceSegmentRef segmentRef = new TraceSegmentRef(snapshot); this.segment.ref(segmentRef); this.activeSpan().ref(segmentRef); - this.segment.relatedGlobalTraces(snapshot.getTraceId()); + this.segment.relatedGlobalTraces(snapshot.getPrimaryContextSnapshot().getTraceId()); this.correlationContext.continued(snapshot); this.extensionContext.continued(snapshot); this.extensionContext.handle(this.activeSpan()); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java index 688dca1..517396a 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/TraceSegmentRef.java @@ -48,23 +48,23 @@ public class TraceSegmentRef { */ public TraceSegmentRef(ContextCarrier carrier) { this.type = SegmentRefType.CROSS_PROCESS; - this.traceId = carrier.getTraceId(); - this.traceSegmentId = carrier.getTraceSegmentId(); - this.spanId = carrier.getSpanId(); - this.parentService = carrier.getParentService(); - this.parentServiceInstance = carrier.getParentServiceInstance(); - this.parentEndpoint = carrier.getParentEndpoint(); - this.addressUsedAtClient = carrier.getAddressUsedAtClient(); + this.traceId = carrier.getPrimaryContext().getTraceId(); + this.traceSegmentId = carrier.getPrimaryContext().getTraceSegmentId(); + this.spanId = carrier.getPrimaryContext().getSpanId(); + this.parentService = carrier.getPrimaryContext().getParentService(); + this.parentServiceInstance = carrier.getPrimaryContext().getParentServiceInstance(); + this.parentEndpoint = carrier.getPrimaryContext().getParentEndpoint(); + this.addressUsedAtClient = carrier.getPrimaryContext().getAddressUsedAtClient(); } public TraceSegmentRef(ContextSnapshot snapshot) { this.type = SegmentRefType.CROSS_THREAD; - this.traceId = snapshot.getTraceId().getId(); - this.traceSegmentId = snapshot.getTraceSegmentId(); - this.spanId = snapshot.getSpanId(); + this.traceId = snapshot.getPrimaryContextSnapshot().getTraceId().getId(); + this.traceSegmentId = snapshot.getPrimaryContextSnapshot().getTraceSegmentId(); + this.spanId = snapshot.getPrimaryContextSnapshot().getSpanId(); this.parentService = Config.Agent.SERVICE_NAME; this.parentServiceInstance = Config.Agent.INSTANCE_NAME; - this.parentEndpoint = snapshot.getParentEndpoint(); + this.parentEndpoint = snapshot.getPrimaryContextSnapshot().getParentEndpoint(); } public SegmentReference transform() { diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java index f45e093..dc7aa56 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextCarrierV3HeaderTest.java @@ -52,14 +52,14 @@ public class ContextCarrierV3HeaderTest { List<DistributedTraceId> distributedTraceIds = new ArrayList<>(); ContextCarrier contextCarrier = new ContextCarrier(); - contextCarrier.setTraceSegmentId("1.2.3"); - contextCarrier.setTraceId("3.4.5"); - contextCarrier.setSpanId(4); - contextCarrier.setParentService("service"); - contextCarrier.setParentServiceInstance("instance"); - contextCarrier.setAddressUsedAtClient("127.0.0.1:8080"); - contextCarrier.setParentEndpoint("/portal"); - contextCarrier.setParentEndpoint("/app"); + contextCarrier.getPrimaryContext().setTraceSegmentId("1.2.3"); + contextCarrier.getPrimaryContext().setTraceId("3.4.5"); + contextCarrier.getPrimaryContext().setSpanId(4); + contextCarrier.getPrimaryContext().setParentService("service"); + contextCarrier.getPrimaryContext().setParentServiceInstance("instance"); + contextCarrier.getPrimaryContext().setAddressUsedAtClient("127.0.0.1:8080"); + contextCarrier.getPrimaryContext().setParentEndpoint("/portal"); + contextCarrier.getPrimaryContext().setParentEndpoint("/app"); contextCarrier.getCorrelationContext().put("test", "true"); @@ -69,7 +69,10 @@ public class ContextCarrierV3HeaderTest { while (next.hasNext()) { next = next.next(); if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) { - Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", next.getHeadValue()); + Assert.assertEquals( + "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", + next.getHeadValue() + ); } else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) { /** * customKey:customValue @@ -88,7 +91,10 @@ public class ContextCarrierV3HeaderTest { while (next.hasNext()) { next = next.next(); if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) { - Assert.assertEquals("1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", next.getHeadValue()); + Assert.assertEquals( + "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", + next.getHeadValue() + ); } else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) { Assert.assertEquals("dGVzdA==:dHJ1ZQ==", next.getHeadValue()); } else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) { @@ -107,26 +113,26 @@ public class ContextCarrierV3HeaderTest { distributedTraceIds.add(new PropagatedTraceId("3.4.5")); ContextCarrier contextCarrier = new ContextCarrier(); - contextCarrier.setTraceSegmentId("1.2.3"); - contextCarrier.setTraceId("3.4.5"); - contextCarrier.setSpanId(4); - contextCarrier.setParentService("service"); - contextCarrier.setParentServiceInstance("instance"); - contextCarrier.setAddressUsedAtClient("127.0.0.1:8080"); - contextCarrier.setParentEndpoint("/portal"); - contextCarrier.setParentEndpoint("/app"); + contextCarrier.getPrimaryContext().setTraceSegmentId("1.2.3"); + contextCarrier.getPrimaryContext().setTraceId("3.4.5"); + contextCarrier.getPrimaryContext().setSpanId(4); + contextCarrier.getPrimaryContext().setParentService("service"); + contextCarrier.getPrimaryContext().setParentServiceInstance("instance"); + contextCarrier.getPrimaryContext().setAddressUsedAtClient("127.0.0.1:8080"); + contextCarrier.getPrimaryContext().setParentEndpoint("/portal"); + contextCarrier.getPrimaryContext().setParentEndpoint("/app"); contextCarrier.getCorrelationContext().put("test", "true"); contextCarrier.getExtensionContext().deserialize("1"); CarrierItem next = contextCarrier.items(); - String sw6HeaderValue = null; + String sw8HeaderValue = null; String correlationHeaderValue = null; String extensionHeaderValue = null; while (next.hasNext()) { next = next.next(); if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) { - sw6HeaderValue = next.getHeadValue(); + sw8HeaderValue = next.getHeadValue(); } else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) { correlationHeaderValue = next.getHeadValue(); } else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) { @@ -141,7 +147,7 @@ public class ContextCarrierV3HeaderTest { while (next.hasNext()) { next = next.next(); if (next.getHeadKey().equals(SW8CarrierItem.HEADER_NAME)) { - next.setHeadValue(sw6HeaderValue); + next.setHeadValue(sw8HeaderValue); } else if (next.getHeadKey().equals(SW8CorrelationCarrierItem.HEADER_NAME)) { next.setHeadValue(correlationHeaderValue); } else if (next.getHeadKey().equals(SW8ExtensionCarrierItem.HEADER_NAME)) { @@ -152,13 +158,30 @@ public class ContextCarrierV3HeaderTest { } Assert.assertTrue(contextCarrier2.isValid()); - Assert.assertEquals(contextCarrier.getSpanId(), contextCarrier2.getSpanId()); - Assert.assertEquals(contextCarrier.getAddressUsedAtClient(), contextCarrier2.getAddressUsedAtClient()); - Assert.assertEquals(contextCarrier.getTraceId(), contextCarrier2.getTraceId()); - Assert.assertEquals(contextCarrier.getTraceSegmentId(), contextCarrier2.getTraceSegmentId()); - Assert.assertEquals(contextCarrier.getParentService(), contextCarrier2.getParentService()); - Assert.assertEquals(contextCarrier.getParentServiceInstance(), contextCarrier2.getParentServiceInstance()); - Assert.assertEquals(contextCarrier.getParentEndpoint(), contextCarrier2.getParentEndpoint()); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getSpanId(), contextCarrier2.getPrimaryContext().getSpanId()); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getAddressUsedAtClient(), + contextCarrier2.getPrimaryContext().getAddressUsedAtClient() + ); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getTraceId(), contextCarrier2.getPrimaryContext().getTraceId()); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getTraceSegmentId(), + contextCarrier2.getPrimaryContext().getTraceSegmentId() + ); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getParentService(), + contextCarrier2.getPrimaryContext().getParentService() + ); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getParentServiceInstance(), + contextCarrier2.getPrimaryContext().getParentServiceInstance() + ); + Assert.assertEquals( + contextCarrier.getPrimaryContext().getParentEndpoint(), + contextCarrier2.getPrimaryContext().getParentEndpoint() + ); Assert.assertEquals(contextCarrier.getCorrelationContext(), contextCarrier2.getCorrelationContext()); Assert.assertEquals(contextCarrier.getExtensionContext(), contextCarrier2.getExtensionContext()); } diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java index 774da23..fce0ab8 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/ContextManagerTest.java @@ -103,9 +103,9 @@ public class ContextManagerTest { @Test public void createMultipleEntrySpan() { - ContextCarrier contextCarrier = new ContextCarrier().deserialize( - "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", - ContextCarrier.HeaderVersion.v3 + ContextCarrier contextCarrier = new ContextCarrier(); + contextCarrier.getPrimaryContext().deserialize( + "1-My40LjU=-MS4yLjM=-4-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=" ); assertTrue(contextCarrier.isValid()); @@ -159,8 +159,8 @@ public class ContextManagerTest { assertThat(logs.size(), is(1)); assertThat(logs.get(0).getLogs().size(), is(4)); - assertThat(injectContextCarrier.getSpanId(), is(1)); - assertThat(injectContextCarrier.getAddressUsedAtClient(), is("127.0.0.1:12800")); + assertThat(injectContextCarrier.getPrimaryContext().getSpanId(), is(1)); + assertThat(injectContextCarrier.getPrimaryContext().getAddressUsedAtClient(), is("127.0.0.1:12800")); } @Test @@ -210,18 +210,18 @@ public class ContextManagerTest { assertThat(actualEntrySpan.getSpanId(), is(0)); assertThat(AbstractTracingSpanHelper.getParentSpanId(actualEntrySpan), is(-1)); - assertThat(firstExitSpanContextCarrier.getAddressUsedAtClient(), is("127.0.0.1:8080")); - assertThat(firstExitSpanContextCarrier.getSpanId(), is(1)); + assertThat(firstExitSpanContextCarrier.getPrimaryContext().getAddressUsedAtClient(), is("127.0.0.1:8080")); + assertThat(firstExitSpanContextCarrier.getPrimaryContext().getSpanId(), is(1)); - assertThat(secondExitSpanContextCarrier.getSpanId(), is(1)); + assertThat(secondExitSpanContextCarrier.getPrimaryContext().getSpanId(), is(1)); } @Test public void testTransform() throws InvalidProtocolBufferException { - ContextCarrier contextCarrier = new ContextCarrier().deserialize( - "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=", - ContextCarrier.HeaderVersion.v3 + ContextCarrier contextCarrier = new ContextCarrier(); + contextCarrier.getPrimaryContext().deserialize( + "1-My40LjU=-MS4yLjM=-3-c2VydmljZQ==-aW5zdGFuY2U=-L2FwcA==-MTI3LjAuMC4xOjgwODA=" ); assertTrue(contextCarrier.isValid()); diff --git a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java index b46fcdc..3c63847 100644 --- a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java +++ b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/IgnoredTracerContextTest.java @@ -95,9 +95,9 @@ public class IgnoredTracerContextTest { ContextManager.stopSpan(); assertThat(abstractSpan.getClass().getName(), is(NoopSpan.class.getName())); - assertNull(contextCarrier.getParentEndpoint()); - assertThat(contextCarrier.getSpanId(), is(-1)); - assertNull(contextCarrier.getAddressUsedAtClient()); + assertNull(contextCarrier.getPrimaryContext().getParentEndpoint()); + assertThat(contextCarrier.getPrimaryContext().getSpanId(), is(-1)); + assertNull(contextCarrier.getPrimaryContext().getAddressUsedAtClient()); LinkedList<IgnoredTracerContext> ignoredTracerContexts = storage.getIgnoredTracerContexts(); assertThat(ignoredTracerContexts.size(), is(1)); diff --git a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java index 8e32945..9f769fb 100644 --- a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/EventBusImplDeliverToHandlerInterceptor.java @@ -46,7 +46,7 @@ public class EventBusImplDeliverToHandlerInterceptor implements InstanceMethodsA AbstractSpan span; if (VertxContext.hasContext(message.replyAddress())) { VertxContext context = VertxContext.peekContext(message.replyAddress()); - span = ContextManager.createLocalSpan(context.getContextSnapshot().getParentEndpoint()); + span = ContextManager.createLocalSpan(context.getContextSnapshot().getPrimaryContextSnapshot().getParentEndpoint()); ContextManager.continued(context.getContextSnapshot()); } else { span = ContextManager.createLocalSpan(message.address()); diff --git a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java index ff8db51..6662439 100644 --- a/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java +++ b/apm-sniffer/apm-sdk-plugin/vertx-plugins/vertx-core-3.x-plugin/src/main/java/org/apache/skywalking/apm/plugin/vertx3/HandlerRegistrationInterceptor.java @@ -59,7 +59,7 @@ public class HandlerRegistrationInterceptor implements InstanceMethodsAroundInte } else { if (VertxContext.hasContext(message.replyAddress())) { VertxContext context = VertxContext.peekContext(message.replyAddress()); - span = ContextManager.createLocalSpan(context.getContextSnapshot().getParentEndpoint()); + span = ContextManager.createLocalSpan(context.getContextSnapshot().getPrimaryContextSnapshot().getParentEndpoint()); ContextManager.continued(context.getContextSnapshot()); } else { span = ContextManager.createLocalSpan(message.address()); diff --git a/apm-sniffer/optional-reporter-plugins/pom.xml b/apm-sniffer/optional-reporter-plugins/pom.xml new file mode 100644 index 0000000..140eac2 --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/pom.xml @@ -0,0 +1,121 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>apm-sniffer</artifactId> + <groupId>org.apache.skywalking</groupId> + <version>8.1.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>optional-reporter-plugins</artifactId> + <packaging>pom</packaging> + + <modules> + <module>zipkin-reporter-plugin</module> + </modules> + + <properties> + <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> + + <agent.package.dest.dir>${project.build.directory}/../../../../skywalking-agent</agent.package.dest.dir> + <optional.reporter.plugins.dest.dir>${agent.package.dest.dir}/optional-reporter-plugins</optional.reporter.plugins.dest.dir> + + <ant-contrib.version>1.0b3</ant-contrib.version> + <ant-nodeps.version>1.8.1</ant-nodeps.version> + + <kafk-clients.version>2.4.1</kafk-clients.version> + <spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version> + </properties> + + <dependencies> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>apm-agent-core</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>apm-util</artifactId> + <version>${project.version}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.apache.skywalking</groupId> + <artifactId>apm-test-tools</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <tasks> + <taskdef resource="net/sf/antcontrib/antcontrib.properties" + classpathref="maven.runtime.classpath"/> + <if> + <equals arg1="${project.packaging}" arg2="jar"/> + <then> + <mkdir dir="${optional.reporter.plugins.dest.dir}"/> + <mkdir dir="${agent.package.dest.dir}/reporter-plugins" /> + <copy + file="${project.build.directory}/${project.artifactId}-${project.version}.jar" + tofile="${optional.reporter.plugins.dest.dir}/${project.artifactId}-${project.version}.jar" + overwrite="true"/> + </then> + </if> + </tasks> + </configuration> + </execution> + </executions> + <dependencies> + <dependency> + <groupId>ant-contrib</groupId> + <artifactId>ant-contrib</artifactId> + <version>${ant-contrib.version}</version> + <exclusions> + <exclusion> + <groupId>ant</groupId> + <artifactId>ant</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.ant</groupId> + <artifactId>ant-nodeps</artifactId> + <version>${ant-nodeps.version}</version> + </dependency> + </dependencies> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml new file mode 100644 index 0000000..9ace787 --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/pom.xml @@ -0,0 +1,73 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + ~ + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.skywalking</groupId> + <artifactId>optional-reporter-plugins</artifactId> + <version>8.1.0-SNAPSHOT</version> + </parent> + + <groupId>org.apache.skywalking</groupId> + <artifactId>zipkin-reporter-plugin</artifactId> + <version>8.1.0-SNAPSHOT</version> + + <properties> + <zipkin.version>5.12.3</zipkin.version> + <zipkin.okhttp3.version>2.15.0</zipkin.okhttp3.version> + <zipkin.reporter.brave.version>2.15.0</zipkin.reporter.brave.version> + </properties> + + <dependencies> + <dependency> + <groupId>io.zipkin.brave</groupId> + <artifactId>brave</artifactId> + <version>${zipkin.version}</version> + </dependency> + <dependency> + <groupId>io.zipkin.brave</groupId> + <artifactId>brave-instrumentation-http</artifactId> + <version>${zipkin.version}</version> + </dependency> + <dependency> + <groupId>io.zipkin.brave</groupId> + <artifactId>brave-instrumentation-rpc</artifactId> + <version>${zipkin.version}</version> + </dependency> + <dependency> + <groupId>io.zipkin.brave</groupId> + <artifactId>brave-instrumentation-messaging</artifactId> + <version>${zipkin.version}</version> + </dependency> + <dependency> + <groupId>io.zipkin.reporter2</groupId> + <artifactId>zipkin-sender-okhttp3</artifactId> + <version>${zipkin.okhttp3.version}</version> + </dependency> + <dependency> + <groupId>io.zipkin.reporter2</groupId> + <artifactId>zipkin-reporter-brave</artifactId> + <version>${zipkin.reporter.brave.version}</version> + </dependency> + </dependencies> +</project> \ No newline at end of file diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java similarity index 57% copy from apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java copy to apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java index 07f79f2..5613c02 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/SW8CarrierItem.java +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/GRPCBlockingService.java @@ -16,19 +16,23 @@ * */ -package org.apache.skywalking.apm.agent.core.context; +package org.apache.skywalking.apm.plugin.reporter.zipkin; -public class SW8CarrierItem extends CarrierItem { - public static final String HEADER_NAME = "sw8"; - private ContextCarrier carrier; +import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor; +import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager; - public SW8CarrierItem(ContextCarrier carrier, CarrierItem next) { - super(HEADER_NAME, carrier.serialize(ContextCarrier.HeaderVersion.v3), next); - this.carrier = carrier; +/** + * The {@link org.apache.skywalking.apm.agent.core.conf.Config.Collector#BACKEND_SERVICE} is used by {@link + * ZipkinTraceReporter}, so, this service prevents the default implementation activate the grpc channel through this + * parameter. + */ +@OverrideImplementor(GRPCChannelManager.class) +public class GRPCBlockingService extends GRPCChannelManager { + @Override + public void prepare() { } @Override - public void setHeadValue(String headValue) { - carrier.deserialize(headValue, ContextCarrier.HeaderVersion.v3); + public void boot() { } -} \ No newline at end of file +} diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java new file mode 100644 index 0000000..eb22db6 --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinContextManager.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.reporter.zipkin; + +import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor; +import org.apache.skywalking.apm.agent.core.boot.ServiceManager; +import org.apache.skywalking.apm.agent.core.context.AbstractTracerContext; +import org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService; + +/** + * ZipkinContextManager used Brave APIs to manage the Zipkin tracing context, including span start/stop/tag/log, + * inject/extract in across process, and capture/continue in across thread. + */ +@OverrideImplementor(ContextManagerExtendService.class) +public class ZipkinContextManager extends ContextManagerExtendService { + private ZipkinTraceReporter zipkinTraceReporter; + + @Override + public void prepare() { + zipkinTraceReporter = ServiceManager.INSTANCE.findService(ZipkinTraceReporter.class); + } + + /** + * Create AbstractTracerContext with as all new Zipkin tracer. + */ + @Override + public AbstractTracerContext createTraceContext(final String operationName, final boolean forceSampling) { + return new ZipkinTracerContext(zipkinTraceReporter.getTracing()); + } +} diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java new file mode 100644 index 0000000..e207ffd --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinSpan.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.reporter.zipkin; + +import brave.Span; +import java.util.Map; +import lombok.Getter; +import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; +import org.apache.skywalking.apm.agent.core.context.trace.SpanLayer; +import org.apache.skywalking.apm.agent.core.context.trace.TraceSegmentRef; +import org.apache.skywalking.apm.network.trace.component.Component; + +/** + * Zipkin span is the API bridge for Zipkin span running in the SkyWalking shell. + */ +@Getter +public class ZipkinSpan implements AbstractSpan { + private final Span span; + private boolean isEntry; + private boolean isExit; + @Getter + private boolean isAsync = false; + + public ZipkinSpan(final Span span) { + this.span = span; + this.isEntry = false; + this.isExit = false; + } + + public ZipkinSpan setEntry(final boolean entry) { + isEntry = entry; + span.kind(Span.Kind.SERVER); + return this; + } + + public ZipkinSpan setExit(final boolean exit) { + isExit = exit; + span.kind(Span.Kind.CLIENT); + return this; + } + + @Override + public AbstractSpan setComponent(final Component component) { + span.tag("component", component.getName()); + return this; + } + + @Override + public AbstractSpan setLayer(final SpanLayer layer) { + span.tag("layer", layer.name()); + if (isEntry && layer.equals(SpanLayer.MQ)) { + span.kind(Span.Kind.CONSUMER); + } else if (isExit && layer.equals(SpanLayer.MQ)) { + span.kind(Span.Kind.PRODUCER); + } + return this; + } + + @Override + public AbstractSpan tag(final String key, final String value) { + span.tag(key, value); + return this; + } + + @Override + public AbstractSpan tag(final AbstractTag<?> tag, final String value) { + span.tag(tag.key(), value); + return this; + } + + @Override + public AbstractSpan log(final Throwable t) { + span.error(t); + return this; + } + + @Override + public AbstractSpan errorOccurred() { + return this; + } + + @Override + public boolean isEntry() { + return isEntry; + } + + @Override + public boolean isExit() { + return isExit; + } + + @Override + public AbstractSpan log(final long timestamp, final Map<String, ?> event) { + return null; + } + + @Override + public AbstractSpan setOperationName(final String operationName) { + span.name(operationName); + return this; + } + + @Override + public AbstractSpan start() { + span.start(); + return this; + } + + /** + * @return 0 always, as span is not readable before finished. + */ + @Override + public int getSpanId() { + return 0; + } + + /** + * @return empty string, as span is not readable before finished. + */ + @Override + public String getOperationName() { + return ""; + } + + @Override + public void ref(final TraceSegmentRef ref) { + throw new UnsupportedOperationException(); + } + + @Override + public AbstractSpan start(final long startTime) { + span.start(startTime); + return this; + } + + @Override + public AbstractSpan setPeer(final String remotePeer) { + span.remoteServiceName(remotePeer); + return this; + } + + @Override + public boolean isProfiling() { + return false; + } + + @Override + public void skipAnalysis() { + + } + + @Override + public AbstractSpan prepareForAsync() { + isAsync = true; + return this; + } + + @Override + public AbstractSpan asyncFinish() { + span.finish(); + return this; + } + + public void stop() { + span.finish(); + } +} diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java new file mode 100644 index 0000000..7081a17 --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTraceReporter.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.reporter.zipkin; + +import brave.Tracing; +import lombok.Getter; +import org.apache.skywalking.apm.agent.core.boot.OverrideImplementor; +import org.apache.skywalking.apm.agent.core.conf.Config; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient; +import zipkin2.reporter.brave.AsyncZipkinSpanHandler; +import zipkin2.reporter.okhttp3.OkHttpSender; + +/** + * Zipkin traces are reported through brave client. This override implementor majorly make sure the original memory + * queue and grpc client doesn't work, and set up the Zipkin client in the right way. + */ +@OverrideImplementor(TraceSegmentServiceClient.class) +public class ZipkinTraceReporter extends TraceSegmentServiceClient { + @Getter + private Tracing tracing; + private OkHttpSender sender; + private AsyncZipkinSpanHandler zipkinSpanHandler; + + @Override + public void prepare() { + } + + /** + * Set up the Zipkin reporter, use {@link Config.Collector#BACKEND_SERVICE} as the report URL. Typically, the path + * should be http://ip:port/api/v2/spans + */ + @Override + public void boot() { + sender = OkHttpSender.create(Config.Collector.BACKEND_SERVICE); + zipkinSpanHandler = AsyncZipkinSpanHandler.create(sender); + + // Create a tracing component with the service name you want to see in Zipkin. + tracing = Tracing.newBuilder() + .localServiceName(Config.Agent.SERVICE_NAME) + .addSpanHandler(zipkinSpanHandler) + .build(); + } + + @Override + public void shutdown() { + tracing.close(); + zipkinSpanHandler.close(); + sender.close(); + } +} diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java new file mode 100644 index 0000000..4b9c01c --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/java/org/apache/skywalking/apm/plugin/reporter/zipkin/ZipkinTracerContext.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.skywalking.apm.plugin.reporter.zipkin; + +import brave.Span; +import brave.Tracer; +import brave.Tracing; +import brave.propagation.TraceContext; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.skywalking.apm.agent.core.context.AbstractTracerContext; +import org.apache.skywalking.apm.agent.core.context.AsyncSpan; +import org.apache.skywalking.apm.agent.core.context.ContextCarrier; +import org.apache.skywalking.apm.agent.core.context.ContextSnapshot; +import org.apache.skywalking.apm.agent.core.context.CorrelationContext; +import org.apache.skywalking.apm.agent.core.context.trace.AbstractSpan; + +/** + * ZipkinTracerContext is an API wrapper of Zipkin tracer. + */ +public class ZipkinTracerContext implements AbstractTracerContext { + private static String B3_NAME = "b3"; + + /** + * Running span cache of the current Zipkin context. This takes the responsibility of determining when this context + * should be closed. The key time point is all running context has been closed. + */ + private Map<Span, ZipkinSpan> runningSpans; + private final Tracing tracing; + private final Tracer tracer; + private final CorrelationContext correlationContext; + + public ZipkinTracerContext(final Tracing tracing) { + this.tracing = tracing; + this.tracer = tracing.tracer(); + runningSpans = new ConcurrentHashMap<>(); + this.correlationContext = new CorrelationContext(); + } + + @Override + public void inject(final ContextCarrier carrier) { + tracing.propagation().injector((request, key, value) -> carrier.addCustomContext(key, value)) + .inject(tracing.currentTraceContext().get(), null); + + this.correlationContext.inject(carrier); + } + + @Override + public void extract(final ContextCarrier carrier) { + carrier.setCustomKeys(B3_NAME); + tracing.propagation().extractor((request, key) -> carrier.readCustomContext(key)).extract(null); + this.correlationContext.extract(carrier); + } + + @Override + public ContextSnapshot capture() { + final TraceContext traceContext = tracing.currentTraceContext().get(); + ContextSnapshot contextSnapshot = new ContextSnapshot(correlationContext); + contextSnapshot.addCustomContext(B3_NAME, traceContext); + return contextSnapshot; + } + + @Override + public void continued(final ContextSnapshot snapshot) { + final TraceContext traceContext = (TraceContext) snapshot.readCustomContext(B3_NAME); + tracing.currentTraceContext().newScope(traceContext); + } + + @Override + public String getReadablePrimaryTraceId() { + final Span span = tracer.currentSpan(); + return span == null ? "N/A" : span.context().traceIdString(); + } + + @Override + public AbstractSpan createEntrySpan(final String operationName) { + final Span span = tracer.nextSpan().name(operationName); + return createOrGet(span).setEntry(true); + } + + @Override + public AbstractSpan createLocalSpan(final String operationName) { + final Span span = tracer.nextSpan().name(operationName); + return createOrGet(span); + } + + @Override + public AbstractSpan createExitSpan(final String operationName, final String remotePeer) { + final Span span = tracer.nextSpan().name(operationName); + span.remoteServiceName(remotePeer); + return createOrGet(span).setExit(true); + } + + @Override + public AbstractSpan activeSpan() { + final Span span = tracer.currentSpan(); + if (span == null) { + throw new IllegalStateException("No active span."); + } + return createOrGet(span); + } + + /** + * @param span to finish + * @return true once no active span. + */ + @Override + public boolean stopSpan(final AbstractSpan span) { + final ZipkinSpan zipkinSpan = (ZipkinSpan) span; + if (!zipkinSpan.isAsync()) { + zipkinSpan.stop(); + } + runningSpans.remove(span); + return runningSpans.isEmpty(); + } + + @Override + public AbstractTracerContext awaitFinishAsync() { + return this; + } + + @Override + public void asyncStop(final AsyncSpan span) { + ((ZipkinSpan) span).stop(); + } + + @Override + public CorrelationContext getCorrelationContext() { + return null; + } + + private ZipkinSpan createOrGet(Span span) { + ZipkinSpan zipkinSpan = runningSpans.get(span); + if (zipkinSpan == null) { + zipkinSpan = new ZipkinSpan(span); + final ZipkinSpan prevValue = runningSpans.putIfAbsent(span, zipkinSpan); + if (prevValue != null) { + zipkinSpan = prevValue; + } + } + return zipkinSpan; + } +} diff --git a/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService new file mode 100644 index 0000000..e28a93a --- /dev/null +++ b/apm-sniffer/optional-reporter-plugins/zipkin-reporter-plugin/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService @@ -0,0 +1,30 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# + +org.apache.skywalking.apm.plugin.reporter.zipkin.ZipkinTraceReporter +org.apache.skywalking.apm.plugin.reporter.zipkin.ZipkinContextManager +org.apache.skywalking.apm.plugin.reporter.zipkin.GRPCBlockingService + +#org.apache.skywalking.apm.agent.core.jvm.JVMService +#org.apache.skywalking.apm.agent.core.remote.ServiceManagementClient +#org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService +#org.apache.skywalking.apm.agent.core.commands.CommandService +#org.apache.skywalking.apm.agent.core.commands.CommandExecutorService +#org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService +#org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService +#org.apache.skywalking.apm.agent.core.meter.MeterService \ No newline at end of file
