This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/skywalking-python.git


The following commit(s) were added to refs/heads/master by this push:
     new 93dd9b3  Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka 
plain text (#343)
93dd9b3 is described below

commit 93dd9b331f0598e7f2081ad5d3bf75d13e04df99
Author: Tsonglew <[email protected]>
AuthorDate: Sat Jun 29 21:51:24 2024 +0800

    Fix: no attribute '_SkyWalkingAgent__log_queue' using kafka plain text 
(#343)
---
 CHANGELOG.md                 |  1 +
 skywalking/agent/__init__.py | 30 ++++++++++++++++++++----------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 68c0563..8aaedf0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -15,6 +15,7 @@
   - **Tentative**: Set upper bound <=5.9.5 for psutil package due to test 
failure. (#326)
   - Remove `DeprecationWarning` from `pkg_resources` by replace it with 
`importlib_metadata` (#329)
   - Fix unexpected 'decode' AttributeError when MySQLdb module is mapped by 
PyMySQL (#336)
+  - Fix SkyWalking agent failed to start if using kafka protocol with 
sasl_mechanism=PLAIN. (#343)
 
 ### 1.0.1
 
diff --git a/skywalking/agent/__init__.py b/skywalking/agent/__init__.py
index 9ee4d92..70b28e9 100644
--- a/skywalking/agent/__init__.py
+++ b/skywalking/agent/__init__.py
@@ -119,6 +119,10 @@ class SkyWalkingAgent(Singleton):
     def __bootstrap(self):
         # when forking, already instrumented modules must not be instrumented 
again
         # otherwise it will cause double instrumentation! (we should provide 
an un-instrument method)
+
+        # Initialize queues for segment, log, meter and profiling snapshots
+        self.__init_queues()
+
         if config.agent_protocol == 'grpc':
             from skywalking.agent.protocol.grpc import GrpcProtocol
             self.__protocol = GrpcProtocol()
@@ -129,18 +133,29 @@ class SkyWalkingAgent(Singleton):
             from skywalking.agent.protocol.kafka import KafkaProtocol
             self.__protocol = KafkaProtocol()
 
-        # Initialize queues for segment, log, meter and profiling snapshots
-        self.__segment_queue: Optional[Queue] = None
+        # Start reporter threads and register queues
+        self.__init_threading()
+
+    def __init_queues(self) -> None:
+        """
+        This method initializes all the queues for the agent and reporters.
+        """
+        self.__segment_queue = 
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
         self.__log_queue: Optional[Queue] = None
         self.__meter_queue: Optional[Queue] = None
         self.__snapshot_queue: Optional[Queue] = None
 
-        # Start reporter threads and register queues
-        self.__init_threading()
+        if config.agent_meter_reporter_active:
+            self.__meter_queue = 
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
+        if config.agent_log_reporter_active:
+            self.__log_queue = 
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
+        if config.agent_profile_active:
+            self.__snapshot_queue = 
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
+
 
     def __init_threading(self) -> None:
         """
-        This method initializes all the queues and threads for the agent and 
reporters.
+        This method initializes all the threads for the agent and reporters.
         Upon os.fork(), callback will reinitialize threads and queues by 
calling this method
 
         Heartbeat thread is started by default.
@@ -152,12 +167,10 @@ class SkyWalkingAgent(Singleton):
         __heartbeat_thread = Thread(name='HeartbeatThread', 
target=self.__heartbeat, daemon=True)
         __heartbeat_thread.start()
 
-        self.__segment_queue = 
Queue(maxsize=config.agent_trace_reporter_max_buffer_size)
         __segment_report_thread = Thread(name='SegmentReportThread', 
target=self.__report_segment, daemon=True)
         __segment_report_thread.start()
 
         if config.agent_meter_reporter_active:
-            self.__meter_queue = 
Queue(maxsize=config.agent_meter_reporter_max_buffer_size)
             __meter_report_thread = Thread(name='MeterReportThread', 
target=self.__report_meter, daemon=True)
             __meter_report_thread.start()
 
@@ -173,7 +186,6 @@ class SkyWalkingAgent(Singleton):
                 ThreadDataSource().register()
 
         if config.agent_log_reporter_active:
-            self.__log_queue = 
Queue(maxsize=config.agent_log_reporter_max_buffer_size)
             __log_report_thread = Thread(name='LogReportThread', 
target=self.__report_log, daemon=True)
             __log_report_thread.start()
 
@@ -183,8 +195,6 @@ class SkyWalkingAgent(Singleton):
                                                daemon=True)
             __command_dispatch_thread.start()
 
-            self.__snapshot_queue = 
Queue(maxsize=config.agent_profile_snapshot_transport_buffer_size)
-
             __query_profile_thread = Thread(name='QueryProfileCommandThread', 
target=self.__query_profile_command,
                                             daemon=True)
             __query_profile_thread.start()

Reply via email to