[AIRFLOW-160] Parse DAG files through child processes Instead of parsing the DAG definition files in the same process as the scheduler, this change parses the files in a child process. This helps to isolate the scheduler from bad user code.
Closes #1636 from plypaul/plypaul_schedule_by_file_rebase_master Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fdb7e949 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fdb7e949 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fdb7e949 Branch: refs/heads/master Commit: fdb7e949140b735b8554ae5b22ad752e86f6ebaf Parents: 835bcb6 Author: Paul Yang <paul.y...@airbnb.com> Authored: Sun Jul 31 12:49:30 2016 -0700 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Sun Jul 31 12:49:39 2016 -0700 ---------------------------------------------------------------------- airflow/bin/cli.py | 33 +- airflow/configuration.py | 5 + airflow/dag/__init__.py | 14 + airflow/dag/base_dag.py | 96 ++ airflow/executors/base_executor.py | 2 +- airflow/jobs.py | 1224 ++++++++++++++---- airflow/models.py | 273 +++- airflow/settings.py | 36 +- airflow/utils/dag_processing.py | 612 +++++++++ airflow/utils/db.py | 9 +- scripts/ci/check-license.sh | 2 +- scripts/ci/setup_env.sh | 12 + setup.py | 2 + tests/core.py | 33 +- tests/dags/no_dags.py | 14 + tests/dags_with_system_exit/a_system_exit.py | 29 + .../b_test_scheduler_dags.py | 29 + tests/dags_with_system_exit/c_system_exit.py | 29 + tests/jobs.py | 192 +-- 19 files changed, 2239 insertions(+), 407 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 2d02f1e..0d35661 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -31,6 +31,8 @@ import daemon from daemon.pidfile import TimeoutPIDLockFile import signal import sys +import threading +import traceback import airflow from airflow import jobs, settings @@ -45,10 +47,28 @@ from airflow.exceptions import AirflowException DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -def sigint_handler(signal, frame): +def sigint_handler(sig, frame): sys.exit(0) +def sigquit_handler(sig, frame): + """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT + e.g. kill -s QUIT <PID> or CTRL+\ + """ + print("Dumping stack traces for all threads in PID {}".format(os.getpid())) + id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()]) + code = [] + for thread_id, stack in sys._current_frames().items(): + code.append("\n# Thread: {}({})" + .format(id_to_name.get(thread_id, ""), thread_id)) + for filename, line_number, name, line in traceback.extract_stack(stack): + code.append('File: "{}", line {}, in {}' + .format((filename, line_number, name))) + if line: + code.append(" {}".format(line.strip())) + print("\n".join(code)) + + def setup_logging(filename): root = logging.getLogger() handler = logging.FileHandler(filename) @@ -538,6 +558,7 @@ def scheduler(args): job = jobs.SchedulerJob( dag_id=args.dag_id, subdir=process_subdir(args.subdir), + run_duration=args.run_duration, num_runs=args.num_runs, do_pickle=args.do_pickle) @@ -561,6 +582,7 @@ def scheduler(args): else: signal.signal(signal.SIGINT, sigint_handler) signal.signal(signal.SIGTERM, sigint_handler) + signal.signal(signal.SIGQUIT, sigquit_handler) job.run() @@ -918,6 +940,10 @@ class CLIFactory(object): default=False), # scheduler 'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"), + 'run_duration': Arg( + ("-r", "--run-duration"), + default=None, type=int, + help="Set number of seconds to execute before exiting"), 'num_runs': Arg( ("-n", "--num_runs"), default=None, type=int, @@ -1056,8 +1082,9 @@ class CLIFactory(object): }, { 'func': scheduler, 'help': "Start a scheduler instance", - 'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle', - 'pid', 'daemon', 'stdout', 'stderr', 'log_file'), + 'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs', + 'do_pickle', 'pid', 'daemon', 'stdout', 'stderr', + 'log_file'), }, { 'func': worker, 'help': "Start a Celery worker node", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/configuration.py ---------------------------------------------------------------------- diff --git a/airflow/configuration.py b/airflow/configuration.py index 5a380ae..65b482c 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -141,6 +141,11 @@ defaults = { 'scheduler_heartbeat_sec': 60, 'authenticate': False, 'max_threads': 2, + 'run_duration': 30 * 60, + 'dag_dir_list_interval': 5 * 60, + 'print_stats_interval': 30, + 'min_file_process_interval': 180, + 'child_process_log_directory': '/tmp/airflow/scheduler/logs' }, 'celery': { 'broker_url': 'sqla+mysql://airflow:airflow@localhost:3306/airflow', http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/dag/__init__.py ---------------------------------------------------------------------- diff --git a/airflow/dag/__init__.py b/airflow/dag/__init__.py new file mode 100644 index 0000000..a84b6da --- /dev/null +++ b/airflow/dag/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/dag/base_dag.py ---------------------------------------------------------------------- diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py new file mode 100644 index 0000000..83ecfb9 --- /dev/null +++ b/airflow/dag/base_dag.py @@ -0,0 +1,96 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +from abc import ABCMeta, abstractmethod, abstractproperty + + +class BaseDag(object): + """ + Base DAG object that both the SimpleDag and DAG inherit. + """ + __metaclass__ = ABCMeta + + @abstractproperty + def dag_id(self): + """ + :return: the DAG ID + :rtype: unicode + """ + raise NotImplementedError() + + @abstractproperty + def task_ids(self): + """ + :return: A list of task IDs that are in this DAG + :rtype: List[unicode] + """ + raise NotImplementedError() + + @abstractproperty + def full_filepath(self): + """ + :return: The absolute path to the file that contains this DAG's definition + :rtype: unicode + """ + raise NotImplementedError() + + @abstractmethod + def concurrency(self): + """ + :return: maximum number of tasks that can run simultaneously from this DAG + :rtype: int + """ + raise NotImplementedError() + + @abstractmethod + def is_paused(self): + """ + :return: whether this DAG is paused or not + :rtype: bool + """ + raise NotImplementedError() + + @abstractmethod + def pickle_id(self): + """ + :return: The pickle ID for this DAG, if it has one. Otherwise None. + :rtype: unicode + """ + raise NotImplementedError + + +class BaseDagBag(object): + """ + Base object that both the SimpleDagBag and DagBag inherit. + """ + @abstractproperty + def dag_ids(self): + """ + :return: a list of DAG IDs in this bag + :rtype: List[unicode] + """ + raise NotImplementedError() + + @abstractmethod + def get_dag(self, dag_id): + """ + :return: whether the task exists in this bag + :rtype: BaseDag + """ + raise NotImplementedError() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/executors/base_executor.py ---------------------------------------------------------------------- diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index ca63443..d6a06d8 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -102,7 +102,7 @@ class BaseExecutor(LoggingMixin): # TODO(jlowin) without a way to know what Job ran which tasks, # there is a danger that another Job started running a task # that was also queued to this executor. This is the last chance - # to check if that hapened. The most probable way is that a + # to check if that happened. The most probable way is that a # Scheduler tried to run a task that was originally queued by a # Backfill. This fix reduces the probability of a collision but # does NOT eliminate it. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 77f34ee..b391415 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -19,30 +19,45 @@ from __future__ import unicode_literals from past.builtins import basestring from collections import defaultdict, Counter -from datetime import datetime, timedelta + +from datetime import datetime + import getpass import logging import socket import subprocess import multiprocessing -import math +import os +import signal +import sys +import threading +import time from time import sleep +import psutil from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_ from sqlalchemy.orm.session import make_transient +from tabulate import tabulate from airflow import executors, models, settings from airflow import configuration as conf from airflow.exceptions import AirflowException +from airflow.models import DagRun +from airflow.settings import Stats from airflow.utils.state import State from airflow.utils.db import provide_session, pessimistic_connection_handling +from airflow.utils.dag_processing import (AbstractDagFileProcessor, + DagFileProcessorManager, + SimpleDag, + SimpleDagBag, + list_py_file_paths) from airflow.utils.email import send_email from airflow.utils.logging import LoggingMixin from airflow.utils import asciiart -from airflow.settings import Stats -DagRun = models.DagRun + Base = models.Base +DagRun = models.DagRun ID_LEN = models.ID_LEN Stats = settings.Stats @@ -186,27 +201,247 @@ class BaseJob(Base, LoggingMixin): raise NotImplementedError("This method needs to be overridden") +class DagFileProcessor(AbstractDagFileProcessor): + """Helps call SchedulerJob.process_file() in a separate process.""" + + # Counter that increments everytime an instance of this class is created + class_creation_counter = 0 + + def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file): + """ + :param file_path: a Python file containing Airflow DAG definitions + :type file_path: unicode + :param pickle_dags: whether to serialize the DAG objects to the DB + :type pickle_dags: bool + :param dag_id_whitelist: If specified, only look at these DAG ID's + :type dag_id_whitelist: list[unicode] + :param log_file: the path to the file where log lines should be output + :type log_file: unicode + """ + self._file_path = file_path + self._log_file = log_file + # Queue that's used to pass results from the child process. + self._result_queue = multiprocessing.Queue() + # The process that was launched to process the given . + self._process = None + self._dag_id_white_list = dag_id_white_list + self._pickle_dags = pickle_dags + # The result of Scheduler.process_file(file_path). + self._result = None + # Whether the process is done running. + self._done = False + # When the process started. + self._start_time = None + # This ID is use to uniquely name the process / thread that's launched + # by this processor instance + self._instance_id = DagFileProcessor.class_creation_counter + DagFileProcessor.class_creation_counter += 1 + + @property + def file_path(self): + return self._file_path + + @property + def log_file(self): + return self._log_file + + @staticmethod + def _launch_process(result_queue, + file_path, + pickle_dags, + dag_id_white_list, + thread_name, + log_file): + """ + Launch a process to process the given file. + + :param result_queue: the queue to use for passing back the result + :type result_queue: multiprocessing.Queue + :param file_path: the file to process + :type file_path: unicode + :param pickle_dags: whether to pickle the DAGs found in the file and + save them to the DB + :type pickle_dags: bool + :param dag_id_white_list: if specified, only examine DAG ID's that are + in this list + :type dag_id_white_list: list[unicode] + :param thread_name: the name to use for the process that is launched + :type thread_name: unicode + :param log_file: the logging output for the process should be directed + to this file + :type log_file: unicode + :return: the process that was launched + :rtype: multiprocessing.Process + """ + def helper(): + # This helper runs in the newly created process + + # Re-direct stdout and stderr to a separate log file. Otherwise, + # the main log becomes too hard to read. No buffering to enable + # responsive file tailing + parent_dir, _ = os.path.split(log_file) + + # Create the parent directory for the log file if necessary. + if not os.path.isdir(parent_dir): + os.makedirs(parent_dir) + + f = open(log_file, "a") + original_stdout = sys.stdout + original_stderr = sys.stderr + + sys.stdout = f + sys.stderr = f + + try: + # Re-configure logging to use the new output streams + log_format = settings.LOG_FORMAT_WITH_THREAD_NAME + settings.configure_logging(log_format=log_format) + # Re-configure the ORM engine as there are issues with multiple processes + settings.configure_orm() + + # Change the thread name to differentiate log lines. This is + # really a separate process, but changing the name of the + # process doesn't work, so changing the thread name instead. + threading.current_thread().name = thread_name + start_time = time.time() + + logging.info("Started process (PID=%s) to work on %s", + os.getpid(), + file_path) + scheduler_job = SchedulerJob(dag_ids=dag_id_white_list) + result = scheduler_job.process_file(file_path, + pickle_dags) + result_queue.put(result) + end_time = time.time() + logging.info("Processing %s took %.3f seconds", + file_path, + end_time - start_time) + except: + # Log exceptions through the logging framework. + logging.exception("Got an exception! Propagating...") + raise + finally: + sys.stdout = original_stdout + sys.stderr = original_stderr + f.close() + + p = multiprocessing.Process(target=helper, + args=(), + name="{}-Process".format(thread_name)) + p.start() + return p + + def start(self): + """ + Launch the process and start processing the DAG. + """ + self._process = DagFileProcessor._launch_process( + self._result_queue, + self.file_path, + self._pickle_dags, + self._dag_id_white_list, + "DagFileProcessor{}".format(self._instance_id), + self.log_file) + self._start_time = datetime.now() + + def terminate(self, sigkill=False): + """ + Terminate (and then kill) the process launched to process the file. + :param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work. + :type sigkill: bool + """ + if self._process is None: + raise AirflowException("Tried to call stop before starting!") + # The queue will likely get corrupted, so remove the reference + self._result_queue = None + self._process.terminate() + # Arbitrarily wait 5s for the process to die + self._process.join(5) + if sigkill and self._process.is_alive(): + logging.warn("Killing PID %s", self._process.pid) + os.kill(self._process.pid, signal.SIGKILL) + + @property + def pid(self): + """ + :return: the PID of the process launched to process the given file + :rtype: int + """ + if self._process is None: + raise AirflowException("Tried to get PID before starting!") + return self._process.pid + + @property + def exit_code(self): + """ + After the process is finished, this can be called to get the return code + :return: the exit code of the process + :rtype: int + """ + if not self._done: + raise AirflowException("Tried to call retcode before process was finished!") + return self._process.exitcode + + @property + def done(self): + """ + Check if the process launched to process this file is done. + :return: whether the process is finished running + :rtype: bool + """ + if self._process is None: + raise AirflowException("Tried to see if it's done before starting!") + + if self._done: + return True + + if not self._result_queue.empty(): + self._result = self._result_queue.get_nowait() + self._done = True + logging.debug("Waiting for %s", self._process) + self._process.join() + return True + + # Potential error case when process dies + if not self._process.is_alive(): + self._done = True + # Get the object from the queue or else join() can hang. + if not self._result_queue.empty(): + self._result = self._result_queue.get_nowait() + logging.debug("Waiting for %s", self._process) + self._process.join() + return True + + return False + + @property + def result(self): + """ + :return: result of running SchedulerJob.process_file() + :rtype: SimpleDag + """ + if not self.done: + raise AirflowException("Tried to get the result before it's done!") + return self._result + + @property + def start_time(self): + """ + :return: when this started to process the file + :rtype: datetime + """ + if self._start_time is None: + raise AirflowException("Tried to get start time before it started!") + return self._start_time + + class SchedulerJob(BaseJob): """ - This SchedulerJob runs indefinitely and constantly schedules the jobs + This SchedulerJob runs for a specific time interval and schedules the jobs that are ready to run. It figures out the latest runs for each - task and see if the dependencies for the next schedules are met. - If so it triggers the task instance. It does this for each task - in each DAG and repeats. - - :param dag_id: to run the scheduler for a single specific DAG - :type dag_id: string - :param subdir: to search for DAG under a certain folder only - :type subdir: string - :param test_mode: used for unit testing this class only, runs a single - schedule run - :type test_mode: bool - :param refresh_dags_every: force refresh the DAG definition every N - runs, as specified here - :type refresh_dags_every: int - :param do_pickle: to pickle the DAG object and send over to workers - for non-local executors - :type do_pickle: bool + task and sees if the dependencies for the next schedules are met. + If so, it creates appropriate TaskInstances and sends run commands to the + executor. It does this for each task in each DAG and repeats. """ __mapper_args__ = { @@ -217,13 +452,32 @@ class SchedulerJob(BaseJob): self, dag_id=None, dag_ids=None, - subdir=None, - test_mode=False, - refresh_dags_every=10, - num_runs=None, + subdir=models.DAGS_FOLDER, + num_runs=-1, + file_process_interval=conf.getint('scheduler', + 'min_file_process_interval'), + processor_poll_interval=1.0, + run_duration=None, do_pickle=False, *args, **kwargs): - + """ + :param dag_id: if specified, only schedule tasks with this DAG ID + :type dag_id: unicode + :param dag_ids: if specified, only schedule tasks with these DAG IDs + :type dag_ids: list[unicode] + :param subdir: directory containing Python files with Airflow DAG + definitions, or a specific path to a file + :type subdir: unicode + :param num_runs: The number of times to try to schedule each DAG file. + -1 for unlimited within the run_duration. + :param processor_poll_interval: The number of seconds to wait between + polls of running processors + :param run_duration: how long to run (in seconds) before exiting + :type run_duration: int + :param do_pickle: once a DAG object is obtained by executing the Python + file, whether to serialize the DAG object to the DB + :type do_pickle: bool + """ # for BaseJob compatibility self.dag_id = dag_id self.dag_ids = [dag_id] if dag_id else [] @@ -232,21 +486,38 @@ class SchedulerJob(BaseJob): self.subdir = subdir - if test_mode: - self.num_runs = 1 - else: - self.num_runs = num_runs + self.num_runs = num_runs + self.run_duration = run_duration + self._processor_poll_interval = processor_poll_interval - self.refresh_dags_every = refresh_dags_every self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC') self.max_threads = min(conf.getint('scheduler', 'max_threads'), multiprocessing.cpu_count()) + self.using_sqlite = False if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): if self.max_threads > 1: self.logger.error("Cannot use more than 1 thread when using sqlite. Setting max_threads to 1") self.max_threads = 1 + self.using_sqlite = True + + # How often to scan the DAGs directory for new files. Default to 5 minutes. + self.dag_dir_list_interval = conf.getint('scheduler', + 'dag_dir_list_interval') + # How often to print out DAG file processing stats to the log. Default to + # 30 seconds. + self.print_stats_interval = conf.getint('scheduler', + 'print_stats_interval') + # Parse and schedule each file no faster than this interval. Default + # to 3 minutes. + self.file_process_interval = file_process_interval + # Directory where log files for the processes that scheduled the DAGs reside + self.child_process_log_directory = conf.get('scheduler', + 'child_process_log_directory') + if run_duration is None: + self.run_duration = conf.getint('scheduler', + 'run_duration') @provide_session def manage_slas(self, dag, session=None): @@ -296,7 +567,8 @@ class SchedulerJob(BaseJob): slas = ( session .query(SlaMiss) - .filter(SlaMiss.email_sent.is_(False) or SlaMiss.notification_sent.is_(False)) + .filter(or_(SlaMiss.email_sent == False, + SlaMiss.notification_sent == False)) .filter(SlaMiss.dag_id == dag.dag_id) .all() ) @@ -370,23 +642,45 @@ class SchedulerJob(BaseJob): session.commit() session.close() - def import_errors(self, dagbag): - session = settings.Session() - session.query(models.ImportError).delete() + @staticmethod + def record_import_errors(session, dagbag): + """ + For the DAGs in the given DagBag, record any associated import errors. + These are usually displayed through the Airflow UI so that users know + that there are issues parsing DAGs. + + :param session: session for ORM operations + :type session: sqlalchemy.orm.session.Session + :param dagbag: DagBag containing DAGs with import errors + :type dagbag: models.Dagbag + """ for filename, stacktrace in list(dagbag.import_errors.items()): + session.query(models.ImportError).filter( + models.ImportError.filename == filename + ).delete() session.add(models.ImportError( filename=filename, stacktrace=stacktrace)) session.commit() + @staticmethod + def clear_import_errors(session): + """ + Remove all the known import errors from the DB. + + :param session: session for ORM operations + :type session: sqlalchemy.orm.session.Session + """ + session.query(models.ImportError).delete() + session.commit() + @provide_session - def schedule_dag(self, dag, session=None): + def create_dag_run(self, dag, session=None): """ This method checks whether a new DagRun needs to be created for a DAG based on scheduling interval Returns DagRun if one is scheduled. Otherwise returns None. """ if dag.schedule_interval: - DagRun = models.DagRun active_runs = DagRun.find( dag_id=dag.dag_id, state=State.RUNNING, @@ -475,52 +769,24 @@ class SchedulerJob(BaseJob): ) return next_run - def process_dag(self, dag, queue): + def _process_task_instances(self, dag, queue): """ - This method schedules a single DAG by looking at the latest - run for each task and attempting to schedule the following run. - - As multiple schedulers may be running for redundancy, this - function takes a lock on the DAG and timestamps the last run - in ``last_scheduler_run``. + This method schedules the tasks for a single DAG by looking at the + active DAG runs and adding task instances that should run to the + queue. """ DagModel = models.DagModel session = settings.Session() - # picklin' - pickle_id = None - if self.do_pickle and self.executor.__class__ not in ( - executors.LocalExecutor, executors.SequentialExecutor): - pickle_id = dag.pickle(session).id - - # obtain db lock - db_dag = session.query(DagModel).filter_by( - dag_id=dag.dag_id - ).with_for_update().one() - - last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1) - secs_since_last = (datetime.now() - last_scheduler_run).total_seconds() - - if secs_since_last < self.heartrate: - # release db lock - session.commit() - session.close() - return None - - # Release the db lock - # the assumption here is that process_dag will take less - # time than self.heartrate otherwise we might unlock too - # quickly and this should moved below, but that would increase - # the time the record is locked and is blocking for other calls. - db_dag.last_scheduler_run = datetime.now() - session.commit() - # update the state of the previously active dag runs dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, session=session) active_dag_runs = [] for run in dag_runs: + self.logger.info("Examining DAG run {}".format(run)) # do not consider runs that are executed in the future if run.execution_date > datetime.now(): + self.logging.error("Execution date is in future: {}" + .format(run.execution_date)) continue # todo: run.dag is transient but needs to be set @@ -533,6 +799,7 @@ class SchedulerJob(BaseJob): active_dag_runs.append(run) for run in active_dag_runs: + self.logger.debug("Examining active DAG run {}".format(run)) # this needs a fresh session sometimes tis get detached tis = run.get_task_instances(state=(State.NONE, State.UP_FOR_RETRY)) @@ -552,41 +819,122 @@ class SchedulerJob(BaseJob): if ti.is_runnable(flag_upstream_failed=True): self.logger.debug('Queuing task: {}'.format(ti)) - queue.put((ti.key, pickle_id)) + queue.append(ti.key) session.close() @provide_session - def prioritize_queued(self, session, executor, dagbag): - # Prioritizing queued task instances + def _change_state_for_tis_without_dagrun(self, + simple_dag_bag, + old_states, + new_state, + session=None): + """ + For all DAG IDs in the SimpleDagBag, look for task instances in the + old_states and set them to new_state if the corresponding DagRun + exists but is not in the running state. This normally should not + happen, but it can if the state of DagRuns are changed manually. + + :param old_states: examine TaskInstances in this state + :type old_state: list[State] + :param new_state: set TaskInstances to this state + :type new_state: State + :param simple_dag_bag: TaskInstances associated with DAGs in the + simple_dag_bag and with states in the old_state will be examined + :type simple_dag_bag: SimpleDagBag + """ - pools = {p.pool: p for p in session.query(models.Pool).all()} + task_instances_to_change = ( + session + .query(models.TaskInstance) + .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(models.TaskInstance.state.in_(old_states)) + .all() + ) + """:type: list[TaskInstance]""" + + for task_instance in task_instances_to_change: + dag_runs = DagRun.find(dag_id=task_instance.dag_id, + execution_date=task_instance.execution_date, + ) + + if len(dag_runs) == 0: + self.logger.warn("DagRun for %s %s does not exist", + task_instance.dag_id, + task_instance.execution_date) + continue + + # There should only be one DAG run. Add some logging info if this + # is not the case for later debugging. + if len(dag_runs) > 1: + self.logger.warn("Multiple DagRuns found for {} {}: {}" + .format(task_instance.dag_id, + task_instance.execution_date, + dag_runs)) + + dag_is_running = True + for dag_run in dag_runs: + if dag_run.state == State.RUNNING: + dag_is_running = True + break + + if not dag_is_running: + self.logger.warn("Setting {} to state={} as it does not have " + "a DagRun in the {} state" + .format(task_instance, + new_state, + State.RUNNING)) + task_instance.state = new_state + session.merge(task_instance) + session.commit() + + @provide_session + def _execute_task_instances(self, + simple_dag_bag, + states, + session=None): + """ + Fetches task instances from ORM in the specified states, figures + out pool limits, and sends them to the executor for execution. + + :param simple_dag_bag: TaskInstances associated with DAGs in the + simple_dag_bag will be fetched from the DB and executed + :type simple_dag_bag: SimpleDagBag + :param executor: the executor that runs task instances + :type executor: BaseExecutor + :param states: Execute TaskInstances in these states + :type states: Tuple[State] + :return: None + """ + # Get all the relevant task instances TI = models.TaskInstance - queued_tis = ( - session.query(TI) - .filter(TI.state == State.QUEUED) + task_instances_to_examine = ( + session + .query(TI) + .filter(TI.dag_id.in_(simple_dag_bag.dag_ids)) + .filter(TI.state.in_(states)) .all() ) - self.logger.info( - "Prioritizing {} queued jobs".format(len(queued_tis))) - session.expunge_all() - d = defaultdict(list) - for ti in queued_tis: - if ti.dag_id not in dagbag.dags: - self.logger.info( - "DAG no longer in dagbag, deleting {}".format(ti)) - session.delete(ti) - session.commit() - elif not dagbag.dags[ti.dag_id].has_task(ti.task_id): - self.logger.info( - "Task no longer exists, deleting {}".format(ti)) - session.delete(ti) - session.commit() - else: - d[ti.pool].append(ti) - dag_blacklist = set(dagbag.paused_dags()) - for pool, tis in list(d.items()): + # Put one task instance on each line + if len(task_instances_to_examine) == 0: + self.logger.info("No tasks to send to the executor") + return + + task_instance_str = "\n\t".join( + ["{}".format(x) for x in task_instances_to_examine]) + self.logger.info("Tasks up for execution:\n\t{}".format(task_instance_str)) + + # Get the pool settings + pools = {p.pool: p for p in session.query(models.Pool).all()} + + pool_to_task_instances = defaultdict(list) + for task_instance in task_instances_to_examine: + pool_to_task_instances[task_instance.pool].append(task_instance) + + # Go through each pool, and queue up a task for execution if there are + # any open slots in the pool. + for pool, task_instances in pool_to_task_instances.items(): if not pool: # Arbitrary: # If queued outside of a pool, trigger no more than @@ -595,78 +943,218 @@ class SchedulerJob(BaseJob): else: open_slots = pools[pool].open_slots(session=session) - queue_size = len(tis) - self.logger.info("Pool {pool} has {open_slots} slots, {queue_size} " + num_queued = len(task_instances) + self.logger.info("Figuring out tasks to run in Pool(name={pool}) " + "with {open_slots} open slots and {num_queued} " "task instances in queue".format(**locals())) + if open_slots <= 0: continue - tis = sorted( - tis, key=lambda ti: (-ti.priority_weight, ti.start_date)) - for ti in tis: - if open_slots <= 0: - continue - task = None - try: - task = dagbag.dags[ti.dag_id].get_task(ti.task_id) - except: - self.logger.error("Queued task {} seems gone".format(ti)) - session.delete(ti) - session.commit() - continue - if not task: - continue + priority_sorted_task_instances = sorted( + task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)) - ti.task = task + # DAG IDs with running tasks that equal the concurrency limit of the dag + dag_id_to_running_task_count = {} - # picklin' - dag = dagbag.dags[ti.dag_id] - pickle_id = None - if self.do_pickle and self.executor.__class__ not in ( - executors.LocalExecutor, - executors.SequentialExecutor): - self.logger.info("Pickling DAG {}".format(dag)) - pickle_id = dag.pickle(session).id + for task_instance in priority_sorted_task_instances: + if open_slots <= 0: + self.logger.info("No more slots free") + # Can't schedule any more since there are no more open slots. + break - if dag.dag_id in dag_blacklist: - continue - if dag.concurrency_reached: - dag_blacklist.add(dag.dag_id) + if simple_dag_bag.get_dag(task_instance.dag_id).is_paused: + self.logger.info("Not executing queued {} since {} is paused" + .format(task_instance, task_instance.dag_id)) continue - if ti.are_dependencies_met(): - executor.queue_task_instance(ti, pickle_id=pickle_id) - open_slots -= 1 - else: - session.delete(ti) - session.commit() + + # Check to make sure that the task concurrency of the DAG hasn't been + # reached. + dag_id = task_instance.dag_id + + if dag_id not in dag_id_to_running_task_count: + dag_id_to_running_task_count[dag_id] = \ + DagRun.get_running_tasks( + session, + dag_id, + simple_dag_bag.get_dag(dag_id).task_ids) + + current_task_concurrency = dag_id_to_running_task_count[dag_id] + task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency + self.logger.info("DAG {} has {}/{} running tasks" + .format(dag_id, + current_task_concurrency, + task_concurrency_limit)) + if current_task_concurrency > task_concurrency_limit: + self.logger.info("Not executing {} since the number " + "of tasks running from DAG {} is >= to the " + "DAG's task concurrency limit of {}" + .format(task_instance, + dag_id, + task_concurrency_limit)) continue - ti.task = task + command = TI.generate_command( + task_instance.dag_id, + task_instance.task_id, + task_instance.execution_date, + local=True, + mark_success=False, + force=False, + ignore_dependencies=False, + ignore_depends_on_past=False, + pool=task_instance.pool, + file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath, + pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id) + + priority = task_instance.priority_weight + queue = task_instance.queue + self.logger.info("Sending to executor {} with priority {} and queue {}" + .format(task_instance.key, priority, queue)) + + # Set the state to queued + self.logger.info("Setting state of {} to {}".format( + task_instance.key, State.QUEUED)) + task_instance.state = State.QUEUED + task_instance.queued_dttm = (datetime.now() + if not task_instance.queued_dttm + else task_instance.queued_dttm) + session.merge(task_instance) session.commit() - def _split(self, items, size): - """ - This function splits a list of items into chunks of int size. - _split([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]] - """ - size = max(1, size) - return [items[i:i + size] for i in range(0, len(items), size)] + # These attributes will be lost after the object expires, so save them. + task_id_ = task_instance.task_id + dag_id_ = task_instance.dag_id + execution_date_ = task_instance.execution_date + make_transient(task_instance) + task_instance.task_id = task_id_ + task_instance.dag_id = dag_id_ + task_instance.execution_date = execution_date_ + + self.executor.queue_command( + task_instance, + command, + priority=priority, + queue=queue) - def _do_dags(self, dagbag, dags, tis_out): + open_slots -= 1 + + def _process_dags(self, dagbag, dags, tis_out): """ - Iterates over the dags and schedules and processes them + Iterates over the dags and processes them. Processing includes: + + 1. Create appropriate DagRun(s) in the DB. + 2. Create appropriate TaskInstance(s) in the DB. + 3. Send emails for tasks that have missed SLAs. + + :param dagbag: a collection of DAGs to process + :type dagbag: models.DagBag + :param dags: the DAGs from the DagBag to process + :type dags: DAG + :param tis_out: A queue to add generated TaskInstance objects + :type tis_out: multiprocessing.Queue[TaskInstance] + :return: None """ for dag in dags: - self.logger.debug("Scheduling {}".format(dag.dag_id)) dag = dagbag.get_dag(dag.dag_id) + if dag.is_paused: + self.logger.info("Not processing DAG {} since it's paused" + .format(dag.dag_id)) + continue + if not dag: + self.logger.error("DAG ID {} was not found in the DagBag") continue - try: - self.schedule_dag(dag) - self.process_dag(dag, tis_out) - self.manage_slas(dag) - except Exception as e: - self.logger.exception(e) + + self.logger.info("Processing {}".format(dag.dag_id)) + + dag_run = self.create_dag_run(dag) + if dag_run: + self.logger.info("Created {}".format(dag_run)) + self._process_task_instances(dag, tis_out) + self.manage_slas(dag) + + def _process_executor_events(self): + """ + Respond to executor events. + + :param executor: the executor that's running the task instances + :type executor: BaseExecutor + :return: None + """ + for key, executor_state in list(self.executor.get_event_buffer().items()): + dag_id, task_id, execution_date = key + self.logger.info("Executor reports {}.{} execution_date={} as {}" + .format(dag_id, + task_id, + execution_date, + executor_state)) + + def _log_file_processing_stats(self, + known_file_paths, + processor_manager): + """ + Print out stats about how files are getting processed. + + :param known_file_paths: a list of file paths that may contain Airflow + DAG definitions + :type known_file_paths: list[unicode] + :param processor_manager: manager for the file processors + :type stats: DagFileProcessorManager + :return: None + """ + + # File Path: Path to the file containing the DAG definition + # PID: PID associated with the process that's processing the file. May + # be empty. + # Runtime: If the process is currently running, how long it's been + # running for in seconds. + # Last Runtime: If the process ran before, how long did it take to + # finish in seconds + # Last Run: When the file finished processing in the previous run. + headers = ["File Path", + "PID", + "Runtime", + "Last Runtime", + "Last Run"] + + rows = [] + for file_path in known_file_paths: + last_runtime = processor_manager.get_last_runtime(file_path) + processor_pid = processor_manager.get_pid(file_path) + processor_start_time = processor_manager.get_start_time(file_path) + runtime = ((datetime.now() - processor_start_time).total_seconds() + if processor_start_time else None) + last_run = processor_manager.get_last_finish_time(file_path) + + rows.append((file_path, + processor_pid, + runtime, + last_runtime, + last_run)) + + # Sort by longest last runtime. (Can't sort None values in python3) + rows = sorted(rows, key=lambda x: x[3] or 0.0) + + formatted_rows = [] + for file_path, pid, runtime, last_runtime, last_run in rows: + formatted_rows.append((file_path, + pid, + "{:.2f}s".format(runtime) + if runtime else None, + "{:.2f}s".format(last_runtime) + if last_runtime else None, + last_run.strftime("%Y-%m-%dT%H:%M:%S") + if last_run else None)) + log_str = ("\n" + + "=" * 80 + + "\n" + + "DAG File Processing Stats\n\n" + + tabulate(formatted_rows, headers=headers) + + "\n" + + "=" * 80) + + self.logger.info(log_str) @provide_session def _reset_state_for_orphaned_tasks(self, dag_run, session=None): @@ -680,27 +1168,110 @@ class SchedulerJob(BaseJob): # also consider running as the state might not have changed in the db yet running = self.executor.running - tis = dag_run.get_task_instances(state=State.SCHEDULED, session=session) + tis = list() + tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, session=session)) + tis.extend(dag_run.get_task_instances(state=State.QUEUED, session=session)) + for ti in tis: if ti.key not in queued_tis and ti.key not in running: - ti.state = State.NONE self.logger.debug("Rescheduling orphaned task {}".format(ti)) - + ti.state = State.NONE session.commit() def _execute(self): - session = settings.Session() - TI = models.TaskInstance - + self.logger.info("Starting the scheduler") pessimistic_connection_handling() logging.basicConfig(level=logging.DEBUG) - self.logger.info("Starting the scheduler") - dagbag = models.DagBag(self.subdir, sync_to_db=True) - executor = self.executor = dagbag.executor - executor.start() + # DAGs can be pickled for easier remote execution by some executors + pickle_dags = False + if self.do_pickle and self.executor.__class__ not in \ + (executors.LocalExecutor, executors.SequentialExecutor): + pickle_dags = True + + # Use multiple processes to parse and generate tasks for the + # DAGs in parallel. By processing them in separate processes, + # we can get parallelism and isolation from potentially harmful + # user code. + self.logger.info("Processing files using up to {} processes at a time " + .format(self.max_threads)) + self.logger.info("Running execute loop for {} seconds" + .format(self.run_duration)) + self.logger.info("Processing each file at most {} times" + .format(self.num_runs)) + self.logger.info("Process each file at most once every {} seconds" + .format(self.file_process_interval)) + self.logger.info("Checking for new files in {} every {} seconds" + .format(self.subdir, self.dag_dir_list_interval)) + + # Build up a list of Python files that could contain DAGs + self.logger.info("Searching for files in {}".format(self.subdir)) + known_file_paths = list_py_file_paths(self.subdir) + self.logger.info("There are {} files in {}" + .format(len(known_file_paths), self.subdir)) + + def processor_factory(file_path, log_file_path): + return DagFileProcessor(file_path, + pickle_dags, + self.dag_ids, + log_file_path) + + processor_manager = DagFileProcessorManager(self.subdir, + known_file_paths, + self.max_threads, + self.file_process_interval, + self.child_process_log_directory, + self.num_runs, + processor_factory) + try: + self._execute_helper(processor_manager) + finally: + self.logger.info("Exited execute loop") + + # Kill all child processes on exit since we don't want to leave + # them as orphaned. + pids_to_kill = processor_manager.get_all_pids() + if len(pids_to_kill) > 0: + # First try SIGTERM + this_process = psutil.Process(os.getpid()) + # Only check child processes to ensure that we don't have a case + # where we kill the wrong process because a child process died + # but the PID got reused. + child_processes = [x for x in this_process.children(recursive=True) + if x.is_running() and x.pid in pids_to_kill] + for child in child_processes: + self.logger.info("Terminating child PID: {}".format(child.pid)) + child.terminate() + timeout = 5 + self.logger.info("Waiting up to {}s for processes to exit..." + .format(timeout)) + try: + psutil.wait_procs(child_processes, timeout) + except psutil.TimeoutExpired: + self.logger.debug("Ran out of time while waiting for " + "processes to exit") + + # Then SIGKILL + child_processes = [x for x in this_process.children(recursive=True) + if x.is_running() and x.pid in pids_to_kill] + if len(child_processes) > 0: + for child in child_processes: + self.logger.info("Killing child PID: {}".format(child.pid)) + child.kill() + child.wait() + + def _execute_helper(self, processor_manager): + """ + :param processor_manager: manager to use + :type processor_manager: DagFileProcessorManager + :return: None + """ + self.executor.start() + + session = settings.Session() + self.logger.info("Resetting state for orphaned tasks") # grab orphaned tasks and make sure to reset their state active_runs = DagRun.find( state=State.RUNNING, @@ -708,103 +1279,250 @@ class SchedulerJob(BaseJob): session=session ) for dr in active_runs: + self.logger.info("Resetting {} {}".format(dr.dag_id, + dr.execution_date)) self._reset_state_for_orphaned_tasks(dr, session=session) - self.runs = 0 - while not self.num_runs or self.num_runs > self.runs: - try: - loop_start_dttm = datetime.now() - try: - self.prioritize_queued(executor=executor, dagbag=dagbag) - except Exception as e: - self.logger.exception(e) + self.logger.info("Removing old import errors") + self.clear_import_errors(session) + session.close() - self.runs += 1 - try: - if self.runs % self.refresh_dags_every == 0: - dagbag = models.DagBag(self.subdir, sync_to_db=True) - else: - dagbag.collect_dags(only_if_updated=True) - except Exception as e: - self.logger.error("Failed at reloading the dagbag. {}".format(e)) - Stats.incr('dag_refresh_error', 1, 1) - sleep(5) - - if len(self.dag_ids) > 0: - dags = [dag for dag in dagbag.dags.values() if dag.dag_id in self.dag_ids] - else: - dags = [ - dag for dag in dagbag.dags.values() - if not dag.parent_dag] - - paused_dag_ids = dagbag.paused_dags() - dags = [x for x in dags if x.dag_id not in paused_dag_ids] - # dags = filter(lambda x: x.dag_id not in paused_dag_ids, dags) - - self.logger.debug("Total Cores: {} Max Threads: {} DAGs:{}". - format(multiprocessing.cpu_count(), - self.max_threads, - len(dags))) - dags = self._split(dags, math.ceil(len(dags) / self.max_threads)) - tis_q = multiprocessing.Queue() - jobs = [multiprocessing.Process(target=self._do_dags, - args=(dagbag, dags[i], tis_q)) - for i in range(len(dags))] - - self.logger.info("Starting {} scheduler jobs".format(len(jobs))) - for j in jobs: - j.start() - - while any(j.is_alive() for j in jobs): - while not tis_q.empty(): - ti_key, pickle_id = tis_q.get() - dag = dagbag.dags[ti_key[0]] - task = dag.get_task(ti_key[1]) - ti = TI(task, ti_key[2]) - ti.refresh_from_db(session=session, lock_for_update=True) - if ti.state == State.SCHEDULED: - session.commit() - self.logger.debug("Task {} was picked up by another scheduler" - .format(ti)) - continue - elif ti.state is State.NONE: - ti.state = State.SCHEDULED - - self.executor.queue_task_instance(ti, pickle_id=pickle_id) + execute_start_time = datetime.now() + + # Last time stats were printed + last_stat_print_time = datetime(2000, 1, 1) + # Last time that self.heartbeat() was called. + last_self_heartbeat_time = datetime.now() + # Last time that the DAG dir was traversed to look for files + last_dag_dir_refresh_time = datetime.now() + + # Use this value initially + known_file_paths = processor_manager.file_paths + + # For the execute duration, parse and schedule DAGs + while (datetime.now() - execute_start_time).total_seconds() < \ + self.run_duration: + self.logger.debug("Starting Loop...") + loop_start_time = time.time() + + # Traverse the DAG directory for Python files containing DAGs + # periodically + elapsed_time_since_refresh = (datetime.now() - + last_dag_dir_refresh_time).total_seconds() + + if elapsed_time_since_refresh > self.dag_dir_list_interval: + # Build up a list of Python files that could contain DAGs + self.logger.info("Searching for files in {}".format(self.subdir)) + known_file_paths = list_py_file_paths(self.subdir) + last_dag_dir_refresh_time = datetime.now() + self.logger.info("There are {} files in {}" + .format(len(known_file_paths), self.subdir)) + processor_manager.set_file_paths(known_file_paths) + + # Kick of new processes and collect results from finished ones + self.logger.info("Heartbeating the process manager") + simple_dags = processor_manager.heartbeat() + + if self.using_sqlite: + # For the sqlite case w/ 1 thread, wait until the processor + # is finished to avoid concurrent access to the DB. + self.logger.debug("Waiting for processors to finish since we're " + "using sqlite") + processor_manager.wait_until_finished() + + # Send tasks for execution if available + if len(simple_dags) > 0: + simple_dag_bag = SimpleDagBag(simple_dags) + + # Handle cases where a DAG run state is set (perhaps manually) to + # a non-running state. Handle task instances that belong to + # DAG runs in those states + + # If a task instance is up for retry but the corresponding DAG run + # isn't running, mark the task instance as FAILED so we don't try + # to re-run it. + self._change_state_for_tis_without_dagrun(simple_dag_bag, + [State.UP_FOR_RETRY], + State.FAILED) + # If a task instance is scheduled or queued, but the corresponding + # DAG run isn't running, set the state to NONE so we don't try to + # re-run it. + self._change_state_for_tis_without_dagrun(simple_dag_bag, + [State.QUEUED, + State.SCHEDULED], + State.NONE) + + self._execute_task_instances(simple_dag_bag, + (State.SCHEDULED, + State.UP_FOR_RETRY)) + + # Call hearbeats + self.logger.info("Heartbeating the executor") + self.executor.heartbeat() + + # Process events from the executor + self._process_executor_events() + + # Heartbeat the scheduler periodically + time_since_last_heartbeat = (datetime.now() - + last_self_heartbeat_time).total_seconds() + if time_since_last_heartbeat > self.heartrate: + self.logger.info("Heartbeating the scheduler") + self.heartbeat() + last_self_heartbeat_time = datetime.now() + + # Occasionally print out stats about how fast the files are getting processed + if ((datetime.now() - last_stat_print_time).total_seconds() > + self.print_stats_interval): + if len(known_file_paths) > 0: + self._log_file_processing_stats(known_file_paths, + processor_manager) + last_stat_print_time = datetime.now() + + loop_end_time = time.time() + self.logger.debug("Ran scheduling loop in {:.2f}s" + .format(loop_end_time - loop_start_time)) + self.logger.debug("Sleeping for {:.2f}s" + .format(self._processor_poll_interval)) + time.sleep(self._processor_poll_interval) + + # Exit early for a test mode + if processor_manager.max_runs_reached(): + self.logger.info("Exiting loop as all files have been processed " + "{} times".format(self.num_runs)) + break + + # Stop any processors + processor_manager.terminate() + + # Verify that all files were processed, and if so, deactivate DAGs that + # haven't been touched by the scheduler as they likely have been + # deleted. + all_files_processed = True + for file_path in known_file_paths: + if processor_manager.get_last_finish_time(file_path) is None: + all_files_processed = False + break + if all_files_processed: + self.logger.info("Deactivating DAGs that haven't been touched since {}" + .format(execute_start_time.isoformat())) + models.DAG.deactivate_stale_dags(execute_start_time) + + self.executor.end() + + settings.Session.remove() - session.merge(ti) - session.commit() + @provide_session + def process_file(self, file_path, pickle_dags=False, session=None): + """ + Process a Python file containing Airflow DAGs. + + This includes: + + 1. Execute the file and look for DAG objects in the namespace. + 2. Pickle the DAG and save it to the DB (if necessary). + 3. For each DAG, see what tasks should run and create appropriate task + instances in the DB. + 4. Record any errors importing the file into ORM + 5. Kill (in ORM) any task instances belonging to the DAGs that haven't + issued a heartbeat in a while. + + Returns a list of SimpleDag objects that represent the DAGs found in + the file + + :param file_path: the path to the Python file that should be executed + :type file_path: unicode + :param pickle_dags: whether serialize the DAGs found in the file and + save them to the db + :type pickle_dags: bool + :return: a list of SimpleDags made from the Dags found in the file + :rtype: list[SimpleDag] + """ + self.logger.info("Processing file {} for tasks to queue".format(file_path)) + # As DAGs are parsed from this file, they will be converted into SimpleDags + simple_dags = [] + + try: + dagbag = models.DagBag(file_path) + except Exception: + self.logger.exception("Failed at reloading the DAG file {}".format(file_path)) + Stats.incr('dag_file_refresh_error', 1, 1) + return [] + + if len(dagbag.dags) > 0: + self.logger.info("DAG(s) {} retrieved from {}" + .format(dagbag.dags.keys(), + file_path)) + else: + self.logger.warn("No viable dags retrieved from {}".format(file_path)) + return [] + + # Save individual DAGs in the ORM and update DagModel.last_scheduled_time + sync_time = datetime.now() + for dag in dagbag.dags.values(): + models.DAG.sync_to_db(dag, dag.owner, sync_time) + + paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values() + if dag.is_paused] + + # Pickle the DAGs (if necessary) and put them into a SimpleDag + for dag_id in dagbag.dags: + dag = dagbag.get_dag(dag_id) + pickle_id = None + if pickle_dags: + pickle_id = dag.pickle(session).id + + task_ids = [task.task_id for task in dag.tasks] + + # Only return DAGs that are not paused + if dag_id not in paused_dag_ids: + simple_dags.append(SimpleDag(dag.dag_id, + task_ids, + dag.full_filepath, + dag.concurrency, + dag.is_paused, + pickle_id)) + + if len(self.dag_ids) > 0: + dags = [dag for dag in dagbag.dags.values() + if dag.dag_id in self.dag_ids and + dag.dag_id not in paused_dag_ids] + else: + dags = [dag for dag in dagbag.dags.values() + if not dag.parent_dag and + dag.dag_id not in paused_dag_ids] + + # Not using multiprocessing.Queue() since it's no longer a separate + # process and due to some unusual behavior. (empty() incorrectly + # returns true?) + ti_keys_to_schedule = [] + + self._process_dags(dagbag, dags, ti_keys_to_schedule) + + for ti_key in ti_keys_to_schedule: + dag = dagbag.dags[ti_key[0]] + task = dag.get_task(ti_key[1]) + ti = models.TaskInstance(task, ti_key[2]) + # Task starts out in the scheduled state. All tasks in the + # scheduled state will be sent to the executor + ti.state = State.SCHEDULED + + # Also save this task instance to the DB. + self.logger.info("Creating / updating {} in ORM".format(ti)) + session.merge(ti) + session.commit() - for j in jobs: - j.join() + # Record import errors into the ORM + try: + self.record_import_errors(session, dagbag) + except Exception: + self.logger.exception("Error logging import errors!") + try: + dagbag.kill_zombies() + except Exception: + self.logger.exception("Error killing zombies!") - self.logger.info("Done queuing tasks, calling the executor's " - "heartbeat") - duration_sec = (datetime.now() - loop_start_dttm).total_seconds() - self.logger.info("Loop took: {} seconds".format(duration_sec)) - Stats.timing("scheduler_loop", duration_sec * 1000) - try: - self.import_errors(dagbag) - except Exception as e: - self.logger.exception(e) - try: - dagbag.kill_zombies() - except Exception as e: - self.logger.exception(e) - try: - # We really just want the scheduler to never ever stop. - executor.heartbeat() - self.heartbeat() - except Exception as e: - self.logger.exception(e) - self.logger.error("Tachycardia!") - except Exception as deep_e: - self.logger.exception(deep_e) - raise - finally: - settings.Session.remove() - executor.end() - session.close() + return simple_dags @provide_session def heartbeat_callback(self, session=None): http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/models.py ---------------------------------------------------------------------- diff --git a/airflow/models.py b/airflow/models.py index f589b3e..8326a92 100644 --- a/airflow/models.py +++ b/airflow/models.py @@ -59,6 +59,7 @@ from airflow import settings, utils from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor from airflow import configuration from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.dag.base_dag import BaseDag, BaseDagBag from airflow.utils.dates import cron_presets, date_range as utils_date_range from airflow.utils.db import provide_session from airflow.utils.decorators import apply_defaults @@ -129,7 +130,7 @@ def clear_task_instances(tis, session, activate_dag_runs=True): dr.start_date = datetime.now() -class DagBag(LoggingMixin): +class DagBag(BaseDagBag, LoggingMixin): """ A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and @@ -140,7 +141,7 @@ class DagBag(LoggingMixin): independent settings sets. :param dag_folder: the folder to scan to find DAGs - :type dag_folder: str + :type dag_folder: unicode :param executor: the executor to use when executing task instances in this DagBag :param include_examples: whether to include the examples that ship @@ -155,26 +156,23 @@ class DagBag(LoggingMixin): self, dag_folder=None, executor=DEFAULT_EXECUTOR, - include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'), - sync_to_db=False): + include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES')): dag_folder = dag_folder or DAGS_FOLDER self.logger.info("Filling up the DagBag from {}".format(dag_folder)) self.dag_folder = dag_folder self.dags = {} - self.sync_to_db = sync_to_db # the file's last modified timestamp when we last read it self.file_last_changed = {} self.executor = executor self.import_errors = {} + if include_examples: example_dag_folder = os.path.join( os.path.dirname(__file__), 'example_dags') self.collect_dags(example_dag_folder) self.collect_dags(dag_folder) - if sync_to_db: - self.deactivate_inactive_dags() def size(self): """ @@ -307,7 +305,7 @@ class DagBag(LoggingMixin): return found_dags @provide_session - def kill_zombies(self, session): + def kill_zombies(self, session=None): """ Fails tasks that haven't had a heartbeat in too long """ @@ -355,20 +353,6 @@ class DagBag(LoggingMixin): for task in dag.tasks: settings.policy(task) - if self.sync_to_db: - session = settings.Session() - orm_dag = session.query( - DagModel).filter(DagModel.dag_id == dag.dag_id).first() - if not orm_dag: - orm_dag = DagModel(dag_id=dag.dag_id) - orm_dag.fileloc = root_dag.full_filepath - orm_dag.is_subdag = dag.is_subdag - orm_dag.owners = root_dag.owner - orm_dag.is_active = True - session.merge(orm_dag) - session.commit() - session.close() - for subdag in dag.subdags: subdag.full_filepath = dag.full_filepath subdag.parent_dag = dag @@ -756,8 +740,77 @@ class TaskInstance(Base): the orchestrator. """ dag = self.task.dag - iso = self.execution_date.isoformat() - cmd = "airflow run {self.dag_id} {self.task_id} {iso} " + + # Keeping existing logic, but not entirely sure why this is here. + if not pickle_id and dag: + if dag.full_filepath != dag.filepath: + path = "DAGS_FOLDER/{}".format(dag.filepath) + elif dag.full_filepath: + path = dag.full_filepath + + return TaskInstance.generate_command( + self.dag_id, + self.task_id, + self.execution_date, + mark_success=mark_success, + ignore_dependencies=ignore_dependencies, + ignore_depends_on_past=ignore_depends_on_past, + force=force, + local=local, + pickle_id=pickle_id, + file_path=path, + raw=raw, + job_id=job_id, + pool=pool) + + @staticmethod + def generate_command(dag_id, + task_id, + execution_date, + mark_success=False, + ignore_dependencies=False, + ignore_depends_on_past=False, + force=False, + local=False, + pickle_id=None, + file_path=None, + raw=False, + job_id=None, + pool=None + ): + """ + Generates the shell command required to execute this task instance. + + :param dag_id: DAG ID + :type dag_id: unicode + :param task_id: Task ID + :type task_id: unicode + :param execution_date: Execution date for the task + :type execution_date: datetime + :param mark_success: Whether to mark the task as successful + :type mark_success: bool + :param ignore_dependencies: Whether to ignore the dependencies and run + anyway + :type ignore_dependencies: bool + :param ignore_depends_on_past: Whether to ignore the depends on past + setting and run anyway + :type ignore_depends_on_past: bool + :param force: Whether to force running - see TaskInstance.run() + :type force: bool + :param local: Whether to run the task locally + :type local: bool + :param pickle_id: If the DAG was serialized to the DB, the ID + associated with the pickled DAG + :type pickle_id: unicode + :param file_path: path to the file containing the DAG definition + :param raw: raw mode (needs more details) + :param job_id: job ID (needs more details) + :param pool: the Airflow pool that the task should run in + :type pool: unicode + :return: shell command that can be used to run the task instance + """ + iso = execution_date.isoformat() + cmd = "airflow run {dag_id} {task_id} {iso} " cmd += "--mark_success " if mark_success else "" cmd += "--pickle {pickle_id} " if pickle_id else "" cmd += "--job_id {job_id} " if job_id else "" @@ -767,11 +820,7 @@ class TaskInstance(Base): cmd += "--local " if local else "" cmd += "--pool {pool} " if pool else "" cmd += "--raw " if raw else "" - if not pickle_id and dag: - if dag.full_filepath != dag.filepath: - cmd += "-sd DAGS_FOLDER/{dag.filepath} " - elif dag.full_filepath: - cmd += "-sd {dag.full_filepath}" + cmd += "-sd {file_path}" return cmd.format(**locals()) @property @@ -1185,9 +1234,7 @@ class TaskInstance(Base): .first() ) if not pool: - raise ValueError( - "Task specified a pool ({}) but the pool " - "doesn't exist!".format(self.task.pool)) + return False open_slots = pool.open_slots(session=session) return open_slots <= 0 @@ -2469,7 +2516,7 @@ class DagModel(Base): @functools.total_ordering -class DAG(LoggingMixin): +class DAG(BaseDag, LoggingMixin): """ A dag (directed acyclic graph) is a collection of tasks with directional dependencies. A dag also has a schedule, a start end an end date @@ -2557,8 +2604,14 @@ class DAG(LoggingMixin): del self.default_args['params'] validate_key(dag_id) + + # Properties from BaseDag + self._dag_id = dag_id + self._full_filepath = full_filepath if full_filepath else '' + self._concurrency = concurrency + self._pickle_id = None + self.task_dict = dict() - self.dag_id = dag_id self.start_date = start_date self.end_date = end_date self.schedule_interval = schedule_interval @@ -2568,14 +2621,12 @@ class DAG(LoggingMixin): self._schedule_interval = None else: self._schedule_interval = schedule_interval - self.full_filepath = full_filepath if full_filepath else '' if isinstance(template_searchpath, six.string_types): template_searchpath = [template_searchpath] self.template_searchpath = template_searchpath self.parent_dag = None # Gets set when DAGs are loaded self.last_loaded = datetime.now() self.safe_dag_id = dag_id.replace('.', '__dot__') - self.concurrency = concurrency self.max_active_runs = max_active_runs self.dagrun_timeout = dagrun_timeout self.sla_miss_callback = sla_miss_callback @@ -2597,7 +2648,9 @@ class DAG(LoggingMixin): def __eq__(self, other): return ( type(self) == type(other) and - all(self.__dict__.get(c, None) == other.__dict__.get(c, None) + # Use getattr() instead of __dict__ as __dict__ doesn't return + # correct values for properties. + all(getattr(self, c, None) == getattr(other, c, None) for c in self._comps)) def __ne__(self, other): @@ -2672,6 +2725,38 @@ class DAG(LoggingMixin): return dttm @property + def dag_id(self): + return self._dag_id + + @dag_id.setter + def dag_id(self, value): + self._dag_id = value + + @property + def full_filepath(self): + return self._full_filepath + + @full_filepath.setter + def full_filepath(self, value): + self._full_filepath = value + + @property + def concurrency(self): + return self._concurrency + + @concurrency.setter + def concurrency(self, value): + self._concurrency = value + + @property + def pickle_id(self): + return self._pickle_id + + @pickle_id.setter + def pickle_id(self, value): + self._pickle_id = value + + @property def tasks(self): return list(self.task_dict.values()) @@ -3134,6 +3219,80 @@ class DAG(LoggingMixin): run.refresh_from_db() return run + @staticmethod + @provide_session + def sync_to_db(dag, owner, sync_time, session=None): + """ + Save attributes about this DAG to the DB. Note that this method + can be called for both DAGs and SubDAGs. A SubDag is actually a + SubDagOperator. + + :param dag: the DAG object to save to the DB + :type dag: DAG + :own + :param sync_time: The time that the DAG should be marked as sync'ed + :type sync_time: datetime + :return: None + """ + orm_dag = session.query( + DagModel).filter(DagModel.dag_id == dag.dag_id).first() + if not orm_dag: + orm_dag = DagModel(dag_id=dag.dag_id) + logging.info("Creating ORM DAG for %s", + dag.dag_id) + orm_dag.fileloc = dag.full_filepath + orm_dag.is_subdag = dag.is_subdag + orm_dag.owners = owner + orm_dag.is_active = True + orm_dag.last_scheduler_run = sync_time + session.merge(orm_dag) + session.commit() + + for subdag in dag.subdags: + DAG.sync_to_db(subdag, owner, sync_time, session=session) + + @staticmethod + @provide_session + def deactivate_unknown_dags(active_dag_ids, session=None): + """ + Given a list of known DAGs, deactivate any other DAGs that are + marked as active in the ORM + + :param active_dag_ids: list of DAG IDs that are active + :type active_dag_ids: list[unicode] + :return: None + """ + + if len(active_dag_ids) == 0: + return + for dag in session.query( + DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all(): + dag.is_active = False + session.merge(dag) + + @staticmethod + @provide_session + def deactivate_stale_dags(expiration_date, session=None): + """ + Deactivate any DAGs that were last touched by the scheduler before + the expiration date. These DAGs were likely deleted. + + :param expiration_date: set inactive DAGs that were touched before this + time + :type expiration_date: datetime + :return: None + """ + for dag in session.query( + DagModel).filter(DagModel.last_scheduler_run < expiration_date, + DagModel.is_active).all(): + logging.info("Deactivating DAG ID %s since it was last touched " + "by the scheduler at %s", + dag.dag_id, + dag.last_scheduler_run.isoformat()) + dag.is_active = False + session.merge(dag) + session.commit() + class Chart(Base): __tablename__ = "chart" @@ -3567,7 +3726,7 @@ class DagRun(Base): state=State.unfinished(), session=session ) - none_depends_on_past = all(t.task.depends_on_past for t in unfinished_tasks) + none_depends_on_past = all(not t.task.depends_on_past for t in unfinished_tasks) # small speed up if unfinished_tasks and none_depends_on_past: @@ -3642,6 +3801,44 @@ class DagRun(Base): session.commit() + @staticmethod + def get_running_tasks(session, dag_id, task_ids): + """ + Returns the number of tasks running in the given DAG. + + :param session: ORM session + :param dag_id: ID of the DAG to get the task concurrency of + :type dag_id: unicode + :param task_ids: A list of valid task IDs for the given DAG + :type task_ids: list[unicode] + :return: The number of running tasks + :rtype: int + """ + qry = session.query(func.count(TaskInstance.task_id)).filter( + TaskInstance.dag_id == dag_id, + TaskInstance.task_id.in_(task_ids), + TaskInstance.state == State.RUNNING, + ) + return qry.scalar() + + @staticmethod + def get_run(session, dag_id, execution_date): + """ + :param dag_id: DAG ID + :type dag_id: unicode + :param execution_date: execution date + :type execution_date: datetime + :return: DagRun corresponding to the given dag_id and execution date + if one exists. None otherwise. + :rtype: DagRun + """ + qry = session.query(DagRun).filter( + DagRun.dag_id == dag_id, + DagRun.external_trigger == False, + DagRun.execution_date == execution_date, + ) + return qry.first() + class Pool(Base): __tablename__ = "slot_pool" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 9f9bb14..ccd77ee 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -72,20 +72,11 @@ SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN') LOGGING_LEVEL = logging.INFO DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER')) -engine_args = {} -if 'sqlite' not in SQL_ALCHEMY_CONN: - # Engine args not supported by sqlite - engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') - engine_args['pool_recycle'] = conf.getint('core', - 'SQL_ALCHEMY_POOL_RECYCLE') - -engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) -Session = scoped_session( - sessionmaker(autocommit=False, autoflush=False, bind=engine)) - # can't move this to conf due to ConfigParser interpolation LOG_FORMAT = ( '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s') +LOG_FORMAT_WITH_THREAD_NAME = ( + '[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - %(message)s') SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s' @@ -116,10 +107,28 @@ def policy(task_instance): pass -def configure_logging(): +def configure_logging(log_format=LOG_FORMAT): logging.root.handlers = [] logging.basicConfig( - format=LOG_FORMAT, stream=sys.stdout, level=LOGGING_LEVEL) + format=log_format, stream=sys.stdout, level=LOGGING_LEVEL) + +engine = None +Session = None + + +def configure_orm(): + global engine + global Session + engine_args = {} + if 'sqlite' not in SQL_ALCHEMY_CONN: + # Engine args not supported by sqlite + engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') + engine_args['pool_recycle'] = conf.getint('core', + 'SQL_ALCHEMY_POOL_RECYCLE') + + engine = create_engine(SQL_ALCHEMY_CONN, **engine_args) + Session = scoped_session( + sessionmaker(autocommit=False, autoflush=False, bind=engine)) try: from airflow_local_settings import * @@ -128,3 +137,4 @@ except: pass configure_logging() +configure_orm()