tom-pytel commented on a change in pull request #125:
URL: https://github.com/apache/skywalking-python/pull/125#discussion_r657031313



##########
File path: skywalking/agent/__init__.py
##########
@@ -65,14 +76,40 @@ def __init():
         __protocol = KafkaProtocol()
 
     plugins.install()
+    __init_threading()
 
 
 def __fini():
     __protocol.report(__queue, False)
     __queue.join()
+    __finished.set()
+
+
+def __fork_before():
+    if config.protocol != 'http':
+        logger.warning('fork() not currently supported with %s protocol' % 
config.protocol)

Review comment:
       > I didn't mean to close parent channel and create in child process by 
reusing the agent in parent. What I meant is to start another independent agent 
in child process and leave the parent one there because there may be other 
things that may need to be traced in parent process. Can you take a look at
   
   I tried several things like:
   * Not doing anything before the fork then creating new 
`GrpcServiceManagementClient` and `GrpcTraceSegmentReportService` in child.
   * The above but closing channel in child before creating new.
   * Closing the channel before fork then recreating in both parent and child.
   * Instead of `close()`, use `unsubscribe()`.
   * Both `unsubscribe()` then `close()` before fork or after in child.
   * I did also try waiting for empty queue before allowing fork to proceed but 
that was unnecessary as I wasn't even sending anything before the fork, just 
for form.
   
   I also forgot to mention there was a third result I was getting sometime, 
deadlock hang. It is possible I missed some permutations or a different 
function to call, but in general researching python grpc with multiprocessing 
on the the net I found basically the following answers, either 1. "don't do 
it", or 2. "grpc must be started only after forking everything, then maybe it 
will work". Here are some links:
   
   https://github.com/googleapis/synthtool/issues/902
   
https://stackoverflow.com/questions/62798507/why-multiprocess-python-grpc-server-do-not-work
   
   So as I said, it may be possible but I have not hit on how to do it. If you 
want to give it a shot I will add a simple test scrip to the end of this 
message. I also didn't test anything with Kafka and assume it will not work 
correctly forking until someone validates that.
   
   As for current higher level flow, keep in mind it can be modified in the 
future according to what protocol is in use, but for now - Nothing special is 
done before fork or after in the parent. In those cases all threads and sockets 
and locks continue operating as if nothing had happened. In the child, new 
report and heartbeat threads are started since threads don't survive into 
children. And specifically in the http protocol, the duplicated sockets are 
closed and new ones are opened on next heartbeat or report.
   
   There is a potential problem with the `__queue` object as a thread may have 
been holding an internal lock on it before fork and since that thread is no 
longer present the queue will remain in a locked state. Not sure how to resolve 
this yet, but it should be a very rare event. Even rarer may be the same lock 
problem with the `__finished` event, but I wouldn't expect that to happen 
basically ever.
   
   Right now I have other stuff on my plate but if you have any suggestions on 
what to try I may revisit this at some point in the future. Or if you ant to 
try yourself, here is a test script:
   ```py
   import multiprocessing as mp
   import time
   
   from skywalking.trace.context import get_context
   from skywalking import agent, config
   
   config.init(collector='127.0.0.1:11800', service='your awesome service')
   agent.start()
   
   def foo():
       with get_context().new_local_span('child before error'):
           pass
   
       time.sleep(2)  # this needed to flush send because python doesn't run 
atexit handlers on exit in forked children
   
       # import atexit
       # atexit._run_exitfuncs()
   
   if __name__ == '__main__':
       p = mp.Process(target = foo, args = ())
   
       with get_context().new_local_span('parent before start'):
           pass
   
       p.start()
   
       time.sleep(1)
   
       with get_context().new_local_span('parent after start'):
           pass
   
       p.join()
   
       with get_context().new_local_span('parent after join'):
           pass
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to