Fix corner case with joining processes/queues (#1473)

If a process places items in a queue and the process is joined before the queue 
is emptied, it can lead to a deadlock under some circumstances. Closes 
AIRFLOW-61.

See for example: 
https://docs.python.org/3/library/multiprocessing.html#all-start-methods 
("Joining processes that use queues")
http://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em
http://stackoverflow.com/questions/31708646/process-join-and-queue-dont-work-with-large-numbers
http://stackoverflow.com/questions/19071529/python-multiprocessing-125-list-never-finishes

Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e5fb905
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e5fb905
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e5fb905

Branch: refs/heads/airbnb_rb1.7.1_3
Commit: 0e5fb905c2c6bb02b8a3aabe36af3189d407ed23
Parents: 0ed36a1
Author: Jeremiah Lowin <jlo...@users.noreply.github.com>
Authored: Fri May 6 12:11:16 2016 -0400
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon May 9 16:12:55 2016 -0700

----------------------------------------------------------------------
 airflow/jobs.py | 16 +++++++++-------
 1 file changed, 9 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e5fb905/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 64d4eb4..ae38298 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -751,16 +751,18 @@ class SchedulerJob(BaseJob):
                 self.logger.info("Starting {} scheduler 
jobs".format(len(jobs)))
                 for j in jobs:
                     j.start()
+
+                while any(j.is_alive() for j in jobs):
+                    while not tis_q.empty():
+                        ti_key, pickle_id = tis_q.get()
+                        dag = dagbag.dags[ti_key[0]]
+                        task = dag.get_task(ti_key[1])
+                        ti = TI(task, ti_key[2])
+                        self.executor.queue_task_instance(ti, 
pickle_id=pickle_id)
+
                 for j in jobs:
                     j.join()
 
-                while not tis_q.empty():
-                    ti_key, pickle_id = tis_q.get()
-                    dag = dagbag.dags[ti_key[0]]
-                    task = dag.get_task(ti_key[1])
-                    ti = TI(task, ti_key[2])
-                    self.executor.queue_task_instance(ti, pickle_id=pickle_id)
-
                 self.logger.info("Done queuing tasks, calling the executor's "
                               "heartbeat")
                 duration_sec = (datetime.now() - 
loop_start_dttm).total_seconds()

Reply via email to