This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch timeout-config
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit 85075a62b343475050242b5382b0e200cd2665ae
Author: Wu Sheng <wu.sh...@foxmail.com>
AuthorDate: Fri Sep 20 10:14:37 2019 +0800

    Support timeout configuration in agent and backend.
---
 .../org/apache/skywalking/apm/agent/core/conf/Config.java    |  4 ++++
 .../org/apache/skywalking/apm/agent/core/jvm/JVMService.java |  4 +++-
 .../agent/core/remote/ServiceAndEndpointRegisterClient.java  | 12 +++++++-----
 .../apm/agent/core/remote/TraceSegmentServiceClient.java     |  3 ++-
 docs/en/setup/service-agent/java-agent/README.md             |  1 +
 .../apache/skywalking/oap/server/core/CoreModuleConfig.java  |  4 ++++
 .../skywalking/oap/server/core/CoreModuleProvider.java       |  2 +-
 .../oap/server/core/remote/client/GRPCRemoteClient.java      |  6 ++++--
 .../oap/server/core/remote/client/RemoteClientManager.java   | 11 +++++++++--
 .../core/remote/client/RemoteClientManagerTestCase.java      |  2 +-
 10 files changed, 36 insertions(+), 13 deletions(-)

diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 5a8eca4..def4cd5 100755
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -132,6 +132,10 @@ public class Config {
          * Collector skywalking trace receiver service addresses.
          */
         public static String BACKEND_SERVICE = "";
+        /**
+         * How long grpc client will timeout in sending data to upstream.
+         */
+        public static int GRPC_UPSTREAM_TIMEOUT = 30;
     }
 
     public static class Jvm {
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 040d733..598cd36 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -47,6 +47,8 @@ import 
org.apache.skywalking.apm.network.language.agent.v2.JVMMetricCollection;
 import 
org.apache.skywalking.apm.network.language.agent.v2.JVMMetricReportServiceGrpc;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
 /**
  * The <code>JVMService</code> represents a timer, which collectors JVM cpu, 
memory, memorypool and gc info, and send
  * the collected info to Collector through the channel provided by {@link 
GRPCChannelManager}
@@ -140,7 +142,7 @@ public class JVMService implements BootService, Runnable {
                         if (buffer.size() > 0) {
                             builder.addAllMetrics(buffer);
                             
builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID);
-                            Commands commands = stub.withDeadlineAfter(10, 
TimeUnit.SECONDS).collect(builder.build());
+                            Commands commands = 
stub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).collect(builder.build());
                             
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                         }
                     } catch (Throwable t) {
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
index 92c22a5..9c68ac3 100755
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java
@@ -53,6 +53,8 @@ import org.apache.skywalking.apm.network.register.v2.Services;
 import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
 import org.apache.skywalking.apm.util.StringUtil;
 
+import static 
org.apache.skywalking.apm.agent.core.conf.Config.Collector.GRPC_UPSTREAM_TIMEOUT;
+
 /**
  * @author wusheng
  */
@@ -138,7 +140,7 @@ public class ServiceAndEndpointRegisterClient implements 
BootService, Runnable,
             try {
                 if (RemoteDownstreamConfig.Agent.SERVICE_ID == 
DictionaryUtil.nullValue()) {
                     if (registerBlockingStub != null) {
-                        ServiceRegisterMapping serviceRegisterMapping = 
registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister(
+                        ServiceRegisterMapping serviceRegisterMapping = 
registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).doServiceRegister(
                             
Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build());
                         if (serviceRegisterMapping != null) {
                             for (KeyIntValuePair registered : 
serviceRegisterMapping.getServicesList()) {
@@ -153,7 +155,7 @@ public class ServiceAndEndpointRegisterClient implements 
BootService, Runnable,
                     if (registerBlockingStub != null) {
                         if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID 
== DictionaryUtil.nullValue()) {
 
-                            ServiceInstanceRegisterMapping instanceMapping = 
registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                            ServiceInstanceRegisterMapping instanceMapping = 
registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, TimeUnit.SECONDS)
                                     
.doServiceInstanceRegister(ServiceInstances.newBuilder()
                                 .addInstances(
                                     ServiceInstance.newBuilder()
@@ -173,15 +175,15 @@ public class ServiceAndEndpointRegisterClient implements 
BootService, Runnable,
                                 }
                             }
                         } else {
-                            final Commands commands = 
serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS)
+                            final Commands commands = 
serviceInstancePingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS)
                                 .doPing(ServiceInstancePingPkg.newBuilder()
                                 
.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID)
                                 .setTime(System.currentTimeMillis())
                                 .setServiceInstanceUUID(INSTANCE_UUID)
                                 .build());
 
-                            
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10,
 TimeUnit.SECONDS));
-                            
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10,
 TimeUnit.SECONDS));
+                            
NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
 TimeUnit.SECONDS));
+                            
EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(GRPC_UPSTREAM_TIMEOUT,
 TimeUnit.SECONDS));
                             
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
                         }
                     }
diff --git 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 3ffacff..edea03b 100644
--- 
a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ 
b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.skywalking.apm.agent.core.boot.*;
 import org.apache.skywalking.apm.agent.core.commands.CommandService;
+import org.apache.skywalking.apm.agent.core.conf.Config;
 import org.apache.skywalking.apm.agent.core.context.*;
 import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
 import org.apache.skywalking.apm.agent.core.logging.api.*;
@@ -88,7 +89,7 @@ public class TraceSegmentServiceClient implements 
BootService, IConsumer<TraceSe
     public void consume(List<TraceSegment> data) {
         if (CONNECTED.equals(status)) {
             final GRPCStreamServiceStatus status = new 
GRPCStreamServiceStatus(false);
-            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = 
serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new 
StreamObserver<Commands>() {
+            StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = 
serviceStub.withDeadlineAfter(Config.Collector.GRPC_UPSTREAM_TIMEOUT, 
TimeUnit.SECONDS).collect(new StreamObserver<Commands>() {
                 @Override
                 public void onNext(Commands commands) {
                     
ServiceManager.INSTANCE.findService(CommandService.class).receiveCommand(commands);
diff --git a/docs/en/setup/service-agent/java-agent/README.md 
b/docs/en/setup/service-agent/java-agent/README.md
index e3fa37d..bfa95e4 100755
--- a/docs/en/setup/service-agent/java-agent/README.md
+++ b/docs/en/setup/service-agent/java-agent/README.md
@@ -82,6 +82,7 @@ property key | Description | Default |
 `collector.grpc_channel_check_interval`|grpc channel status check 
interval.|`30`|
 `collector.app_and_service_register_check_interval`|application and service 
registry check interval.|`3`|
 `collector.backend_service`|Collector SkyWalking trace receiver service 
addresses.|`127.0.0.1:11800`|
+`collector.grpc_upstream_timeout`|How long grpc client will timeout in sending 
data to upstream. Unit is second.|`30` seconds|
 `logging.level`|The log level. Default is debug.|`DEBUG`|
 `logging.file_name`|Log file name.|`skywalking-api.log`|
 `logging.output`| Log output. Default is FILE. Use CONSOLE means output to 
stdout. |`FILE`|
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index c7c26e1..1858116 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -53,6 +53,10 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private int monthMetricsDataTTL;
     @Setter private int gRPCThreadPoolSize;
     @Setter private int gRPCThreadPoolQueueSize;
+    /**
+     * Timeout for cluster internal communication, in seconds.
+     */
+    @Setter private int remoteTimeout = 20;
 
     CoreModuleConfig() {
         this.downsampling = new ArrayList<>();
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 86c611f..3bc4e9a 100755
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -166,7 +166,7 @@ public class CoreModuleProvider extends ModuleProvider {
 
         annotationScan.registerListener(streamAnnotationListener);
 
-        this.remoteClientManager = new RemoteClientManager(getManager());
+        this.remoteClientManager = new RemoteClientManager(getManager(), 
moduleConfig.getRemoteTimeout());
         this.registerServiceImplementation(RemoteClientManager.class, 
remoteClientManager);
 
         
MetricsStreamProcessor.getInstance().setEnableDatabaseSession(moduleConfig.isEnableDatabaseSession());
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
index fd80c9c..e671ef4 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java
@@ -59,12 +59,14 @@ public class GRPCRemoteClient implements RemoteClient {
     private boolean isConnect;
     private CounterMetrics remoteOutCounter;
     private CounterMetrics remoteOutErrorCounter;
+    private int remoteTimeout;
 
     public GRPCRemoteClient(ModuleDefineHolder moduleDefineHolder, Address 
address, int channelSize,
-        int bufferSize) {
+        int bufferSize, int remoteTimeout) {
         this.address = address;
         this.channelSize = channelSize;
         this.bufferSize = bufferSize;
+        this.remoteTimeout = remoteTimeout;
 
         remoteOutCounter = 
moduleDefineHolder.find(TelemetryModule.NAME).provider().getService(MetricsCreator.class)
             .createCounter("remote_out_count", "The number(client side) of 
inside remote inside aggregate rpc.",
@@ -183,7 +185,7 @@ public class GRPCRemoteClient implements RemoteClient {
             }
         }
 
-        return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new 
StreamObserver<Empty>() {
+        return getStub().withDeadlineAfter(remoteTimeout, 
TimeUnit.SECONDS).call(new StreamObserver<Empty>() {
             @Override public void onNext(Empty empty) {
             }
 
diff --git 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
index 06a963f..e749c66 100644
--- 
a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
+++ 
b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManager.java
@@ -57,12 +57,19 @@ public class RemoteClientManager implements Service {
     private final List<RemoteClient> clientsB;
     private volatile List<RemoteClient> usingClients;
     private GaugeMetrics gauge;
+    private int remoteTimeout;
 
-    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder) {
+    /**
+     * Initial the manager for all remote communication clients.
+     * @param moduleDefineHolder for looking up other modules
+     * @param remoteTimeout for cluster internal communication, in seconds.
+     */
+    public RemoteClientManager(ModuleDefineHolder moduleDefineHolder, int 
remoteTimeout) {
         this.moduleDefineHolder = moduleDefineHolder;
         this.clientsA = new LinkedList<>();
         this.clientsB = new LinkedList<>();
         this.usingClients = clientsA;
+        this.remoteTimeout = remoteTimeout;
     }
 
     public void start() {
@@ -203,7 +210,7 @@ public class RemoteClientManager implements Service {
                         RemoteClient client = new 
SelfRemoteClient(moduleDefineHolder, address);
                         getFreeClients().add(client);
                     } else {
-                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000);
+                        RemoteClient client = new 
GRPCRemoteClient(moduleDefineHolder, address, 1, 3000, remoteTimeout);
                         client.connect();
                         getFreeClients().add(client);
                     }
diff --git 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
index e83aa63..c561273 100644
--- 
a/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
+++ 
b/oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/remote/client/RemoteClientManagerTestCase.java
@@ -80,7 +80,7 @@ public class RemoteClientManagerTestCase {
         moduleManager.put(TelemetryModule.NAME, telemetryModuleDefine);
         
telemetryModuleDefine.provider().registerServiceImplementation(MetricsCreator.class,
 metricsCreator);
 
-        RemoteClientManager clientManager = new 
RemoteClientManager(moduleManager);
+        RemoteClientManager clientManager = new 
RemoteClientManager(moduleManager, 10);
 
         
when(clusterNodesQuery.queryRemoteNodes()).thenReturn(groupOneInstances());
         clientManager.refresh();

Reply via email to