This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new f2fd64bdf8 Adjust the span match rule of the attached event (#10069)
f2fd64bdf8 is described below

commit f2fd64bdf8ae8d739989bbc68eb35df5990e91af
Author: mrproliu <[email protected]>
AuthorDate: Thu Dec 1 21:22:17 2022 +0800

    Adjust the span match rule of the attached event (#10069)
---
 .github/workflows/skywalking.yaml                  |  8 ++-
 .../oap/server/core/query/TraceQueryService.java   | 48 +++++++++++------
 .../query/zipkin/handler/ZipkinQueryHandler.java   | 60 ++++++++++++++--------
 skywalking-ui                                      |  2 +-
 4 files changed, 80 insertions(+), 38 deletions(-)

diff --git a/.github/workflows/skywalking.yaml 
b/.github/workflows/skywalking.yaml
index 60145b497d..cc3075fff2 100644
--- a/.github/workflows/skywalking.yaml
+++ b/.github/workflows/skywalking.yaml
@@ -312,7 +312,7 @@ jobs:
       ((github.event_name == 'schedule' && github.repository == 
'apache/skywalking') || needs.changes.outputs.oap == 'true')
     name: E2E test
     needs: [docker]
-    runs-on: ubuntu-latest
+    runs-on: ${{ matrix.test.runs-on || 'ubuntu-latest' }}
     timeout-minutes: 60
     strategy:
       fail-fast: false
@@ -517,12 +517,15 @@ jobs:
             env: OPENSEARCH_VERSION=2.4.0
           - name: eBPF Profiling Off CPU
             config: test/e2e-v2/cases/profiling/ebpf/offcpu/e2e.yaml
+            runs-on: ubuntu-20.04
           - name: eBPF Profiling Network
             config: test/e2e-v2/cases/profiling/ebpf/network/e2e.yaml
             env: ISTIO_VERSION=1.13.1
+            runs-on: ubuntu-20.04
           - name: eBPF Profiling Network ES Sharding
             config: 
test/e2e-v2/cases/profiling/ebpf/network/es-sharding/e2e.yaml
             env: ISTIO_VERSION=1.13.1
+            runs-on: ubuntu-20.04
 
           - name: Kafka Basic
             config: test/e2e-v2/cases/kafka/simple-so11y/e2e.yaml
@@ -602,6 +605,7 @@ jobs:
           - name: Rover with Istio Process 1.13.1
             config: test/e2e-v2/cases/rover/process/istio/e2e.yaml
             env: ISTIO_VERSION=1.13.1
+            runs-on: ubuntu-20.04
 
           - name: Zipkin ES
             config: test/e2e-v2/cases/zipkin/es/e2e.yaml
@@ -668,7 +672,7 @@ jobs:
       ((github.event_name == 'schedule' && github.repository == 
'apache/skywalking') || needs.changes.outputs.oap == 'true')
     name: E2E test
     needs: [docker]
-    runs-on: ubuntu-latest
+    runs-on: ubuntu-20.04
     timeout-minutes: 60
     strategy:
       fail-fast: false
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
index f840921f06..16b06ef18d 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/TraceQueryService.java
@@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.query;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
@@ -276,29 +277,46 @@ public class TraceQueryService implements Service {
             return;
         }
 
+        // sort by start time
+        events.sort((e1, e2) -> {
+            final int second = Long.compare(e1.getStartTimeSecond(), 
e2.getStartTimeSecond());
+            if (second == 0) {
+                return Long.compare(e1.getStartTimeNanos(), 
e2.getStartTimeNanos());
+            }
+            return second;
+        });
+
+        final HashMap<String, Span> spanMatcher = new HashMap<>();
         for (SpanAttachedEventRecord record : events) {
             if (!StringUtils.isNumeric(record.getTraceSpanId())) {
                 continue;
             }
+            SpanAttachedEvent event = 
SpanAttachedEvent.parseFrom(record.getDataBinary());
+            final String spanMatcherKey = record.getTraceSegmentId() + "_" + 
record.getTraceSpanId();
+            Span span = spanMatcher.get(spanMatcherKey);
+            if (span == null) {
+                // find the matches span
+                final int eventSpanId = 
Integer.parseInt(record.getTraceSpanId());
+                span = spans.stream().filter(s -> 
Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
+                    (s.getSpanId() == eventSpanId)).findFirst().orElse(null);
+                if (span == null) {
+                    continue;
+                }
 
-            // find matches span
-            final int eventSpanId = Integer.parseInt(record.getTraceSpanId());
-            Span matchesSpan = spans.stream().filter(s -> 
Objects.equals(s.getSegmentId(), record.getTraceSegmentId()) &&
-                Objects.equals(s.getSpanId(), 
eventSpanId)).findFirst().orElse(null);
-            if (matchesSpan == null) {
-                continue;
-            }
+                // if the event is server side, then needs to change to the 
upstream span
+                final String direction = 
getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
+                final String type = 
getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
 
-            // find the first entry span of upstream if the event from the 
upstream
-            SpanAttachedEvent event = 
SpanAttachedEvent.parseFrom(record.getDataBinary());
-            final String bindToTheUpstreamEntrySpan = 
getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
-            if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
-                final String parentSpanId = matchesSpan.getSegmentSpanId();
-                matchesSpan = spans.stream().filter(s -> 
s.getSegmentParentSpanId().equals(parentSpanId)
-                    && Objects.equals(s.getType(), 
SpanType.Entry.name())).findFirst().orElse(matchesSpan);
+                if (("request".equals(type) && "inbound".equals(direction)) || 
("response".equals(type) && "outbound".equals(direction))) {
+                    final String parentSpanId = span.getSegmentSpanId();
+                    span = spans.stream().filter(s -> 
s.getSegmentParentSpanId().equals(parentSpanId)
+                        && Objects.equals(s.getType(), 
SpanType.Entry.name())).findFirst().orElse(span);
+                }
+
+                spanMatcher.put(spanMatcherKey, span);
             }
 
-            matchesSpan.getAttachedEvents().add(parseEvent(event));
+            span.getAttachedEvents().add(parseEvent(event));
         }
     }
 
diff --git 
a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
 
b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
index 3c09389482..ebc188d595 100644
--- 
a/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
+++ 
b/oap-server/server-query-plugin/zipkin-query-plugin/src/main/java/org/apache/skywalking/oap/query/zipkin/handler/ZipkinQueryHandler.java
@@ -311,34 +311,54 @@ public class ZipkinQueryHandler {
 
         final List<Tuple2<Integer, Span>> spanWithIndex = IntStream.range(0, 
spans.size()).mapToObj(i -> Tuple.of(i, 
spans.get(i))).collect(Collectors.toList());
 
-        final Map<String, List<SpanAttachedEventRecord>> namedEvents = 
events.stream().collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent,
 Collectors.toList()));
-        final Map<Integer, Span.Builder> spanCache = new HashMap<>();
-        for (Map.Entry<String, List<SpanAttachedEventRecord>> entry : 
namedEvents.entrySet()) {
-            for (int i = 1; i <= entry.getValue().size(); i++) {
-                final SpanAttachedEventRecord record = entry.getValue().get(i 
- 1);
-                String eventName = record.getEvent() + 
(entry.getValue().size() == 1 ? "" : "-" + i);
-                Tuple2<Integer, Span> matchesSpan = 
spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), 
record.getTraceSpanId())).
-                    findFirst().orElse(null);
-                if (matchesSpan == null) {
-                    continue;
-                }
+        // sort by start time
+        events.sort((e1, e2) -> {
+            final int second = Long.compare(e1.getStartTimeSecond(), 
e2.getStartTimeSecond());
+            if (second == 0) {
+                return Long.compare(e1.getStartTimeNanos(), 
e2.getStartTimeNanos());
+            }
+            return second;
+        });
 
+        final Map<String, List<SpanAttachedEventRecord>> namedEvents = 
events.stream()
+            .collect(Collectors.groupingBy(SpanAttachedEventRecord::getEvent, 
Collectors.toList()));
+
+        final Map<String, Tuple2<Span.Builder, Integer>> spanCache = new 
HashMap<>();
+        for (Map.Entry<String, List<SpanAttachedEventRecord>> namedEntry : 
namedEvents.entrySet()) {
+            for (int i = 1; i <= namedEntry.getValue().size(); i++) {
+                final SpanAttachedEventRecord record = 
namedEntry.getValue().get(i - 1);
+                String eventName = record.getEvent() + 
(namedEntry.getValue().size() == 1 ? "" : "-" + i);
                 final SpanAttachedEvent event = 
SpanAttachedEvent.parseFrom(record.getDataBinary());
-                final String bindToTheUpstreamEntrySpan = 
getSpanAttachedEventTagValue(event.getTagsList(), "bind to upstream span");
-                if (Objects.equals(bindToTheUpstreamEntrySpan, "true")) {
-                    final String parentSpanId = matchesSpan._2.id();
-                    matchesSpan = spanWithIndex.stream().filter(s -> 
s._2.parentId().equals(parentSpanId)
-                        && Objects.equals(s._2.kind(), 
Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
+
+                // find matched span
+                Tuple2<Span.Builder, Integer> spanBuilder = 
spanCache.get(record.getTraceSpanId());
+                if (spanBuilder == null) {
+                    Tuple2<Integer, Span> matchesSpan = 
spanWithIndex.stream().filter(s -> Objects.equals(s._2.id(), 
record.getTraceSpanId())).
+                        findFirst().orElse(null);
+                    if (matchesSpan == null) {
+                        continue;
+                    }
+
+                    // if the event is server side, then needs to change to 
the upstream span
+                    final String direction = 
getSpanAttachedEventTagValue(event.getTagsList(), "data_direction");
+                    final String type = 
getSpanAttachedEventTagValue(event.getTagsList(), "data_type");
+                    if (("request".equals(type) && 
"inbound".equals(direction)) || ("response".equals(type) && 
"outbound".equals(direction))) {
+                        final String parentSpanId = matchesSpan._2.id();
+                        matchesSpan = spanWithIndex.stream().filter(s -> 
Objects.equals(s._2.parentId(), parentSpanId)
+                            && Objects.equals(s._2.kind(), 
Span.Kind.SERVER)).findFirst().orElse(matchesSpan);
+                    }
+
+                    spanBuilder = Tuple.of(matchesSpan._2.toBuilder(), 
matchesSpan._1);
+                    spanCache.put(record.getTraceSpanId(), spanBuilder);
                 }
 
-                final Span.Builder builder = 
spanCache.computeIfAbsent(matchesSpan._1, idx -> spans.get(idx).toBuilder());
-                appendEvent(builder, eventName, event);
+                appendEvent(spanBuilder._1, eventName, event);
             }
         }
 
         // re-build modified spans
-        for (Map.Entry<Integer, Span.Builder> entry : spanCache.entrySet()) {
-            spans.set(entry.getKey(), entry.getValue().build());
+        for (Map.Entry<String, Tuple2<Span.Builder, Integer>> entry : 
spanCache.entrySet()) {
+            spans.set(entry.getValue()._2, entry.getValue()._1.build());
         }
     }
 
diff --git a/skywalking-ui b/skywalking-ui
index 221751f034..aab44626da 160000
--- a/skywalking-ui
+++ b/skywalking-ui
@@ -1 +1 @@
-Subproject commit 221751f034763d6f56ab7b4cb154abfa890f370c
+Subproject commit aab44626da02576a48b344fb3078f336e8449c30

Reply via email to