This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 3122697e3a Support span attached event concept (#9916)
3122697e3a is described below
commit 3122697e3a4cd2744595e849051d4808ebce64fc
Author: mrproliu <[email protected]>
AuthorDate: Wed Nov 9 11:28:42 2022 +0800
Support span attached event concept (#9916)
---
docs/en/changes/changes.md | 2 +
.../manual/spanattach/SpanAttachedEventRecord.java | 114 +++++++++++++++++++++
.../spanattach/SpanAttachedEventTraceType.java | 54 ++++++++++
.../oap/server/core/query/TraceQueryService.java | 83 +++++++++++++++
.../oap/server/core/query/type/Instant.java | 30 ++++++
.../server/core/query/type/KeyNumericValue.java | 38 +++++++
.../oap/server/core/query/type/Span.java | 2 +
.../type/{Span.java => SpanAttachedEvent.java} | 57 +++--------
.../oap/server/core/source/DefaultScopeDefine.java | 2 +
.../oap/server/core/storage/StorageModule.java | 4 +-
.../storage/query/ISpanAttachedEventQueryDAO.java | 30 ++++++
.../src/main/resources/query-protocol | 2 +-
.../query/zipkin/handler/ZipkinQueryHandler.java | 103 +++++++++++++++++++
.../trace/provider/TraceModuleProvider.java | 2 +
.../SpanAttachedEventReportServiceHandler.java | 85 +++++++++++++++
.../plugin/banyandb/BanyanDBStorageProvider.java | 3 +
.../stream/BanyanDBSpanAttachedEventQueryDAO.java | 71 +++++++++++++
.../StorageModuleElasticsearchProvider.java | 4 +
.../query/SpanAttachedEventEsDAO.java | 102 ++++++++++++++++++
.../plugin/jdbc/common/JDBCStorageProvider.java | 5 +
.../common/dao/JDBCSpanAttachedEventQueryDAO.java | 81 +++++++++++++++
.../exporter/test/ProfileSnapshotExporterTest.java | 2 +
.../exporter/test/SpanAttachedEventQueryDAO.java | 33 ++++++
23 files changed, 864 insertions(+), 45 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index 14f2ae395d..5164e6874b 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -101,6 +101,8 @@
* Optimize MQ Topology analysis. Use entry span's peer from the consumer side
as source service when no producer instrumentation(no cross-process reference).
* Refactor JDBC storage implementations to reuse logics.
* Fix `ClassCastException` in `LoggingConfigWatcher`.
+* Support span attached event concept in Zipkin and SkyWalking trace query.
+* Support span attached events on Zipkin lens UI.
#### UI
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
new file mode 100644
index 0000000000..9a04a7a674
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventRecord.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.spanattach;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.core.storage.annotation.SuperDataset;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Entity;
+import org.apache.skywalking.oap.server.core.storage.type.Convert2Storage;
+import org.apache.skywalking.oap.server.core.storage.type.StorageBuilder;
+
+import static
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.SPAN_ATTACHED_EVENT;
+
+@SuperDataset
+@Setter
+@Getter
+@ScopeDeclaration(id = SPAN_ATTACHED_EVENT, name = "SpanAttachedEvent")
+@Stream(name = SpanAttachedEventRecord.INDEX_NAME, scopeId =
SPAN_ATTACHED_EVENT, builder = SpanAttachedEventRecord.Builder.class, processor
= RecordStreamProcessor.class)
+public class SpanAttachedEventRecord extends Record {
+
+ public static final String INDEX_NAME = "span_attached_event_record";
+ public static final String START_TIME_SECOND = "start_time_second";
+ public static final String START_TIME_NANOS = "start_time_nanos";
+ public static final String EVENT = "event";
+ public static final String END_TIME_SECOND = "end_time_second";
+ public static final String END_TIME_NANOS = "end_time_nanos";
+ public static final String TRACE_REF_TYPE = "trace_ref_type";
+ public static final String TRACE_ID = "trace_id";
+ public static final String TRACE_SEGMENT_ID = "trace_segment_id";
+ public static final String TRACE_SPAN_ID = "trace_span_id";
+ public static final String DATA_BINARY = "data_binary";
+
+ @Column(columnName = START_TIME_SECOND)
+ private long startTimeSecond;
+ @Column(columnName = START_TIME_NANOS)
+ private int startTimeNanos;
+ @Column(columnName = EVENT)
+ private String event;
+ @Column(columnName = END_TIME_SECOND)
+ private long endTimeSecond;
+ @Column(columnName = END_TIME_NANOS)
+ private int endTimeNanos;
+ @Column(columnName = TRACE_REF_TYPE)
+ private int traceRefType;
+ @Column(columnName = TRACE_ID)
+ @BanyanDB.ShardingKey(index = 0)
+ private String traceId;
+ @Column(columnName = TRACE_SEGMENT_ID)
+ private String traceSegmentId;
+ @Column(columnName = TRACE_SPAN_ID)
+ private String traceSpanId;
+ @Column(columnName = DATA_BINARY, storageOnly = true)
+ private byte[] dataBinary;
+
+ @Override
+ public String id() {
+ return traceSegmentId + Const.ID_CONNECTOR + startTimeSecond +
Const.ID_CONNECTOR + startTimeNanos + Const.ID_CONNECTOR + event;
+ }
+
+ public static class Builder implements
StorageBuilder<SpanAttachedEventRecord> {
+ @Override
+ public SpanAttachedEventRecord storage2Entity(Convert2Entity
converter) {
+ final SpanAttachedEventRecord record = new
SpanAttachedEventRecord();
+ record.setStartTimeSecond(((Number)
converter.get(START_TIME_SECOND)).longValue());
+ record.setStartTimeNanos(((Number)
converter.get(START_TIME_NANOS)).intValue());
+ record.setEvent((String) converter.get(EVENT));
+ record.setEndTimeSecond(((Number)
converter.get(END_TIME_SECOND)).longValue());
+ record.setEndTimeNanos(((Number)
converter.get(END_TIME_NANOS)).intValue());
+ record.setTraceRefType(((Number)
converter.get(TRACE_REF_TYPE)).intValue());
+ record.setTraceId((String) converter.get(TRACE_ID));
+ record.setTraceSegmentId((String) converter.get(TRACE_SEGMENT_ID));
+ record.setTraceSpanId((String) converter.get(TRACE_SPAN_ID));
+ record.setDataBinary(converter.getBytes(DATA_BINARY));
+ return record;
+ }
+
+ @Override
+ public void entity2Storage(SpanAttachedEventRecord entity,
Convert2Storage converter) {
+ converter.accept(START_TIME_SECOND, entity.getStartTimeSecond());
+ converter.accept(START_TIME_NANOS, entity.getStartTimeNanos());
+ converter.accept(EVENT, entity.getEvent());
+ converter.accept(END_TIME_SECOND, entity.getEndTimeSecond());
+ converter.accept(END_TIME_NANOS, entity.getEndTimeNanos());
+ converter.accept(TRACE_REF_TYPE, entity.getTraceRefType());
+ converter.accept(TRACE_ID, entity.getTraceId());
+ converter.accept(TRACE_SEGMENT_ID, entity.getTraceSegmentId());
+ converter.accept(TRACE_SPAN_ID, entity.getTraceSpanId());
+ converter.accept(DATA_BINARY, entity.getDataBinary());
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventTraceType.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventTraceType.java
new file mode 100644
index 0000000000..7bd411307e
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/analysis/manual/spanattach/SpanAttachedEventTraceType.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.analysis.manual.spanattach;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * The {@link SpanAttachedEventRecord} tracing context reference type.
+ */
+public enum SpanAttachedEventTraceType {
+
+ SKYWALKING(0),
+
+ ZIPKIN(1);
+
+ private final int code;
+ private static final Map<Integer, SpanAttachedEventTraceType>
CODE_DICTIONARY = new HashMap<>();
+
+ static {
+ for (SpanAttachedEventTraceType val
:SpanAttachedEventTraceType.values()) {
+ CODE_DICTIONARY.put(val.value(), val);
+ }
+ }
+
+ public static SpanAttachedEventTraceType valueOf(Integer code) {
+ return CODE_DICTIONARY.get(code);
+ }
+
+ SpanAttachedEventTraceType(int code) {
+ this.code = code;
+ }
+
+ public int value() {
+ return this.code;
+ }
+
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index af1ebd5cb6..f840921f06 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -23,13 +23,24 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Objects;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.skywalking.apm.network.common.v3.KeyIntValuePair;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
import org.apache.skywalking.apm.network.language.agent.v3.SegmentObject;
+import org.apache.skywalking.apm.network.language.agent.v3.SpanAttachedEvent;
+import org.apache.skywalking.apm.network.language.agent.v3.SpanType;
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.Tag;
import
org.apache.skywalking.oap.server.core.analysis.manual.segment.SegmentRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
import
org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
import org.apache.skywalking.oap.server.core.query.input.Duration;
+import org.apache.skywalking.oap.server.core.query.type.KeyNumericValue;
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
import org.apache.skywalking.oap.server.core.query.type.LogEntity;
import org.apache.skywalking.oap.server.core.query.type.Pagination;
@@ -41,6 +52,7 @@ import org.apache.skywalking.oap.server.core.query.type.Trace;
import org.apache.skywalking.oap.server.core.query.type.TraceBrief;
import org.apache.skywalking.oap.server.core.query.type.TraceState;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
@@ -52,6 +64,7 @@ public class TraceQueryService implements Service {
private final ModuleManager moduleManager;
private ITraceQueryDAO traceQueryDAO;
+ private ISpanAttachedEventQueryDAO spanAttachedEventQueryDAO;
private IComponentLibraryCatalogService componentLibraryCatalogService;
public TraceQueryService(ModuleManager moduleManager) {
@@ -65,6 +78,13 @@ public class TraceQueryService implements Service {
return traceQueryDAO;
}
+ private ISpanAttachedEventQueryDAO getSpanAttachedEventQueryDAO() {
+ if (spanAttachedEventQueryDAO == null) {
+ this.spanAttachedEventQueryDAO =
moduleManager.find(StorageModule.NAME).provider().getService(ISpanAttachedEventQueryDAO.class);
+ }
+ return spanAttachedEventQueryDAO;
+ }
+
private IComponentLibraryCatalogService
getComponentLibraryCatalogService() {
if (componentLibraryCatalogService == null) {
this.componentLibraryCatalogService =
moduleManager.find(CoreModule.NAME)
@@ -123,6 +143,12 @@ public class TraceQueryService implements Service {
}
}
+ if (CollectionUtils.isNotEmpty(sortedSpans)) {
+ final List<SpanAttachedEventRecord> spanAttachedEvents =
getSpanAttachedEventQueryDAO().
+ querySpanAttachedEvents(SpanAttachedEventTraceType.SKYWALKING,
traceId);
+ appendAttachedEventsToSpan(sortedSpans, spanAttachedEvents);
+ }
+
trace.getSpans().clear();
trace.getSpans().addAll(sortedSpans);
return trace;
@@ -244,4 +270,61 @@ public class TraceQueryService implements Service {
}
});
}
+
+ private void appendAttachedEventsToSpan(List<Span> spans,
List<SpanAttachedEventRecord> events) throws InvalidProtocolBufferException {
+ if (CollectionUtils.isEmpty(events)) {
+ return;
+ }
+
+ for (SpanAttachedEventRecord record : events) {
+ if (!StringUtils.isNumeric(record.getTraceSpanId())) {
+ continue;
+ }
+
+ // find matches span
+ final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
+ Span matchesSpan = spans.stream().filter(s ->
Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
+ Objects.equals(s.getSpanId(),
eventSpanId)).findFirst().orElse(null);
+ if (matchesSpan == null) {
+ continue;
+ }
+
+ // find the first entry span of upstream if the event from the
upstream
+ SpanAttachedEvent event =
SpanAttachedEvent.parseFrom(record.getDataBinary());
+ final String bindToTheUpstreamEntrySpan =
getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
+ if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
+ final String parentSpanId = matchesSpan.getSegmentSpanId();
+ matchesSpan = spans.stream().filter(s ->
s.getSegmentParentSpanId().equals(parentSpanId)
+ && Objects.equals(s.getType(),
SpanType.Entry.name())).findFirst().orElse(matchesSpan);
+ }
+
+ matchesSpan.getAttachedEvents().add(parseEvent(event));
+ }
+ }
+
+ private String getSpanAttachedEventTagValue(List<KeyStringValuePair>
values, String tagKey) {
+ for (KeyStringValuePair pair : values) {
+ if (Objects.equals(pair.getKey(), tagKey)) {
+ return pair.getValue();
+ }
+ }
+ return null;
+ }
+
+ private org.apache.skywalking.oap.server.core.query.type.SpanAttachedEvent
parseEvent(SpanAttachedEvent event) {
+ final
org.apache.skywalking.oap.server.core.query.type.SpanAttachedEvent result =
+ new
org.apache.skywalking.oap.server.core.query.type.SpanAttachedEvent();
+ result.getStartTime().setSeconds(event.getStartTime().getSeconds());
+ result.getStartTime().setNanos(event.getStartTime().getNanos());
+ result.getEndTime().setSeconds(event.getEndTime().getSeconds());
+ result.getEndTime().setNanos(event.getEndTime().getNanos());
+ result.setEvent(event.getEvent());
+ for (KeyStringValuePair tag : event.getTagsList()) {
+ result.getTags().add(new KeyValue(tag.getKey(), tag.getValue()));
+ }
+ for (KeyIntValuePair pair : event.getSummaryList()) {
+ result.getSummary().add(new KeyNumericValue(pair.getKey(),
pair.getValue()));
+ }
+ return result;
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Instant.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Instant.java
new file mode 100644
index 0000000000..18b252d8fa
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Instant.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query.type;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+public class Instant {
+ @Setter
+ private long seconds;
+ @Setter
+ private int nanos;
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/KeyNumericValue.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/KeyNumericValue.java
new file mode 100644
index 0000000000..aa56d7e620
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/KeyNumericValue.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.query.type;
+
+import lombok.Getter;
+import lombok.Setter;
+
+@Getter
+public class KeyNumericValue {
+ @Setter
+ private String key;
+ @Setter
+ private long value;
+
+ public KeyNumericValue() {
+ }
+
+ public KeyNumericValue(String key, long value) {
+ this.key = key;
+ this.value = value;
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
index 9ce279e79c..65d12fd0ef 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
@@ -62,10 +62,12 @@ public class Span {
private String segmentSpanId;
@Setter
private String segmentParentSpanId;
+ private final List<SpanAttachedEvent> attachedEvents;
public Span() {
this.refs = new ArrayList<>();
this.tags = new ArrayList<>();
this.logs = new ArrayList<>();
+ this.attachedEvents = new ArrayList<>();
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/SpanAttachedEvent.java
similarity index 52%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/SpanAttachedEvent.java
index 9ce279e79c..5254b35d5f 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/Span.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/SpanAttachedEvent.java
@@ -18,54 +18,25 @@
package org.apache.skywalking.oap.server.core.query.type;
-import java.util.ArrayList;
-import java.util.List;
import lombok.Getter;
import lombok.Setter;
+import java.util.ArrayList;
+import java.util.List;
+
@Getter
-public class Span {
- @Setter
- private String traceId;
- @Setter
- private String segmentId;
- @Setter
- private int spanId;
- @Setter
- private int parentSpanId;
- private final List<Ref> refs;
- @Setter
- private String serviceCode;
- @Setter
- private String serviceInstanceName;
- @Setter
- private long startTime;
- @Setter
- private long endTime;
- @Setter
- private String endpointName;
- @Setter
- private String type;
- @Setter
- private String peer;
- @Setter
- private String component;
- @Setter
- private boolean isError;
- @Setter
- private String layer;
- private final List<KeyValue> tags;
- private final List<LogEntity> logs;
- @Setter
- private boolean isRoot;
- @Setter
- private String segmentSpanId;
+public class SpanAttachedEvent {
+ private Instant startTime;
+ private Instant endTime;
@Setter
- private String segmentParentSpanId;
+ private String event;
+ private List<KeyValue> tags;
+ private List<KeyNumericValue> summary;
- public Span() {
- this.refs = new ArrayList<>();
+ public SpanAttachedEvent() {
+ this.startTime = new Instant();
+ this.endTime = new Instant();
this.tags = new ArrayList<>();
- this.logs = new ArrayList<>();
+ this.summary = new ArrayList<>();
}
-}
+}
\ No newline at end of file
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index bac4bf180c..a7cacff910 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -119,6 +119,8 @@ public class DefaultScopeDefine {
public static final int MESSAGE_QUEUE_ACCESS = 63;
public static final int MESSAGE_QUEUE_ENDPOINT_ACCESS = 64;
+ public static final int SPAN_ATTACHED_EVENT = 65;
+
/**
* Catalog of scope, the metrics processor could use this to group all
generated metrics by oal rt.
*/
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
index 534d9b7510..d35c7b8099 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/StorageModule.java
@@ -34,6 +34,7 @@ import
org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
@@ -80,7 +81,8 @@ public class StorageModule extends ModuleDefine {
IEBPFProfilingDataDAO.class,
IServiceLabelDAO.class,
ITagAutoCompleteQueryDAO.class,
- IZipkinQueryDAO.class
+ IZipkinQueryDAO.class,
+ ISpanAttachedEventQueryDAO.class
};
}
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ISpanAttachedEventQueryDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ISpanAttachedEventQueryDAO.java
new file mode 100644
index 0000000000..60b0b8f2fc
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/ISpanAttachedEventQueryDAO.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.storage.query;
+
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
+import org.apache.skywalking.oap.server.library.module.Service;
+
+import java.io.IOException;
+import java.util.List;
+
+public interface ISpanAttachedEventQueryDAO extends Service {
+ List<SpanAttachedEventRecord>
querySpanAttachedEvents(SpanAttachedEventTraceType type, String traceId) throws
IOException;
+}
diff --git
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index f6ed76588f..6ebb10a411 160000
---
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit f6ed76588f2aa147c0b59a3b497526b317ee7fb9
+Subproject commit 6ebb10a411902cfca1b36e9fe6133f0e347433ea
diff --git
a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
index a507f6811d..3c09389482 100644
---
a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
+++
b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.query.zipkin.handler;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.google.gson.Gson;
+import com.google.protobuf.InvalidProtocolBufferException;
import com.linecorp.armeria.common.AggregatedHttpResponse;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpStatus;
@@ -37,22 +38,41 @@ import java.io.StringWriter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import io.vavr.Tuple;
+import io.vavr.Tuple2;
+import org.apache.skywalking.apm.network.common.v3.KeyIntValuePair;
+import org.apache.skywalking.apm.network.common.v3.KeyStringValuePair;
+import org.apache.skywalking.apm.network.language.agent.v3.SpanAttachedEvent;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.manual.searchtag.TagType;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
import org.apache.skywalking.oap.server.core.query.TagAutoCompleteQueryService;
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IZipkinQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.query.zipkin.ZipkinQueryConfig;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import org.apache.skywalking.oap.server.library.util.StringUtil;
import org.joda.time.DateTime;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+import org.yaml.snakeyaml.nodes.Tag;
import zipkin2.Span;
import zipkin2.codec.SpanBytesEncoder;
import zipkin2.storage.QueryRequest;
@@ -70,6 +90,7 @@ public class ZipkinQueryHandler {
private final ZipkinQueryConfig config;
private final ModuleManager moduleManager;
private IZipkinQueryDAO zipkinQueryDAO;
+ private ISpanAttachedEventQueryDAO spanAttachedEventQueryDAO;
private TagAutoCompleteQueryService tagQueryService;
private final long defaultLookback;
private final int namesMaxAge;
@@ -99,6 +120,13 @@ public class ZipkinQueryHandler {
return tagQueryService;
}
+ private ISpanAttachedEventQueryDAO getSpanAttachedEventQueryDAO() {
+ if (spanAttachedEventQueryDAO == null) {
+ this.spanAttachedEventQueryDAO =
moduleManager.find(StorageModule.NAME).provider().getService(ISpanAttachedEventQueryDAO.class);
+ }
+ return spanAttachedEventQueryDAO;
+ }
+
@Get("/config.json")
@Blocking
public AggregatedHttpResponse getUIConfig() throws IOException {
@@ -149,6 +177,7 @@ public class ZipkinQueryHandler {
if (CollectionUtils.isEmpty(trace)) {
return AggregatedHttpResponse.of(NOT_FOUND, ANY_TEXT_TYPE, traceId
+ " not found");
}
+ appendEvents(trace,
getSpanAttachedEventQueryDAO().querySpanAttachedEvents(SpanAttachedEventTraceType.ZIPKIN,
traceId));
return response(SpanBytesEncoder.JSON_V2.encodeList(trace));
}
@@ -274,4 +303,78 @@ public class ZipkinQueryHandler {
buff.put((byte) ']');
return buff.array();
}
+
+ private void appendEvents(List<Span> spans, List<SpanAttachedEventRecord>
events) throws InvalidProtocolBufferException {
+ if (CollectionUtils.isEmpty(events)) {
+ return;
+ }
+
+ final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0,
spans.size()).mapToObj(i -> Tuple.of(i,
spans.get(i))).collect(Collectors.toList());
+
+ final Map<String, List<SpanAttachedEventRecord>> namedEvents =
events.stream().collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent,
Collectors.toList()));
+ final Map<Integer, Span.Builder> spanCache = new HashMap<>();
+ for (Map.Entry<String, List<SpanAttachedEventRecord>> entry :
namedEvents.entrySet()) {
+ for (int i = 1; i <= entry.getValue().size(); i++) {
+ final SpanAttachedEventRecord record = entry.getValue().get(i
- 1);
+ String eventName = record.getEvent() +
(entry.getValue().size() == 1 ? "" : "-" + i);
+ Tuple2<Integer, Span> matchesSpan =
spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(),
record.getTraceSpanId())).
+ findFirst().orElse(null);
+ if (matchesSpan == null) {
+ continue;
+ }
+
+ final SpanAttachedEvent event =
SpanAttachedEvent.parseFrom(record.getDataBinary());
+ final String bindToTheUpstreamEntrySpan =
getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
+ if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
+ final String parentSpanId = matchesSpan._2.id();
+ matchesSpan = spanWithIndex.stream().filter(s ->
s._2.parentId().equals(parentSpanId)
+ && Objects.equals(s._2.kind(),
Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
+ }
+
+ final Span.Builder builder =
spanCache.computeIfAbsent(matchesSpan._1, idx -> spans.get(idx).toBuilder());
+ appendEvent(builder, eventName, event);
+ }
+ }
+
+ // re-build modified spans
+ for (Map.Entry<Integer, Span.Builder> entry : spanCache.entrySet()) {
+ spans.set(entry.getKey(), entry.getValue().build());
+ }
+ }
+
+ private void appendEvent(Span.Builder span, String eventName,
SpanAttachedEvent event) {
+ span.addAnnotation(
+ TimeUnit.SECONDS.toMicros(event.getStartTime().getSeconds()) +
TimeUnit.NANOSECONDS.toMicros(event.getStartTime().getNanos()),
+ "Start " + eventName);
+ span.addAnnotation(
+ TimeUnit.SECONDS.toMicros(event.getEndTime().getSeconds()) +
TimeUnit.NANOSECONDS.toMicros(event.getEndTime().getNanos()),
+ "Finished " + eventName);
+
+ final Yaml yaml = new Yaml();
+ if (event.getSummaryList().size() > 0) {
+ final Map<String, Long> summaries =
event.getSummaryList().stream().collect(Collectors.toMap(
+ KeyIntValuePair::getKey, KeyIntValuePair::getValue, (s1, s2)
-> s1));
+ String summary = yaml.dumpAs(summaries, Tag.MAP,
DumperOptions.FlowStyle.AUTO);
+ span.putTag(formatEventTagKey(eventName + ".summary"), summary);
+ }
+ if (event.getTagsList().size() > 0) {
+ final Map<String, String> tags =
event.getTagsList().stream().collect(Collectors.toMap(
+ KeyStringValuePair::getKey, KeyStringValuePair::getValue, (s1,
s2) -> s1));
+ String summary = yaml.dumpAs(tags, Tag.MAP,
DumperOptions.FlowStyle.AUTO);
+ span.putTag(formatEventTagKey(eventName + ".tags"), summary);
+ }
+ }
+
+ private String formatEventTagKey(String name) {
+ return name.replaceAll(" ", ".").toLowerCase(Locale.ROOT);
+ }
+
+ private String getSpanAttachedEventTagValue(List<KeyStringValuePair>
values, String tagKey) {
+ for (KeyStringValuePair pair : values) {
+ if (Objects.equals(pair.getKey(), tagKey)) {
+ return pair.getValue();
+ }
+ }
+ return null;
+ }
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
index f8e72ba100..1b3bfe44f8 100755
---
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/TraceModuleProvider.java
@@ -30,6 +30,7 @@ import
org.apache.skywalking.oap.server.library.module.ModuleProvider;
import
org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import
org.apache.skywalking.oap.server.receiver.sharing.server.SharingServerModule;
import org.apache.skywalking.oap.server.receiver.trace.module.TraceModule;
+import
org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.SpanAttachedEventReportServiceHandler;
import
org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.TraceSegmentReportServiceHandler;
import
org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc.TraceSegmentReportServiceHandlerCompat;
import
org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.rest.TraceSegmentReportHandler;
@@ -69,6 +70,7 @@ public class TraceModuleProvider extends ModuleProvider {
TraceSegmentReportServiceHandler traceSegmentReportServiceHandler =
new TraceSegmentReportServiceHandler(getManager());
grpcHandlerRegister.addHandler(traceSegmentReportServiceHandler);
grpcHandlerRegister.addHandler(new
TraceSegmentReportServiceHandlerCompat(traceSegmentReportServiceHandler));
+ grpcHandlerRegister.addHandler(new
SpanAttachedEventReportServiceHandler(getManager()));
httpHandlerRegister.addHandler(new
TraceSegmentReportHandler(getManager()),
Collections.singletonList(HttpMethod.POST)
diff --git
a/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
new file mode 100644
index 0000000000..513061dca7
--- /dev/null
+++
b/oap-server/server-receiver-plugin/skywalking-trace-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/trace/provider/handler/v8/grpc/SpanAttachedEventReportServiceHandler.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package
org.apache.skywalking.oap.server.receiver.trace.provider.handler.v8.grpc;
+
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.apm.network.language.agent.v3.SpanAttachedEvent;
+import
org.apache.skywalking.apm.network.language.agent.v3.SpanAttachedEventReportServiceGrpc;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+public class SpanAttachedEventReportServiceHandler extends
SpanAttachedEventReportServiceGrpc.SpanAttachedEventReportServiceImplBase
implements GRPCHandler {
+ public SpanAttachedEventReportServiceHandler(ModuleManager moduleManager) {
+ }
+
+ @Override
+ public StreamObserver<SpanAttachedEvent> collect(StreamObserver<Commands>
responseObserver) {
+ return new StreamObserver<SpanAttachedEvent>() {
+ @Override
+ public void onNext(SpanAttachedEvent event) {
+ if (log.isDebugEnabled()) {
+ log.debug("receive span attached event is streaming");
+ }
+
+ final SpanAttachedEventRecord record = new
SpanAttachedEventRecord();
+ record.setStartTimeSecond(event.getStartTime().getSeconds());
+ record.setStartTimeNanos(event.getStartTime().getNanos());
+ record.setEvent(event.getEvent());
+ record.setEndTimeSecond(event.getEndTime().getSeconds());
+ record.setEndTimeNanos(event.getEndTime().getNanos());
+ record.setTraceRefType(event.getTraceContext().getTypeValue());
+ record.setTraceId(event.getTraceContext().getTraceId());
+
record.setTraceSegmentId(event.getTraceContext().getTraceSegmentId());
+ record.setTraceSpanId(event.getTraceContext().getSpanId());
+ record.setDataBinary(event.toByteArray());
+
record.setTimeBucket(TimeBucket.getMinuteTimeBucket(TimeUnit.SECONDS.toMillis(record.getStartTimeSecond())
+ +
TimeUnit.NANOSECONDS.toMillis(record.getStartTimeNanos())));
+
+ RecordStreamProcessor.getInstance().in(record);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ Status status = Status.fromThrowable(throwable);
+ if (Status.CANCELLED.getCode() == status.getCode()) {
+ if (log.isDebugEnabled()) {
+ log.debug(throwable.getMessage(), throwable);
+ }
+ return;
+ }
+ log.error(throwable.getMessage(), throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
index 9d11ec2a45..b0555952cb 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java
@@ -43,6 +43,7 @@ import
org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
@@ -68,6 +69,7 @@ import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBL
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskLogQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileTaskQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBProfileThreadSnapshotQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBSpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStorageDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBTraceQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBZipkinQueryDAO;
@@ -152,6 +154,7 @@ public class BanyanDBStorageProvider extends ModuleProvider
{
this.registerServiceImplementation(IAggregationQueryDAO.class, new
BanyanDBAggregationQueryDAO(client));
this.registerServiceImplementation(IRecordsQueryDAO.class, new
BanyanDBRecordsQueryDAO(client));
this.registerServiceImplementation(IZipkinQueryDAO.class, new
BanyanDBZipkinQueryDAO(client));
+ this.registerServiceImplementation(ISpanAttachedEventQueryDAO.class,
new BanyanDBSpanAttachedEventQueryDAO(client));
}
@Override
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSpanAttachedEventQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSpanAttachedEventQueryDAO.java
new file mode 100644
index 0000000000..792b151be9
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/BanyanDBSpanAttachedEventQueryDAO.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
+import org.apache.skywalking.banyandb.v1.client.RowEntity;
+import org.apache.skywalking.banyandb.v1.client.StreamQuery;
+import org.apache.skywalking.banyandb.v1.client.StreamQueryResponse;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBConverter;
+import
org.apache.skywalking.oap.server.storage.plugin.banyandb.BanyanDBStorageClient;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+public class BanyanDBSpanAttachedEventQueryDAO extends AbstractBanyanDBDAO
implements ISpanAttachedEventQueryDAO {
+ private static final Set<String> TAGS =
ImmutableSet.of(SpanAttachedEventRecord.START_TIME_SECOND,
+ SpanAttachedEventRecord.START_TIME_NANOS,
+ SpanAttachedEventRecord.EVENT,
+ SpanAttachedEventRecord.END_TIME_SECOND,
+ SpanAttachedEventRecord.END_TIME_NANOS,
+ SpanAttachedEventRecord.TRACE_REF_TYPE,
+ SpanAttachedEventRecord.TRACE_ID,
+ SpanAttachedEventRecord.TRACE_SEGMENT_ID,
+ SpanAttachedEventRecord.TRACE_SPAN_ID,
+ SpanAttachedEventRecord.DATA_BINARY);
+
+ public BanyanDBSpanAttachedEventQueryDAO(BanyanDBStorageClient client) {
+ super(client);
+ }
+
+ @Override
+ public List<SpanAttachedEventRecord>
querySpanAttachedEvents(SpanAttachedEventTraceType type, String traceId) throws
IOException {
+ final StreamQueryResponse resp =
query(SpanAttachedEventRecord.INDEX_NAME, TAGS, new QueryBuilder<StreamQuery>()
{
+ @Override
+ protected void apply(StreamQuery query) {
+ query.and(eq(SpanAttachedEventRecord.TRACE_ID, traceId));
+ query.and(eq(SpanAttachedEventRecord.TRACE_REF_TYPE,
type.value()));
+ query.setOrderBy(new
StreamQuery.OrderBy(SpanAttachedEventRecord.START_TIME_SECOND,
AbstractQuery.Sort.ASC));
+ }
+ });
+
+ return
resp.getElements().stream().map(this::buildRecord).collect(Collectors.toList());
+ }
+
+ private SpanAttachedEventRecord buildRecord(RowEntity row) {
+ final SpanAttachedEventRecord.Builder builder = new
SpanAttachedEventRecord.Builder();
+ return builder.storage2Entity(new
BanyanDBConverter.StorageToStream(SpanAttachedEventRecord.INDEX_NAME, row));
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
index a824abeb80..da52561c7d 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/StorageModuleElasticsearchProvider.java
@@ -46,6 +46,7 @@ import
org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
@@ -79,6 +80,7 @@ import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.Profi
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileTaskQueryEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ProfileThreadSnapshotQueryEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.ServiceLabelEsDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.SpanAttachedEventEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TagAutoCompleteQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.RecordsQueryEsDAO;
import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query.TopologyQueryEsDAO;
@@ -226,6 +228,8 @@ public class StorageModuleElasticsearchProvider extends
ModuleProvider {
ITagAutoCompleteQueryDAO.class, new
TagAutoCompleteQueryDAO(elasticSearchClient));
this.registerServiceImplementation(
IZipkinQueryDAO.class, new ZipkinQueryEsDAO(elasticSearchClient));
+ this.registerServiceImplementation(
+ ISpanAttachedEventQueryDAO.class, new
SpanAttachedEventEsDAO(elasticSearchClient, config));
IndexController.INSTANCE.setLogicSharding(config.isLogicSharding());
}
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
new file mode 100644
index 0000000000..4860235757
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/SpanAttachedEventEsDAO.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.elasticsearch.query;
+
+import
org.apache.skywalking.library.elasticsearch.requests.search.BoolQueryBuilder;
+import org.apache.skywalking.library.elasticsearch.requests.search.Query;
+import org.apache.skywalking.library.elasticsearch.requests.search.Search;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchBuilder;
+import
org.apache.skywalking.library.elasticsearch.requests.search.SearchParams;
+import org.apache.skywalking.library.elasticsearch.requests.search.Sort;
+import org.apache.skywalking.library.elasticsearch.response.search.SearchHit;
+import
org.apache.skywalking.library.elasticsearch.response.search.SearchResponse;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
+import
org.apache.skywalking.oap.server.library.client.elasticsearch.ElasticSearchClient;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.StorageModuleElasticsearchConfig;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.ElasticSearchConverter;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.EsDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.elasticsearch.base.IndexController;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class SpanAttachedEventEsDAO extends EsDAO implements
ISpanAttachedEventQueryDAO {
+ private final int scrollingBatchSize;
+
+ public SpanAttachedEventEsDAO(ElasticSearchClient client,
StorageModuleElasticsearchConfig config) {
+ super(client);
+ this.scrollingBatchSize = config.getProfileDataQueryBatchSize();
+ }
+
+ @Override
+ public List<SpanAttachedEventRecord>
querySpanAttachedEvents(SpanAttachedEventTraceType type, String traceId) throws
IOException {
+ final String index =
+
IndexController.LogicIndicesRegister.getPhysicalTableName(SpanAttachedEventRecord.INDEX_NAME);
+ final BoolQueryBuilder query = Query.bool();
+ if
(IndexController.LogicIndicesRegister.isPhysicalTable(SpanAttachedEventRecord.INDEX_NAME))
{
+
query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME,
SpanAttachedEventRecord.INDEX_NAME));
+ }
+ final SearchBuilder search =
Search.builder().query(query).size(scrollingBatchSize);
+ query.must(Query.terms(SpanAttachedEventRecord.TRACE_ID, traceId));
+ query.must(Query.terms(SpanAttachedEventRecord.TRACE_REF_TYPE,
type.value()));
+ search.sort(SpanAttachedEventRecord.START_TIME_SECOND, Sort.Order.ASC);
+ search.sort(SpanAttachedEventRecord.START_TIME_NANOS, Sort.Order.ASC);
+
+ final SearchParams params = new
SearchParams().scroll(SCROLL_CONTEXT_RETENTION);
+ final List<SpanAttachedEventRecord> records = new ArrayList<>();
+
+ SearchResponse results = getClient().search(index, search.build(),
params);
+ final Set<String> scrollIds = new HashSet<>();
+ try {
+ while (true) {
+ final String scrollId = results.getScrollId();
+ scrollIds.add(scrollId);
+ if (results.getHits().getTotal() == 0) {
+ break;
+ }
+ final List<SpanAttachedEventRecord> batch =
buildDataList(results);
+ records.addAll(batch);
+ // The last iterate, there is no more data
+ if (batch.size() < scrollingBatchSize) {
+ break;
+ }
+ results = getClient().scroll(SCROLL_CONTEXT_RETENTION,
scrollId);
+ }
+ } finally {
+ scrollIds.forEach(getClient()::deleteScrollContextQuietly);
+ }
+ return records;
+ }
+
+ private List<SpanAttachedEventRecord> buildDataList(SearchResponse
response) {
+ final ArrayList<SpanAttachedEventRecord> records = new ArrayList<>();
+ for (SearchHit hit : response.getHits()) {
+ final Map<String, Object> sourceAsMap = hit.getSource();
+ final SpanAttachedEventRecord.Builder builder = new
SpanAttachedEventRecord.Builder();
+ records.add(builder.storage2Entity(new
ElasticSearchConverter.ToEntity(SpanAttachedEventRecord.INDEX_NAME,
sourceAsMap)));
+ }
+ return records;
+ }
+}
\ No newline at end of file
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/JDBCStorageProvider.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/JDBCStorageProvider.java
index 9924807974..8a8a10815c 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/JDBCStorageProvider.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/JDBCStorageProvider.java
@@ -43,6 +43,7 @@ import
org.apache.skywalking.oap.server.core.storage.query.IEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ILogQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IMetricsQueryDAO;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.core.storage.query.ITagAutoCompleteQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.IRecordsQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITopologyQueryDAO;
@@ -71,6 +72,7 @@ import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCProfi
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCProfileThreadSnapshotQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCRecordsQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCServiceLabelQueryDAO;
+import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCSpanAttachedEventQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCStorageDAO;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTagAutoCompleteQueryDAO;
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao.JDBCTopologyQueryDAO;
@@ -202,6 +204,9 @@ public abstract class JDBCStorageProvider extends
ModuleProvider {
this.registerServiceImplementation(
IZipkinQueryDAO.class,
new JDBCZipkinQueryDAO(jdbcClient));
+ this.registerServiceImplementation(
+ ISpanAttachedEventQueryDAO.class,
+ new JDBCSpanAttachedEventQueryDAO(jdbcClient));
}
@Override
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSpanAttachedEventQueryDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSpanAttachedEventQueryDAO.java
new file mode 100644
index 0000000000..ec47fcf8cf
--- /dev/null
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCSpanAttachedEventQueryDAO.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.storage.plugin.jdbc.common.dao;
+
+import lombok.RequiredArgsConstructor;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
+import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+@RequiredArgsConstructor
+public class JDBCSpanAttachedEventQueryDAO implements
ISpanAttachedEventQueryDAO {
+ private final JDBCHikariCPClient jdbcClient;
+
+ @Override
+ public List<SpanAttachedEventRecord>
querySpanAttachedEvents(SpanAttachedEventTraceType type, String traceId) throws
IOException {
+ StringBuilder sql = new StringBuilder("select * from " +
SpanAttachedEventRecord.INDEX_NAME + " where ");
+ List<Object> parameters = new ArrayList<>(2);
+
+ sql.append(" ").append(SpanAttachedEventRecord.TRACE_ID).append(" =
?");
+ parameters.add(traceId);
+ sql.append(" and
").append(SpanAttachedEventRecord.TRACE_REF_TYPE).append(" = ?");
+ parameters.add(type.value());
+
+ sql.append(" order by
").append(SpanAttachedEventRecord.START_TIME_SECOND)
+
.append(",").append(SpanAttachedEventRecord.START_TIME_NANOS).append(" ASC ");
+
+ List<SpanAttachedEventRecord> results = new ArrayList<>();
+ try (Connection connection = jdbcClient.getConnection()) {
+ try (ResultSet resultSet = jdbcClient.executeQuery(
+ connection, sql.toString(), parameters.toArray(new
Object[0]))) {
+ while (resultSet.next()) {
+ SpanAttachedEventRecord record = new
SpanAttachedEventRecord();
+
record.setStartTimeSecond(resultSet.getLong(SpanAttachedEventRecord.START_TIME_SECOND));
+
record.setStartTimeNanos(resultSet.getInt(SpanAttachedEventRecord.START_TIME_NANOS));
+
record.setEvent(resultSet.getString(SpanAttachedEventRecord.EVENT));
+
record.setEndTimeSecond(resultSet.getLong(SpanAttachedEventRecord.END_TIME_SECOND));
+
record.setEndTimeNanos(resultSet.getInt(SpanAttachedEventRecord.END_TIME_NANOS));
+
record.setTraceRefType(resultSet.getInt(SpanAttachedEventRecord.TRACE_REF_TYPE));
+
record.setTraceId(resultSet.getString(SpanAttachedEventRecord.TRACE_ID));
+
record.setTraceSegmentId(resultSet.getString(SpanAttachedEventRecord.TRACE_SEGMENT_ID));
+
record.setTraceSpanId(resultSet.getString(SpanAttachedEventRecord.TRACE_SPAN_ID));
+ String dataBinaryBase64 =
resultSet.getString(SpanAttachedEventRecord.DATA_BINARY);
+ if (StringUtil.isNotEmpty(dataBinaryBase64)) {
+
record.setDataBinary(Base64.getDecoder().decode(dataBinaryBase64));
+ }
+ results.add(record);
+ }
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
+
+ return results;
+ }
+}
diff --git
a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileSnapshotExporterTest.java
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileSnapshotExporterTest.java
index 5fff3b2700..e1200cc3ed 100644
---
a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileSnapshotExporterTest.java
+++
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/ProfileSnapshotExporterTest.java
@@ -34,6 +34,7 @@ import
org.apache.skywalking.oap.server.core.query.TraceQueryService;
import
org.apache.skywalking.oap.server.core.query.type.ProfileAnalyzeTimeRange;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import
org.apache.skywalking.oap.server.core.storage.profiling.trace.IProfileThreadSnapshotQueryDAO;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
import org.apache.skywalking.oap.server.core.storage.query.ITraceQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.util.ResourceUtils;
@@ -86,6 +87,7 @@ public class ProfileSnapshotExporterTest {
Mockito.when(moduleProvider.getService(IProfileThreadSnapshotQueryDAO.class))
.thenReturn(new ProfileExportSnapshotDAO(exportedData));
Mockito.when(moduleProvider.getService(ITraceQueryDAO.class)).thenReturn(new
ProfileTraceDAO(exportedData));
+
Mockito.when(moduleProvider.getService(ISpanAttachedEventQueryDAO.class)).thenReturn(new
SpanAttachedEventQueryDAO());
}
@Test
diff --git
a/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/SpanAttachedEventQueryDAO.java
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/SpanAttachedEventQueryDAO.java
new file mode 100644
index 0000000000..fa3e4c8dcb
--- /dev/null
+++
b/oap-server/server-tools/profile-exporter/tool-profile-snapshot-bootstrap/src/test/java/org/apache/skywalking/oap/server/tool/profile/exporter/test/SpanAttachedEventQueryDAO.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.tool.profile.exporter.test;
+
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventRecord;
+import
org.apache.skywalking.oap.server.core.analysis.manual.spanattach.SpanAttachedEventTraceType;
+import
org.apache.skywalking.oap.server.core.storage.query.ISpanAttachedEventQueryDAO;
+
+import java.io.IOException;
+import java.util.List;
+
+public class SpanAttachedEventQueryDAO implements ISpanAttachedEventQueryDAO {
+ @Override
+ public List<SpanAttachedEventRecord>
querySpanAttachedEvents(SpanAttachedEventTraceType type, String traceId) throws
IOException {
+ return null;
+ }
+}
\ No newline at end of file