This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new f5e38b8 Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid
the instanceTraffic record properties is null. (#5688)
f5e38b8 is described below
commit f5e38b88e3b2734fd7c73207e62d38ef54115a04
Author: zifeihan <[email protected]>
AuthorDate: Tue Oct 20 12:02:27 2020 +0800
Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid the
instanceTraffic record properties is null. (#5688)
---
.../skywalking/apm/agent/core/conf/Config.java | 4 +++
.../agent/core/remote/ServiceManagementClient.java | 6 ++--
.../kafka/KafkaServiceManagementServiceClient.java | 40 +++++++++++++---------
docs/en/setup/service-agent/java-agent/README.md | 1 +
4 files changed, 31 insertions(+), 20 deletions(-)
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 41a86d4..fc51709 100755
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -143,6 +143,10 @@ public class Config {
*/
public static long HEARTBEAT_PERIOD = 30;
/**
+ * The agent sends the instance properties to the backend every
`collector.heartbeat_period * collector.properties_report_period_factor` seconds
+ */
+ public static int PROPERTIES_REPORT_PERIOD_FACTOR = 10;
+ /**
* Collector skywalking trace receiver service addresses.
*/
public static String BACKEND_SERVICE = "";
diff --git
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
index e282a63..f271856 100755
---
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
+++
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
@@ -52,7 +53,7 @@ public class ServiceManagementClient implements BootService,
Runnable, GRPCChann
private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
private volatile ManagementServiceGrpc.ManagementServiceBlockingStub
managementServiceBlockingStub;
private volatile ScheduledFuture<?> heartbeatFuture;
- private volatile boolean instancePropertiesSubmitted = false;
+ private volatile AtomicInteger sendPropertiesCounter = new
AtomicInteger(0);
@Override
public void statusChanged(GRPCChannelStatus status) {
@@ -112,7 +113,7 @@ public class ServiceManagementClient implements
BootService, Runnable, GRPCChann
if (GRPCChannelStatus.CONNECTED.equals(status)) {
try {
if (managementServiceBlockingStub != null) {
- if (!instancePropertiesSubmitted) {
+ if (Math.abs(sendPropertiesCounter.getAndAdd(1)) %
Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
managementServiceBlockingStub
.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
TimeUnit.SECONDS)
@@ -123,7 +124,6 @@ public class ServiceManagementClient implements
BootService, Runnable, GRPCChann
Config.OsInfo.IPV4_LIST_SIZE))
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
.build());
- instancePropertiesSubmitted = true;
} else {
final Commands commands =
managementServiceBlockingStub.withDeadlineAfter(
GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
diff --git
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
index d07766a..5b66b5d 100644
---
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
+++
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
@@ -24,6 +24,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.utils.Bytes;
@@ -57,6 +58,7 @@ public class KafkaServiceManagementServiceClient implements
BootService, Runnabl
private KafkaProducer<String, Bytes> producer;
private String topic;
+ private AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
@Override
public void prepare() {
@@ -85,28 +87,32 @@ public class KafkaServiceManagementServiceClient implements
BootService, Runnabl
this,
t -> LOGGER.error("unexpected exception.", t)
), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS);
-
- InstanceProperties instance = InstanceProperties.newBuilder()
-
.setService(Config.Agent.SERVICE_NAME)
-
.setServiceInstance(Config.Agent.INSTANCE_NAME)
-
.addAllProperties(OSUtil.buildOSInfo(
-
Config.OsInfo.IPV4_LIST_SIZE))
-
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
- .build();
- producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER +
instance.getServiceInstance(), Bytes.wrap(instance.toByteArray())));
- producer.flush();
}
@Override
public void run() {
- InstancePingPkg ping = InstancePingPkg.newBuilder()
-
.setService(Config.Agent.SERVICE_NAME)
-
.setServiceInstance(Config.Agent.INSTANCE_NAME)
- .build();
- if (LOGGER.isDebugEnable()) {
- LOGGER.debug("Heartbeat reporting, instance: {}",
ping.getServiceInstance());
+ if (Math.abs(sendPropertiesCounter.getAndAdd(1)) %
Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
+ InstanceProperties instance = InstanceProperties.newBuilder()
+
.setService(Config.Agent.SERVICE_NAME)
+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+
.addAllProperties(OSUtil.buildOSInfo(
+
Config.OsInfo.IPV4_LIST_SIZE))
+
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
+ .build();
+ producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER +
instance.getServiceInstance(),
+
Bytes.wrap(instance.toByteArray())
+ ));
+ producer.flush();
+ } else {
+ InstancePingPkg ping = InstancePingPkg.newBuilder()
+
.setService(Config.Agent.SERVICE_NAME)
+
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+ .build();
+ if (LOGGER.isDebugEnable()) {
+ LOGGER.debug("Heartbeat reporting, instance: {}",
ping.getServiceInstance());
+ }
+ producer.send(new ProducerRecord<>(topic,
ping.getServiceInstance(), Bytes.wrap(ping.toByteArray())));
}
- producer.send(new ProducerRecord<>(topic, ping.getServiceInstance(),
Bytes.wrap(ping.toByteArray())));
}
@Override
diff --git a/docs/en/setup/service-agent/java-agent/README.md
b/docs/en/setup/service-agent/java-agent/README.md
index 220b776..44750db 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -89,6 +89,7 @@ property key | Description | Default |
`osinfo.ipv4_list_size`| Limit the length of the ipv4 list size. |`10`|
`collector.grpc_channel_check_interval`|grpc channel status check
interval.|`30`|
`collector.heartbeat_period`|agent heartbeat report period. Unit, second.|`30`|
+`collector.properties_report_period_factor`|The agent sends the instance
properties to the backend every `collector.heartbeat_period *
collector.properties_report_period_factor` seconds |`10`|
`collector.backend_service`|Collector SkyWalking trace receiver service
addresses.|`127.0.0.1:11800`|
`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending
data to upstream. Unit is second.|`30` seconds|
`collector.get_profile_task_interval`|Sniffer get profile task list
interval.|`20`|