This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 90f3c4d Sniffer processing profile task and report status and
snapshot (#4220)
90f3c4d is described below
commit 90f3c4de557b427426dc33a86f3223be6a831794
Author: mrproliu <[email protected]>
AuthorDate: Sat Jan 18 18:38:03 2020 +0800
Sniffer processing profile task and report status and snapshot (#4220)
* sniffer processing profile task and report status and snapshot
* resolve testServiceDependencies test case error, use same register with
`TraceSegmentServiceClient`
* resolve names
* change profile to single one thread run.
* 1. change to the ArrayList, because known the max size
2. rename issue resolved
* add profiling status enum
* change sniffer use full name issue
* 1. remove `prepareProfiling` method, build profiling status when
construct `TracingContext`
2. add `TracingThreadListenerManager`, notify when tracing main thread
finish
3. change ProfileThread start when process new profile task
* remove unnecessary getter
* add test assert error message
* adding `AgentServiceRule`
* revert original assert
* remove unnecessary getter
* resolve issues
* reduce findService invoke
* resolve style error
* recheck profiling when change first span operatin name
* resolve issues
1. remove `ContextManager#profilingRecheck`, only check on `TracingContext`
2. rename comments
3. resolve volatile array setting
* remove article link
* add `ProfileTask#maxSamplingCount` check
* resolve conflict (Downstream -> Commands)
* 1. change profilingSegmentSlots init on construct
2. if is profiling, recheck dont need to stop
3. total profiling count increment on first dump
* remove unused return val
* remove some `@param` and `@return`
* add profile task check result data bean
* change profiler slot to `AtomicReferenceArray`
* resolved java doc error
* fix doc error, remove meaningless descriptions
* resolve missed profile receiver on oap starter
* resolve method invoke error
Co-authored-by: 吴晟 Wu Sheng <[email protected]>
Co-authored-by: kezhenxu94 <[email protected]>
---
.../component/command/ProfileTaskCommand.java | 18 +-
.../executor/ProfileTaskCommandExecutor.java | 3 +-
.../skywalking/apm/agent/core/conf/Config.java | 20 ++
.../agent/core/context/AbstractTracerContext.java | 1 +
.../core/context/ContextManagerExtendService.java | 2 +-
.../apm/agent/core/context/TracingContext.java | 100 ++++++++-
.../agent/core/context/TracingThreadListener.java | 24 +-
.../core/context/trace/AbstractTracingSpan.java | 19 +-
.../apm/agent/core/context/trace/EntrySpan.java | 9 +-
.../apm/agent/core/context/trace/ExitSpan.java | 17 +-
.../apm/agent/core/context/trace/LocalSpan.java | 10 +-
.../core/context/trace/StackBasedTracingSpan.java | 25 ++-
.../apm/agent/core/profile/ProfileTask.java | 28 ++-
.../core/profile/ProfileTaskChannelService.java | 243 +++++++++++++++++++++
.../core/profile/ProfileTaskExecutionContext.java | 119 +++++++++-
.../core/profile/ProfileTaskExecutionService.java | 124 ++++++++---
.../core/profile/ProfileTaskQueryService.java | 129 -----------
.../apm/agent/core/profile/ProfileThread.java | 115 ++++++++++
.../apm/agent/core/profile/ProfilingStatus.java | 24 +-
.../apm/agent/core/profile/ThreadProfiler.java | 153 +++++++++++++
.../agent/core/profile/TracingThreadSnapshot.java | 73 +++++++
...ache.skywalking.apm.agent.core.boot.BootService | 2 +-
.../apm/agent/core/boot/ServiceManagerTest.java | 18 +-
.../apm/agent/core/context/TracingContextTest.java | 2 +-
.../agent/core/test/tools/AgentServiceRule.java | 3 +
apm-sniffer/config/agent.config | 12 +
docs/en/setup/service-agent/java-agent/README.md | 4 +
.../src/main/resources/application.yml | 3 +
.../oap/server/core/cache/ProfileTaskCache.java | 45 +++-
.../oap/server/core/command/CommandService.java | 2 +-
.../profile/ProfileTaskSegmentSnapshotRecord.java | 101 +++++++++
.../oap/server/core/source/DefaultScopeDefine.java | 1 +
.../core/storage/profile/IProfileTaskQueryDAO.java | 7 +
.../handler/ProfileTaskServiceHandler.java | 77 ++++++-
.../elasticsearch/query/ProfileTaskQueryEsDAO.java | 19 ++
.../plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java | 24 ++
.../skywalking/e2e/ProfileVerificationITCase.java | 40 +++-
.../skywalking/e2e/ProfileVerificationITCase.java | 40 +++-
.../skywalking/e2e/ProfileVerificationITCase.java | 40 +++-
.../{TestController.java => CreateUser.java} | 36 +--
.../skywalking/e2e/profile/TestController.java | 14 +-
...leVerificationITCase.profileTasks.finished.yml} | 5 +
...leVerificationITCase.profileTasks.notified.yml} | 0
43 files changed, 1419 insertions(+), 332 deletions(-)
diff --git
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
index fecb876..f595fda 100644
---
a/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
+++
b/apm-protocol/apm-network/src/main/java/org/apache/skywalking/apm/network/trace/component/command/ProfileTaskCommand.java
@@ -27,10 +27,11 @@ import java.util.List;
* @author MrPro
*/
public class ProfileTaskCommand extends BaseCommand implements Serializable,
Deserializable<ProfileTaskCommand> {
- public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new
ProfileTaskCommand("", "", 0, 0, 0, 0, 0, 0);
+ public static final Deserializable<ProfileTaskCommand> DESERIALIZER = new
ProfileTaskCommand("", "", "", 0, 0, 0, 0, 0, 0);
public static final String NAME = "ProfileTaskQuery";
// profile task data
+ private String taskId;
private String endpointName;
private int duration;
private int minDurationThreshold;
@@ -39,8 +40,9 @@ public class ProfileTaskCommand extends BaseCommand
implements Serializable, Des
private long startTime;
private long createTime;
- public ProfileTaskCommand(String serialNumber, String endpointName, int
duration, int minDurationThreshold, int dumpPeriod, int maxSamplingCount, long
startTime, long createTime) {
+ public ProfileTaskCommand(String serialNumber, String taskId, String
endpointName, int duration, int minDurationThreshold, int dumpPeriod, int
maxSamplingCount, long startTime, long createTime) {
super(NAME, serialNumber);
+ this.taskId = taskId;
this.endpointName = endpointName;
this.duration = duration;
this.minDurationThreshold = minDurationThreshold;
@@ -54,6 +56,7 @@ public class ProfileTaskCommand extends BaseCommand
implements Serializable, Des
public ProfileTaskCommand deserialize(Command command) {
final List<KeyStringValuePair> argsList = command.getArgsList();
String serialNumber = null;
+ String taskId = null;
String endpointName = null;
int duration = 0;
int minDurationThreshold = 0;
@@ -67,6 +70,8 @@ public class ProfileTaskCommand extends BaseCommand
implements Serializable, Des
serialNumber = pair.getValue();
} else if ("EndpointName".equals(pair.getKey())) {
endpointName = pair.getValue();
+ } else if ("TaskId".equals(pair.getKey())) {
+ taskId = pair.getValue();
} else if ("Duration".equals(pair.getKey())) {
duration = Integer.parseInt(pair.getValue());
} else if ("MinDurationThreshold".equals(pair.getKey())) {
@@ -82,13 +87,14 @@ public class ProfileTaskCommand extends BaseCommand
implements Serializable, Des
}
}
- return new ProfileTaskCommand(serialNumber, endpointName, duration,
minDurationThreshold, dumpPeriod, maxSamplingCount, startTime, createTime);
+ return new ProfileTaskCommand(serialNumber, taskId, endpointName,
duration, minDurationThreshold, dumpPeriod, maxSamplingCount, startTime,
createTime);
}
@Override
public Command.Builder serialize() {
final Command.Builder builder = commandBuilder();
-
builder.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
+
builder.addArgs(KeyStringValuePair.newBuilder().setKey("TaskId").setValue(taskId))
+
.addArgs(KeyStringValuePair.newBuilder().setKey("EndpointName").setValue(endpointName))
.addArgs(KeyStringValuePair.newBuilder().setKey("Duration").setValue(String.valueOf(duration)))
.addArgs(KeyStringValuePair.newBuilder().setKey("MinDurationThreshold").setValue(String.valueOf(minDurationThreshold)))
.addArgs(KeyStringValuePair.newBuilder().setKey("DumpPeriod").setValue(String.valueOf(dumpPeriod)))
@@ -125,4 +131,8 @@ public class ProfileTaskCommand extends BaseCommand
implements Serializable, Des
public long getCreateTime() {
return createTime;
}
+
+ public String getTaskId() {
+ return taskId;
+ }
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
index c6c22d2..eb6212c 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/commands/executor/ProfileTaskCommandExecutor.java
@@ -39,7 +39,8 @@ public class ProfileTaskCommandExecutor implements
CommandExecutor {
// build profile task
final ProfileTask profileTask = new ProfileTask();
- profileTask.setEndpointName(profileTaskCommand.getEndpointName());
+ profileTask.setTaskId(profileTaskCommand.getTaskId());
+ profileTask.setFistSpanOPName(profileTaskCommand.getEndpointName());
profileTask.setDuration(profileTaskCommand.getDuration());
profileTask.setMinDurationThreshold(profileTaskCommand.getMinDurationThreshold());
profileTask.setThreadDumpPeriod(profileTaskCommand.getDumpPeriod());
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 084e819..d356e2b 100755
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -138,6 +138,26 @@ public class Config {
* If true, skywalking agent will enable profile when user create a
new profile task. Otherwise disable profile.
*/
public static boolean ACTIVE = true;
+
+ /**
+ * Parallel monitor segment count
+ */
+ public static int MAX_PARALLEL = 5;
+
+ /**
+ * Max monitor segment time(minutes), if current segment monitor time
out of limit, then stop it.
+ */
+ public static int MAX_DURATION = 10;
+
+ /**
+ * Max dump thread stack depth
+ */
+ public static int DUMP_MAX_STACK_DEPTH = 500;
+
+ /**
+ * Snapshot transport to backend buffer size
+ */
+ public static int SNAPSHOT_TRANSPORT_BUFFER_SIZE = 500;
}
public static class Jvm {
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
index 4b3cd2d..6c76873 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/AbstractTracerContext.java
@@ -115,4 +115,5 @@ public interface AbstractTracerContext {
* @param span to be stopped.
*/
void asyncStop(AsyncSpan span);
+
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
index d25e915..1331f62 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/ContextManagerExtendService.java
@@ -51,7 +51,7 @@ public class ContextManagerExtendService implements
BootService {
} else {
SamplingService samplingService =
ServiceManager.INSTANCE.findService(SamplingService.class);
if (forceSampling || samplingService.trySampling()) {
- context = new TracingContext();
+ context = new TracingContext(operationName);
} else {
context = new IgnoredTracerContext();
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
index ccb9aae..9a9ccd5 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingContext.java
@@ -40,6 +40,7 @@ import
org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
import org.apache.skywalking.apm.agent.core.sampling.SamplingService;
import org.apache.skywalking.apm.util.StringUtil;
@@ -62,9 +63,14 @@ public class TracingContext implements AbstractTracerContext
{
private long lastWarningTimestamp = 0;
/**
+ * @see {@link ProfileTaskExecutionService}
+ */
+ private static ProfileTaskExecutionService PROFILE_TASK_EXECUTION_SERVICE;
+
+ /**
* @see {@link SamplingService}
*/
- private SamplingService samplingService;
+ private static SamplingService SAMPLING_SERVICE;
/**
* The final {@link TraceSegment}, which includes all finished spans.
@@ -92,15 +98,32 @@ public class TracingContext implements
AbstractTracerContext {
private volatile boolean running;
+ private final long createTime;
+
+ /**
+ * profiling status
+ */
+ private volatile boolean profiling;
+
/**
* Initialize all fields with default value.
*/
- TracingContext() {
+ TracingContext(String firstOPName) {
this.segment = new TraceSegment();
this.spanIdGenerator = 0;
- samplingService =
ServiceManager.INSTANCE.findService(SamplingService.class);
isRunningInAsyncMode = false;
+ createTime = System.currentTimeMillis();
running = true;
+
+ if (SAMPLING_SERVICE == null) {
+ SAMPLING_SERVICE =
ServiceManager.INSTANCE.findService(SamplingService.class);
+ }
+
+ // profiling status
+ if (PROFILE_TASK_EXECUTION_SERVICE == null) {
+ PROFILE_TASK_EXECUTION_SERVICE =
ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
+ }
+ this.profiling = PROFILE_TASK_EXECUTION_SERVICE.addProfiling(this,
segment.getTraceSegmentId(), firstOPName);
}
/**
@@ -308,6 +331,7 @@ public class TracingContext implements
AbstractTracerContext {
return push(span);
}
AbstractSpan entrySpan;
+ TracingContext owner = this;
final AbstractSpan parentSpan = peek();
final int parentSpanId = parentSpan == null ? -1 :
parentSpan.getSpanId();
if (parentSpan != null && parentSpan.isEntry()) {
@@ -328,11 +352,11 @@ public class TracingContext implements
AbstractTracerContext {
.findOnly(segment.getServiceId(), operationName)
.doInCondition(new PossibleFound.FoundAndObtain() {
@Override public Object doProcess(int operationId) {
- return new EntrySpan(spanIdGenerator++, parentSpanId,
operationId);
+ return new EntrySpan(spanIdGenerator++, parentSpanId,
operationId, owner);
}
}, new PossibleFound.NotFoundAndObtain() {
@Override public Object doProcess() {
- return new EntrySpan(spanIdGenerator++, parentSpanId,
operationName);
+ return new EntrySpan(spanIdGenerator++, parentSpanId,
operationName, owner);
}
});
entrySpan.start();
@@ -358,7 +382,7 @@ public class TracingContext implements
AbstractTracerContext {
* From v6.0.0-beta, local span doesn't do op name register.
* All op name register is related to entry and exit spans only.
*/
- AbstractTracingSpan span = new LocalSpan(spanIdGenerator++,
parentSpanId, operationName);
+ AbstractTracingSpan span = new LocalSpan(spanIdGenerator++,
parentSpanId, operationName, this);
span.start();
return push(span);
}
@@ -380,6 +404,7 @@ public class TracingContext implements
AbstractTracerContext {
AbstractSpan exitSpan;
AbstractSpan parentSpan = peek();
+ TracingContext owner = this;
if (parentSpan != null && parentSpan.isExit()) {
exitSpan = parentSpan;
} else {
@@ -389,13 +414,13 @@ public class TracingContext implements
AbstractTracerContext {
new PossibleFound.FoundAndObtain() {
@Override
public Object doProcess(final int peerId) {
- return new ExitSpan(spanIdGenerator++,
parentSpanId, operationName, peerId);
+ return new ExitSpan(spanIdGenerator++,
parentSpanId, operationName, peerId, owner);
}
},
new PossibleFound.NotFoundAndObtain() {
@Override
public Object doProcess() {
- return new ExitSpan(spanIdGenerator++,
parentSpanId, operationName, remotePeer);
+ return new ExitSpan(spanIdGenerator++,
parentSpanId, operationName, remotePeer, owner);
}
});
push(exitSpan);
@@ -463,15 +488,38 @@ public class TracingContext implements
AbstractTracerContext {
}
/**
+ * Re-check current trace need profiling, encase third part plugin change
the operation name.
+ *
+ * @param span current modify span
+ * @param operationName change to operation name
+ */
+ public void profilingRecheck(AbstractSpan span, String operationName) {
+ // only recheck first span
+ if (span.getSpanId() != 0) {
+ return;
+ }
+
+ profiling = PROFILE_TASK_EXECUTION_SERVICE.profilingRecheck(this,
segment.getTraceSegmentId(), operationName);
+ }
+
+ /**
* Finish this context, and notify all {@link TracingContextListener}s,
managed by {@link
- * TracingContext.ListenerManager}
+ * TracingContext.ListenerManager} and {@link
TracingContext.TracingThreadListenerManager}
*/
private void finish() {
if (isRunningInAsyncMode) {
asyncFinishLock.lock();
}
try {
- if (activeSpanStack.isEmpty() && running && (!isRunningInAsyncMode
|| asyncSpanCounter.get() == 0)) {
+ boolean isFinishedInMainThread = activeSpanStack.isEmpty() &&
running;
+ if (isFinishedInMainThread) {
+ /**
+ * Notify after tracing finished in the main thread.
+ */
+ TracingThreadListenerManager.notifyFinish(this);
+ }
+
+ if (isFinishedInMainThread && (!isRunningInAsyncMode ||
asyncSpanCounter.get() == 0)) {
TraceSegment finishedSegment =
segment.finish(isLimitMechanismWorking());
/*
* Recheck the segment if the segment contains only one span.
@@ -480,7 +528,7 @@ public class TracingContext implements
AbstractTracerContext {
* @see {@link #createSpan(String, long, boolean)}
*/
if (!segment.hasRef() && segment.isSingleSpanSegment()) {
- if (!samplingService.trySampling()) {
+ if (!SAMPLING_SERVICE.trySampling()) {
finishedSegment.setIgnore(true);
}
}
@@ -544,6 +592,27 @@ public class TracingContext implements
AbstractTracerContext {
}
/**
+ * The <code>ListenerManager</code> represents an event notify for every
registered listener, which are notified
+ */
+ public static class TracingThreadListenerManager {
+ private static List<TracingThreadListener> LISTENERS = new
LinkedList<>();
+
+ public static synchronized void add(TracingThreadListener listener) {
+ LISTENERS.add(listener);
+ }
+
+ static void notifyFinish(TracingContext finishedContext) {
+ for (TracingThreadListener listener : LISTENERS) {
+ listener.afterMainThreadFinish(finishedContext);
+ }
+ }
+
+ public static synchronized void remove(TracingThreadListener listener)
{
+ LISTENERS.remove(listener);
+ }
+ }
+
+ /**
* @return the top element of 'ActiveSpanStack', and remove it.
*/
private AbstractSpan pop() {
@@ -587,4 +656,13 @@ public class TracingContext implements
AbstractTracerContext {
return false;
}
}
+
+ public long createTime() {
+ return this.createTime;
+ }
+
+ public boolean isProfiling() {
+ return this.profiling;
+ }
+
}
diff --git
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
similarity index 59%
copy from
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
index c300fc3..c175454 100644
---
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/TracingThreadListener.java
@@ -16,30 +16,12 @@
*
*/
-package org.apache.skywalking.e2e.profile;
-
-import org.springframework.web.bind.annotation.*;
+package org.apache.skywalking.apm.agent.core.context;
/**
* @author MrPro
*/
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
- private final UserRepo userRepo;
-
- public TestController(final UserRepo userRepo) {
- this.userRepo = userRepo;
- }
-
- @GetMapping("/health-check")
- public String hello() {
- return "healthy";
- }
+public interface TracingThreadListener {
- @PostMapping("/users")
- public User createAuthor(@RequestBody final User user) throws
InterruptedException {
- Thread.sleep(1000L);
- return userRepo.save(user);
- }
+ void afterMainThreadFinish(TracingContext tracingContext);
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
index b5ece98..4eec067 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/AbstractTracingSpan.java
@@ -48,7 +48,11 @@ public abstract class AbstractTracingSpan implements
AbstractSpan {
* The flag represents whether the span has been async stopped
*/
private volatile boolean isAsyncStopped = false;
- protected volatile AbstractTracerContext context;
+
+ /**
+ * The context to which the span belongs
+ */
+ protected final TracingContext owner;
/**
* The start time of this Span.
@@ -79,18 +83,20 @@ public abstract class AbstractTracingSpan implements
AbstractSpan {
*/
protected List<TraceSegmentRef> refs;
- protected AbstractTracingSpan(int spanId, int parentSpanId, String
operationName) {
+ protected AbstractTracingSpan(int spanId, int parentSpanId, String
operationName, TracingContext owner) {
this.operationName = operationName;
this.operationId = DictionaryUtil.nullValue();
this.spanId = spanId;
this.parentSpanId = parentSpanId;
+ this.owner = owner;
}
- protected AbstractTracingSpan(int spanId, int parentSpanId, int
operationId) {
+ protected AbstractTracingSpan(int spanId, int parentSpanId, int
operationId, TracingContext owner) {
this.operationName = null;
this.operationId = operationId;
this.spanId = spanId;
this.parentSpanId = parentSpanId;
+ this.owner = owner;
}
/**
@@ -203,6 +209,9 @@ public abstract class AbstractTracingSpan implements
AbstractSpan {
public AbstractTracingSpan setOperationName(String operationName) {
this.operationName = operationName;
this.operationId = DictionaryUtil.nullValue();
+
+ // recheck profiling status
+ owner.profilingRecheck(this, operationName);
return this;
}
@@ -332,7 +341,7 @@ public abstract class AbstractTracingSpan implements
AbstractSpan {
if (isInAsyncMode) {
throw new RuntimeException("Prepare for async repeatedly. Span is
already in async mode.");
}
- context = ContextManager.awaitFinishAsync(this);
+ ContextManager.awaitFinishAsync(this);
isInAsyncMode = true;
return this;
}
@@ -345,7 +354,7 @@ public abstract class AbstractTracingSpan implements
AbstractSpan {
throw new RuntimeException("Can not do async finish for the span
repeately.");
}
this.endTime = System.currentTimeMillis();
- context.asyncStop(this);
+ owner.asyncStop(this);
isAsyncStopped = true;
return this;
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
index bef43e4..7d83af9 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/EntrySpan.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.network.trace.component.Component;
@@ -37,13 +38,13 @@ public class EntrySpan extends StackBasedTracingSpan {
private int currentMaxDepth;
- public EntrySpan(int spanId, int parentSpanId, String operationName) {
- super(spanId, parentSpanId, operationName);
+ public EntrySpan(int spanId, int parentSpanId, String operationName,
TracingContext owner) {
+ super(spanId, parentSpanId, operationName, owner);
this.currentMaxDepth = 0;
}
- public EntrySpan(int spanId, int parentSpanId, int operationId) {
- super(spanId, parentSpanId, operationId);
+ public EntrySpan(int spanId, int parentSpanId, int operationId,
TracingContext owner) {
+ super(spanId, parentSpanId, operationId, owner);
this.currentMaxDepth = 0;
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
index aa0089a..d081a78 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/ExitSpan.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.tag.AbstractTag;
import org.apache.skywalking.apm.network.trace.component.Component;
@@ -37,20 +38,20 @@ import
org.apache.skywalking.apm.network.trace.component.Component;
*/
public class ExitSpan extends StackBasedTracingSpan implements WithPeerInfo {
- public ExitSpan(int spanId, int parentSpanId, String operationName, String
peer) {
- super(spanId, parentSpanId, operationName, peer);
+ public ExitSpan(int spanId, int parentSpanId, String operationName, String
peer, TracingContext owner) {
+ super(spanId, parentSpanId, operationName, peer, owner);
}
- public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId)
{
- super(spanId, parentSpanId, operationId, peerId);
+ public ExitSpan(int spanId, int parentSpanId, int operationId, int peerId,
TracingContext owner) {
+ super(spanId, parentSpanId, operationId, peerId, owner);
}
- public ExitSpan(int spanId, int parentSpanId, int operationId, String
peer) {
- super(spanId, parentSpanId, operationId, peer);
+ public ExitSpan(int spanId, int parentSpanId, int operationId, String
peer, TracingContext owner) {
+ super(spanId, parentSpanId, operationId, peer, owner);
}
- public ExitSpan(int spanId, int parentSpanId, String operationName, int
peerId) {
- super(spanId, parentSpanId, operationName, peerId);
+ public ExitSpan(int spanId, int parentSpanId, String operationName, int
peerId, TracingContext owner) {
+ super(spanId, parentSpanId, operationName, peerId, owner);
}
/**
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
index 1bd4a77..be4c4cc 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/LocalSpan.java
@@ -19,6 +19,8 @@
package org.apache.skywalking.apm.agent.core.context.trace;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+
/**
* The <code>LocalSpan</code> represents a normal tracing point, such as a
local method.
*
@@ -26,12 +28,12 @@ package org.apache.skywalking.apm.agent.core.context.trace;
*/
public class LocalSpan extends AbstractTracingSpan {
- public LocalSpan(int spanId, int parentSpanId, int operationId) {
- super(spanId, parentSpanId, operationId);
+ public LocalSpan(int spanId, int parentSpanId, int operationId,
TracingContext owner) {
+ super(spanId, parentSpanId, operationId, owner);
}
- public LocalSpan(int spanId, int parentSpanId, String operationName) {
- super(spanId, parentSpanId, operationName);
+ public LocalSpan(int spanId, int parentSpanId, String operationName,
TracingContext owner) {
+ super(spanId, parentSpanId, operationName, owner);
}
@Override
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
index cdf26a8..19000fb 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/context/trace/StackBasedTracingSpan.java
@@ -18,6 +18,7 @@
package org.apache.skywalking.apm.agent.core.context.trace;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryManager;
import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
import org.apache.skywalking.apm.agent.core.dictionary.PossibleFound;
@@ -35,40 +36,40 @@ public abstract class StackBasedTracingSpan extends
AbstractTracingSpan {
protected String peer;
protected int peerId;
- protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName) {
- super(spanId, parentSpanId, operationName);
+ protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName, TracingContext owner) {
+ super(spanId, parentSpanId, operationName, owner);
this.stackDepth = 0;
this.peer = null;
this.peerId = DictionaryUtil.nullValue();
}
- protected StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId) {
- super(spanId, parentSpanId, operationId);
+ protected StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId, TracingContext owner) {
+ super(spanId, parentSpanId, operationId, owner);
this.stackDepth = 0;
this.peer = null;
this.peerId = DictionaryUtil.nullValue();
}
- public StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId, int peerId) {
- super(spanId, parentSpanId, operationId);
+ public StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId, int peerId, TracingContext owner) {
+ super(spanId, parentSpanId, operationId, owner);
this.peer = null;
this.peerId = peerId;
}
- public StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId, String peer) {
- super(spanId, parentSpanId, operationId);
+ public StackBasedTracingSpan(int spanId, int parentSpanId, int
operationId, String peer, TracingContext owner) {
+ super(spanId, parentSpanId, operationId, owner);
this.peer = peer;
this.peerId = DictionaryUtil.nullValue();
}
- protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName, String peer) {
- super(spanId, parentSpanId, operationName);
+ protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName, String peer, TracingContext owner) {
+ super(spanId, parentSpanId, operationName, owner);
this.peer = peer;
this.peerId = DictionaryUtil.nullValue();
}
- protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName, int peerId) {
- super(spanId, parentSpanId, operationName);
+ protected StackBasedTracingSpan(int spanId, int parentSpanId, String
operationName, int peerId, TracingContext owner) {
+ super(spanId, parentSpanId, operationName, owner);
this.peer = null;
this.peerId = peerId;
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
index 8669c7d..2c4c6c1 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTask.java
@@ -27,8 +27,11 @@ import java.util.Objects;
*/
public class ProfileTask {
- // monitor endpoint name
- private String endpointName;
+ // task id
+ private String taskId;
+
+ // monitor first span operation name
+ private String fistSpanOPName;
// task duration (minute)
private int duration;
@@ -48,12 +51,12 @@ public class ProfileTask {
// task create time
private long createTime;
- public String getEndpointName() {
- return endpointName;
+ public String getFistSpanOPName() {
+ return fistSpanOPName;
}
- public void setEndpointName(String endpointName) {
- this.endpointName = endpointName;
+ public void setFistSpanOPName(String fistSpanOPName) {
+ this.fistSpanOPName = fistSpanOPName;
}
public int getDuration() {
@@ -104,6 +107,14 @@ public class ProfileTask {
this.createTime = createTime;
}
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -115,11 +126,12 @@ public class ProfileTask {
maxSamplingCount == that.maxSamplingCount &&
startTime == that.startTime &&
createTime == that.createTime &&
- endpointName.equals(that.endpointName);
+ taskId.equals(that.taskId) &&
+ fistSpanOPName.equals(that.fistSpanOPName);
}
@Override
public int hashCode() {
- return Objects.hash(endpointName, duration, minDurationThreshold,
threadDumpPeriod, maxSamplingCount, startTime, createTime);
+ return Objects.hash(taskId, fistSpanOPName, duration,
minDurationThreshold, threadDumpPeriod, maxSamplingCount, startTime,
createTime);
}
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
new file mode 100644
index 0000000..9a4b839
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskChannelService.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import io.grpc.Channel;
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.stub.StreamObserver;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
+import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.agent.core.remote.*;
+import org.apache.skywalking.apm.network.common.Commands;
+import
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import
org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
+import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
+import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+
+import java.util.ArrayList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
+/**
+ * Sniffer and backend, about the communication service of profile task
protocol.
+ * 1. Sniffer will check has new profile task list every {@link
Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
+ * 2. When there is a new profile task snapshot, the data is transferred to
the back end. use {@link LinkedBlockingQueue}
+ * 3. When profiling task finish, it will send task finish status to backend
+ *
+ * @author MrPro
+ */
+@DefaultImplementor
+public class ProfileTaskChannelService implements BootService, Runnable,
GRPCChannelListener {
+ private static final ILog logger =
LogManager.getLogger(ProfileTaskChannelService.class);
+
+ // channel status
+ private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
+
+ // gRPC stub
+ private volatile ProfileTaskGrpc.ProfileTaskBlockingStub
profileTaskBlockingStub;
+ private volatile ProfileTaskGrpc.ProfileTaskStub profileTaskStub;
+
+ // segment snapshot sender
+ private final LinkedBlockingQueue<TracingThreadSnapshot> snapshotQueue =
new LinkedBlockingQueue<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
+ private volatile ScheduledFuture<?> sendSnapshotFuture;
+
+ // query task list schedule
+ private volatile ScheduledFuture<?> getTaskListFuture;
+
+ @Override
+ public void run() {
+ if (RemoteDownstreamConfig.Agent.SERVICE_ID !=
DictionaryUtil.nullValue()
+ && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID !=
DictionaryUtil.nullValue()
+ ) {
+ if (status == GRPCChannelStatus.CONNECTED) {
+ try {
+ ProfileTaskCommandQuery.Builder builder =
ProfileTaskCommandQuery.newBuilder();
+
+ // sniffer info
+
builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
+
+ // last command create time
+
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
+
+ Commands commands =
profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
+
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
+ } catch (Throwable t) {
+ if (!(t instanceof StatusRuntimeException)) {
+ logger.error(t, "Query profile task from backend
fail.");
+ return;
+ }
+ final StatusRuntimeException statusRuntimeException =
(StatusRuntimeException) t;
+ if (statusRuntimeException.getStatus().getCode() ==
Status.Code.UNIMPLEMENTED) {
+ logger.warn("Backend doesn't support profiling,
profiling will be disabled");
+ if (getTaskListFuture != null) {
+ getTaskListFuture.cancel(true);
+ }
+
+ // stop snapshot sender
+ if (sendSnapshotFuture != null) {
+ sendSnapshotFuture.cancel(true);
+ }
+ }
+ }
+ }
+ }
+
+ }
+
+ @Override
+ public void prepare() throws Throwable {
+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
+ }
+
+ @Override
+ public void boot() throws Throwable {
+ if (Config.Profile.ACTIVE) {
+ // query task list
+ getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new
DefaultNamedThreadFactory("ProfileGetTaskService"))
+ .scheduleWithFixedDelay(new
RunnableWithExceptionProtection(this, new
RunnableWithExceptionProtection.CallbackWhenException() {
+ @Override
+ public void handle(Throwable t) {
+ logger.error("Query profile task list failure.",
t);
+ }
+ }), 0, Config.Collector.GET_PROFILE_TASK_INTERVAL,
TimeUnit.SECONDS);
+
+ sendSnapshotFuture =
Executors.newSingleThreadScheduledExecutor(new
DefaultNamedThreadFactory("ProfileSendSnapshotService"))
+ .scheduleWithFixedDelay(new
RunnableWithExceptionProtection(new SnapshotSender(), new
RunnableWithExceptionProtection.CallbackWhenException() {
+ @Override public void handle(Throwable t) {
+ logger.error("Profile segment snapshot upload
failure.", t);
+ }
+ }), 0, 500, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ public void onComplete() throws Throwable {
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+ if (getTaskListFuture != null) {
+ getTaskListFuture.cancel(true);
+ }
+
+ if (sendSnapshotFuture != null) {
+ sendSnapshotFuture.cancel(true);
+ }
+ }
+
+ @Override
+ public void statusChanged(GRPCChannelStatus status) {
+ if (GRPCChannelStatus.CONNECTED.equals(status)) {
+ Channel channel =
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
+ profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
+ profileTaskStub = ProfileTaskGrpc.newStub(channel);
+ } else {
+ profileTaskBlockingStub = null;
+ profileTaskStub = null;
+ }
+ this.status = status;
+ }
+
+ /**
+ * add a new profiling snapshot, send to {@link #snapshotQueue}
+ */
+ public void addProfilingSnapshot(TracingThreadSnapshot snapshot) {
+ snapshotQueue.add(snapshot);
+ }
+
+ /**
+ * notify backend profile task has finish
+ */
+ public void notifyProfileTaskFinish(ProfileTask task) {
+ try {
+ final ProfileTaskFinishReport.Builder reportBuilder =
ProfileTaskFinishReport.newBuilder();
+ // sniffer info
+
reportBuilder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
+ // task info
+ reportBuilder.setTaskId(task.getTaskId());
+
+ // send data
+ profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS).reportTaskFinish(reportBuilder.build());
+ } catch (Throwable e) {
+ logger.error(e, "Notify profile task finish to backend fail.");
+ }
+ }
+
+ /**
+ * send segment snapshot
+ */
+ private class SnapshotSender implements Runnable {
+
+ @Override
+ public void run() {
+ if (status == GRPCChannelStatus.CONNECTED) {
+ try {
+ ArrayList<TracingThreadSnapshot> buffer = new
ArrayList<>(Config.Profile.SNAPSHOT_TRANSPORT_BUFFER_SIZE);
+ snapshotQueue.drainTo(buffer);
+ if (buffer.size() > 0) {
+ final GRPCStreamServiceStatus status = new
GRPCStreamServiceStatus(false);
+ StreamObserver<ThreadSnapshot> snapshotStreamObserver
= profileTaskStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS).collectSnapshot(new StreamObserver<Commands>() {
+ @Override
+ public void onNext(Commands commands) {
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ status.finished();
+ if (logger.isErrorEnable()) {
+ logger.error(throwable, "Send profile
segment snapshot to collector fail with a grpc internal exception.");
+ }
+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(throwable);
+ }
+
+ @Override
+ public void onCompleted() {
+ status.finished();
+ }
+ });
+ for (TracingThreadSnapshot snapshot : buffer) {
+ final ThreadSnapshot transformSnapshot =
snapshot.transform();
+ snapshotStreamObserver.onNext(transformSnapshot);
+ }
+
+ snapshotStreamObserver.onCompleted();
+ status.wait4Finish();
+ }
+ } catch (Throwable t) {
+ logger.error(t, "Send profile segment snapshot to backend
fail.");
+ }
+ }
+ }
+
+ }
+}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
index 20b9eec..b17414c 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionContext.java
@@ -18,7 +18,15 @@
package org.apache.skywalking.apm.agent.core.profile;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+
import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReferenceArray;
/**
* profile task execution context, it will create on process this profile task
@@ -30,20 +38,119 @@ public class ProfileTaskExecutionContext {
// task data
private final ProfileTask task;
- // task real start time
- private final long startTime;
+ // record current profiling count, use this to check has available profile
slot
+ private final AtomicInteger currentProfilingCount = new AtomicInteger(0);
+
+ // profiling segment slot
+ private volatile AtomicReferenceArray<ThreadProfiler>
profilingSegmentSlots;
+
+ // current profiling execution future
+ private volatile Future profilingFuture;
- public ProfileTaskExecutionContext(ProfileTask task, long startTime) {
+ // total started profiling tracing context count
+ private final AtomicInteger totalStartedProfilingCount = new
AtomicInteger(0);
+
+ public ProfileTaskExecutionContext(ProfileTask task) {
this.task = task;
- this.startTime = startTime;
+ profilingSegmentSlots = new
AtomicReferenceArray<>(Config.Profile.MAX_PARALLEL);
+ }
+
+ /**
+ * start profiling this task
+ */
+ public void startProfiling(ExecutorService executorService) {
+ profilingFuture = executorService.submit(new ProfileThread(this));
+ }
+
+ /**
+ * stop profiling
+ */
+ public void stopProfiling() {
+ if (profilingFuture != null) {
+ profilingFuture.cancel(true);
+ }
+ }
+
+ /**
+ * check have available slot to profile and add it
+ *
+ * @return is add profile success
+ */
+ public boolean attemptProfiling(TracingContext tracingContext, ID
traceSegmentId, String firstSpanOPName) {
+ // check has available slot
+ final int usingSlotCount = currentProfilingCount.get();
+ if (usingSlotCount >= Config.Profile.MAX_PARALLEL) {
+ return false;
+ }
+
+ // check first operation name matches
+ if (!Objects.equals(task.getFistSpanOPName(), firstSpanOPName)) {
+ return false;
+ }
+
+ // if out limit started profiling count then stop add profiling
+ if (totalStartedProfilingCount.get() > task.getMaxSamplingCount()) {
+ return false;
+ }
+
+ // try to occupy slot
+ if (!currentProfilingCount.compareAndSet(usingSlotCount,
usingSlotCount + 1)) {
+ return false;
+ }
+
+ final ThreadProfiler threadProfiler = new
ThreadProfiler(tracingContext, traceSegmentId, Thread.currentThread(), this);
+ int slotLength = profilingSegmentSlots.length();
+ for (int slot = 0; slot < slotLength; slot++) {
+ if (profilingSegmentSlots.compareAndSet(slot, null,
threadProfiler)) {
+ break;
+ }
+ }
+ return true;
+ }
+
+
+ /**
+ * profiling recheck
+ */
+ public boolean profilingRecheck(TracingContext tracingContext, ID
traceSegmentId, String firstSpanOPName) {
+ // if started, keep profiling
+ if (tracingContext.isProfiling()) {
+ return true;
+ }
+
+ return attemptProfiling(tracingContext, traceSegmentId,
firstSpanOPName);
+ }
+
+ /**
+ * find tracing context and clear on slot
+ */
+ public void stopTracingProfile(TracingContext tracingContext) {
+ // find current tracingContext and clear it
+ int slotLength = profilingSegmentSlots.length();
+ for (int slot = 0; slot < slotLength; slot++) {
+ ThreadProfiler currentProfiler = profilingSegmentSlots.get(slot);
+ if (currentProfiler != null &&
currentProfiler.matches(tracingContext)) {
+ profilingSegmentSlots.set(slot, null);
+
+ // setting stop running
+ currentProfiler.stopProfiling();
+ currentProfilingCount.addAndGet(-1);
+ break;
+ }
+ }
}
public ProfileTask getTask() {
return task;
}
- public long getStartTime() {
- return startTime;
+ public AtomicReferenceArray<ThreadProfiler> threadProfilerSlots() {
+ return profilingSegmentSlots;
+ }
+
+ public boolean isStartProfileable() {
+ // check is out of max sampling count check
+ return totalStartedProfilingCount.incrementAndGet() >
task.getMaxSamplingCount();
}
@Override
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
index 0bf0f50..9a3f270 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskExecutionService.java
@@ -21,6 +21,10 @@ package org.apache.skywalking.apm.agent.core.profile;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.network.constants.ProfileConstants;
@@ -38,7 +42,7 @@ import java.util.concurrent.atomic.AtomicReference;
* @author MrPro
*/
@DefaultImplementor
-public class ProfileTaskExecutionService implements BootService {
+public class ProfileTaskExecutionService implements BootService,
TracingThreadListener {
private static final ILog logger =
LogManager.getLogger(ProfileTaskExecutionService.class);
@@ -51,12 +55,14 @@ public class ProfileTaskExecutionService implements
BootService {
// current processing profile task context
private final AtomicReference<ProfileTaskExecutionContext>
taskExecutionContext = new AtomicReference<>();
+ // profile executor thread pool, only running one thread
+ private final static ExecutorService PROFILE_EXECUTOR =
Executors.newSingleThreadExecutor(new
DefaultNamedThreadFactory("PROFILING-TASK"));
+
// profile task list, include running and waiting running tasks
private final List<ProfileTask> profileTaskList =
Collections.synchronizedList(new LinkedList<>());
/**
- * get profile task from OAP
- * @param task
+ * add profile task from OAP
*/
public void addProfileTask(ProfileTask task) {
// update last command create time
@@ -65,9 +71,9 @@ public class ProfileTaskExecutionService implements
BootService {
}
// check profile task limit
- final String dataError = checkProfileTaskSuccess(task);
- if (dataError != null) {
- logger.warn("check command error, cannot process this profile
task. reason: {}", dataError);
+ final CheckResult dataError = checkProfileTaskSuccess(task);
+ if (!dataError.isSuccess()) {
+ logger.warn("check command error, cannot process this profile
task. reason: {}", dataError.getErrorReason());
return;
}
@@ -85,18 +91,45 @@ public class ProfileTaskExecutionService implements
BootService {
}
/**
+ * check and add {@link TracingContext} profiling
+ */
+ public boolean addProfiling(TracingContext tracingContext, ID
traceSegmentId, String firstSpanOPName) {
+ // get current profiling task, check need profiling
+ final ProfileTaskExecutionContext executionContext =
taskExecutionContext.get();
+ if (executionContext == null) {
+ return false;
+ }
+
+ return executionContext.attemptProfiling(tracingContext,
traceSegmentId, firstSpanOPName);
+ }
+
+ /**
+ * Re-check current trace need profiling, in case that third-party plugins
change the operation name.
+ */
+ public boolean profilingRecheck(TracingContext tracingContext, ID
traceSegmentId, String firstSpanOPName) {
+ // get current profiling task, check need profiling
+ final ProfileTaskExecutionContext executionContext =
taskExecutionContext.get();
+ if (executionContext == null) {
+ return false;
+ }
+
+ return executionContext.profilingRecheck(tracingContext,
traceSegmentId, firstSpanOPName);
+ }
+
+ /**
* active the selected profile task to execution task, and start a removal
task for it.
- * @param task
*/
private synchronized void processProfileTask(ProfileTask task) {
// make sure prev profile task already stopped
stopCurrentProfileTask(taskExecutionContext.get());
// make stop task schedule and task context
- // TODO process task on next step
- final ProfileTaskExecutionContext currentStartedTaskContext = new
ProfileTaskExecutionContext(task, System.currentTimeMillis());
+ final ProfileTaskExecutionContext currentStartedTaskContext = new
ProfileTaskExecutionContext(task);
taskExecutionContext.set(currentStartedTaskContext);
+ // start profiling this task
+ currentStartedTaskContext.startProfiling(PROFILE_EXECUTOR);
+
PROFILE_TASK_SCHEDULE.schedule(new Runnable() {
@Override
public void run() {
@@ -108,36 +141,44 @@ public class ProfileTaskExecutionService implements
BootService {
/**
* stop profile task, remove context data
*/
- private synchronized void
stopCurrentProfileTask(ProfileTaskExecutionContext needToStop) {
+ synchronized void stopCurrentProfileTask(ProfileTaskExecutionContext
needToStop) {
// stop same context only
if (needToStop == null ||
!taskExecutionContext.compareAndSet(needToStop, null)) {
return;
}
+ // current execution stop running
+ needToStop.stopProfiling();
+
// remove task
profileTaskList.remove(needToStop.getTask());
- // TODO notify OAP current profile task execute finish
+ // notify profiling task has finished
+
ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class).notifyProfileTaskFinish(needToStop.getTask());
}
@Override
public void prepare() throws Throwable {
-
}
@Override
public void boot() throws Throwable {
-
}
@Override
public void onComplete() throws Throwable {
-
+ // add trace finish notification
+ TracingContext.TracingThreadListenerManager.add(this);
}
@Override
public void shutdown() throws Throwable {
+ // remove trace listener
+ TracingContext.TracingThreadListenerManager.remove(this);
+
PROFILE_TASK_SCHEDULE.shutdown();
+
+ PROFILE_EXECUTOR.shutdown();
}
public long getLastCommandCreateTime() {
@@ -146,39 +187,37 @@ public class ProfileTaskExecutionService implements
BootService {
/**
* check profile task data success, make the re-check, prevent receiving
wrong data from database or OAP
- * @param task
- * @return
*/
- private String checkProfileTaskSuccess(ProfileTask task) {
+ private CheckResult checkProfileTaskSuccess(ProfileTask task) {
// endpoint name
- if (StringUtil.isEmpty(task.getEndpointName())) {
- return "endpoint name cannot be empty";
+ if (StringUtil.isEmpty(task.getFistSpanOPName())) {
+ return new CheckResult(false, "endpoint name cannot be empty");
}
// duration
if (task.getDuration() < ProfileConstants.TASK_DURATION_MIN_MINUTE) {
- return "monitor duration must greater than " +
ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes";
+ return new CheckResult(false, "monitor duration must greater than
" + ProfileConstants.TASK_DURATION_MIN_MINUTE + " minutes");
}
if (task.getDuration() > ProfileConstants.TASK_DURATION_MAX_MINUTE) {
- return "The duration of the monitoring task cannot be greater than
" + ProfileConstants.TASK_DURATION_MAX_MINUTE + " minutes";
+ return new CheckResult(false, "The duration of the monitoring task
cannot be greater than " + ProfileConstants.TASK_DURATION_MAX_MINUTE + "
minutes");
}
// min duration threshold
if (task.getMinDurationThreshold() < 0) {
- return "min duration threshold must greater than or equals zero";
+ return new CheckResult(false, "min duration threshold must greater
than or equals zero");
}
// dump period
if (task.getThreadDumpPeriod() <
ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS) {
- return "dump period must be greater than or equals " +
ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds";
+ return new CheckResult(false, "dump period must be greater than or
equals " + ProfileConstants.TASK_DUMP_PERIOD_MIN_MILLIS + " milliseconds");
}
// max sampling count
if (task.getMaxSamplingCount() <= 0) {
- return "max sampling count must greater than zero";
+ return new CheckResult(false, "max sampling count must greater
than zero");
}
if (task.getMaxSamplingCount() >=
ProfileConstants.TASK_MAX_SAMPLING_COUNT) {
- return "max sampling count must less than " +
ProfileConstants.TASK_MAX_SAMPLING_COUNT;
+ return new CheckResult(false, "max sampling count must less than "
+ ProfileConstants.TASK_MAX_SAMPLING_COUNT);
}
// check task queue, check only one task in a certain time
@@ -187,15 +226,46 @@ public class ProfileTaskExecutionService implements
BootService {
// if the end time of the task to be added is during the execution
of any data, means is a error data
if (taskProcessFinishTime >= profileTask.getStartTime() &&
taskProcessFinishTime <= calcProfileTaskFinishTime(profileTask)) {
- return "there already have processing task in time range,
could not add a new task again. processing task monitor endpoint name: " +
profileTask.getEndpointName();
+ return new CheckResult(false, "there already have processing
task in time range, could not add a new task again. processing task monitor
endpoint name: " + profileTask.getFistSpanOPName());
}
}
- return null;
+ return new CheckResult(true, null);
}
private long calcProfileTaskFinishTime(ProfileTask task) {
return task.getStartTime() +
TimeUnit.MINUTES.toMillis(task.getDuration());
}
+ @Override
+ public void afterMainThreadFinish(TracingContext tracingContext) {
+ if (tracingContext.isProfiling()) {
+ // stop profiling tracing context
+ ProfileTaskExecutionContext currentExecutionContext =
taskExecutionContext.get();
+ if (currentExecutionContext != null) {
+ currentExecutionContext.stopTracingProfile(tracingContext);
+ }
+ }
+ }
+
+ /**
+ * check profile task is processable
+ */
+ private static class CheckResult {
+ private boolean success;
+ private String errorReason;
+
+ public CheckResult(boolean success, String errorReason) {
+ this.success = success;
+ this.errorReason = errorReason;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public String getErrorReason() {
+ return errorReason;
+ }
+ }
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
deleted file mode 100644
index 5cca040..0000000
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileTaskQueryService.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.skywalking.apm.agent.core.profile;
-
-import io.grpc.Channel;
-import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import org.apache.skywalking.apm.agent.core.boot.BootService;
-import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
-import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
-import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
-import org.apache.skywalking.apm.agent.core.commands.CommandService;
-import org.apache.skywalking.apm.agent.core.conf.Config;
-import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
-import org.apache.skywalking.apm.agent.core.dictionary.DictionaryUtil;
-import org.apache.skywalking.apm.agent.core.logging.api.ILog;
-import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
-import org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus;
-import org.apache.skywalking.apm.network.common.Commands;
-import
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
-import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-
-import static
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
-
-/**
- * sniffer will check has new profile task list every {@link
Config.Collector#GET_PROFILE_TASK_INTERVAL} second.
- *
- * @author MrPro
- */
-@DefaultImplementor
-public class ProfileTaskQueryService implements BootService, Runnable,
GRPCChannelListener {
- private static final ILog logger =
LogManager.getLogger(ProfileTaskQueryService.class);
-
- private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
- private volatile ProfileTaskGrpc.ProfileTaskBlockingStub
profileTaskBlockingStub;
- private volatile ScheduledFuture<?> getTaskListFuture;
-
- @Override
- public void run() {
- if (RemoteDownstreamConfig.Agent.SERVICE_ID !=
DictionaryUtil.nullValue()
- && RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID !=
DictionaryUtil.nullValue()
- ) {
- if (status == GRPCChannelStatus.CONNECTED) {
- try {
- ProfileTaskCommandQuery.Builder builder =
ProfileTaskCommandQuery.newBuilder();
-
- // sniffer info
-
builder.setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID).setInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
-
- // last command create time
-
builder.setLastCommandTime(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class).getLastCommandCreateTime());
-
- Commands commands =
profileTaskBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS).getProfileTaskCommands(builder.build());
-
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
- } catch (Throwable t) {
- if (!(t instanceof StatusRuntimeException)) {
- logger.error(t, "query profile task from Collector
fail.", t);
- return;
- }
- final StatusRuntimeException statusRuntimeException =
(StatusRuntimeException) t;
- if (statusRuntimeException.getStatus().getCode() ==
Status.Code.UNIMPLEMENTED) {
- logger.warn("Backend doesn't support profiling,
profiling will be disabled");
- if (getTaskListFuture != null) {
- getTaskListFuture.cancel(true);
- }
- }
- }
- }
- }
-
- }
-
- @Override
- public void prepare() throws Throwable {
-
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
- }
-
- @Override
- public void boot() throws Throwable {
- if (Config.Profile.ACTIVE) {
- getTaskListFuture = Executors.newSingleThreadScheduledExecutor(new
DefaultNamedThreadFactory("ProfileGetTaskService"))
- .scheduleWithFixedDelay(this, 0,
Config.Collector.GET_PROFILE_TASK_INTERVAL, TimeUnit.SECONDS);
- }
- }
-
- @Override
- public void onComplete() throws Throwable {
- }
-
- @Override
- public void shutdown() throws Throwable {
- if (getTaskListFuture != null) {
- getTaskListFuture.cancel(true);
- }
- }
-
- @Override
- public void statusChanged(GRPCChannelStatus status) {
- if (GRPCChannelStatus.CONNECTED.equals(status)) {
- Channel channel =
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
- profileTaskBlockingStub = ProfileTaskGrpc.newBlockingStub(channel);
- } else {
- profileTaskBlockingStub = null;
- }
- this.status = status;
- }
-}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
new file mode 100644
index 0000000..086126f
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfileThread.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+
+import java.util.concurrent.atomic.AtomicReferenceArray;
+
+/**
+ * Profile task process thread, dump the executing thread stack.
+ *
+ * @author MrPro
+ */
+public class ProfileThread implements Runnable {
+
+ private static final ILog logger =
LogManager.getLogger(ProfileThread.class);
+
+ // profiling task context
+ private final ProfileTaskExecutionContext taskExecutionContext;
+
+ private final ProfileTaskExecutionService profileTaskExecutionService;
+ private final ProfileTaskChannelService profileTaskChannelService;
+
+ public ProfileThread(ProfileTaskExecutionContext taskExecutionContext) {
+ this.taskExecutionContext = taskExecutionContext;
+ profileTaskExecutionService =
ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class);
+ profileTaskChannelService =
ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class);
+ }
+
+ @Override
+ public void run() {
+
+ try {
+ profiling(taskExecutionContext);
+ } catch (InterruptedException e) {
+ // ignore interrupted
+ // means current task has stopped
+ } catch (Exception e) {
+ logger.error(e, "Profiling task fail. taskId:{}",
taskExecutionContext.getTask().getTaskId());
+ } finally {
+ // finally stop current profiling task, tell execution service
task has stop
+
profileTaskExecutionService.stopCurrentProfileTask(taskExecutionContext);
+ }
+
+ }
+
+ /**
+ * start profiling
+ */
+ private void profiling(ProfileTaskExecutionContext executionContext)
throws InterruptedException {
+
+ int maxSleepPeriod = executionContext.getTask().getThreadDumpPeriod();
+
+ // run loop when current thread still running
+ long currentLoopStartTime = -1;
+ while (!Thread.currentThread().isInterrupted()) {
+ currentLoopStartTime = System.currentTimeMillis();
+
+ // each all slot
+ AtomicReferenceArray<ThreadProfiler> profilers =
executionContext.threadProfilerSlots();
+ int profilerCount = profilers.length();
+ for (int slot = 0; slot < profilerCount; slot++) {
+ ThreadProfiler currentProfiler = profilers.get(slot);
+ if (currentProfiler == null) {
+ continue;
+ }
+
+ switch (currentProfiler.profilingStatus()) {
+
+ case READY:
+ // check tracing context running time
+ currentProfiler.startProfilingIfNeed();
+ break;
+
+ case PROFILING:
+ // dump stack
+ TracingThreadSnapshot snapshot =
currentProfiler.buildSnapshot();
+ if (snapshot != null) {
+
profileTaskChannelService.addProfilingSnapshot(snapshot);
+ } else {
+ // tell execution context current tracing thread
dump failed, stop it
+
executionContext.stopTracingProfile(currentProfiler.tracingContext());
+ }
+ break;
+
+ }
+ }
+
+ // sleep to next period
+ // if out of period, sleep one period
+ long needToSleep = (currentLoopStartTime + maxSleepPeriod) -
System.currentTimeMillis();
+ needToSleep = needToSleep > 0 ? needToSleep : maxSleepPeriod;
+ Thread.sleep(needToSleep);
+ }
+ }
+
+}
diff --git
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
similarity index 59%
copy from
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to
apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
index c300fc3..5159c0c 100644
---
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ProfilingStatus.java
@@ -16,30 +16,16 @@
*
*/
-package org.apache.skywalking.e2e.profile;
-
-import org.springframework.web.bind.annotation.*;
+package org.apache.skywalking.apm.agent.core.profile;
/**
* @author MrPro
*/
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
- private final UserRepo userRepo;
+public enum ProfilingStatus {
- public TestController(final UserRepo userRepo) {
- this.userRepo = userRepo;
- }
+ READY,
- @GetMapping("/health-check")
- public String hello() {
- return "healthy";
- }
+ PROFILING,
- @PostMapping("/users")
- public User createAuthor(@RequestBody final User user) throws
InterruptedException {
- Thread.sleep(1000L);
- return userRepo.save(user);
- }
+ STOPPED
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
new file mode 100644
index 0000000..59b5e99
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/ThreadProfiler.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import com.google.common.base.Objects;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+
+import java.util.ArrayList;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author MrPro
+ */
+public class ThreadProfiler {
+
+ // current tracing context
+ private final TracingContext tracingContext;
+ // current tracing segment id
+ private final ID traceSegmentId;
+ // need to profiling thread
+ private final Thread profilingThread;
+ // profiling execution context
+ private final ProfileTaskExecutionContext executionContext;
+
+ // profiling time
+ private long profilingStartTime;
+ private long profilingMaxTimeMills;
+
+ // after min duration threshold check, it will start dump
+ private ProfilingStatus profilingStatus = ProfilingStatus.READY;
+ // thread dump sequence
+ private int dumpSequence = 0;
+
+ public ThreadProfiler(TracingContext tracingContext, ID traceSegmentId,
Thread profilingThread, ProfileTaskExecutionContext executionContext) {
+ this.tracingContext = tracingContext;
+ this.traceSegmentId = traceSegmentId;
+ this.profilingThread = profilingThread;
+ this.executionContext = executionContext;
+ this.profilingMaxTimeMills =
TimeUnit.MINUTES.toMillis(Config.Profile.MAX_DURATION);
+ }
+
+ /**
+ * If tracing start time greater than {@link
ProfileTask#getMinDurationThreshold()}, then start to profiling trace
+ */
+ public void startProfilingIfNeed() {
+ if (System.currentTimeMillis() - tracingContext.createTime() >
executionContext.getTask().getMinDurationThreshold()) {
+ this.profilingStartTime = System.currentTimeMillis();
+ this.profilingStatus = ProfilingStatus.PROFILING;
+ }
+ }
+
+ /**
+ * Stop profiling status
+ */
+ public void stopProfiling() {
+ this.profilingStatus = ProfilingStatus.STOPPED;
+ }
+
+ /**
+ * dump tracing thread and build thread snapshot
+ *
+ * @return snapshot, if null means dump snapshot error, should stop it
+ */
+ public TracingThreadSnapshot buildSnapshot() {
+ if (!isProfilingContinuable()) {
+ return null;
+ }
+
+ long currentTime = System.currentTimeMillis();
+ // dump thread
+ StackTraceElement[] stackTrace;
+ try {
+ stackTrace = profilingThread.getStackTrace();
+
+ // stack depth is zero, means thread is already run finished
+ if (stackTrace.length == 0) {
+ return null;
+ }
+ } catch (Exception e) {
+ // dump error ignore and make this profiler stop
+ return null;
+ }
+
+ // if is first dump, check is can start profiling
+ if (dumpSequence == 0 && (!executionContext.isStartProfileable())) {
+ return null;
+ }
+
+ int dumpElementCount = Math.min(stackTrace.length,
Config.Profile.DUMP_MAX_STACK_DEPTH);
+
+ // use inverted order, because thread dump is start with bottom
+ final ArrayList<String> stackList = new ArrayList<>(dumpElementCount);
+ for (int i = dumpElementCount - 1; i >= 0; i--) {
+ stackList.add(buildStackElementCodeSignature(stackTrace[i]));
+ }
+
+ String taskId = executionContext.getTask().getTaskId();
+ return new TracingThreadSnapshot(taskId, traceSegmentId,
dumpSequence++, currentTime, stackList);
+ }
+
+ /**
+ * build thread stack element code signature
+ *
+ * @return code sign: className.methodName:lineNumber
+ */
+ private String buildStackElementCodeSignature(StackTraceElement element) {
+ return element.getClassName() + "." + element.getMethodName() + ":" +
element.getLineNumber();
+ }
+
+ /**
+ * matches profiling tracing context
+ */
+ public boolean matches(TracingContext context) {
+ // match trace id
+ return Objects.equal(context.getReadableGlobalTraceId(),
tracingContext.getReadableGlobalTraceId());
+ }
+
+ /**
+ * check whether profiling should continue
+ *
+ * @return if true means this thread profiling is continuable
+ */
+ private boolean isProfilingContinuable() {
+ return System.currentTimeMillis() - profilingStartTime <
profilingMaxTimeMills;
+ }
+
+ public TracingContext tracingContext() {
+ return tracingContext;
+ }
+
+ public ProfilingStatus profilingStatus() {
+ return profilingStatus;
+ }
+
+}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
new file mode 100644
index 0000000..dcfcf05
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/profile/TracingThreadSnapshot.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.profile;
+
+import org.apache.skywalking.apm.agent.core.context.ids.ID;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
+import org.apache.skywalking.apm.network.language.profile.ThreadStack;
+
+import java.util.List;
+
+/**
+ * @author MrPro
+ */
+public class TracingThreadSnapshot {
+
+ // thread profiler
+ private final String taskId;
+ private final ID traceSegmentId;
+
+ // dump info
+ private final int sequence;
+ private final long time;
+ private final List<String> stackList;
+
+ public TracingThreadSnapshot(String taskId, ID traceSegmentId, int
sequence, long time, List<String> stackList) {
+ this.taskId = taskId;
+ this.traceSegmentId = traceSegmentId;
+ this.sequence = sequence;
+ this.time = time;
+ this.stackList = stackList;
+ }
+
+ /**
+ * transform to gRPC data
+ */
+ public ThreadSnapshot transform() {
+ final ThreadSnapshot.Builder builder = ThreadSnapshot.newBuilder();
+ // task id
+ builder.setTaskId(taskId);
+ // dumped segment id
+ builder.setTraceSegmentId(traceSegmentId.transform());
+ // dump time
+ builder.setTime(time);
+ // snapshot dump sequence
+ builder.setSequence(sequence);
+ // snapshot stack
+ final ThreadStack.Builder stackBuilder = ThreadStack.newBuilder();
+ for (String codeSign : stackList) {
+ stackBuilder.addCodeSignatures(codeSign);
+ }
+ builder.setStack(stackBuilder);
+
+ return builder.build();
+ }
+
+
+}
diff --git
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
index 826b032..392600c 100644
---
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -26,5 +26,5 @@
org.apache.skywalking.apm.agent.core.context.ContextManagerExtendService
org.apache.skywalking.apm.agent.core.commands.CommandService
org.apache.skywalking.apm.agent.core.commands.CommandExecutorService
org.apache.skywalking.apm.agent.core.context.OperationNameFormatService
-org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService
+org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService
diff --git
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
index 0390dcc..53b4d4d 100644
---
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+++
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
@@ -22,13 +22,11 @@ package org.apache.skywalking.apm.agent.core.boot;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.List;
-import org.apache.skywalking.apm.agent.core.context.ContextManager;
-import org.apache.skywalking.apm.agent.core.context.IgnoredTracerContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
+
+import org.apache.skywalking.apm.agent.core.context.*;
import org.apache.skywalking.apm.agent.core.jvm.JVMService;
import
org.apache.skywalking.apm.agent.core.profile.ProfileTaskExecutionService;
-import org.apache.skywalking.apm.agent.core.profile.ProfileTaskQueryService;
+import org.apache.skywalking.apm.agent.core.profile.ProfileTaskChannelService;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelListener;
import org.apache.skywalking.apm.agent.core.remote.GRPCChannelManager;
import org.apache.skywalking.apm.agent.core.remote.TraceSegmentServiceClient;
@@ -64,11 +62,17 @@ public class ServiceManagerTest {
assertGRPCChannelManager(ServiceManager.INSTANCE.findService(GRPCChannelManager.class));
assertSamplingService(ServiceManager.INSTANCE.findService(SamplingService.class));
assertJVMService(ServiceManager.INSTANCE.findService(JVMService.class));
-
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskQueryService.class));
+
assertProfileTaskQueryService(ServiceManager.INSTANCE.findService(ProfileTaskChannelService.class));
assertProfileTaskExecuteService(ServiceManager.INSTANCE.findService(ProfileTaskExecutionService.class));
assertTracingContextListener();
assertIgnoreTracingContextListener();
+ assertTracingThreadContextListener();
+ }
+
+ private void assertTracingThreadContextListener() throws Exception {
+ List<TracingThreadListener> listeners =
getFieldValue(TracingContext.TracingThreadListenerManager.class, "LISTENERS");
+ assertThat(listeners.size(), is(1));
}
private void assertIgnoreTracingContextListener() throws Exception {
@@ -87,7 +91,7 @@ public class ServiceManagerTest {
assertNotNull(service);
}
- private void assertProfileTaskQueryService(ProfileTaskQueryService
service) {
+ private void assertProfileTaskQueryService(ProfileTaskChannelService
service) {
assertNotNull(service);
}
diff --git
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
index 0943107..9686324 100644
---
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
+++
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/context/TracingContextTest.java
@@ -46,7 +46,7 @@ public class TracingContextTest {
};
TracingContext.ListenerManager.add(listener);
try {
- TracingContext tracingContext = new TracingContext();
+ TracingContext tracingContext = new TracingContext("/url");
AbstractSpan span = tracingContext.createEntrySpan("/url");
for (int i = 0; i < 10; i++) {
diff --git
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
index 1e4a987..bd2b368 100644
---
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
+++
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/test/tools/AgentServiceRule.java
@@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.test.tools;
import java.util.HashMap;
import java.util.LinkedList;
+
+import org.apache.skywalking.apm.agent.core.context.TracingThreadListener;
import org.junit.rules.ExternalResource;
import org.powermock.reflect.Whitebox;
import org.apache.skywalking.apm.agent.core.boot.BootService;
@@ -37,6 +39,7 @@ public class AgentServiceRule extends ExternalResource {
Whitebox.setInternalState(ServiceManager.INSTANCE, "bootedServices",
new HashMap<Class, BootService>());
Whitebox.setInternalState(TracingContext.ListenerManager.class,
"LISTENERS", new LinkedList<TracingContextListener>());
Whitebox.setInternalState(IgnoredTracerContext.ListenerManager.class,
"LISTENERS", new LinkedList<TracingContextListener>());
+
Whitebox.setInternalState(TracingContext.TracingThreadListenerManager.class,
"LISTENERS", new LinkedList<TracingThreadListener>());
}
@Override
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index acd369b..ee6dc5a 100644
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -44,6 +44,18 @@ agent.service_name=${SW_AGENT_NAME:Your_ApplicationName}
# If true, skywalking agent will enable profile when user create a new profile
task. Otherwise disable profile.
# profile.active=${SW_AGENT_PROFILE_ACTIVE:true}
+# Parallel monitor segment count
+# profile.max_parallel=${SW_AGENT_PROFILE_MAX_PARALLEL:5}
+
+# Max monitor segment time(minutes), if current segment monitor time out of
limit, then stop it.
+# profile.duration=${SW_AGENT_PROFILE_DURATION:10}
+
+# Max dump thread stack depth
+# profile.dump_max_stack_depth=${SW_AGENT_PROFILE_DUMP_MAX_STACK_DEPTH:500}
+
+# Snapshot transport to backend buffer size
+#
profile.snapshot_transport_buffer_size=${SW_AGENT_PROFILE_SNAPSHOT_TRANSPORT_BUFFER_SIZE:50}
+
# Backend service addresses.
collector.backend_service=${SW_AGENT_COLLECTOR_BACKEND_SERVICES:127.0.0.1:11800}
diff --git a/docs/en/setup/service-agent/java-agent/README.md
b/docs/en/setup/service-agent/java-agent/README.md
index f38d67f..fc9946b 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -102,6 +102,10 @@ property key | Description | Default |
`dictionary.service_code_buffer_size`|The buffer size of application codes and
peer|`10 * 10000`|
`dictionary.endpoint_name_buffer_size`|The buffer size of endpoint names and
peer|`1000 * 10000`|
`profile.active`|If true, skywalking agent will enable profile when user
create a new profile task. Otherwise disable profile.|`true`|
+`profile.max_parallel`|Parallel monitor segment count|`5`|
+`profile.duration`|Max monitor segment time(minutes), if current segment
monitor time out of limit, then stop it.|`10`|
+`profile.dump_max_stack_depth`|Max dump thread stack depth|`500`|
+`profile.snapshot_transport_buffer_size`|Snapshot transport to backend buffer
size|`50`|
`plugin.peer_max_length `|Peer maximum description limit.|`200`|
`plugin.mongodb.trace_param`|If true, trace all the parameters in MongoDB
access, default is false. Only trace the operation, not include
parameters.|`false`|
`plugin.mongodb.filter_length_limit`|If set to positive number, the
`WriteRequest.params` would be truncated to this length, otherwise it would be
completely saved, which may cause performance problem.|`256`|
diff --git a/oap-server/server-bootstrap/src/main/resources/application.yml
b/oap-server/server-bootstrap/src/main/resources/application.yml
index 3f79711..faaa7e3 100755
--- a/oap-server/server-bootstrap/src/main/resources/application.yml
+++ b/oap-server/server-bootstrap/src/main/resources/application.yml
@@ -164,6 +164,8 @@ istio-telemetry:
default:
envoy-metric:
default:
+receiver-profile:
+ default:
# alsHTTPAnalysis: ${SW_ENVOY_METRIC_ALS_HTTP_ANALYSIS:k8s-mesh}
#receiver_zipkin:
# default:
@@ -233,3 +235,4 @@ configuration:
# grpc:
# targetHost: ${SW_EXPORTER_GRPC_HOST:127.0.0.1}
# targetPort: ${SW_EXPORTER_GRPC_PORT:9870}
+
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
index b790fbc..22f6172 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/cache/ProfileTaskCache.java
@@ -23,14 +23,18 @@ import com.google.common.cache.CacheBuilder;
import org.apache.skywalking.oap.server.core.CoreModuleConfig;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
+import org.apache.skywalking.oap.server.core.storage.StorageModule;
+import
org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.module.Service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
@@ -42,9 +46,11 @@ public class ProfileTaskCache implements Service {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileTaskCache.class);
- private final Cache<Integer, List<ProfileTask>> profileTaskCache;
+ private final Cache<Integer, List<ProfileTask>> profileTaskDownstreamCache;
+ private final Cache<String, ProfileTask> profileTaskIdCache;
private final ModuleManager moduleManager;
+ private IProfileTaskQueryDAO profileTaskQueryDAO;
public ProfileTaskCache(ModuleManager moduleManager, CoreModuleConfig
moduleConfig) {
this.moduleManager = moduleManager;
@@ -52,9 +58,18 @@ public class ProfileTaskCache implements Service {
long initialSize = moduleConfig.getMaxSizeOfProfileTask() / 10L;
int initialCapacitySize = (int)(initialSize > Integer.MAX_VALUE ?
Integer.MAX_VALUE : initialSize);
- profileTaskCache =
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
+ profileTaskDownstreamCache =
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask())
// remove old profile task data
.expireAfterWrite(Duration.ofMinutes(1)).build();
+
+ profileTaskIdCache =
CacheBuilder.newBuilder().initialCapacity(initialCapacitySize).maximumSize(moduleConfig.getMaxSizeOfProfileTask()).build();
+ }
+
+ private IProfileTaskQueryDAO getProfileTaskQueryDAO() {
+ if (Objects.isNull(profileTaskQueryDAO)) {
+ profileTaskQueryDAO =
moduleManager.find(StorageModule.NAME).provider().getService(IProfileTaskQueryDAO.class);
+ }
+ return profileTaskQueryDAO;
}
/**
@@ -64,11 +79,33 @@ public class ProfileTaskCache implements Service {
*/
public List<ProfileTask> getProfileTaskList(int serviceId) {
// read profile task list from cache only, use cache update timer
mechanism
- List<ProfileTask> profileTaskList =
profileTaskCache.getIfPresent(serviceId);
+ List<ProfileTask> profileTaskList =
profileTaskDownstreamCache.getIfPresent(serviceId);
return profileTaskList;
}
/**
+ * query profile task by id
+ * @param id
+ * @return
+ */
+ public ProfileTask getProfileTaskById(String id) {
+ ProfileTask profile = profileTaskIdCache.getIfPresent(id);
+
+ if (profile == null) {
+ try {
+ profile = getProfileTaskQueryDAO().getById(id);
+ } catch (IOException e) {
+ LOGGER.error(e.getMessage(), e);
+ }
+ if (profile != null) {
+ profileTaskIdCache.put(id, profile);
+ }
+ }
+
+ return profile;
+ }
+
+ /**
* save service task list
* @param serviceId
* @param taskList
@@ -78,7 +115,7 @@ public class ProfileTaskCache implements Service {
taskList = Collections.emptyList();
}
- profileTaskCache.put(serviceId, taskList);
+ profileTaskDownstreamCache.put(serviceId, taskList);
}
/**
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
index cb6fa43..c152d83 100755
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/command/CommandService.java
@@ -43,7 +43,7 @@ public class CommandService implements Service {
public ProfileTaskCommand newProfileTaskCommand(ProfileTask task) {
final String serialNumber = UUID.randomUUID().toString();
- return new ProfileTaskCommand(serialNumber, task.getEndpointName(),
task.getDuration(), task.getMinDurationThreshold(), task.getDumpPeriod(),
task.getMaxSamplingCount(), task.getStartTime(), task.getCreateTime());
+ return new ProfileTaskCommand(serialNumber, task.getId(),
task.getEndpointName(), task.getDuration(), task.getMinDurationThreshold(),
task.getDumpPeriod(), task.getMaxSamplingCount(), task.getStartTime(),
task.getCreateTime());
}
private String generateSerialNumber(final int serviceInstanceId, final
long time, final String serviceInstanceUUID) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
new file mode 100644
index 0000000..36a4cba
--- /dev/null
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/profile/ProfileTaskSegmentSnapshotRecord.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.skywalking.oap.server.core.profile;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.apache.skywalking.apm.util.StringUtil;
+import org.apache.skywalking.oap.server.core.Const;
+import org.apache.skywalking.oap.server.core.analysis.Stream;
+import org.apache.skywalking.oap.server.core.analysis.record.Record;
+import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
+import org.apache.skywalking.oap.server.core.source.ScopeDeclaration;
+import org.apache.skywalking.oap.server.core.storage.StorageBuilder;
+import org.apache.skywalking.oap.server.core.storage.annotation.Column;
+import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import static
org.apache.skywalking.oap.server.core.source.DefaultScopeDefine.PROFILE_TASK_SEGMENT_SNAPSHOT;
+
+/**
+ * Profiling segment snapshot database bean, use record
+ *
+ * @author MrPro
+ */
+@Getter
+@Setter
+@ScopeDeclaration(id = PROFILE_TASK_SEGMENT_SNAPSHOT, name =
"ProfileTaskSegmentSnapshot")
+@Stream(name = ProfileTaskSegmentSnapshotRecord.INDEX_NAME, scopeId =
PROFILE_TASK_SEGMENT_SNAPSHOT, builder =
ProfileTaskSegmentSnapshotRecord.Builder.class, processor =
RecordStreamProcessor.class)
+public class ProfileTaskSegmentSnapshotRecord extends Record {
+
+ public static final String INDEX_NAME = "profile_task_segment_snapshot";
+ public static final String TASK_ID = "task_id";
+ public static final String SEGMENT_ID = "segment_id";
+ public static final String DUMP_TIME = "dump_time";
+ public static final String SEQUENCE = "sequence";
+ public static final String STACK_BINARY = "stack_binary";
+
+ @Column(columnName = TASK_ID) private String taskId;
+ @Column(columnName = SEGMENT_ID) private String segmentId;
+ @Column(columnName = DUMP_TIME) private long dumpTime;
+ @Column(columnName = SEQUENCE) private int sequence;
+ @Column(columnName = STACK_BINARY) private byte[] stackBinary;
+
+ @Override
+ public String id() {
+ return getTaskId() + Const.ID_SPLIT + getSegmentId() + Const.ID_SPLIT
+ getSequence() + Const.ID_SPLIT;
+ }
+
+ public static class Builder implements
StorageBuilder<ProfileTaskSegmentSnapshotRecord> {
+
+ @Override
+ public ProfileTaskSegmentSnapshotRecord map2Data(Map<String, Object>
dbMap) {
+ final ProfileTaskSegmentSnapshotRecord snapshot = new
ProfileTaskSegmentSnapshotRecord();
+ snapshot.setTaskId((String)dbMap.get(TASK_ID));
+ snapshot.setSegmentId((String)dbMap.get(SEGMENT_ID));
+ snapshot.setDumpTime(((Number)dbMap.get(DUMP_TIME)).longValue());
+ snapshot.setSequence(((Number)dbMap.get(SEQUENCE)).intValue());
+
snapshot.setTimeBucket(((Number)dbMap.get(TIME_BUCKET)).intValue());
+ if (StringUtil.isEmpty((String)dbMap.get(STACK_BINARY))) {
+ snapshot.setStackBinary(new byte[] {});
+ } else {
+
snapshot.setStackBinary(Base64.getDecoder().decode((String)dbMap.get(STACK_BINARY)));
+ }
+ return snapshot;
+ }
+
+ @Override
+ public Map<String, Object> data2Map(ProfileTaskSegmentSnapshotRecord
storageData) {
+ final HashMap<String, Object> map = new HashMap<>();
+ map.put(TASK_ID, storageData.getTaskId());
+ map.put(SEGMENT_ID, storageData.getSegmentId());
+ map.put(DUMP_TIME, storageData.getDumpTime());
+ map.put(SEQUENCE, storageData.getSequence());
+ map.put(TIME_BUCKET, storageData.getTimeBucket());
+ if (CollectionUtils.isEmpty(storageData.getStackBinary())) {
+ map.put(STACK_BINARY, Const.EMPTY_STRING);
+ } else {
+ map.put(STACK_BINARY, new
String(Base64.getEncoder().encode(storageData.getStackBinary())));
+ }
+ return map;
+ }
+ }
+}
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
index 54f9dfc..c6741b5 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/source/DefaultScopeDefine.java
@@ -69,6 +69,7 @@ public class DefaultScopeDefine {
public static final int HTTP_ACCESS_LOG = 25;
public static final int PROFILE_TASK = 26;
public static final int PROFILE_TASK_LOG = 27;
+ public static final int PROFILE_TASK_SEGMENT_SNAPSHOT = 28;
/**
* Catalog of scope, the metrics processor could use this to group all
generated metrics by oal rt.
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
index 5e6fa09..7ca639d 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/profile/IProfileTaskQueryDAO.java
@@ -41,4 +41,11 @@ public interface IProfileTaskQueryDAO extends DAO {
*/
List<ProfileTask> getTaskList(final Integer serviceId, final String
endpointName, final Long startTimeBucket, final Long endTimeBucket, final
Integer limit) throws IOException;
+ /**
+ * query profile task by id
+ * @param id
+ * @return
+ */
+ ProfileTask getById(final String id) throws IOException;
+
}
diff --git
a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
index 0511808..b181fd3 100644
---
a/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
+++
b/oap-server/server-receiver-plugin/skywalking-profile-receiver-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/profile/provider/handler/ProfileTaskServiceHandler.java
@@ -20,19 +20,25 @@ package
org.apache.skywalking.oap.server.receiver.profile.provider.handler;
import io.grpc.stub.StreamObserver;
import org.apache.skywalking.apm.network.common.Commands;
+import org.apache.skywalking.apm.network.language.agent.UniqueId;
import
org.apache.skywalking.apm.network.language.profile.ProfileTaskCommandQuery;
+import
org.apache.skywalking.apm.network.language.profile.ProfileTaskFinishReport;
import org.apache.skywalking.apm.network.language.profile.ProfileTaskGrpc;
+import org.apache.skywalking.apm.network.language.profile.ThreadSnapshot;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.analysis.TimeBucket;
import
org.apache.skywalking.oap.server.core.analysis.worker.RecordStreamProcessor;
import org.apache.skywalking.oap.server.core.cache.ProfileTaskCache;
import org.apache.skywalking.oap.server.core.command.CommandService;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskLogRecord;
+import
org.apache.skywalking.oap.server.core.profile.ProfileTaskSegmentSnapshotRecord;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import
org.apache.skywalking.oap.server.core.query.entity.ProfileTaskLogOperationType;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.library.server.grpc.GRPCHandler;
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -42,6 +48,8 @@ import java.util.concurrent.TimeUnit;
*/
public class ProfileTaskServiceHandler extends
ProfileTaskGrpc.ProfileTaskImplBase implements GRPCHandler {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProfileTaskServiceHandler.class);
+
private ProfileTaskCache profileTaskCache;
private final CommandService commandService;
@@ -71,7 +79,7 @@ public class ProfileTaskServiceHandler extends
ProfileTaskGrpc.ProfileTaskImplBa
}
// record profile task log
- recordProfileTaskLog(profileTask, request);
+ recordProfileTaskLog(profileTask, request.getInstanceId(),
ProfileTaskLogOperationType.NOTIFIED);
// add command
commandsBuilder.addCommands(commandService.newProfileTaskCommand(profileTask).serialize().build());
@@ -81,11 +89,72 @@ public class ProfileTaskServiceHandler extends
ProfileTaskGrpc.ProfileTaskImplBa
responseObserver.onCompleted();
}
- private void recordProfileTaskLog(ProfileTask task,
ProfileTaskCommandQuery query) {
+ @Override
+ public StreamObserver<ThreadSnapshot>
collectSnapshot(StreamObserver<Commands> responseObserver) {
+ return new StreamObserver<ThreadSnapshot>() {
+ @Override
+ public void onNext(ThreadSnapshot snapshot) {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("receive profile segment snapshot");
+ }
+
+ // parse segment id
+ UniqueId uniqueId = snapshot.getTraceSegmentId();
+ StringBuilder segmentIdBuilder = new StringBuilder();
+ for (int i = 0; i < uniqueId.getIdPartsList().size(); i++) {
+ if (i == 0) {
+
segmentIdBuilder.append(uniqueId.getIdPartsList().get(i));
+ } else {
+
segmentIdBuilder.append(".").append(uniqueId.getIdPartsList().get(i));
+ }
+ }
+
+ // build database data
+ final ProfileTaskSegmentSnapshotRecord record = new
ProfileTaskSegmentSnapshotRecord();
+ record.setTaskId(snapshot.getTaskId());
+ record.setSegmentId(segmentIdBuilder.toString());
+ record.setDumpTime(snapshot.getTime());
+ record.setSequence(snapshot.getSequence());
+ record.setStackBinary(snapshot.getStack().toByteArray());
+
record.setTimeBucket(TimeBucket.getRecordTimeBucket(snapshot.getTime()));
+
+ // async storage
+ RecordStreamProcessor.getInstance().in(record);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error(throwable.getMessage(), throwable);
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void onCompleted() {
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+ };
+ }
+
+ @Override
+ public void reportTaskFinish(ProfileTaskFinishReport request,
StreamObserver<Commands> responseObserver) {
+ // query task from cache, set log time bucket need it
+ final ProfileTask profileTask =
profileTaskCache.getProfileTaskById(request.getTaskId());
+
+ // record finish log
+ if (profileTask != null) {
+ recordProfileTaskLog(profileTask, request.getInstanceId(),
ProfileTaskLogOperationType.EXECUTION_FINISHED);
+ }
+
+ responseObserver.onNext(Commands.newBuilder().build());
+ responseObserver.onCompleted();
+ }
+
+ private void recordProfileTaskLog(ProfileTask task, int instanceId,
ProfileTaskLogOperationType operationType) {
final ProfileTaskLogRecord logRecord = new ProfileTaskLogRecord();
logRecord.setTaskId(task.getId());
- logRecord.setInstanceId(query.getInstanceId());
-
logRecord.setOperationType(ProfileTaskLogOperationType.NOTIFIED.getCode());
+ logRecord.setInstanceId(instanceId);
+ logRecord.setOperationType(operationType.getCode());
logRecord.setOperationTime(System.currentTimeMillis());
// same with task time bucket, ensure record will ttl same with
profile task
logRecord.setTimeBucket(TimeBucket.getRecordTimeBucket(task.getStartTime() +
TimeUnit.MINUTES.toMillis(task.getDuration())));
diff --git
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
index d7598cf..dbd2dee 100644
---
a/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
+++
b/oap-server/server-storage-plugin/storage-elasticsearch-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/elasticsearch/query/ProfileTaskQueryEsDAO.java
@@ -88,6 +88,25 @@ public class ProfileTaskQueryEsDAO extends EsDAO implements
IProfileTaskQueryDAO
return tasks;
}
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ if (StringUtil.isEmpty(id)) {
+ return null;
+ }
+
+ SearchSourceBuilder sourceBuilder = SearchSourceBuilder.searchSource();
+ sourceBuilder.query(QueryBuilders.idsQuery().addIds(id));
+ sourceBuilder.size(1);
+
+ final SearchResponse response =
getClient().search(ProfileTaskNoneStream.INDEX_NAME, sourceBuilder);
+
+ if (response.getHits().getHits().length > 0) {
+ return parseTask(response.getHits().getHits()[0]);
+ }
+
+ return null;
+ }
+
private ProfileTask parseTask(SearchHit data) {
return ProfileTask.builder()
.id(data.getId())
diff --git
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
index a526b49..248ca70 100644
---
a/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
+++
b/oap-server/server-storage-plugin/storage-jdbc-hikaricp-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/jdbc/h2/dao/H2ProfileTaskQueryDAO.java
@@ -22,6 +22,7 @@ import org.apache.skywalking.apm.util.StringUtil;
import org.apache.skywalking.oap.server.core.profile.ProfileTaskNoneStream;
import org.apache.skywalking.oap.server.core.query.entity.ProfileTask;
import
org.apache.skywalking.oap.server.core.storage.profile.IProfileTaskQueryDAO;
+import
org.apache.skywalking.oap.server.library.client.jdbc.JDBCClientException;
import
org.apache.skywalking.oap.server.library.client.jdbc.hikaricp.JDBCHikariCPClient;
import java.io.IOException;
@@ -87,6 +88,29 @@ public class H2ProfileTaskQueryDAO implements
IProfileTaskQueryDAO {
}
}
+ @Override
+ public ProfileTask getById(String id) throws IOException {
+ if (StringUtil.isEmpty(id)) {
+ return null;
+ }
+
+ final StringBuilder sql = new StringBuilder();
+ final ArrayList<Object> condition = new ArrayList<>(1);
+ sql.append("select * from
").append(ProfileTaskNoneStream.INDEX_NAME).append(" where id=? LIMIT 1");
+ condition.add(id);
+
+ try (Connection connection = h2Client.getConnection()) {
+ try (ResultSet resultSet = h2Client.executeQuery(connection,
sql.toString(), condition.toArray(new Object[0]))) {
+ if (resultSet.next()) {
+ return parseTask(resultSet);
+ }
+ }
+ } catch (SQLException | JDBCClientException e) {
+ throw new IOException(e);
+ }
+ return null;
+ }
+
/**
* parse profile task data
* @param data
diff --git
a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index 528c490..b998f4c 100644
---
a/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++
b/test/e2e/e2e-profile/e2e-profile-es-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
- final Map<String, String> user = new HashMap<>();
- user.put("name", "SkyWalking");
- final ResponseEntity<String> responseEntity =
restTemplate.postForEntity(
- instrumentedServiceUrl + "/e2e/users",
- user,
- String.class
- );
+ final ResponseEntity<String> responseEntity =
sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
@@ -123,6 +117,17 @@ public class ProfileVerificationITCase {
}
+ private ResponseEntity<String> sendRequest(boolean needProfiling) {
+ final Map<String, String> user = new HashMap<>();
+ user.put("name", "SkyWalking");
+ user.put("enableProfiling", String.valueOf(needProfiling));
+ return restTemplate.postForEntity(
+ instrumentedServiceUrl + "/e2e/users",
+ user,
+ String.class
+ );
+ }
+
/**
* verify create profile task
* @param minutesAgo
@@ -134,10 +139,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest =
ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
- .duration(5)
+ .duration(1)
.startTime(-1)
- .minDurationThreshold(10)
- .dumpPeriod(10)
+ .minDurationThreshold(1000)
+ .dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
@@ -147,18 +152,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new
ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
+ // verify get task list and sniffer get task logs
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+ // send a profile request
+ sendRequest(true);
+
+ // verify task execution finish
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+ }
+
+ private void verifyProfileTask(int serviceId, String verifyResources)
throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
+ .serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
- new
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ new
ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
diff --git
a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index cab6dff..2289ae8 100644
---
a/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++
b/test/e2e/e2e-profile/e2e-profile-h2-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
- final Map<String, String> user = new HashMap<>();
- user.put("name", "SkyWalking");
- final ResponseEntity<String> responseEntity =
restTemplate.postForEntity(
- instrumentedServiceUrl + "/e2e/users",
- user,
- String.class
- );
+ final ResponseEntity<String> responseEntity =
sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
}
+ private ResponseEntity<String> sendRequest(boolean needProfiling) {
+ final Map<String, String> user = new HashMap<>();
+ user.put("name", "SkyWalking");
+ user.put("enableProfiling", String.valueOf(needProfiling));
+ return restTemplate.postForEntity(
+ instrumentedServiceUrl + "/e2e/users",
+ user,
+ String.class
+ );
+ }
+
/**
* verify create profile task
* @param minutesAgo
@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest =
ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
- .duration(5)
+ .duration(1)
.startTime(-1)
- .minDurationThreshold(10)
- .dumpPeriod(10)
+ .minDurationThreshold(1000)
+ .dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new
ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
+ // verify get task list and sniffer get task logs
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+ // send a profile request
+ sendRequest(true);
+
+ // verify task execution finish
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+ }
+
+ private void verifyProfileTask(int serviceId, String verifyResources)
throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
+ .serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
- new
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ new
ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
diff --git
a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
index cab6dff..2289ae8 100644
---
a/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
+++
b/test/e2e/e2e-profile/e2e-profile-mysql-test-runner/src/test/java/org/apache/skywalking/e2e/ProfileVerificationITCase.java
@@ -91,13 +91,7 @@ public class ProfileVerificationITCase {
while (true) {
try {
- final Map<String, String> user = new HashMap<>();
- user.put("name", "SkyWalking");
- final ResponseEntity<String> responseEntity =
restTemplate.postForEntity(
- instrumentedServiceUrl + "/e2e/users",
- user,
- String.class
- );
+ final ResponseEntity<String> responseEntity =
sendRequest(false);
LOGGER.info("responseEntity: {}", responseEntity);
assertThat(responseEntity.getStatusCode()).isEqualTo(HttpStatus.OK);
final List<Trace> traces = profileClient.traces(
@@ -122,6 +116,17 @@ public class ProfileVerificationITCase {
}
+ private ResponseEntity<String> sendRequest(boolean needProfiling) {
+ final Map<String, String> user = new HashMap<>();
+ user.put("name", "SkyWalking");
+ user.put("enableProfiling", String.valueOf(needProfiling));
+ return restTemplate.postForEntity(
+ instrumentedServiceUrl + "/e2e/users",
+ user,
+ String.class
+ );
+ }
+
/**
* verify create profile task
* @param minutesAgo
@@ -133,10 +138,10 @@ public class ProfileVerificationITCase {
final ProfileTaskCreationRequest creationRequest =
ProfileTaskCreationRequest.builder()
.serviceId(2)
.endpointName("/e2e/users")
- .duration(5)
+ .duration(1)
.startTime(-1)
- .minDurationThreshold(10)
- .dumpPeriod(10)
+ .minDurationThreshold(1000)
+ .dumpPeriod(50)
.maxSamplingCount(5).build();
// verify create task
@@ -146,18 +151,29 @@ public class ProfileVerificationITCase {
ProfileTaskCreationResultMatcher creationResultMatcher = new
ProfileTaskCreationResultMatcher();
creationResultMatcher.verify(creationResult);
+ // verify get task list and sniffer get task logs
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml");
+
+ // send a profile request
+ sendRequest(true);
+
+ // verify task execution finish
+ verifyProfileTask(creationRequest.getServiceId(),
"expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml");
+ }
+
+ private void verifyProfileTask(int serviceId, String verifyResources)
throws InterruptedException {
// verify get task list and logs
for (int i = 0; i < 10; i++) {
try {
final ProfileTasks tasks = profileClient.getProfileTaskList(
new ProfileTaskQuery()
- .serviceId(creationRequest.getServiceId())
+ .serviceId(serviceId)
.endpointName("")
);
LOGGER.info("get profile task list: {}", tasks);
InputStream expectedInputStream =
- new
ClassPathResource("expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml").getInputStream();
+ new
ClassPathResource(verifyResources).getInputStream();
final ProfilesTasksMatcher servicesMatcher = new
Yaml().loadAs(expectedInputStream, ProfilesTasksMatcher.class);
servicesMatcher.verify(tasks);
diff --git
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
similarity index 63%
copy from
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
copy to
test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
index c300fc3..158f013 100644
---
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/CreateUser.java
@@ -18,28 +18,34 @@
package org.apache.skywalking.e2e.profile;
-import org.springframework.web.bind.annotation.*;
-
/**
* @author MrPro
*/
-@RestController
-@RequestMapping("/e2e")
-public class TestController {
- private final UserRepo userRepo;
+public class CreateUser {
+
+ private String name;
+
+ private boolean enableProfiling;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
- public TestController(final UserRepo userRepo) {
- this.userRepo = userRepo;
+ public boolean getEnableProfiling() {
+ return enableProfiling;
}
- @GetMapping("/health-check")
- public String hello() {
- return "healthy";
+ public void setEnableProfiling(boolean enableProfiling) {
+ this.enableProfiling = enableProfiling;
}
- @PostMapping("/users")
- public User createAuthor(@RequestBody final User user) throws
InterruptedException {
- Thread.sleep(1000L);
- return userRepo.save(user);
+ public User toUser() {
+ final User user = new User();
+ user.setName(name);
+ return user;
}
}
diff --git
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
index c300fc3..7cb6fac 100644
---
a/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
+++
b/test/e2e/e2e-profile/e2e-profile-service/src/main/java/org/apache/skywalking/e2e/profile/TestController.java
@@ -20,6 +20,8 @@ package org.apache.skywalking.e2e.profile;
import org.springframework.web.bind.annotation.*;
+import java.util.concurrent.TimeUnit;
+
/**
* @author MrPro
*/
@@ -38,8 +40,14 @@ public class TestController {
}
@PostMapping("/users")
- public User createAuthor(@RequestBody final User user) throws
InterruptedException {
- Thread.sleep(1000L);
- return userRepo.save(user);
+ public User createAuthor(@RequestBody final CreateUser createUser) throws
InterruptedException {
+ final User user = userRepo.save(createUser.toUser());
+ if (!createUser.getEnableProfiling()) {
+ return user;
+ } else {
+ // sleep 10 second
+ TimeUnit.SECONDS.sleep(10);
+ return user;
+ }
}
}
diff --git
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
similarity index 90%
copy from
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
copy to
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
index 2aac5b2..03274e3 100644
---
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
+++
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.finished.yml
@@ -26,5 +26,10 @@ tasks:
logs:
- id: not null
instanceId: gt 0
+ operationType: EXECUTION_FINISHED
+ operationTime: gt 0
+ - id: not null
+ instanceId: gt 0
operationType: NOTIFIED
operationTime: gt 0
+
diff --git
a/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
b/test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml
similarity index 100%
rename from
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.yml
rename to
test/e2e/e2e-profile/e2e-profile-test-runner/src/main/resources/expected-data/org.apache.skywalking.e2e.ProfileVerificationITCase.profileTasks.notified.yml