New submission from Steven Barker <blckkn...@gmail.com>:

While investigating a Stack Overflow question (here: 
https://stackoverflow.com/q/46529767/1405065), I found that there may be a race 
condition in the cleanup code for concurrent.futures.ThreadPoolIterator. The 
behavior in normal situations is fairly benign (the executor may run a few more 
jobs than you'd expect, but exits cleanly), but in rare situations it might 
lose track of a running thread and allow the interpreter to shut down while the 
thread is still trying to do work.

Here's some example that concisely demonstrates the situation where the issue 
can come up (it doesn't actually cause the race to go the wrong way on my 
system, but sets up the possibility for it to occur):


from threading import current_thread
from concurrent.futures import ThreadPoolExecutor
from time import sleep

pool = ThreadPoolExecutor(4)

def f(_):
    print(current_thread().name)
    future = pool.submit(sleep, 0.1)
    future.add_done_callback(f)
    
f(None)


The callback from completion of one job schedules another job, indefinitely.

When run in an interactive session, this code will print thread names forever. 
You should get "MainThread" once, followed by a bunch of 
"ThreadPoolExecutor-X_Y" names (often the same name will be repeated most of 
the time, due to the GIL I think, but in theory the work could rotate between 
threads). The main thread will return to the interactive REPL right away, so 
you can type in other stuff while the executor's worker threads are printing 
stuff the background (I suggest running pool.shutdown() to make them stop). 
This is fine.

But if the example code is run as a script, you'll usually get "MainThread", 
followed by exactly four repeats of "ThreadPoolExecutor-0_0" (or fewer in the 
unlikely case that the race condition strikes you). That's the number of 
threads the ThreadPoolExecutor was limited to, but note that the thread name 
that gets printed will usually end with 0 every time (you don't get one output 
from each worker thread, just the same number of outputs as there are threads, 
all from the first thread). Why you get that number of outputs (instead of zero 
or one or an infinite number) was one part of the Stack Overflow question.

The answer turned out to be that after the main thread has queued up the first 
job in the ThreadPoolExecutor, it runs off the end of the script's code, so it 
starts shutting down the interpreter. The cleanup function _python_exit (in 
Lib/concurrent/futures/thread.py) gets run since it is registered with atexit, 
and it tries to signal the worker threads to shut down cleanly. However, the 
shutdown logic interacts oddly with an executor that's still spinning up its 
threads. It will only signal and join the threads that existed when it started 
running, not any new threads.

As it turns out, usually the newly spawned threads will shut themselves down 
immediately after they spawn, but as a side effect, the first worker thread 
carries on longer than expected, doing one additional job for each new thread 
that gets spawned and exiting itself only when the executor has a full set. 
This is why there are four outputs from the worker thread instead of some other 
number. But the exact behavior is dependent on the thread scheduling order, so 
there is a race condition.

You can demonstrate a change in behavior from different timing by putting a 
call to time.sleep at the very top of the _worker() function (delaying how 
quickly the new threads can get to the work queue). You should see the program 
behavior change to only print "ThreadPoolExecutor-0_0" once before exiting.

Lets go through the steps of the process:

1. The main thread runs f() and schedules a job (which adds a work item to the 
executor's work queue). The first worker thread is spawned by the executor to 
run the job, since it doesn't have any threads yet. The main thread also sets a 
callback on the future to run f() again.

2. The main thread exits f() and reaches the end of the script, so it begins 
the interpreter shutdown process, including calling atexit functions. One of 
those is _python_exit, which adds a reference to None to the executor's work 
queue. Note that the None is added *after* the job item from step 1 (since 
they're both done by the same main thread). It then calls join() on the worker 
thread spawned in step 1, waiting for it to exit. It won't try to join any 
other threads that spawn later, since they don't exist yet.

3. The first worker thread spawned by the executor in step 1 begins running and 
pops an item off the work queue. The first item is a real job, so it runs it. 
(The first parts of this step may be running in parallel with step 2, but 
completing job will take much longer than step 2, so the rest of this step runs 
by itself after step 2 has finished.) Eventually the job ends and the callback 
function on the Future is called, which schedules another job (putting a job 
item in the queue after the None), and spawning a second worker thread (since 
the executor doesn't have enough yet).

4. The race condition occurs here. Usually the new worker thread (created in 
step 3) starts running next and pops the None off of the work queue (leaving 
the a real work item still in the queue). It checks and finds the the global 
_shutdown flag is set, so it adds another None to the job queue (at the end 
again) and quits.

5. The other half of the race is here. The first worker finishes the callback 
and is done with the first job, so it goes back to the work queue for another 
one. It usually finds the real job it scheduled in step 3 (since the None was 
already taken by the other thread in step 4). From then on, the code repeats 
the behavior from step 3 (doing the job, calling the callback, queuing a new 
job, and spawning a new thread since the executor still isn't at full capacity).

6. Steps 4 and 5 will repeat a few more times until the executor has as many 
threads as it wants. If no new thread is spawned at the end of step 5, the 
first worker thread finally gets to pop a None from the queue instead of a job, 
so it will shut down. This lets the the main thread, which has been blocked 
since step 2, finally finish it's join() and shut down the rest of the 
interpreter.

The race condition occurs between steps 4 and 5. If the first worker thread 
(that usually runs step 5) reaches the work queue before the other worker 
thread (which usually runs step 4), the first worker thread will get the None 
instead of the new thread. Thus the first worker will shut down earlier that in 
the usual scenario described above. The second thread (or third or fourth, 
depending on which cycle of steps 4 and 5 we're on) could get the job off the 
queue and start working on it while the first thread is exiting. That would be 
fine, but when the first thread exits, it will unblock the main thread and the 
interpreter will continue shutting down. This could cut the ground out from 
under the code running in the remaining worker thread.

One solution would be to avoid creating any new threads when the interpreter is 
in the process of shutting down. We can check for the global _shutdown variable 
inside ThreadPoolExecutor._adjust_thread_count, though I think it needs a lock 
to avoid another race condition (where _shutdown and the contents of 
_thread_queues are accessed out of sync, e.g. a race between steps 2 and 3 
above that could only occur if the jobs were exceedingly fast to complete).

There are other options though. We could make it an error to queue up new work 
to an executor when the interpreter is in the process of shutting down (just 
change the "if self._shutdown" test in ThreadPoolExecutor.submit to also look 
at the global _shutdown variable, and the worker thread will crash with a 
RuntimeError just before things shut down). Or we could change the behavior of 
the workers to shut down as soon as possible rather than finishing all queued 
work items (remove the continue from the inner block in the loop in _worker so 
that it always checks the global _shutdown after completing each job). Or 
another option might be to change the cleanup logic in _python_exit to double 
check for additional threads to join() after it finishes waiting on its first 
set.

----------
components: Library (Lib)
messages: 304364
nosy: Steven.Barker
priority: normal
severity: normal
status: open
title: Race condition in ThreadPoolExecutor when scheduling new jobs while the 
interpreter shuts down
type: behavior
versions: Python 3.6, Python 3.7, Python 3.8

_______________________________________
Python tracker <rep...@bugs.python.org>
<https://bugs.python.org/issue31783>
_______________________________________
_______________________________________________
Python-bugs-list mailing list
Unsubscribe: 
https://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com

Reply via email to