This is an automated email from the ASF dual-hosted git repository. hanahmily pushed a commit to branch grpc/deadline in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit 4bcb651d8eaafb1f0e7adff7c4d245542ccb62d3 Author: Gao Hongtao <[email protected]> AuthorDate: Wed Jul 3 07:23:58 2019 +0800 Adding deadline to gRPC client * Set up 10 seconds deadline after gRPC client sending * The duration of deadline contains three segments: connecting, request and response For blocking stub, I just set deadline before invoke service. For bi-streaming stub, I found all of them are used as a blocking style, that after getting streaming response client stub just close current streaming. Base on above reality, I pick the same way as blocking one. --- .../org/apache/skywalking/apm/agent/core/jvm/JVMService.java | 2 +- .../agent/core/remote/ServiceAndEndpointRegisterClient.java | 12 +++++++----- .../apm/agent/core/remote/TraceSegmentServiceClient.java | 4 +++- .../oap/server/exporter/provider/grpc/GRPCExporter.java | 5 +++-- .../oap/server/core/remote/client/GRPCRemoteClient.java | 3 ++- 5 files changed, 16 insertions(+), 10 deletions(-) diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java index 791375f..6294b8b 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java @@ -138,7 +138,7 @@ public class JVMService implements BootService, Runnable { if (buffer.size() > 0) { builder.addAllMetrics(buffer); builder.setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID); - stub.collect(builder.build()); + stub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(builder.build()); } } catch (Throwable t) { logger.error(t, "send JVM metrics to Collector fail."); diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java index b3e29de..a112b56 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ServiceAndEndpointRegisterClient.java @@ -23,6 +23,7 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; + import org.apache.skywalking.apm.agent.core.boot.BootService; import org.apache.skywalking.apm.agent.core.boot.DefaultImplementor; import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory; @@ -112,7 +113,7 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, try { if (RemoteDownstreamConfig.Agent.SERVICE_ID == DictionaryUtil.nullValue()) { if (registerBlockingStub != null) { - ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.doServiceRegister( + ServiceRegisterMapping serviceRegisterMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doServiceRegister( Services.newBuilder().addServices(Service.newBuilder().setServiceName(Config.Agent.SERVICE_NAME)).build()); if (serviceRegisterMapping != null) { for (KeyIntValuePair registered : serviceRegisterMapping.getServicesList()) { @@ -127,7 +128,8 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, if (registerBlockingStub != null) { if (RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID == DictionaryUtil.nullValue()) { - ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.doServiceInstanceRegister(ServiceInstances.newBuilder() + ServiceInstanceRegisterMapping instanceMapping = registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS) + .doServiceInstanceRegister(ServiceInstances.newBuilder() .addInstances( ServiceInstance.newBuilder() .setServiceId(RemoteDownstreamConfig.Agent.SERVICE_ID) @@ -144,14 +146,14 @@ public class ServiceAndEndpointRegisterClient implements BootService, Runnable, } } } else { - serviceInstancePingStub.doPing(ServiceInstancePingPkg.newBuilder() + serviceInstancePingStub.withDeadlineAfter(10, TimeUnit.SECONDS).doPing(ServiceInstancePingPkg.newBuilder() .setServiceInstanceId(RemoteDownstreamConfig.Agent.SERVICE_INSTANCE_ID) .setTime(System.currentTimeMillis()) .setServiceInstanceUUID(INSTANCE_UUID) .build()); - NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub); - EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub); + NetworkAddressDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); + EndpointNameDictionary.INSTANCE.syncRemoteDictionary(registerBlockingStub.withDeadlineAfter(10, TimeUnit.SECONDS)); } } } diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java index 221d982..a58f6c8 100644 --- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java +++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java @@ -21,6 +21,8 @@ package org.apache.skywalking.apm.agent.core.remote; import io.grpc.Channel; import io.grpc.stub.StreamObserver; import java.util.List; +import java.util.concurrent.TimeUnit; + import org.apache.skywalking.apm.agent.core.boot.*; import org.apache.skywalking.apm.agent.core.context.*; import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment; @@ -85,7 +87,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe public void consume(List<TraceSegment> data) { if (CONNECTED.equals(status)) { final GRPCStreamServiceStatus status = new GRPCStreamServiceStatus(false); - StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.collect(new StreamObserver<Commands>() { + StreamObserver<UpstreamSegment> upstreamSegmentStreamObserver = serviceStub.withDeadlineAfter(10, TimeUnit.SECONDS).collect(new StreamObserver<Commands>() { @Override public void onNext(Commands commands) { diff --git a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java index 2e28322..136c2aa 100644 --- a/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java +++ b/oap-server/exporter/src/main/java/org/apache/skywalking/oap/server/exporter/provider/grpc/GRPCExporter.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.exporter.provider.grpc; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import lombok.*; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; @@ -69,7 +70,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } public void initSubscriptionList() { - SubscriptionsResp subscription = blockingStub.subscription(SubscriptionReq.newBuilder().build()); + SubscriptionsResp subscription = blockingStub.withDeadlineAfter(10, TimeUnit.SECONDS).subscription(SubscriptionReq.newBuilder().build()); subscription.getMetricNamesList().forEach(subscriptionSet::add); logger.debug("Get exporter subscription list, {}", subscriptionSet); } @@ -84,7 +85,7 @@ public class GRPCExporter extends MetricFormatter implements MetricValuesExportS } ExportStatus status = new ExportStatus(); - StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.export( + StreamObserver<ExportMetricValue> streamObserver = exportServiceFutureStub.withDeadlineAfter(10, TimeUnit.SECONDS).export( new StreamObserver<ExportResponse>() { @Override public void onNext(ExportResponse response) { diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java index 6a920a5..4927b97 100644 --- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java +++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/remote/client/GRPCRemoteClient.java @@ -21,6 +21,7 @@ package org.apache.skywalking.oap.server.core.remote.client; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; import java.util.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.skywalking.apm.commons.datacarrier.DataCarrier; import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy; @@ -183,7 +184,7 @@ public class GRPCRemoteClient implements RemoteClient { } } - return getStub().call(new StreamObserver<Empty>() { + return getStub().withDeadlineAfter(10, TimeUnit.SECONDS).call(new StreamObserver<Empty>() { @Override public void onNext(Empty empty) { }
