kezhenxu94 commented on a change in pull request #8696:
URL: https://github.com/apache/skywalking/pull/8696#discussion_r830482066



##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.profiling.ebpf.analyze;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTree;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * eBPF Profiling Analyzer working on data query and combine them for generate 
the Flame Graph.
+ */
+@Slf4j
+public class EBPFProfilingAnalyzer {
+
+    private static final EBPFProfilingAnalyzeCollector ANALYZE_COLLECTOR = new 
EBPFProfilingAnalyzeCollector();
+    private static final Long FETCH_FETCH_DURATION = 
TimeUnit.MINUTES.toMillis(2);
+
+    private final ModuleManager moduleManager;
+    protected IEBPFProfilingDataDAO dataDAO;
+    private long maxAnalyzeTimeRangeInMillisecond;
+
+    public EBPFProfilingAnalyzer(ModuleManager moduleManager, int 
maxDurationOfAnalysisInMinute) {
+        this.moduleManager = moduleManager;
+        this.maxAnalyzeTimeRangeInMillisecond = 
TimeUnit.MINUTES.toMillis(maxDurationOfAnalysisInMinute);
+    }
+
+    /**
+     * search data and analyze
+     */
+    public EBPFProfilingAnalyzation analyze(String taskId, 
List<EBPFProfilingAnalyzeTimeRange> ranges) throws IOException {
+        EBPFProfilingAnalyzation analyzation = new EBPFProfilingAnalyzation();
+
+        String timeRangeValidate = validateIsOutOfTimeRangeLimit(ranges);
+        if (StringUtil.isNotEmpty(timeRangeValidate)) {
+            analyzation.setTip(timeRangeValidate);
+            return analyzation;
+        }
+
+        // query data
+        final Stream<EBPFProfilingStack> stackStream = 
buildTimeRanges(ranges).parallelStream().map(r -> {
+            try {
+                return getDataDAO().queryData(taskId, r.minTime, r.maxTime);
+            } catch (IOException e) {
+                log.warn(e.getMessage(), e);
+                return Collections.<EBPFProfilingDataRecord>emptyList();
+            }
+        }).flatMap(Collection::stream).map(e -> {
+            try {
+                return EBPFProfilingStack.deserialize(e);
+            } catch (Exception ex) {
+                log.warn("could not deserialize the stack", ex);
+                return null;
+            }
+        }).filter(Objects::nonNull).distinct();
+
+        // analyze tree
+        generateTrees(analyzation, stackStream);
+
+        return analyzation;
+    }
+
+    private String 
validateIsOutOfTimeRangeLimit(List<EBPFProfilingAnalyzeTimeRange> timeRanges) {
+        if (CollectionUtils.isEmpty(timeRanges)) {
+            return "please provide time ranges";
+        }
+
+        long totalDuration = 0;
+        for (EBPFProfilingAnalyzeTimeRange timeRange : timeRanges) {
+            final long duration = timeRange.getEnd() - timeRange.getStart();
+            if (duration <= 0) {
+                return "please validate the time duration data";
+            }
+            totalDuration += duration;
+        }
+
+        if (totalDuration > maxAnalyzeTimeRangeInMillisecond) {
+            return "time range is out of " +
+                    
TimeUnit.MILLISECONDS.toMinutes(this.maxAnalyzeTimeRangeInMillisecond) + " 
minute";
+        }
+        return null;
+    }
+
+    public void generateTrees(EBPFProfilingAnalyzation analyzation, 
Stream<EBPFProfilingStack> stackStream) {
+        Collection<EBPFProfilingTree> stackTrees = stackStream
+                // stack list cannot be empty
+                .filter(s -> CollectionUtils.isNotEmpty(s.getSymbols()))
+                // analyze the symbol and combine as trees
+                .collect(Collectors.groupingBy(s -> s.getSymbols()
+                        .get(0), ANALYZE_COLLECTOR)).values();
+
+        analyzation.getTrees().addAll(stackTrees);
+    }
+
+    protected List<TimeRange> 
buildTimeRanges(List<EBPFProfilingAnalyzeTimeRange> timeRanges) {
+        return timeRanges.parallelStream()
+                .map(r -> buildTimeRanges(r.getStart(), r.getEnd()))
+                .filter(Objects::nonNull)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Split time ranges to insure the start time and end time is small then 
{@link #FETCH_FETCH_DURATION}
+     */
+    protected List<TimeRange> buildTimeRanges(long start, long end) {
+        if (start >= end) {
+            return null;
+        }
+
+        // include latest millisecond
+        end += 1;
+
+        final List<TimeRange> timeRanges = new ArrayList<>();
+        do {
+            long batchEnd = Math.min(start + FETCH_FETCH_DURATION, end);
+            timeRanges.add(new TimeRange(start, batchEnd));
+            start = batchEnd;
+        }
+        while (start < end);
+
+        return timeRanges;
+    }
+
+    protected IEBPFProfilingDataDAO getDataDAO() {
+        if (dataDAO == null) {
+            dataDAO = moduleManager.find(StorageModule.NAME)
+                                                         .provider()

Review comment:
       Codes are not formatted

##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingAnalyzer.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.profiling.ebpf.analyze;
+
+import lombok.extern.slf4j.Slf4j;
+import 
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingDataRecord;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTree;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import 
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingDataDAO;
+import org.apache.skywalking.oap.server.library.module.ModuleManager;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.apache.skywalking.oap.server.library.util.StringUtil;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * eBPF Profiling Analyzer working on data query and combine them for generate 
the Flame Graph.
+ */
+@Slf4j
+public class EBPFProfilingAnalyzer {
+
+    private static final EBPFProfilingAnalyzeCollector ANALYZE_COLLECTOR = new 
EBPFProfilingAnalyzeCollector();
+    private static final Long FETCH_FETCH_DURATION = 
TimeUnit.MINUTES.toMillis(2);
+
+    private final ModuleManager moduleManager;
+    protected IEBPFProfilingDataDAO dataDAO;
+    private long maxAnalyzeTimeRangeInMillisecond;
+
+    public EBPFProfilingAnalyzer(ModuleManager moduleManager, int 
maxDurationOfAnalysisInMinute) {
+        this.moduleManager = moduleManager;
+        this.maxAnalyzeTimeRangeInMillisecond = 
TimeUnit.MINUTES.toMillis(maxDurationOfAnalysisInMinute);
+    }
+
+    /**
+     * search data and analyze
+     */
+    public EBPFProfilingAnalyzation analyze(String taskId, 
List<EBPFProfilingAnalyzeTimeRange> ranges) throws IOException {
+        EBPFProfilingAnalyzation analyzation = new EBPFProfilingAnalyzation();
+
+        String timeRangeValidate = validateIsOutOfTimeRangeLimit(ranges);
+        if (StringUtil.isNotEmpty(timeRangeValidate)) {
+            analyzation.setTip(timeRangeValidate);
+            return analyzation;
+        }
+
+        // query data
+        final Stream<EBPFProfilingStack> stackStream = 
buildTimeRanges(ranges).parallelStream().map(r -> {
+            try {
+                return getDataDAO().queryData(taskId, r.minTime, r.maxTime);
+            } catch (IOException e) {
+                log.warn(e.getMessage(), e);
+                return Collections.<EBPFProfilingDataRecord>emptyList();
+            }
+        }).flatMap(Collection::stream).map(e -> {
+            try {
+                return EBPFProfilingStack.deserialize(e);
+            } catch (Exception ex) {
+                log.warn("could not deserialize the stack", ex);
+                return null;
+            }
+        }).filter(Objects::nonNull).distinct();
+
+        // analyze tree
+        generateTrees(analyzation, stackStream);
+
+        return analyzation;
+    }
+
+    private String 
validateIsOutOfTimeRangeLimit(List<EBPFProfilingAnalyzeTimeRange> timeRanges) {
+        if (CollectionUtils.isEmpty(timeRanges)) {
+            return "please provide time ranges";
+        }
+
+        long totalDuration = 0;
+        for (EBPFProfilingAnalyzeTimeRange timeRange : timeRanges) {
+            final long duration = timeRange.getEnd() - timeRange.getStart();
+            if (duration <= 0) {
+                return "please validate the time duration data";
+            }
+            totalDuration += duration;
+        }
+
+        if (totalDuration > maxAnalyzeTimeRangeInMillisecond) {
+            return "time range is out of " +
+                    
TimeUnit.MILLISECONDS.toMinutes(this.maxAnalyzeTimeRangeInMillisecond) + " 
minute";
+        }
+        return null;
+    }
+
+    public void generateTrees(EBPFProfilingAnalyzation analyzation, 
Stream<EBPFProfilingStack> stackStream) {
+        Collection<EBPFProfilingTree> stackTrees = stackStream
+                // stack list cannot be empty
+                .filter(s -> CollectionUtils.isNotEmpty(s.getSymbols()))
+                // analyze the symbol and combine as trees
+                .collect(Collectors.groupingBy(s -> s.getSymbols()
+                        .get(0), ANALYZE_COLLECTOR)).values();
+
+        analyzation.getTrees().addAll(stackTrees);
+    }
+
+    protected List<TimeRange> 
buildTimeRanges(List<EBPFProfilingAnalyzeTimeRange> timeRanges) {
+        return timeRanges.parallelStream()
+                .map(r -> buildTimeRanges(r.getStart(), r.getEnd()))
+                .filter(Objects::nonNull)
+                .flatMap(Collection::stream)
+                .collect(Collectors.toList());
+    }
+
+    /**
+     * Split time ranges to insure the start time and end time is small then 
{@link #FETCH_FETCH_DURATION}
+     */
+    protected List<TimeRange> buildTimeRanges(long start, long end) {
+        if (start >= end) {
+            return null;
+        }
+
+        // include latest millisecond
+        end += 1;
+
+        final List<TimeRange> timeRanges = new ArrayList<>();
+        do {
+            long batchEnd = Math.min(start + FETCH_FETCH_DURATION, end);
+            timeRanges.add(new TimeRange(start, batchEnd));
+            start = batchEnd;
+        }
+        while (start < end);
+
+        return timeRanges;
+    }
+
+    protected IEBPFProfilingDataDAO getDataDAO() {
+        if (dataDAO == null) {
+            dataDAO = moduleManager.find(StorageModule.NAME)
+                                                         .provider()
+                                                         
.getService(IEBPFProfilingDataDAO.class);
+        }
+        return dataDAO;
+    }
+
+    private static class TimeRange {
+        private long minTime;

Review comment:
       These fields can be `final` and use `@RequiredArgsConstructor`

##########
File path: 
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/analyze/EBPFProfilingStackNode.java
##########
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.oap.server.core.profiling.ebpf.analyze;
+
+import com.google.common.base.Objects;
+import 
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingStackElement;
+import org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTree;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.function.Consumer;
+
+/**
+ * EBPF profiling data analyze intermediate state data
+ */
+public class EBPFProfilingStackNode {
+
+    private EBPFProfilingStack.Symbol codeSignature;
+    private List<EBPFProfilingStackNode> children;
+    private long dumpCount;
+
+    /**
+     * create new empty, un-init node
+     */
+    public static EBPFProfilingStackNode newNode() {
+        EBPFProfilingStackNode emptyNode = new EBPFProfilingStackNode();
+        emptyNode.children = new ArrayList<>();
+        return emptyNode;
+    }
+
+    /**
+     * accumulate {@link EBPFProfilingStack} to this tree, it will invoke on 
the tree root node
+     */
+    public void accumulateFrom(EBPFProfilingStack stack) {
+        List<EBPFProfilingStack.Symbol> stackList = stack.getSymbols();
+        if (codeSignature == null) {
+            codeSignature = stackList.get(0);
+        }
+        // add detected stack
+        this.detectedBy(stack);
+
+        // handle stack children
+        EBPFProfilingStackNode parent = this;
+        for (int depth = 1; depth < stackList.size(); depth++) {
+            EBPFProfilingStack.Symbol elementCodeSignature = 
stackList.get(depth);
+
+            // find same code signature children
+            EBPFProfilingStackNode childElement = null;
+            for (EBPFProfilingStackNode child : parent.children) {
+                if (Objects.equal(child.codeSignature, elementCodeSignature)) {
+                    childElement = child;
+                    break;
+                }
+            }
+
+            if (childElement != null) {
+                // add detected stack
+                childElement.detectedBy(stack);
+                parent = childElement;
+            } else {
+                // add children
+                EBPFProfilingStackNode childNode = newNode();
+                childNode.codeSignature = elementCodeSignature;
+                childNode.detectedBy(stack);
+
+                parent.children.add(childNode);
+                parent = childNode;
+            }
+        }
+    }
+
+    /**
+     * combine from other {@link EBPFProfilingStackNode}
+     */
+    public EBPFProfilingStackNode combine(EBPFProfilingStackNode node) {
+        // combine this node
+        this.combineDetectedStacks(node);
+
+        // merge tree using LDR to traversal tree node
+        // using stack to avoid recursion
+        // merge key.children <- value.children
+        LinkedList<Pair<EBPFProfilingStackNode, EBPFProfilingStackNode>> stack 
= new LinkedList<>();
+        stack.add(new Pair<>(this, node));
+        while (!stack.isEmpty()) {
+            Pair<EBPFProfilingStackNode, EBPFProfilingStackNode> 
needCombineNode = stack.pop();
+
+            // merge value children to key
+            // add to stack if need to keep traversal
+            combineChildrenNodes(needCombineNode.key, needCombineNode.value, 
stack::add);
+        }
+
+        return this;
+    }
+
+    /**
+     * merge all children nodes to appoint node
+     */
+    private void combineChildrenNodes(EBPFProfilingStackNode targetNode, 
EBPFProfilingStackNode beingMergedNode,
+                                      Consumer<Pair<EBPFProfilingStackNode, 
EBPFProfilingStackNode>> continueChildrenMerging) {
+        if (beingMergedNode.children.isEmpty()) {
+            return;
+        }
+
+        for (EBPFProfilingStackNode childrenNode : targetNode.children) {
+            // find node from being merged node children
+            for (ListIterator<EBPFProfilingStackNode> it = 
beingMergedNode.children.listIterator(); it.hasNext(); ) {
+                EBPFProfilingStackNode node = it.next();
+                if (node != null && node.matches(childrenNode)) {
+                    childrenNode.combineDetectedStacks(node);
+                    continueChildrenMerging.accept(new Pair<>(childrenNode, 
node));
+
+                    it.set(null);
+                    break;
+                }
+            }
+        }
+
+        for (EBPFProfilingStackNode node : beingMergedNode.children) {
+            if (node != null) {
+                targetNode.children.add(node);
+            }
+        }
+    }
+
+    /**
+     * build GraphQL result, calculate duration and count data using parallels
+     */
+    public EBPFProfilingTree buildAnalyzeResult() {
+        // all nodes add to single-level list (such as flat), work for 
parallel calculating
+        LinkedList<Pair<EBPFProfilingStackElement, EBPFProfilingStackNode>> 
nodeMapping = new LinkedList<>();
+        int idGenerator = 1;
+
+        EBPFProfilingStackElement root = buildElement(idGenerator++);
+        nodeMapping.add(new Pair<>(root, this));
+
+        // same with combine logic
+        LinkedList<Pair<EBPFProfilingStackElement, EBPFProfilingStackNode>> 
stack = new LinkedList<>();
+        stack.add(new Pair<>(root, this));
+        while (!stack.isEmpty()) {
+            Pair<EBPFProfilingStackElement, EBPFProfilingStackNode> 
mergingPair = stack.pop();
+            EBPFProfilingStackElement respElement = mergingPair.key;
+
+            // generate children node and add to stack and all node mapping
+            for (EBPFProfilingStackNode children : mergingPair.value.children) 
{
+                EBPFProfilingStackElement element = 
children.buildElement(idGenerator++);
+                element.setParentId(respElement.getId());
+
+                Pair<EBPFProfilingStackElement, EBPFProfilingStackNode> pair = 
new Pair<>(element, children);
+                stack.add(pair);
+                nodeMapping.add(pair);
+            }
+        }
+
+        EBPFProfilingTree tree = new EBPFProfilingTree();
+        nodeMapping.forEach(n -> tree.getElements().add(n.key));
+
+        return tree;
+    }
+
+    private void detectedBy(EBPFProfilingStack stack) {
+        this.dumpCount += stack.getDumpCount();
+    }
+
+    private void combineDetectedStacks(EBPFProfilingStackNode node) {
+        this.dumpCount += node.dumpCount;
+    }
+
+    private EBPFProfilingStackElement buildElement(int id) {
+        EBPFProfilingStackElement element = new EBPFProfilingStackElement();
+        element.setId(id);
+        element.setSymbol(this.codeSignature.getName());
+        element.setStackType(this.codeSignature.getStackType());
+        element.setDumpCount(this.dumpCount);
+        return element;
+    }
+
+    private boolean matches(EBPFProfilingStackNode node) {
+        return Objects.equal(this.codeSignature, node.codeSignature);
+    }
+
+    private static class Pair<K, V> {

Review comment:
       Consider using existing one: `io.vavr.Tuple2`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to