This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new a7bb50e63e Adapt the continuous profiling task query GraphQL (#10878)
a7bb50e63e is described below
commit a7bb50e63ea9ba3a5b4bbe68511023d10b46b614
Author: mrproliu <[email protected]>
AuthorDate: Fri Jun 2 14:14:40 2023 +0000
Adapt the continuous profiling task query GraphQL (#10878)
---
docs/en/changes/changes.md | 2 +
.../ContinuousProfilingQueryService.java | 153 ++++++++++++++++++++-
.../profiling/ebpf/EBPFProfilingQueryService.java | 10 +-
... => ContinuousProfilingMonitoringInstance.java} | 24 ++--
...a => ContinuousProfilingMonitoringProcess.java} | 19 ++-
.../type/ContinuousProfilingPolicyTarget.java | 2 +
.../core/storage/query/IMetadataQueryDAO.java | 5 +
.../graphql/resolver/ContinuousProfilingQuery.java | 6 +
.../resolver/EBPFProcessProfilingQuery.java | 5 +-
.../src/main/resources/query-protocol | 2 +-
.../handler/ContinuousProfilingServiceHandler.java | 2 +
.../banyandb/measure/BanyanDBMetadataQueryDAO.java | 26 +++-
.../query/EBPFProfilingTaskEsDAO.java | 3 +
.../elasticsearch/query/MetadataQueryEsDAO.java | 16 +++
.../jdbc/common/dao/JDBCMetadataQueryDAO.java | 42 +++++-
15 files changed, 284 insertions(+), 33 deletions(-)
diff --git a/docs/en/changes/changes.md b/docs/en/changes/changes.md
index d5d855fe7d..ade246a373 100644
--- a/docs/en/changes/changes.md
+++ b/docs/en/changes/changes.md
@@ -46,6 +46,8 @@
* Filter out unknown_cluster metric data.
* Support RabbitMQ Monitoring.
* Support Redis slow logs collection.
+* Fix data loss when query continuous profiling task record.
+* Adapt the continuous profiling task query GraphQL.
#### UI
* Revert: cpm5d function. This feature is cancelled from backend.
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
index d8765b4c40..781ba979b9 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/continuous/ContinuousProfilingQueryService.java
@@ -18,31 +18,55 @@
package org.apache.skywalking.oap.server.core.profiling.continuous;
+import com.google.gson.Gson;
+import lombok.Data;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicy;
import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingPolicyConfiguration;
import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
+import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
+import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
+import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
+import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringInstance;
+import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringProcess;
import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyItem;
import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyTarget;
+import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingTaskContinuousProfiling;
+import org.apache.skywalking.oap.server.core.query.type.Process;
+import org.apache.skywalking.oap.server.core.query.type.ServiceInstance;
import org.apache.skywalking.oap.server.core.storage.StorageModule;
import
org.apache.skywalking.oap.server.core.storage.profiling.continuous.IContinuousProfilingPolicyDAO;
+import
org.apache.skywalking.oap.server.core.storage.profiling.ebpf.IEBPFProfilingTaskDAO;
+import org.apache.skywalking.oap.server.core.storage.query.IMetadataQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Collector;
import java.util.stream.Collectors;
@Slf4j
@RequiredArgsConstructor
public class ContinuousProfilingQueryService implements Service {
+ private static final Gson GSON = new Gson();
+ private static final int RECENT_TRIGGERED_HOURS = 48;
+
private final ModuleManager moduleManager;
private IContinuousProfilingPolicyDAO policyDAO;
+ private IMetadataQueryDAO metadataQueryDAO;
+ private IEBPFProfilingTaskDAO ebpfProfilingTaskDAO;
public IContinuousProfilingPolicyDAO getPolicyDAO() {
if (policyDAO == null) {
@@ -52,6 +76,22 @@ public class ContinuousProfilingQueryService implements
Service {
return policyDAO;
}
+ public IMetadataQueryDAO getMetadataQueryDAO() {
+ if (metadataQueryDAO == null) {
+ this.metadataQueryDAO = moduleManager.find(StorageModule.NAME)
+ .provider().getService(IMetadataQueryDAO.class);
+ }
+ return metadataQueryDAO;
+ }
+
+ public IEBPFProfilingTaskDAO getEbpfProfilingTaskDAO() {
+ if (ebpfProfilingTaskDAO == null) {
+ this.ebpfProfilingTaskDAO = moduleManager.find(StorageModule.NAME)
+ .provider().getService(IEBPFProfilingTaskDAO.class);
+ }
+ return ebpfProfilingTaskDAO;
+ }
+
public List<ContinuousProfilingPolicyTarget>
queryContinuousProfilingServiceTargets(String serviceId) throws IOException {
final List<ContinuousProfilingPolicy> policies =
getPolicyDAO().queryPolicies(Arrays.asList(serviceId));
if (CollectionUtils.isEmpty(policies)) {
@@ -62,6 +102,9 @@ public class ContinuousProfilingQueryService implements
Service {
final ContinuousProfilingPolicyConfiguration configuration =
ContinuousProfilingPolicyConfiguration.parseFromJSON(policy.getConfigurationJson());
+ final List<EBPFProfilingTaskRecord> records =
queryRecentTriggeredTasks(serviceId,
configuration.getTargetCheckers().keySet());
+ final Map<Integer, EBPFProfilingTaskSummary> summaryMap =
buildSummaryByKey(records, EBPFProfilingTaskRecord::getTargetType);
+
return
configuration.getTargetCheckers().entrySet().stream().map(targetEntry -> {
final ContinuousProfilingTargetType type = targetEntry.getKey();
final List<ContinuousProfilingPolicyItem> items =
targetEntry.getValue().entrySet().stream().map(checker -> {
@@ -76,10 +119,118 @@ public class ContinuousProfilingQueryService implements
Service {
return result;
}).collect(Collectors.toList());
- return ContinuousProfilingPolicyTarget.builder()
+ final ContinuousProfilingPolicyTarget target =
ContinuousProfilingPolicyTarget.builder()
.type(type)
.checkItems(items)
.build();
+
+
Optional.ofNullable(summaryMap.get(EBPFProfilingTargetType.valueOf(type).value()))
+ .ifPresent(summary -> {
+ target.setTriggeredCount(summary.getCount());
+
target.setLastTriggerTimestamp(summary.getLastTriggerTime());
+ });
+ return target;
}).collect(Collectors.toList());
}
+
+ public List<ContinuousProfilingMonitoringInstance>
queryContinuousProfilingMonitoringInstances(String serviceId,
ContinuousProfilingTargetType target) throws IOException {
+ // Query all processes of the given service
+ final List<Process> processes =
getMetadataQueryDAO().listProcesses(serviceId, null, 0, 0);
+ if (CollectionUtils.isEmpty(processes)) {
+ return Collections.emptyList();
+ }
+ // query all triggered tasks
+ final List<EBPFProfilingTaskRecord> records =
queryRecentTriggeredTasks(serviceId, List.of(target));
+
+ // Query the metadata of instances
+ final Map<String, List<Process>> instancesProcesses =
processes.stream().collect(Collectors.groupingBy(Process::getInstanceId));
+ final List<ServiceInstance> instanceIdWithMetadata =
getMetadataQueryDAO().getInstances(Arrays.asList(instancesProcesses.keySet().toArray(new
String[0])));
+
+ // Build instance & process summary
+ final Map<String, EBPFProfilingTaskSummary> instanceSummary =
buildSummaryByKey(records, EBPFProfilingTaskRecord::getInstanceId);
+ final Map<String, EBPFProfilingTaskSummary> processSummary =
buildSummaryByKey(records, r -> {
+ final EBPFProfilingTaskContinuousProfiling continuousProfiling =
GSON.fromJson(r.getContinuousProfilingJson(),
EBPFProfilingTaskContinuousProfiling.class);
+ return continuousProfiling.getProcessId();
+ });
+
+ // build result
+ return instanceIdWithMetadata.stream().map(instance -> {
+ final ContinuousProfilingMonitoringInstance result = new
ContinuousProfilingMonitoringInstance();
+ result.setId(instance.getId());
+ result.setName(instance.getName());
+ result.setAttributes(instance.getAttributes());
+ final EBPFProfilingTaskSummary summary =
instanceSummary.get(instance.getId());
+ if (summary != null) {
+ result.setTriggeredCount(summary.getCount());
+ result.setLastTriggerTimestamp(summary.getLastTriggerTime());
+ }
+
+
result.setProcesses(instancesProcesses.getOrDefault(instance.getId(), List.of())
+ .stream().map(p -> {
+ final ContinuousProfilingMonitoringProcess process = new
ContinuousProfilingMonitoringProcess();
+ process.setId(p.getId());
+ process.setName(p.getName());
+ process.setDetectType(p.getDetectType());
+ process.setLabels(p.getLabels());
+
+ final EBPFProfilingTaskSummary processSummaryItem =
processSummary.get(p.getId());
+ if (processSummaryItem != null) {
+
process.setTriggeredCount(processSummaryItem.getCount());
+
process.setLastTriggerTimestamp(processSummaryItem.getLastTriggerTime());
+ }
+
+ return process;
+ }).collect(Collectors.toList()));
+ return result;
+ }).collect(Collectors.toList());
+ }
+
+ private <T> Map<T, EBPFProfilingTaskSummary>
buildSummaryByKey(List<EBPFProfilingTaskRecord> records,
Function<EBPFProfilingTaskRecord, T> groupBy) {
+ return
records.stream().collect(Collectors.groupingByConcurrent(groupBy,
buildSummaryCollector()));
+ }
+
+ private List<EBPFProfilingTaskRecord> queryRecentTriggeredTasks(String
serviceId, Collection<ContinuousProfilingTargetType> targets) throws
IOException {
+ final Calendar timeInstance = Calendar.getInstance();
+ timeInstance.add(Calendar.HOUR, -RECENT_TRIGGERED_HOURS);
+ return getEbpfProfilingTaskDAO().queryTasksByTargets(serviceId, null,
+
targets.stream().map(EBPFProfilingTargetType::valueOf).collect(Collectors.toList()),
+ EBPFProfilingTriggerType.CONTINUOUS_PROFILING,
timeInstance.getTimeInMillis(), 0);
+ }
+
+ /**
+ * Summary all records to one summary
+ */
+ private Collector<EBPFProfilingTaskRecord, EBPFProfilingTaskSummary,
EBPFProfilingTaskSummary> buildSummaryCollector() {
+ return Collector.of(EBPFProfilingTaskSummary::new,
+ (result, task) -> {
+ result.setCount(result.getCount() + 1);
+ if (task.getStartTime() > result.getLastTriggerTime()) {
+ result.setLastTriggerTime(task.getStartTime());
+ }
+ result.getRecords().add(task);
+ },
+ (result1, result2) -> {
+ result1.setCount(result1.getCount() + result2.getCount());
+ if (result2.getLastTriggerTime() >
result1.getLastTriggerTime()) {
+ result1.setLastTriggerTime(result2.getLastTriggerTime());
+ }
+ result1.getRecords().addAll(result2.getRecords());
+ return result1;
+ });
+ }
+
+ @Data
+ private static class EBPFProfilingTaskSummary {
+ // count of triggered tasks
+ private int count;
+ // last trigger time
+ private long lastTriggerTime;
+ // all triggered tasks
+ private List<EBPFProfilingTaskRecord> records;
+
+ public EBPFProfilingTaskSummary() {
+ this.records = new ArrayList<>();
+ }
+ }
+
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
index 41cba60b96..336b317483 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profiling/ebpf/EBPFProfilingQueryService.java
@@ -33,6 +33,7 @@ import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilin
import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTaskRecord;
import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
import
org.apache.skywalking.oap.server.core.query.enumeration.ProfilingSupportStatus;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
import org.apache.skywalking.oap.server.core.query.type.Attribute;
import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeAggregateType;
@@ -177,11 +178,16 @@ public class EBPFProfilingQueryService implements Service
{
return prepare;
}
- public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId,
String serviceInstanceId, List<EBPFProfilingTargetType> targets,
EBPFProfilingTriggerType triggerType) throws IOException {
+ public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId,
String serviceInstanceId, List<EBPFProfilingTargetType> targets,
EBPFProfilingTriggerType triggerType, Duration duration) throws IOException {
if (CollectionUtils.isEmpty(targets)) {
targets = Arrays.asList(EBPFProfilingTargetType.values());
}
- final List<EBPFProfilingTaskRecord> tasks =
getTaskDAO().queryTasksByTargets(serviceId, serviceInstanceId, targets,
triggerType, 0, 0);
+ long startTime = 0, endTime = 0;
+ if (duration != null) {
+ startTime = duration.getStartTimestamp();
+ endTime = duration.getEndTimestamp();
+ }
+ final List<EBPFProfilingTaskRecord> tasks =
getTaskDAO().queryTasksByTargets(serviceId, serviceInstanceId, targets,
triggerType, startTime, endTime);
// combine same id tasks
final Map<String, EBPFProfilingTaskRecord> records =
tasks.stream().collect(Collectors.toMap(EBPFProfilingTaskRecord::getLogicalId,
Function.identity(), EBPFProfilingTaskRecord::combine));
return
records.values().stream().map(this::parseTask).collect(Collectors.toList());
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
similarity index 69%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
index ca642a2c87..4aaa6457bd 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringInstance.java
@@ -18,19 +18,23 @@
package org.apache.skywalking.oap.server.core.query.type;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
-import lombok.NoArgsConstructor;
-import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
+import java.util.ArrayList;
import java.util.List;
@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ContinuousProfilingPolicyTarget {
- private ContinuousProfilingTargetType type;
- private List<ContinuousProfilingPolicyItem> checkItems;
+public class ContinuousProfilingMonitoringInstance {
+
+ private String id;
+ private String name;
+ private List<Attribute> attributes;
+ private int triggeredCount;
+ private Long lastTriggerTimestamp;
+
+ private List<ContinuousProfilingMonitoringProcess> processes;
+
+ public ContinuousProfilingMonitoringInstance() {
+ this.processes = new ArrayList<>();
+ }
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
similarity index 69%
copy from
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
copy to
oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
index ca642a2c87..610d7c0441 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingMonitoringProcess.java
@@ -18,19 +18,18 @@
package org.apache.skywalking.oap.server.core.query.type;
-import lombok.AllArgsConstructor;
-import lombok.Builder;
import lombok.Data;
-import lombok.NoArgsConstructor;
-import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
import java.util.List;
@Data
-@Builder
-@NoArgsConstructor
-@AllArgsConstructor
-public class ContinuousProfilingPolicyTarget {
- private ContinuousProfilingTargetType type;
- private List<ContinuousProfilingPolicyItem> checkItems;
+public class ContinuousProfilingMonitoringProcess {
+
+ private String id;
+ private String name;
+ private String detectType;
+ private List<String> labels;
+ private int triggeredCount;
+ private Long lastTriggerTimestamp;
+
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
index ca642a2c87..7a01ad9b6b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/type/ContinuousProfilingPolicyTarget.java
@@ -33,4 +33,6 @@ import java.util.List;
public class ContinuousProfilingPolicyTarget {
private ContinuousProfilingTargetType type;
private List<ContinuousProfilingPolicyItem> checkItems;
+ private int triggeredCount;
+ private Long lastTriggerTimestamp;
}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
index e069a49116..2b0182d44b 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/query/IMetadataQueryDAO.java
@@ -52,6 +52,11 @@ public interface IMetadataQueryDAO extends DAO {
ServiceInstance getInstance(final String instanceId) throws IOException;
+ /**
+ * @param instanceIds instance id list
+ */
+ List<ServiceInstance> getInstances(final List<String> instanceIds) throws
IOException;
+
/**
* @param keyword to filter the endpoints
* @param serviceId the owner of the endpoints
diff --git
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
index cdbf88a7b0..8491ad9bf4 100644
---
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
+++
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/ContinuousProfilingQuery.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.oap.query.graphql.resolver;
import graphql.kickstart.tools.GraphQLQueryResolver;
import org.apache.skywalking.oap.server.core.CoreModule;
import
org.apache.skywalking.oap.server.core.profiling.continuous.ContinuousProfilingQueryService;
+import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingTargetType;
+import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingMonitoringInstance;
import
org.apache.skywalking.oap.server.core.query.type.ContinuousProfilingPolicyTarget;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
@@ -48,4 +50,8 @@ public class ContinuousProfilingQuery implements
GraphQLQueryResolver {
return
getQueryService().queryContinuousProfilingServiceTargets(serviceId);
}
+ public List<ContinuousProfilingMonitoringInstance>
queryContinuousProfilingMonitoringInstances(String serviceId,
ContinuousProfilingTargetType target) throws IOException {
+ return
getQueryService().queryContinuousProfilingMonitoringInstances(serviceId,
target);
+ }
+
}
\ No newline at end of file
diff --git
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
index acd2d4979b..e6cbf3f916 100644
---
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
+++
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/java/org/apache/skywalking/oap/query/graphql/resolver/EBPFProcessProfilingQuery.java
@@ -23,6 +23,7 @@ import org.apache.skywalking.oap.server.core.CoreModule;
import
org.apache.skywalking.oap.server.core.profiling.ebpf.EBPFProfilingQueryService;
import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTargetType;
import
org.apache.skywalking.oap.server.core.profiling.ebpf.storage.EBPFProfilingTriggerType;
+import org.apache.skywalking.oap.server.core.query.input.Duration;
import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzation;
import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeAggregateType;
import
org.apache.skywalking.oap.server.core.query.type.EBPFProfilingAnalyzeTimeRange;
@@ -60,14 +61,14 @@ public class EBPFProcessProfilingQuery implements
GraphQLQueryResolver {
return
getQueryService().queryPrepareCreateEBPFProfilingTaskData(serviceId);
}
- public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId,
String serviceInstanceId, List<EBPFProfilingTargetType> targets,
EBPFProfilingTriggerType triggerType) throws IOException {
+ public List<EBPFProfilingTask> queryEBPFProfilingTasks(String serviceId,
String serviceInstanceId, List<EBPFProfilingTargetType> targets,
EBPFProfilingTriggerType triggerType, Duration duration) throws IOException {
if (StringUtil.isEmpty(serviceId) &&
StringUtil.isEmpty(serviceInstanceId)) {
throw new IllegalArgumentException("please provide the service id
or instance id");
}
if (triggerType == null) {
triggerType = EBPFProfilingTriggerType.FIXED_TIME;
}
- return getQueryService().queryEBPFProfilingTasks(serviceId,
serviceInstanceId, targets, triggerType);
+ return getQueryService().queryEBPFProfilingTasks(serviceId,
serviceInstanceId, targets, triggerType, duration);
}
public List<EBPFProfilingSchedule> queryEBPFProfilingSchedules(String
taskId) throws Exception {
diff --git
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
index 06789e114b..a711fd9d0f 160000
---
a/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
+++
b/oap-server/server-query-plugin/query-graphql-plugin/src/main/resources/query-protocol
@@ -1 +1 @@
-Subproject commit 06789e114b321b19cd23802ea7cb210732b3dbf3
+Subproject commit a711fd9d0f94a67a933eacf30a12976c6219c416
diff --git
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
index b818b54cdb..7e7051bdd7 100644
---
a/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
+++
b/oap-server/server-receiver-plugin/skywalking-ebpf-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/ebpf/provider/handler/ContinuousProfilingServiceHandler.java
@@ -33,6 +33,7 @@ import
org.apache.skywalking.apm.network.ebpf.profiling.v3.ContinuousProfilingSe
import org.apache.skywalking.oap.server.core.Const;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.IDManager;
+import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import
org.apache.skywalking.oap.server.core.analysis.worker.NoneStreamProcessor;
import org.apache.skywalking.oap.server.core.command.CommandService;
import
org.apache.skywalking.oap.server.core.profiling.continuous.storage.ContinuousProfilingMonitorType;
@@ -178,6 +179,7 @@ public class ContinuousProfilingServiceHandler extends
ContinuousProfilingServic
task.setFixedTriggerDuration(request.getDuration());
task.setCreateTime(currentTime);
task.setLastUpdateTime(currentTime);
+ task.setTimeBucket(TimeBucket.getRecordTimeBucket(currentTime));
final EBPFProfilingTaskContinuousProfiling continuousProfiling = new
EBPFProfilingTaskContinuousProfiling();
continuousProfiling.setProcessId(processId);
diff --git
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
index 3231a506fc..752aa71164 100644
---
a/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBMetadataQueryDAO.java
@@ -180,6 +180,21 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
return resp.size() > 0 ? buildInstance(resp.getDataPoints().get(0),
schema) : null;
}
+ @Override
+ public List<ServiceInstance> getInstances(List<String> instanceIds) throws
IOException {
+ MeasureQueryResponse resp = query(InstanceTraffic.INDEX_NAME,
+ INSTANCE_TRAFFIC_COMPACT_TAGS,
+ Collections.emptySet(),
+ new QueryBuilder<MeasureQuery>() {
+ @Override
+ protected void apply(MeasureQuery query) {
+ query.and(in(InstanceTraffic.ID, instanceIds));
+ }
+ });
+ MetadataRegistry.Schema schema =
MetadataRegistry.INSTANCE.findMetadata(InstanceTraffic.INDEX_NAME,
DownSampling.Minute);
+ return resp.getDataPoints().stream().map(e -> buildInstance(e,
schema)).collect(Collectors.toList());
+ }
+
@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit) throws IOException {
MeasureQueryResponse resp = query(EndpointTraffic.INDEX_NAME,
@@ -215,8 +230,15 @@ public class BanyanDBMetadataQueryDAO extends
AbstractBanyanDBDAO implements IMe
@Override
protected void apply(MeasureQuery query) {
query.and(eq(ProcessTraffic.SERVICE_ID, serviceId));
- query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET,
lastPingStartTimeBucket));
- query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS,
supportStatus.value()));
+ if (lastPingStartTimeBucket > 0) {
+
query.and(gte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingStartTimeBucket));
+ }
+ if (lastPingEndTimeBucket > 0) {
+
query.and(lte(ProcessTraffic.LAST_PING_TIME_BUCKET, lastPingEndTimeBucket));
+ }
+ if (supportStatus != null) {
+
query.and(eq(ProcessTraffic.PROFILING_SUPPORT_STATUS, supportStatus.value()));
+ }
query.and(ne(ProcessTraffic.DETECT_TYPE,
ProcessDetectType.VIRTUAL.value()));
}
});
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
index 8a51f6fad0..60f8f8fa3b 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/EBPFProfilingTaskEsDAO.java
@@ -86,6 +86,9 @@ public class EBPFProfilingTaskEsDAO extends EsDAO implements
IEBPFProfilingTaskD
final String index =
IndexController.LogicIndicesRegister.getPhysicalTableName(EBPFProfilingTaskRecord.INDEX_NAME);
final BoolQueryBuilder query = Query.bool();
+ if
(IndexController.LogicIndicesRegister.isMergedTable(EBPFProfilingTaskRecord.INDEX_NAME))
{
+
query.must(Query.term(IndexController.LogicIndicesRegister.RECORD_TABLE_NAME,
EBPFProfilingTaskRecord.INDEX_NAME));
+ }
if (StringUtil.isNotEmpty(serviceId)) {
query.must(Query.term(EBPFProfilingTaskRecord.SERVICE_ID,
serviceId));
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
index a0462d45fb..694d94a1be 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/MetadataQueryEsDAO.java
@@ -196,6 +196,22 @@ public class MetadataQueryEsDAO extends EsDAO implements
IMetadataQueryDAO {
return instances.size() > 0 ? instances.get(0) : null;
}
+ @Override
+ public List<ServiceInstance> getInstances(List<String> instanceIds) throws
IOException {
+ final String index =
+
IndexController.LogicIndicesRegister.getPhysicalTableName(InstanceTraffic.INDEX_NAME);
+ final BoolQueryBuilder query =
+ Query.bool()
+ .must(Query.terms("_id", instanceIds));
+ if
(IndexController.LogicIndicesRegister.isMergedTable(InstanceTraffic.INDEX_NAME))
{
+
query.must(Query.term(IndexController.LogicIndicesRegister.METRIC_TABLE_NAME,
InstanceTraffic.INDEX_NAME));
+ }
+ final SearchBuilder search =
Search.builder().query(query).size(instanceIds.size());
+
+ final SearchResponse response = getClient().search(index,
search.build());
+ return buildInstances(response);
+ }
+
@Override
public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit)
throws IOException {
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
index 9db398be28..0647e586b4 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/common/dao/JDBCMetadataQueryDAO.java
@@ -49,6 +49,7 @@ import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.JDBCTableInst
import
org.apache.skywalking.oap.server.storage.plugin.jdbc.common.SQLAndParameters;
import org.apache.skywalking.oap.server.storage.plugin.jdbc.common.TableHelper;
+import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -220,6 +221,32 @@ public class JDBCMetadataQueryDAO implements
IMetadataQueryDAO {
return null;
}
+ @SneakyThrows
+ @Override
+ public List<ServiceInstance> getInstances(List<String> instanceIds) throws
IOException {
+ final var tables =
tableHelper.getTablesWithinTTL(InstanceTraffic.INDEX_NAME);
+
+ for (String table : tables) {
+ StringBuilder sql = new StringBuilder();
+ List<Object> condition = new ArrayList<>(5);
+ sql.append("select * from ").append(table).append(" where ")
+ .append(JDBCTableInstaller.TABLE_COLUMN).append(" = ?");
+ condition.add(InstanceTraffic.INDEX_NAME);
+ for (String instanceId : instanceIds) {
+ sql.append(" and
").append(JDBCTableInstaller.ID_COLUMN).append(" = ?");
+ condition.add(instanceId);
+ }
+ sql.append(" limit ").append(instanceIds.size());
+
+ final var result = jdbcClient.executeQuery(sql.toString(),
resultSet -> buildInstances(resultSet), condition.toArray(new Object[0]));
+ if (result != null) {
+ return result;
+ }
+ }
+
+ return null;
+ }
+
@Override
@SneakyThrows
public List<Endpoint> findEndpoint(String keyword, String serviceId, int
limit) {
@@ -264,11 +291,16 @@ public class JDBCMetadataQueryDAO implements
IMetadataQueryDAO {
@Override
@SneakyThrows
public List<Process> listProcesses(String serviceId,
ProfilingSupportStatus supportStatus, long lastPingStartTimeBucket, long
lastPingEndTimeBucket) {
- final var tables = tableHelper.getTablesForRead(
- ProcessTraffic.INDEX_NAME,
- lastPingStartTimeBucket,
- lastPingEndTimeBucket
- );
+ List<String> tables;
+ if (lastPingStartTimeBucket > 0 && lastPingEndTimeBucket > 0) {
+ tables = tableHelper.getTablesForRead(
+ ProcessTraffic.INDEX_NAME,
+ lastPingStartTimeBucket,
+ lastPingEndTimeBucket
+ );
+ } else {
+ tables = tableHelper.getTablesWithinTTL(ProcessTraffic.INDEX_NAME);
+ }
final var results = new ArrayList<Process>();
for (String table : tables) {