This is an automated email from the ASF dual-hosted git repository.
kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed3043 fix: grpc timeout segment data loss (#116)
6ed3043 is described below
commit 6ed3043344930334077c0d7da667dba6529db5c8
Author: Tomasz Pytel <[email protected]>
AuthorDate: Thu Jan 28 23:59:20 2021 -0300
fix: grpc timeout segment data loss (#116)
---
skywalking/agent/__init__.py | 9 ++++-----
skywalking/agent/protocol/grpc.py | 18 ++++++++++++++++--
skywalking/client/grpc.py | 2 +-
skywalking/config.py | 5 +++++
4 files changed, 26 insertions(+), 8 deletions(-)
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index c5cee3b..356a28a 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -16,7 +16,7 @@
#
import atexit
-from queue import Queue
+from queue import Queue, Full
from threading import Thread, Event
from typing import TYPE_CHECKING
@@ -109,8 +109,7 @@ def connected():
def archive(segment: 'Segment'):
- if __queue.full():
+ try: # unlike checking __queue.full() then inserting, this is atomic
+ __queue.put(segment, block=False)
+ except Full:
logger.warning('the queue is full, the segment will be abandoned')
- return
-
- __queue.put(segment)
diff --git a/skywalking/agent/protocol/grpc.py
b/skywalking/agent/protocol/grpc.py
index 4207070..3f2c638 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -18,7 +18,8 @@
import logging
from skywalking.loggings import logger
import traceback
-from queue import Queue, Empty
+from queue import Queue, Empty, Full
+from time import time
import grpc
@@ -68,10 +69,16 @@ class GrpcProtocol(Protocol):
self.channel.subscribe(self._cb, try_to_connect=True)
def report(self, queue: Queue, block: bool = True):
+ start = time()
+ segment = None
+
def generator():
+ nonlocal segment
+
while True:
try:
- segment = queue.get(block=block) # type: Segment
+ timeout = max(0, config.QUEUE_TIMEOUT - int(time() -
start)) # type: int
+ segment = queue.get(block=block, timeout=timeout) # type:
Segment
except Empty:
return
@@ -120,5 +127,12 @@ class GrpcProtocol(Protocol):
try:
self.traces_reporter.report(generator())
+
except grpc.RpcError:
self.on_error()
+
+ if segment:
+ try:
+ queue.put(segment, block=False)
+ except Full:
+ pass
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 734d3bb..ef0f02d 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -55,4 +55,4 @@ class
GrpcTraceSegmentReportService(TraceSegmentReportService):
self.report_stub = TraceSegmentReportServiceStub(channel)
def report(self, generator):
- self.report_stub.collect(generator)
+ self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
diff --git a/skywalking/config.py b/skywalking/config.py
index 4ade9a2..d952381 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -23,6 +23,11 @@ from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import List
+# In order to prevent timeouts and possible segment loss make sure
QUEUE_TIMEOUT is always at least few seconds lower
+# than GRPC_TIMEOUT.
+GRPC_TIMEOUT = 300 # type: int
+QUEUE_TIMEOUT = 240 # type: int
+
RE_IGNORE_PATH = re.compile('^$') # type: re.Pattern
service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name' # type: str