Fix corner case with joining processes/queues (#1473) If a process places items in a queue and the process is joined before the queue is emptied, it can lead to a deadlock under some circumstances. Closes AIRFLOW-61.
See for example: https://docs.python.org/3/library/multiprocessing.html#all-start-methods ("Joining processes that use queues") http://stackoverflow.com/questions/31665328/python-3-multiprocessing-queue-deadlock-when-calling-join-before-the-queue-is-em http://stackoverflow.com/questions/31708646/process-join-and-queue-dont-work-with-large-numbers http://stackoverflow.com/questions/19071529/python-multiprocessing-125-list-never-finishes Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0e5fb905 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0e5fb905 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0e5fb905 Branch: refs/heads/airbnb_rb1.7.1_3 Commit: 0e5fb905c2c6bb02b8a3aabe36af3189d407ed23 Parents: 0ed36a1 Author: Jeremiah Lowin <jlo...@users.noreply.github.com> Authored: Fri May 6 12:11:16 2016 -0400 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon May 9 16:12:55 2016 -0700 ---------------------------------------------------------------------- airflow/jobs.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0e5fb905/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 64d4eb4..ae38298 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -751,16 +751,18 @@ class SchedulerJob(BaseJob): self.logger.info("Starting {} scheduler jobs".format(len(jobs))) for j in jobs: j.start() + + while any(j.is_alive() for j in jobs): + while not tis_q.empty(): + ti_key, pickle_id = tis_q.get() + dag = dagbag.dags[ti_key[0]] + task = dag.get_task(ti_key[1]) + ti = TI(task, ti_key[2]) + self.executor.queue_task_instance(ti, pickle_id=pickle_id) + for j in jobs: j.join() - while not tis_q.empty(): - ti_key, pickle_id = tis_q.get() - dag = dagbag.dags[ti_key[0]] - task = dag.get_task(ti_key[1]) - ti = TI(task, ti_key[2]) - self.executor.queue_task_instance(ti, pickle_id=pickle_id) - self.logger.info("Done queuing tasks, calling the executor's " "heartbeat") duration_sec = (datetime.now() - loop_start_dttm).total_seconds()