This is an automated email from the ASF dual-hosted git repository.

kezhenxu94 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed3043  fix: grpc timeout segment data loss (#116)
6ed3043 is described below

commit 6ed3043344930334077c0d7da667dba6529db5c8
Author: Tomasz Pytel <[email protected]>
AuthorDate: Thu Jan 28 23:59:20 2021 -0300

    fix: grpc timeout segment data loss (#116)
---
 skywalking/agent/__init__.py      |  9 ++++-----
 skywalking/agent/protocol/grpc.py | 18 ++++++++++++++++--
 skywalking/client/grpc.py         |  2 +-
 skywalking/config.py              |  5 +++++
 4 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index c5cee3b..356a28a 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -16,7 +16,7 @@
 #
 
 import atexit
-from queue import Queue
+from queue import Queue, Full
 from threading import Thread, Event
 from typing import TYPE_CHECKING
 
@@ -109,8 +109,7 @@ def connected():
 
 
 def archive(segment: 'Segment'):
-    if __queue.full():
+    try:  # unlike checking __queue.full() then inserting, this is atomic
+        __queue.put(segment, block=False)
+    except Full:
         logger.warning('the queue is full, the segment will be abandoned')
-        return
-
-    __queue.put(segment)
diff --git a/skywalking/agent/protocol/grpc.py 
b/skywalking/agent/protocol/grpc.py
index 4207070..3f2c638 100644
--- a/skywalking/agent/protocol/grpc.py
+++ b/skywalking/agent/protocol/grpc.py
@@ -18,7 +18,8 @@
 import logging
 from skywalking.loggings import logger
 import traceback
-from queue import Queue, Empty
+from queue import Queue, Empty, Full
+from time import time
 
 import grpc
 
@@ -68,10 +69,16 @@ class GrpcProtocol(Protocol):
         self.channel.subscribe(self._cb, try_to_connect=True)
 
     def report(self, queue: Queue, block: bool = True):
+        start = time()
+        segment = None
+
         def generator():
+            nonlocal segment
+
             while True:
                 try:
-                    segment = queue.get(block=block)  # type: Segment
+                    timeout = max(0, config.QUEUE_TIMEOUT - int(time() - 
start))  # type: int
+                    segment = queue.get(block=block, timeout=timeout)  # type: 
Segment
                 except Empty:
                     return
 
@@ -120,5 +127,12 @@ class GrpcProtocol(Protocol):
 
         try:
             self.traces_reporter.report(generator())
+
         except grpc.RpcError:
             self.on_error()
+
+            if segment:
+                try:
+                    queue.put(segment, block=False)
+                except Full:
+                    pass
diff --git a/skywalking/client/grpc.py b/skywalking/client/grpc.py
index 734d3bb..ef0f02d 100644
--- a/skywalking/client/grpc.py
+++ b/skywalking/client/grpc.py
@@ -55,4 +55,4 @@ class 
GrpcTraceSegmentReportService(TraceSegmentReportService):
         self.report_stub = TraceSegmentReportServiceStub(channel)
 
     def report(self, generator):
-        self.report_stub.collect(generator)
+        self.report_stub.collect(generator, timeout=config.GRPC_TIMEOUT)
diff --git a/skywalking/config.py b/skywalking/config.py
index 4ade9a2..d952381 100644
--- a/skywalking/config.py
+++ b/skywalking/config.py
@@ -23,6 +23,11 @@ from typing import TYPE_CHECKING
 if TYPE_CHECKING:
     from typing import List
 
+# In order to prevent timeouts and possible segment loss make sure 
QUEUE_TIMEOUT is always at least few seconds lower
+# than GRPC_TIMEOUT.
+GRPC_TIMEOUT = 300  # type: int
+QUEUE_TIMEOUT = 240  # type: int
+
 RE_IGNORE_PATH = re.compile('^$')  # type: re.Pattern
 
 service_name = os.getenv('SW_AGENT_NAME') or 'Python Service Name'  # type: str

Reply via email to