This is an automated email from the ASF dual-hosted git repository.
wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git
The following commit(s) were added to refs/heads/master by this push:
new 93dd9b3 Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka
plain text (#343)
93dd9b3 is described below
commit 93dd9b331f0598e7f2081ad5d3bf75d13e04df99
Author: Tsonglew <[email protected]>
AuthorDate: Sat Jun 29 21:51:24 2024 +0800
Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka plain text
(#343)
---
CHANGELOG.md | 1 +
skywalking/agent/__init__.py | 30 ++++++++++++++++++++----------
2 files changed, 21 insertions(+), 10 deletions(-)
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 68c0563..8aaedf0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,7 @@
- **Tentative**: Set upper bound <=5.9.5 for psutil package due to test
failure. (#326)
- Remove `DeprecationWarning` from `pkg_resources` by replace it with
`importlib_metadata` (#329)
- Fix unexpected 'decode' AttributeError when MySQLdb module is mapped by
PyMySQL (#336)
+ - Fix SkyWalking agent failed to start if using kafka protocol with
sasl_mechanism=PLAIN. (#343)
### 1.0.1
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 9ee4d92..70b28e9 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -119,6 +119,10 @@ class SkyWalkingAgent(Singleton):
def __bootstrap(self):
# when forking, already instrumented modules must not be instrumented
again
# otherwise it will cause double instrumentation! (we should provide
an un-instrument method)
+
+ # Initialize queues for segment, log, meter and profiling snapshots
+ self.__init_queues()
+
if config.agent_protocol == 'grpc':
from skywalking.agent.protocol.grpc import GrpcProtocol
self.__protocol = GrpcProtocol()
@@ -129,18 +133,29 @@ class SkyWalkingAgent(Singleton):
from skywalking.agent.protocol.kafka import KafkaProtocol
self.__protocol = KafkaProtocol()
- # Initialize queues for segment, log, meter and profiling snapshots
- self.__segment_queue: Optional[Queue] = None
+ # Start reporter threads and register queues
+ self.__init_threading()
+
+ def __init_queues(self) -> None:
+ """
+ This method initializes all the queues for the agent and reporters.
+ """
+ self.__segment_queue =
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
self.__log_queue: Optional[Queue] = None
self.__meter_queue: Optional[Queue] = None
self.__snapshot_queue: Optional[Queue] = None
- # Start reporter threads and register queues
- self.__init_threading()
+ if config.agent_meter_reporter_active:
+ self.__meter_queue =
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
+ if config.agent_log_reporter_active:
+ self.__log_queue =
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
+ if config.agent_profile_active:
+ self.__snapshot_queue =
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
+
def __init_threading(self) -> None:
"""
- This method initializes all the queues and threads for the agent and
reporters.
+ This method initializes all the threads for the agent and reporters.
Upon os.fork(), callback will reinitialize threads and queues by
calling this method
Heartbeat thread is started by default.
@@ -152,12 +167,10 @@ class SkyWalkingAgent(Singleton):
__heartbeat_thread = Thread(name='HeartbeatThread',
target=self.__heartbeat, daemon=True)
__heartbeat_thread.start()
- self.__segment_queue =
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
__segment_report_thread = Thread(name='SegmentReportThread',
target=self.__report_segment, daemon=True)
__segment_report_thread.start()
if config.agent_meter_reporter_active:
- self.__meter_queue =
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
__meter_report_thread = Thread(name='MeterReportThread',
target=self.__report_meter, daemon=True)
__meter_report_thread.start()
@@ -173,7 +186,6 @@ class SkyWalkingAgent(Singleton):
ThreadDataSource().register()
if config.agent_log_reporter_active:
- self.__log_queue =
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
__log_report_thread = Thread(name='LogReportThread',
target=self.__report_log, daemon=True)
__log_report_thread.start()
@@ -183,8 +195,6 @@ class SkyWalkingAgent(Singleton):
daemon=True)
__command_dispatch_thread.start()
- self.__snapshot_queue =
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
-
__query_profile_thread = Thread(name='QueryProfileCommandThread',
target=self.__query_profile_command,
daemon=True)
__query_profile_thread.start()