On Wed, Mar 26, 2014 at 6:25 PM, Guido van Rossum <[email protected]> wrote:

> Sounds like you're breaking ground and combining things that haven't been
> tested together before. Some comments:
>
> - Setting the default executor to a ProcessPoolExecutor feels like a bad
> idea -- it would mean that every time you connect to a remote host the
> address lookup is done in that executor (that's mainly why there is a
> default executor at all -- the API to set it mostly exists so you can
> configure the number of threads). Instead, I would just pass an explicit
> executor to run_in_executor().
>

OK.

- Do you know how far ask_exit() made it? I'd like to see more prints there
> too.


It completed.


> - Looks like the signal handler is somehow inherited by the subprocess
> created for the process pool? Otherwise I'm not sure how to explain that
> the sleep(1000) returns immediately but doesn't raise an exception --
> that's what happens with sleep() when a signal handler is called, but not
> when the handler raises an exception or the signal's default action is set
> (SIG_DFL) or it is ignored (SIG_IGN).
>

Yeah, that's what I don't understand either.
It seems the cause of this weird behavior must be something within
loop.add_signal_handler().
I tried to get rid of it and rolled in my own variation and I managed to
have all workers exit cleanly:

import asyncio
import concurrent.futures
import functools
import multiprocessing
import signal
import sys
import time

loop = asyncio.get_event_loop()
executor = concurrent.futures.ProcessPoolExecutor(max_workers=4)

def long_running_fun():
    for x in range(5):
        print("loop %s" % x)
        time.sleep(1000)

@asyncio.coroutine
def infinite_loop():
    while True:
        fut = loop.run_in_executor(executor, long_running_fun)
        yield from asyncio.wait_for(fut, None)

def register_sig_handler(signals=[signal.SIGINT, signal.SIGTERM],
                         handler=None):
    sigmap = dict((k, v) for v, k in signal.__dict__.items()
                  if v.startswith('SIG'))
    if handler is None:
        def default_handler(sig, frame):
            this = multiprocessing.current_process()
            print("%r interrupted by %s signal; exiting cleanly now" % (
                  this, sigmap.get(sig, sig)))
            sys.exit(sig)
        handler = default_handler
    for sig in signals:
        signal.signal(sig, handler)

def main():
    asyncio.async(infinite_loop())
    register_sig_handler()
    loop.run_forever()

main()


Apparently the difference is that the handler passed to
loop.add_signal_handler() gets called only for the main process (and it
mysteriously suppresses time.sleep() exception) while this one is called
for all the running workers + the main process:

$ python foo.py
loop 0
^C<Process(Process-1, started)> interrupted by SIGINT signal; exiting
cleanly now
<Process(Process-3, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-2, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-4, started)> interrupted by SIGINT signal; exiting cleanly
now
<Process(Process-1, started)> interrupted by SIGTERM signal; exiting
cleanly now
<_MainProcess(MainProcess, started)> interrupted by SIGINT signal; exiting
cleanly now
$

-- 
Giampaolo - http://grodola.blogspot.com

Reply via email to