On Wed, Mar 26, 2014 at 6:25 PM, Guido van Rossum <[email protected]> wrote:
> Sounds like you're breaking ground and combining things that haven't been
> tested together before. Some comments:
>
> - Setting the default executor to a ProcessPoolExecutor feels like a bad
> idea -- it would mean that every time you connect to a remote host the
> address lookup is done in that executor (that's mainly why there is a
> default executor at all -- the API to set it mostly exists so you can
> configure the number of threads). Instead, I would just pass an explicit
> executor to run_in_executor().
>
OK.
- Do you know how far ask_exit() made it? I'd like to see more prints there
> too.
It completed.
> - Looks like the signal handler is somehow inherited by the subprocess
> created for the process pool? Otherwise I'm not sure how to explain that
> the sleep(1000) returns immediately but doesn't raise an exception --
> that's what happens with sleep() when a signal handler is called, but not
> when the handler raises an exception or the signal's default action is set
> (SIG_DFL) or it is ignored (SIG_IGN).
>
Yeah, that's what I don't understand either.
It seems the cause of this weird behavior must be something within
loop.add_signal_handler().
I tried to get rid of it and rolled in my own variation and I managed to
have all workers exit cleanly:
import asyncio
import concurrent.futures
import functools
import multiprocessing
import signal
import sys
import time
loop = asyncio.get_event_loop()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)
def long_running_fun():
for x in range(5):
print("loop %s" % x)
time.sleep(1000)
@asyncio.coroutine
def infinite_loop():
while True:
fut = loop.run_in_executor(executor, long_running_fun)
yield from asyncio.wait_for(fut, None)
def register_sig_handler(signals=[signal.SIGINT, signal.SIGTERM],
handler=None):
sigmap = dict((k, v) for v, k in signal.__dict__.items()
if v.startswith('SIG'))
if handler is None:
def default_handler(sig, frame):
this = multiprocessing.current_process()
print("%r interrupted by %s signal; exiting cleanly now" % (
this, sigmap.get(sig, sig)))
sys.exit(sig)
handler = default_handler
for sig in signals:
signal.signal(sig, handler)
def main():
asyncio.async(infinite_loop())
register_sig_handler()
loop.run_forever()
main()
Apparently the difference is that the handler passed to
loop.add_signal_handler() gets called only for the main process (and it
mysteriously suppresses time.sleep() exception) while this one is called
for all the running workers + the main process:
$ python foo.py
loop 0
^C<Process(Process-1, started)> interrupted by SIGINT signal; exiting
cleanly now
<Process(Process-3, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-2, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-4, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-1, started)> interrupted by SIGTERM signal; exiting
cleanly now
<_MainProcess(MainProcess, started)> interrupted by SIGINT signal; exiting
cleanly now
$
--
Giampaolo - http://grodola.blogspot.com