Repository: incubator-airflow Updated Branches: refs/heads/master 8c42d03c4 -> 7e762d42d
[AIRFLOW-1235] Fix webserver's odd behaviour In some cases, the gunicorn master shuts down but the webserver monitor process doesn't. This PR add timeout functionality to shutdown all related processes in such cases. Dear Airflow maintainers, Please accept this PR. I understand that it will not be reviewed until I have checked off all the steps below! ### JIRA - [x] My PR addresses the following [Airflow JIRA] (https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "[AIRFLOW-XXX] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-1235 ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: In some cases, the gunicorn master shuts down but the webserver monitor process doesn't. This PR add timeout functionality to shutdown all related processes in such cases. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: tests.core:CliTests.test_cli_webserver_shutdown_wh en_gunicorn_master_is_killed ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git- commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" Closes #2330 from sekikn/AIRFLOW-1235 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/7e762d42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/7e762d42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/7e762d42 Branch: refs/heads/master Commit: 7e762d42df50d84e4740e15c24594c50aaab53a2 Parents: 8c42d03 Author: Kengo Seki <sek...@apache.org> Authored: Thu Mar 22 11:50:27 2018 -0700 Committer: Arthur Wiedmer <awied...@netflix.com> Committed: Thu Mar 22 11:50:27 2018 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 129 +++++++++++++--------- airflow/config_templates/default_airflow.cfg | 3 + airflow/exceptions.py | 4 + tests/core.py | 10 ++ 4 files changed, 91 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 449d8ca..1801cc7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -46,7 +46,7 @@ import airflow from airflow import api from airflow import jobs, settings from airflow import configuration as conf -from airflow.exceptions import AirflowException +from airflow.exceptions import AirflowException, AirflowWebServerTimeout from airflow.executors import GetDefaultExecutor from airflow.models import (DagModel, DagBag, TaskInstance, DagPickle, DagRun, Variable, DagStat, @@ -592,7 +592,12 @@ def get_num_ready_workers_running(gunicorn_master_proc): return len(ready_workers) -def restart_workers(gunicorn_master_proc, num_workers_expected): +def get_num_workers_running(gunicorn_master_proc): + workers = psutil.Process(gunicorn_master_proc.pid).children() + return len(workers) + + +def restart_workers(gunicorn_master_proc, num_workers_expected, master_timeout): """ Runs forever, monitoring the child processes of @gunicorn_master_proc and restarting workers occasionally. @@ -618,17 +623,18 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): gracefully and that the oldest worker is terminated. """ - def wait_until_true(fn): + def wait_until_true(fn, timeout=0): """ Sleeps until fn is true """ + t = time.time() while not fn(): + if 0 < timeout and timeout <= time.time() - t: + raise AirflowWebServerTimeout( + "No response from gunicorn master within {0} seconds" + .format(timeout)) time.sleep(0.1) - def get_num_workers_running(gunicorn_master_proc): - workers = psutil.Process(gunicorn_master_proc.pid).children() - return len(workers) - def start_refresh(gunicorn_master_proc): batch_size = conf.getint('webserver', 'worker_refresh_batch_size') log.debug('%s doing a refresh of %s workers', state, batch_size) @@ -640,56 +646,68 @@ def restart_workers(gunicorn_master_proc, num_workers_expected): gunicorn_master_proc.send_signal(signal.SIGTTIN) excess += 1 wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc)) - - wait_until_true(lambda: num_workers_expected == - get_num_workers_running(gunicorn_master_proc)) - - while True: - num_workers_running = get_num_workers_running(gunicorn_master_proc) - num_ready_workers_running = get_num_ready_workers_running(gunicorn_master_proc) - - state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running) - - # Whenever some workers are not ready, wait until all workers are ready - if num_ready_workers_running < num_workers_running: - log.debug('%s some workers are starting up, waiting...', state) - sys.stdout.flush() - time.sleep(1) - - # Kill a worker gracefully by asking gunicorn to reduce number of workers - elif num_workers_running > num_workers_expected: - excess = num_workers_running - num_workers_expected - log.debug('%s killing %s workers', state, excess) - - for _ in range(excess): - gunicorn_master_proc.send_signal(signal.SIGTTOU) - excess -= 1 - wait_until_true(lambda: num_workers_expected + excess == - get_num_workers_running(gunicorn_master_proc)) - - # Start a new worker by asking gunicorn to increase number of workers - elif num_workers_running == num_workers_expected: - refresh_interval = conf.getint('webserver', 'worker_refresh_interval') - log.debug( - '%s sleeping for %ss starting doing a refresh...', - state, refresh_interval - ) - time.sleep(refresh_interval) - start_refresh(gunicorn_master_proc) + get_num_workers_running(gunicorn_master_proc), + master_timeout) - else: - # num_ready_workers_running == num_workers_running < num_workers_expected - log.error(( - "%s some workers seem to have died and gunicorn" - "did not restart them as expected" - ), state) - time.sleep(10) - if len( - psutil.Process(gunicorn_master_proc.pid).children() - ) < num_workers_expected: + try: + wait_until_true(lambda: num_workers_expected == + get_num_workers_running(gunicorn_master_proc), + master_timeout) + while True: + num_workers_running = get_num_workers_running(gunicorn_master_proc) + num_ready_workers_running = \ + get_num_ready_workers_running(gunicorn_master_proc) + + state = '[{0} / {1}]'.format(num_ready_workers_running, num_workers_running) + + # Whenever some workers are not ready, wait until all workers are ready + if num_ready_workers_running < num_workers_running: + log.debug('%s some workers are starting up, waiting...', state) + sys.stdout.flush() + time.sleep(1) + + # Kill a worker gracefully by asking gunicorn to reduce number of workers + elif num_workers_running > num_workers_expected: + excess = num_workers_running - num_workers_expected + log.debug('%s killing %s workers', state, excess) + + for _ in range(excess): + gunicorn_master_proc.send_signal(signal.SIGTTOU) + excess -= 1 + wait_until_true(lambda: num_workers_expected + excess == + get_num_workers_running(gunicorn_master_proc), + master_timeout) + + # Start a new worker by asking gunicorn to increase number of workers + elif num_workers_running == num_workers_expected: + refresh_interval = conf.getint('webserver', 'worker_refresh_interval') + log.debug( + '%s sleeping for %ss starting doing a refresh...', + state, refresh_interval + ) + time.sleep(refresh_interval) start_refresh(gunicorn_master_proc) + else: + # num_ready_workers_running == num_workers_running < num_workers_expected + log.error(( + "%s some workers seem to have died and gunicorn" + "did not restart them as expected" + ), state) + time.sleep(10) + if len( + psutil.Process(gunicorn_master_proc.pid).children() + ) < num_workers_expected: + start_refresh(gunicorn_master_proc) + except (AirflowWebServerTimeout, OSError) as err: + log.error(err) + log.error("Shutting down webserver") + try: + gunicorn_master_proc.terminate() + gunicorn_master_proc.wait() + finally: + sys.exit(1) + def webserver(args): print(settings.HEADER) @@ -769,7 +787,8 @@ def webserver(args): def monitor_gunicorn(gunicorn_master_proc): # These run forever until SIG{INT, TERM, KILL, ...} signal is sent if conf.getint('webserver', 'worker_refresh_interval') > 0: - restart_workers(gunicorn_master_proc, num_workers) + master_timeout = conf.getint('webserver', 'web_server_master_timeout') + restart_workers(gunicorn_master_proc, num_workers, master_timeout) else: while True: time.sleep(1) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/config_templates/default_airflow.cfg ---------------------------------------------------------------------- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index acfd15e..8f82208 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -198,6 +198,9 @@ web_server_port = 8080 web_server_ssl_cert = web_server_ssl_key = +# Number of seconds the webserver waits before killing gunicorn master that doesn't respond +web_server_master_timeout = 120 + # Number of seconds the gunicorn webserver waits before timing out on a worker web_server_worker_timeout = 120 http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/airflow/exceptions.py ---------------------------------------------------------------------- diff --git a/airflow/exceptions.py b/airflow/exceptions.py index c1b728c..f4527b2 100644 --- a/airflow/exceptions.py +++ b/airflow/exceptions.py @@ -32,6 +32,10 @@ class AirflowTaskTimeout(AirflowException): pass +class AirflowWebServerTimeout(AirflowException): + pass + + class AirflowSkipException(AirflowException): pass http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/7e762d42/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index ce5fb7a..2ac7b66 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1475,6 +1475,16 @@ class CliTests(unittest.TestCase): self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) + # Patch for causing webserver timeout + @mock.patch("airflow.bin.cli.get_num_workers_running", return_value=0) + def test_cli_webserver_shutdown_when_gunicorn_master_is_killed(self, _): + # Shorten timeout so that this test doesn't take too long time + configuration.conf.set("webserver", "web_server_master_timeout", "10") + args = self.parser.parse_args(['webserver']) + with self.assertRaises(SystemExit) as e: + cli.webserver(args) + self.assertEqual(e.exception.code, 1) + class SecurityTests(unittest.TestCase): def setUp(self):