Another thing to investigate might be how the executor creates the processes, and if anything happens to signals there. You might be able to print the pid in the subprocess and send it a signal and see how it is handled. Also read up on signals on fork.
On Wed, Mar 26, 2014 at 11:30 AM, Giampaolo Rodola' <[email protected]>wrote: > > On Wed, Mar 26, 2014 at 6:25 PM, Guido van Rossum <[email protected]>wrote: > >> Sounds like you're breaking ground and combining things that haven't been >> tested together before. Some comments: >> >> - Setting the default executor to a ProcessPoolExecutor feels like a bad >> idea -- it would mean that every time you connect to a remote host the >> address lookup is done in that executor (that's mainly why there is a >> default executor at all -- the API to set it mostly exists so you can >> configure the number of threads). Instead, I would just pass an explicit >> executor to run_in_executor(). >> > > OK. > > - Do you know how far ask_exit() made it? I'd like to see more prints >> there too. > > > It completed. > > >> - Looks like the signal handler is somehow inherited by the subprocess >> created for the process pool? Otherwise I'm not sure how to explain that >> the sleep(1000) returns immediately but doesn't raise an exception -- >> that's what happens with sleep() when a signal handler is called, but not >> when the handler raises an exception or the signal's default action is set >> (SIG_DFL) or it is ignored (SIG_IGN). >> > > Yeah, that's what I don't understand either. > It seems the cause of this weird behavior must be something within > loop.add_signal_handler(). > I tried to get rid of it and rolled in my own variation and I managed to > have all workers exit cleanly: > > import asyncio > import concurrent.futures > import functools > import multiprocessing > import signal > import sys > import time > > loop = asyncio.get_event_loop() > executor = concurrent.futures.ProcessPoolExecutor(max_workers=4) > > def long_running_fun(): > for x in range(5): > print("loop %s" % x) > time.sleep(1000) > > @asyncio.coroutine > def infinite_loop(): > while True: > fut = loop.run_in_executor(executor, long_running_fun) > yield from asyncio.wait_for(fut, None) > > def register_sig_handler(signals=[signal.SIGINT, signal.SIGTERM], > handler=None): > sigmap = dict((k, v) for v, k in signal.__dict__.items() > if v.startswith('SIG')) > if handler is None: > def default_handler(sig, frame): > this = multiprocessing.current_process() > print("%r interrupted by %s signal; exiting cleanly now" % ( > this, sigmap.get(sig, sig))) > sys.exit(sig) > handler = default_handler > for sig in signals: > signal.signal(sig, handler) > > def main(): > asyncio.async(infinite_loop()) > register_sig_handler() > loop.run_forever() > > main() > > > Apparently the difference is that the handler passed to > loop.add_signal_handler() gets called only for the main process (and it > mysteriously suppresses time.sleep() exception) while this one is called > for all the running workers + the main process: > > $ python foo.py > loop 0 > ^C<Process(Process-1, started)> interrupted by SIGINT signal; exiting > cleanly now > <Process(Process-3, started)> interrupted by SIGINT signal; exiting > cleanly now > <Process(Process-2, started)> interrupted by SIGINT signal; exiting > cleanly now > <Process(Process-4, started)> interrupted by SIGINT signal; exiting > cleanly now > <Process(Process-1, started)> interrupted by SIGTERM signal; exiting > cleanly now > <_MainProcess(MainProcess, started)> interrupted by SIGINT signal; exiting > cleanly now > $ > > -- > Giampaolo - http://grodola.blogspot.com > > -- --Guido van Rossum (python.org/~guido)
