This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking.git


The following commit(s) were added to refs/heads/master by this push:
     new f5e38b8  Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid 
the instanceTraffic record properties is null. (#5688)
f5e38b8 is described below

commit f5e38b88e3b2734fd7c73207e62d38ef54115a04
Author: zifeihan <[email protected]>
AuthorDate: Tue Oct 20 12:02:27 2020 +0800

    Add agent config `PROPERTIES_REPORT_PERIOD_FACTOR `, Avoid the 
instanceTraffic record properties is null. (#5688)
---
 .../skywalking/apm/agent/core/conf/Config.java     |  4 +++
 .../agent/core/remote/ServiceManagementClient.java |  6 ++--
 .../kafka/KafkaServiceManagementServiceClient.java | 40 +++++++++++++---------
 docs/en/setup/service-agent/java-agent/README.md   |  1 +
 4 files changed, 31 insertions(+), 20 deletions(-)

diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 41a86d4..fc51709 100755
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -143,6 +143,10 @@ public class Config {
          */
         public static long HEARTBEAT_PERIOD = 30;
         /**
+         * The agent sends the instance properties to the backend every 
`collector.heartbeat_period * collector.properties_report_period_factor` seconds
+         */
+        public static int PROPERTIES_REPORT_PERIOD_FACTOR = 10;
+        /**
          * Collector skywalking trace receiver service addresses.
          */
         public static String BACKEND_SERVICE = "";
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
index e282a63..f271856 100755
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceManagementClient.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.skywalking.apm.agent.core.boot.BootService;
 import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor;
 import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
@@ -52,7 +53,7 @@ public class ServiceManagementClient implements BootService, 
Runnable, GRPCChann
     private volatile GRPCChannelStatus status = GRPCChannelStatus.DISCONNECT;
     private volatile ManagementServiceGrpc.ManagementServiceBlockingStub 
managementServiceBlockingStub;
     private volatile ScheduledFuture<?> heartbeatFuture;
-    private volatile boolean instancePropertiesSubmitted = false;
+    private volatile AtomicInteger sendPropertiesCounter = new 
AtomicInteger(0);
 
     @Override
     public void statusChanged(GRPCChannelStatus status) {
@@ -112,7 +113,7 @@ public class ServiceManagementClient implements 
BootService, Runnable, GRPCChann
         if (GRPCChannelStatus.CONNECTED.equals(status)) {
             try {
                 if (managementServiceBlockingStub != null) {
-                    if (!instancePropertiesSubmitted) {
+                    if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % 
Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
 
                         managementServiceBlockingStub
                             .withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS)
@@ -123,7 +124,6 @@ public class ServiceManagementClient implements 
BootService, Runnable, GRPCChann
                                                                             
Config.OsInfo.IPV4_LIST_SIZE))
                                                                         
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
                                                                         
.build());
-                        instancePropertiesSubmitted = true;
                     } else {
                         final Commands commands = 
managementServiceBlockingStub.withDeadlineAfter(
                             GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS
diff --git 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
index d07766a..5b66b5d 100644
--- 
a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
+++ 
b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaServiceManagementServiceClient.java
@@ -24,6 +24,7 @@ import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.utils.Bytes;
@@ -57,6 +58,7 @@ public class KafkaServiceManagementServiceClient implements 
BootService, Runnabl
     private KafkaProducer<String, Bytes> producer;
 
     private String topic;
+    private AtomicInteger sendPropertiesCounter = new AtomicInteger(0);
 
     @Override
     public void prepare() {
@@ -85,28 +87,32 @@ public class KafkaServiceManagementServiceClient implements 
BootService, Runnabl
             this,
             t -> LOGGER.error("unexpected exception.", t)
         ), 0, Config.Collector.HEARTBEAT_PERIOD, TimeUnit.SECONDS);
-
-        InstanceProperties instance = InstanceProperties.newBuilder()
-                                                        
.setService(Config.Agent.SERVICE_NAME)
-                                                        
.setServiceInstance(Config.Agent.INSTANCE_NAME)
-                                                        
.addAllProperties(OSUtil.buildOSInfo(
-                                                            
Config.OsInfo.IPV4_LIST_SIZE))
-                                                        
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
-                                                        .build();
-        producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER + 
instance.getServiceInstance(), Bytes.wrap(instance.toByteArray())));
-        producer.flush();
     }
 
     @Override
     public void run() {
-        InstancePingPkg ping = InstancePingPkg.newBuilder()
-                                              
.setService(Config.Agent.SERVICE_NAME)
-                                              
.setServiceInstance(Config.Agent.INSTANCE_NAME)
-                                              .build();
-        if (LOGGER.isDebugEnable()) {
-            LOGGER.debug("Heartbeat reporting, instance: {}", 
ping.getServiceInstance());
+        if (Math.abs(sendPropertiesCounter.getAndAdd(1)) % 
Config.Collector.PROPERTIES_REPORT_PERIOD_FACTOR == 0) {
+            InstanceProperties instance = InstanceProperties.newBuilder()
+                                                            
.setService(Config.Agent.SERVICE_NAME)
+                                                            
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+                                                            
.addAllProperties(OSUtil.buildOSInfo(
+                                                                
Config.OsInfo.IPV4_LIST_SIZE))
+                                                            
.addAllProperties(SERVICE_INSTANCE_PROPERTIES)
+                                                            .build();
+            producer.send(new ProducerRecord<>(topic, TOPIC_KEY_REGISTER + 
instance.getServiceInstance(),
+                                               
Bytes.wrap(instance.toByteArray())
+            ));
+            producer.flush();
+        } else {
+            InstancePingPkg ping = InstancePingPkg.newBuilder()
+                                                  
.setService(Config.Agent.SERVICE_NAME)
+                                                  
.setServiceInstance(Config.Agent.INSTANCE_NAME)
+                                                  .build();
+            if (LOGGER.isDebugEnable()) {
+                LOGGER.debug("Heartbeat reporting, instance: {}", 
ping.getServiceInstance());
+            }
+            producer.send(new ProducerRecord<>(topic, 
ping.getServiceInstance(), Bytes.wrap(ping.toByteArray())));
         }
-        producer.send(new ProducerRecord<>(topic, ping.getServiceInstance(), 
Bytes.wrap(ping.toByteArray())));
     }
 
     @Override
diff --git a/docs/en/setup/service-agent/java-agent/README.md 
b/docs/en/setup/service-agent/java-agent/README.md
index 220b776..44750db 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -89,6 +89,7 @@ property key | Description | Default |
 `osinfo.ipv4_list_size`| Limit the length of the ipv4 list size. |`10`|
 `collector.grpc_channel_check_interval`|grpc channel status check 
interval.|`30`|
 `collector.heartbeat_period`|agent heartbeat report period. Unit, second.|`30`|
+`collector.properties_report_period_factor`|The agent sends the instance 
properties to the backend every `collector.heartbeat_period * 
collector.properties_report_period_factor` seconds |`10`|
 `collector.backend_service`|Collector SkyWalking trace receiver service 
addresses.|`127.0.0.1:11800`|
 `collector.grpc_upstream_timeout`|How long grpc client will timeout in sending 
data to upstream. Unit is second.|`30` seconds|
 `collector.get_profile_task_interval`|Sniffer get profile task list 
interval.|`20`|

Reply via email to