This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new 25214e9 Collect and report agent starting / shutdown events (#6559)
25214e9 is described below
commit 25214e910a193fbf22511ce6048964f4ce4d3048
Author: Zhenxu Ke <[email protected]>
AuthorDate: Thu Mar 18 15:25:06 2021 +0800
Collect and report agent starting / shutdown events (#6559)
---
.github/workflows/codeql.yaml | 2 +
CHANGES.md | 1 +
.../apm/agent/core/ServiceInstanceGenerator.java | 59 +++++++
.../apm/agent/core/boot/BootService.java | 9 +
.../apm/agent/core/boot/ServiceManager.java | 13 +-
.../core/remote/EventReportServiceClient.java | 182 +++++++++++++++++++++
.../apm/agent/core/remote/GRPCChannelManager.java | 5 +
.../agent/core/remote/ServiceManagementClient.java | 6 -
...ache.skywalking.apm.agent.core.boot.BootService | 4 +-
.../apm/agent/core/boot/ServiceManagerTest.java | 4 +-
.../skywalking/oap/server/core/event/Event.java | 2 +-
11 files changed, 271 insertions(+), 16 deletions(-)
diff --git a/.github/workflows/codeql.yaml b/.github/workflows/codeql.yaml
index 8b9796a..769db83 100644
--- a/.github/workflows/codeql.yaml
+++ b/.github/workflows/codeql.yaml
@@ -17,6 +17,8 @@
name: "CodeQL"
on:
+ push:
+ branches: [ 'master' ]
pull_request:
branches: [ 'master' ]
schedule:
diff --git a/CHANGES.md b/CHANGES.md
index 58d36b8..249b992 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -21,6 +21,7 @@ Release Notes.
* Fix ClassCastException by making CallbackAdapterInterceptor to implement
EnhancedInstance interface in the spring-kafka plugin.
* Fix NullPointerException with KafkaProducer.send(record).
* Support config `agent.span_limit_per_segment` can be changed in the runtime.
+* Collect and report agent starting / shutdown events.
#### OAP-Backend
* Allow user-defined `JAVA_OPTS` in the startup script.
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/ServiceInstanceGenerator.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/ServiceInstanceGenerator.java
new file mode 100644
index 0000000..9cec984
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/ServiceInstanceGenerator.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core;
+
+import java.util.UUID;
+import lombok.Getter;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.os.OSUtil;
+
+import static org.apache.skywalking.apm.util.StringUtil.isEmpty;
+
+@Getter
+public class ServiceInstanceGenerator implements BootService {
+ @Override
+ public void prepare() throws Throwable {
+ if (!isEmpty(Config.Agent.INSTANCE_NAME)) {
+ return;
+ }
+
+ Config.Agent.INSTANCE_NAME =
UUID.randomUUID().toString().replaceAll("-", "") + "@" + OSUtil.getIPV4();
+ }
+
+ @Override
+ public void boot() throws Throwable {
+
+ }
+
+ @Override
+ public void onComplete() throws Throwable {
+
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+
+ }
+
+ @Override
+ public int priority() {
+ return Integer.MAX_VALUE;
+ }
+}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java
index eae8a5c..4b39d02 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/BootService.java
@@ -30,4 +30,13 @@ public interface BootService {
void onComplete() throws Throwable;
void shutdown() throws Throwable;
+
+ /**
+ * {@code BootService}s with higher priorities will be started earlier,
and shut down later than those {@code BootService}s with lower priorities.
+ *
+ * @return the priority of this {@code BootService}.
+ */
+ default int priority() {
+ return 0;
+ }
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java
index 46158cd..ab30426 100644
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/boot/ServiceManager.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.apm.agent.core.boot;
import java.util.Collections;
+import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -46,13 +47,13 @@ public enum ServiceManager {
}
public void shutdown() {
- for (BootService service : bootedServices.values()) {
+
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority).reversed()).forEach(service
-> {
try {
service.shutdown();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to shutdown [{}] fail.",
service.getClass().getName());
}
- }
+ });
}
private Map<Class, BootService> loadAllServices() {
@@ -99,23 +100,23 @@ public enum ServiceManager {
}
private void prepare() {
- for (BootService service : bootedServices.values()) {
+
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service
-> {
try {
service.prepare();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to pre-start [{}] fail.",
service.getClass().getName());
}
- }
+ });
}
private void startup() {
- for (BootService service : bootedServices.values()) {
+
bootedServices.values().stream().sorted(Comparator.comparingInt(BootService::priority)).forEach(service
-> {
try {
service.boot();
} catch (Throwable e) {
LOGGER.error(e, "ServiceManager try to start [{}] fail.",
service.getClass().getName());
}
- }
+ });
}
private void onComplete() {
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java
new file mode 100644
index 0000000..208cff8
--- /dev/null
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/EventReportServiceClient.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.remote;
+
+import io.grpc.Channel;
+import io.grpc.stub.StreamObserver;
+import java.lang.management.ManagementFactory;
+import java.lang.management.RuntimeMXBean;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import org.apache.skywalking.apm.agent.core.boot.BootService;
+import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
+import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.agent.core.logging.api.ILog;
+import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.network.common.v3.Commands;
+import org.apache.skywalking.apm.network.event.v3.Event;
+import org.apache.skywalking.apm.network.event.v3.EventServiceGrpc;
+import org.apache.skywalking.apm.network.event.v3.Source;
+import org.apache.skywalking.apm.network.event.v3.Type;
+
+import static
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+import static
org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
+
+@DefaultImplementor
+public class EventReportServiceClient implements BootService,
GRPCChannelListener {
+ private static final ILog LOGGER =
LogManager.getLogger(EventReportServiceClient.class);
+
+ private final AtomicBoolean reported = new AtomicBoolean();
+
+ private Event.Builder startingEvent;
+
+ private EventServiceGrpc.EventServiceStub eventServiceStub;
+
+ private GRPCChannelStatus status;
+
+ @Override
+ public void prepare() throws Throwable {
+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).addChannelListener(this);
+
+ final RuntimeMXBean runtimeMxBean =
ManagementFactory.getRuntimeMXBean();
+ startingEvent = Event.newBuilder()
+ .setUuid(UUID.randomUUID().toString())
+ .setName("Start")
+ .setStartTime(runtimeMxBean.getStartTime())
+ .setMessage("Start Java Application")
+ .setType(Type.Normal)
+ .setSource(
+ Source.newBuilder()
+ .setService(Config.Agent.SERVICE_NAME)
+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+ .build()
+ )
+ .putParameters(
+ "OPTS",
+ runtimeMxBean.getInputArguments()
+ .stream()
+ .sorted()
+ .collect(Collectors.joining(" "))
+ );
+ }
+
+ @Override
+ public void boot() throws Throwable {
+
+ }
+
+ @Override
+ public void onComplete() throws Throwable {
+ startingEvent.setEndTime(System.currentTimeMillis());
+
+ reportStartingEvent();
+ }
+
+ @Override
+ public void shutdown() throws Throwable {
+ if (!CONNECTED.equals(status)) {
+ return;
+ }
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final Event.Builder shutdownEvent = Event.newBuilder()
+
.setUuid(UUID.randomUUID().toString())
+ .setName("Shutdown")
+
.setStartTime(System.currentTimeMillis())
+
.setEndTime(System.currentTimeMillis())
+ .setMessage("Shutting down
Java Application")
+ .setType(Type.Normal)
+ .setSource(
+ Source.newBuilder()
+
.setService(Config.Agent.SERVICE_NAME)
+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+ .build()
+ );
+
+ final StreamObserver<Event> collector = eventServiceStub.collect(new
StreamObserver<Commands>() {
+ @Override
+ public void onNext(final Commands commands) {
+
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ LOGGER.error("Failed to report shutdown event.", t);
+ // Ignore status change at shutting down stage.
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ latch.countDown();
+ }
+ });
+
+ collector.onNext(shutdownEvent.build());
+ collector.onCompleted();
+ latch.await();
+ }
+
+ @Override
+ public void statusChanged(final GRPCChannelStatus status) {
+ this.status = status;
+
+ if (!CONNECTED.equals(status)) {
+ return;
+ }
+
+ final Channel channel =
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
+ eventServiceStub = EventServiceGrpc.newStub(channel);
+ eventServiceStub =
eventServiceStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS);
+
+ reportStartingEvent();
+ }
+
+ private void reportStartingEvent() {
+ if (reported.compareAndSet(false, true)) {
+ return;
+ }
+
+ final StreamObserver<Event> collector = eventServiceStub.collect(new
StreamObserver<Commands>() {
+ @Override
+ public void onNext(final Commands commands) {
+
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ LOGGER.error("Failed to report starting event.", t);
+
ServiceManager.INSTANCE.findService(GRPCChannelManager.class).reportError(t);
+ reported.set(false);
+ }
+
+ @Override
+ public void onCompleted() {
+ }
+ });
+
+ collector.onNext(startingEvent.build());
+ collector.onCompleted();
+ }
+}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
index 4822c75..503bbf1 100755
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
@@ -208,4 +208,9 @@ public class GRPCChannelManager implements BootService,
Runnable {
}
return false;
}
+
+ @Override
+ public int priority() {
+ return Integer.MAX_VALUE;
+ }
}
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
index f271856..31399fa 100755
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
@@ -21,7 +21,6 @@ package org.apache.skywalking.apm.agent.core.remote;
import io.grpc.Channel;
import java.util.ArrayList;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
@@ -41,7 +40,6 @@ import
org.apache.skywalking.apm.network.management.v3.InstancePingPkg;
import org.apache.skywalking.apm.network.management.v3.InstanceProperties;
import org.apache.skywalking.apm.network.management.v3.ManagementServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
-import org.apache.skywalking.apm.util.StringUtil;
import static
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
@@ -78,10 +76,6 @@ public class ServiceManagementClient implements BootService,
Runnable, GRPCChann
.setValue(Config.Agent.INSTANCE_PROPERTIES.get(key))
.build());
}
-
- Config.Agent.INSTANCE_NAME =
StringUtil.isEmpty(Config.Agent.INSTANCE_NAME)
- ? UUID.randomUUID().toString().replaceAll("-", "") + "@" +
OSUtil.getIPV4()
- : Config.Agent.INSTANCE_NAME;
}
@Override
diff --git
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
index bfa58a8..cfda935 100644
---
a/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
+++
b/apm-sniffer/apm-agent-core/src/main/resources/META-INF/services/org.apache.skywalking.apm.agent.core.boot.BootService
@@ -33,4 +33,6 @@ org.apache.skywalking.apm.agent.core.meter.MeterService
org.apache.skywalking.apm.agent.core.meter.MeterSender
org.apache.skywalking.apm.agent.core.context.status.StatusCheckService
org.apache.skywalking.apm.agent.core.remote.LogReportServiceClient
-org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
\ No newline at end of file
+org.apache.skywalking.apm.agent.core.conf.dynamic.ConfigurationDiscoveryService
+org.apache.skywalking.apm.agent.core.remote.EventReportServiceClient
+org.apache.skywalking.apm.agent.core.ServiceInstanceGenerator
diff --git
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
index 67b79a9..b51ce2d 100644
---
a/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
+++
b/apm-sniffer/apm-agent-core/src/test/java/org/apache/skywalking/apm/agent/core/boot/ServiceManagerTest.java
@@ -58,7 +58,7 @@ public class ServiceManagerTest {
public void testServiceDependencies() throws Exception {
HashMap<Class, BootService> registryService =
getFieldValue(ServiceManager.INSTANCE, "bootedServices");
- assertThat(registryService.size(), is(18));
+ assertThat(registryService.size(), is(20));
assertTraceSegmentServiceClient(ServiceManager.INSTANCE.findService(TraceSegmentServiceClient.class));
assertContextManager(ServiceManager.INSTANCE.findService(ContextManager.class));
@@ -107,7 +107,7 @@ public class ServiceManagerTest {
assertNotNull(service);
List<GRPCChannelListener> listeners = getFieldValue(service,
"listeners");
- assertEquals(listeners.size(), 8);
+ assertEquals(listeners.size(), 9);
}
private void assertSamplingService(SamplingService service) {
diff --git
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
index 8fb26d0..7065589 100644
---
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
+++
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/event/Event.java
@@ -95,7 +95,7 @@ public class Event extends Metrics {
@Column(columnName = MESSAGE)
private String message;
- @Column(columnName = PARAMETERS, storageOnly = true)
+ @Column(columnName = PARAMETERS, storageOnly = true, length = 1024)
private String parameters;
@Column(columnName = START_TIME)