Repository: incubator-airflow Updated Branches: refs/heads/master 835bcb623 -> fdb7e9491
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/utils/dag_processing.py ---------------------------------------------------------------------- diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py new file mode 100644 index 0000000..fc4ca1b --- /dev/null +++ b/airflow/utils/dag_processing.py @@ -0,0 +1,612 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import logging +import os +import re +import time + +from abc import ABCMeta, abstractmethod +from collections import defaultdict +from datetime import datetime + +from airflow.exceptions import AirflowException +from airflow.dag.base_dag import BaseDag, BaseDagBag +from airflow.utils.logging import LoggingMixin + + +class SimpleDag(BaseDag): + """ + A simplified representation of a DAG that contains all attributes + required for instantiating and scheduling its associated tasks. + """ + + def __init__(self, + dag_id, + task_ids, + full_filepath, + concurrency, + is_paused, + pickle_id): + """ + :param dag_id: ID of the DAG + :type dag_id: unicode + :param task_ids: task IDs associated with the DAG + :type task_ids: list[unicode] + :param full_filepath: path to the file containing the DAG e.g. + /a/b/c.py + :type full_filepath: unicode + :param concurrency: No more than these many tasks from the + dag should run concurrently + :type concurrency: int + :param is_paused: Whether or not this DAG is paused. Tasks from paused + DAGs are not scheduled + :type is_paused: bool + :param pickle_id: ID associated with the pickled version of this DAG. + :type pickle_id: unicode + """ + self._dag_id = dag_id + self._task_ids = task_ids + self._full_filepath = full_filepath + self._is_paused = is_paused + self._concurrency = concurrency + self._pickle_id = pickle_id + + @property + def dag_id(self): + """ + :return: the DAG ID + :rtype: unicode + """ + return self._dag_id + + @property + def task_ids(self): + """ + :return: A list of task IDs that are in this DAG + :rtype: list[unicode] + """ + return self._task_ids + + @property + def full_filepath(self): + """ + :return: The absolute path to the file that contains this DAG's definition + :rtype: unicode + """ + return self._full_filepath + + @property + def concurrency(self): + """ + :return: maximum number of tasks that can run simultaneously from this DAG + :rtype: int + """ + return self._concurrency + + @property + def is_paused(self): + """ + :return: whether this DAG is paused or not + :rtype: bool + """ + return self._is_paused + + @property + def pickle_id(self): + """ + :return: The pickle ID for this DAG, if it has one. Otherwise None. + :rtype: unicode + """ + return self._pickle_id + + +class SimpleDagBag(BaseDagBag): + """ + A collection of SimpleDag objects with some convenience methods. + """ + + def __init__(self, simple_dags): + """ + Constructor. + + :param simple_dags: SimpleDag objects that should be in this + :type: list(SimpleDag) + """ + self.simple_dags = simple_dags + self.dag_id_to_simple_dag = {} + + for simple_dag in simple_dags: + self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag + + @property + def dag_ids(self): + """ + :return: IDs of all the DAGs in this + :rtype: list[unicode] + """ + return self.dag_id_to_simple_dag.keys() + + def get_dag(self, dag_id): + """ + :param dag_id: DAG ID + :type dag_id: unicode + :return: if the given DAG ID exists in the bag, return the BaseDag + corresponding to that ID. Otherwise, throw an Exception + :rtype: SimpleDag + """ + if dag_id not in self.dag_id_to_simple_dag: + raise AirflowException("Unknown DAG ID {}".format(dag_id)) + return self.dag_id_to_simple_dag[dag_id] + + +def list_py_file_paths(directory, safe_mode=True): + """ + Traverse a directory and look for Python files. + + :param directory: the directory to traverse + :type directory: unicode + :param safe_mode: whether to use a heuristic to determine whether a file + contains Airflow DAG definitions + :return: a list of paths to Python files in the specified directory + :rtype: list[unicode] + """ + file_paths = [] + if directory is None: + return [] + elif os.path.isfile(directory): + return [directory] + elif os.path.isdir(directory): + patterns = [] + for root, dirs, files in os.walk(directory, followlinks=True): + ignore_file = [f for f in files if f == '.airflowignore'] + if ignore_file: + f = open(os.path.join(root, ignore_file[0]), 'r') + patterns += [p for p in f.read().split('\n') if p] + f.close() + for f in files: + try: + file_path = os.path.join(root, f) + if not os.path.isfile(file_path): + continue + mod_name, file_ext = os.path.splitext( + os.path.split(file_path)[-1]) + if file_ext != '.py': + continue + if any([re.findall(p, file_path) for p in patterns]): + continue + + # Heuristic that guesses whether a Python file contains an + # Airflow DAG definition. + might_contain_dag = True + if safe_mode: + with open(file_path, 'rb') as f: + content = f.read() + might_contain_dag = all([s in content + for s in (b'DAG', b'airflow')]) + + if not might_contain_dag: + continue + + file_paths.append(file_path) + except Exception: + logging.exception("Error while examining %s", f) + return file_paths + + +class AbstractDagFileProcessor(object): + """ + Processes a DAG file. See SchedulerJob.process_file() for more details. + """ + __metaclass__ = ABCMeta + + @abstractmethod + def start(self): + """ + Launch the process to process the file + """ + raise NotImplementedError() + + @abstractmethod + def terminate(self, sigkill=False): + """ + Terminate (and then kill) the process launched to process the file + """ + raise NotImplementedError() + + @property + @abstractmethod + def pid(self): + """ + :return: the PID of the process launched to process the given file + """ + raise NotImplementedError() + + @property + @abstractmethod + def exit_code(self): + """ + After the process is finished, this can be called to get the return code + :return: the exit code of the process + :rtype: int + """ + raise NotImplementedError() + + @property + @abstractmethod + def done(self): + """ + Check if the process launched to process this file is done. + :return: whether the process is finished running + :rtype: bool + """ + raise NotImplementedError() + + @property + @abstractmethod + def result(self): + """ + :return: result of running SchedulerJob.process_file() + :rtype: list[SimpleDag] + """ + raise NotImplementedError() + + @property + @abstractmethod + def start_time(self): + """ + :return: When this started to process the file + :rtype: datetime + """ + raise NotImplementedError() + + @property + @abstractmethod + def log_file(self): + """ + :return: the log file associated with this processor + :rtype: unicode + """ + raise NotImplementedError() + + @property + @abstractmethod + def file_path(self): + """ + :return: the path to the file that this is processing + :rtype: unicode + """ + raise NotImplementedError() + + +class DagFileProcessorManager(LoggingMixin): + """ + Given a list of DAG definition files, this kicks off several processors + in parallel to process them. The parallelism is limited and as the + processors finish, more are launched. The files are processed over and + over again, but no more often than the specified interval. + + :type _file_path_queue: list[unicode] + :type _processors: dict[unicode, AbstractDagFileProcessor] + :type _last_runtime: dict[unicode, float] + :type _last_finish_time: dict[unicode, datetime] + """ + def __init__(self, + dag_directory, + file_paths, + parallelism, + process_file_interval, + child_process_log_directory, + max_runs, + processor_factory): + """ + :param dag_directory: Directory where DAG definitions are kept. All + files in file_paths should be under this directory + :type dag_directory: unicode + :param file_paths: list of file paths that contain DAG definitions + :type file_paths: list[unicode] + :param parallelism: maximum number of simultaneous process to run at once + :type parallelism: int + :param process_file_interval: process a file at most once every this + many seconds + :type process_file_interval: float + :param max_runs: The number of times to parse and schedule each file. -1 + for unlimited. + :type max_runs: int + :param child_process_log_directory: Store logs for child processes in + this directory + :type child_process_log_directory: unicode + :type process_file_interval: float + :param processor_factory: function that creates processors for DAG + definition files. Arguments are (dag_definition_path, log_file_path) + :type processor_factory: (unicode, unicode) -> (AbstractDagFileProcessor) + + """ + self._file_paths = file_paths + self._file_path_queue = [] + self._parallelism = parallelism + self._dag_directory = dag_directory + self._max_runs = max_runs + self._process_file_interval = process_file_interval + self._child_process_log_directory = child_process_log_directory + self._processor_factory = processor_factory + # Map from file path to the processor + self._processors = {} + # Map from file path to the last runtime + self._last_runtime = {} + # Map from file path to the last finish time + self._last_finish_time = {} + # Map from file path to the number of runs + self._run_count = defaultdict(int) + + @property + def file_paths(self): + return self._file_paths + + def get_pid(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the PID of the process processing the given file or None if + the specified file is not being processed + :rtype: int + """ + if file_path in self._processors: + return self._processors[file_path].pid + return None + + def get_all_pids(self): + """ + :return: a list of the PIDs for the processors that are running + :rtype: List[int] + """ + return [x.pid for x in self._processors.values()] + + def get_runtime(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the current runtime (in seconds) of the process that's + processing the specified file or None if the file is not currently + being processed + """ + if file_path in self._processors: + return (datetime.now() - self._processors[file_path].start_time)\ + .total_seconds() + return None + + def get_last_runtime(self, file_path): + """ + :param file_path: the path to the file that was processed + :type file_path: unicode + :return: the runtime (in seconds) of the process of the last run, or + None if the file was never processed. + :rtype: float + """ + return self._last_runtime.get(file_path) + + def get_last_finish_time(self, file_path): + """ + :param file_path: the path to the file that was processed + :type file_path: unicode + :return: the finish time of the process of the last run, or None if the + file was never processed. + :rtype: datetime + """ + return self._last_finish_time.get(file_path) + + def get_start_time(self, file_path): + """ + :param file_path: the path to the file that's being processed + :type file_path: unicode + :return: the start time of the process that's processing the + specified file or None if the file is not currently being processed + :rtype: datetime + """ + if file_path in self._processors: + return self._processors[file_path].start_time + return None + + def set_file_paths(self, new_file_paths): + """ + Update this with a new set of paths to DAG definition files. + + :param new_file_paths: list of paths to DAG definition files + :type new_file_paths: list[unicode] + :return: None + """ + self._file_paths = new_file_paths + self._file_path_queue = [x for x in self._file_path_queue + if x in new_file_paths] + # Stop processors that are working on deleted files + filtered_processors = {} + for file_path, processor in self._processors.items(): + if file_path in new_file_paths: + filtered_processors[file_path] = processor + else: + self.logger.warn("Stopping processor for {}".format(file_path)) + processor.stop() + self._processors = filtered_processors + + @staticmethod + def _split_path(file_path): + """ + Return the path elements of a path as an array. E.g. /a/b/c -> + ['a', 'b', 'c'] + + :param file_path: the file path to split + :return: a list of the elements of the file path + :rtype: list[unicode] + """ + results = [] + while True: + head, tail = os.path.split(file_path) + if len(tail) != 0: + results.append(tail) + if file_path == head: + break + file_path = head + results.reverse() + return results + + def _get_log_file_path(self, dag_file_path): + """ + Log output from processing the specified file should go to this + location. + + :param dag_file_path: file containing a DAG + :type dag_file_path: unicode + :return: the path to the corresponding log file + :rtype: unicode + """ + # General approach is to put the log file under the same relative path + # under the log directory as the DAG file in the DAG directory + now = datetime.now() + log_directory = os.path.join(self._child_process_log_directory, + now.strftime("%Y-%m-%d")) + relative_dag_file_path = os.path.relpath(dag_file_path, start=self._dag_directory) + path_elements = self._split_path(relative_dag_file_path) + + # Add a .log suffix for the log file + path_elements[-1] += ".log" + + return os.path.join(log_directory, *path_elements) + + def processing_count(self): + """ + :return: the number of files currently being processed + :rtype: int + """ + return len(self._processors) + + def wait_until_finished(self): + """ + Sleeps until all the processors are done. + """ + for file_path, processor in self._processors.items(): + while not processor.done: + time.sleep(0.1) + + def heartbeat(self): + """ + This should be periodically called by the scheduler. This method will + kick of new processes to process DAG definition files and read the + results from the finished processors. + + :return: a list of SimpleDags that were produced by processors that + have finished since the last time this was called + :rtype: list[SimpleDag] + """ + finished_processors = {} + """:type : dict[unicode, AbstractDagFileProcessor]""" + running_processors = {} + """:type : dict[unicode, AbstractDagFileProcessor]""" + + for file_path, processor in self._processors.items(): + if processor.done: + self.logger.info("Processor for {} finished".format(file_path)) + now = datetime.now() + finished_processors[file_path] = processor + self._last_runtime[file_path] = (now - + processor.start_time).total_seconds() + self._last_finish_time[file_path] = now + self._run_count[file_path] += 1 + else: + running_processors[file_path] = processor + self._processors = running_processors + + # Collect all the DAGs that were found in the processed files + simple_dags = [] + for file_path, processor in finished_processors.items(): + if processor.result is None: + self.logger.warn("Processor for {} exited with return code " + "{}. See {} for details." + .format(processor.file_path, + processor.exit_code, + processor.log_file)) + else: + for simple_dag in processor.result: + simple_dags.append(simple_dag) + + # Generate more file paths to process if we processed all the files + # already. + if len(self._file_path_queue) == 0: + # If the file path is already being processed, or if a file was + # processed recently, wait until the next batch + file_paths_in_progress = self._processors.keys() + now = datetime.now() + file_paths_recently_processed = [] + for file_path in self._file_paths: + last_finish_time = self.get_last_finish_time(file_path) + if (last_finish_time is not None and + (now - last_finish_time).total_seconds() < + self._process_file_interval): + file_paths_recently_processed.append(file_path) + + files_paths_at_run_limit = [file_path + for file_path, num_runs in self._run_count.items() + if num_runs == self._max_runs] + + files_paths_to_queue = list(set(self._file_paths) - + set(file_paths_in_progress) - + set(file_paths_recently_processed) - + set(files_paths_at_run_limit)) + + for file_path, processor in self._processors.items(): + self.logger.debug("File path {} is still being processed (started: {})" + .format(processor.file_path, + processor.start_time.isoformat())) + + self.logger.debug("Queuing the following files for processing:\n\t{}" + .format("\n\t".join(files_paths_to_queue))) + + self._file_path_queue.extend(files_paths_to_queue) + + # Start more processors if we have enough slots and files to process + while (self._parallelism - len(self._processors) > 0 and + len(self._file_path_queue) > 0): + file_path = self._file_path_queue.pop(0) + log_file_path = self._get_log_file_path(file_path) + processor = self._processor_factory(file_path, log_file_path) + + processor.start() + self.logger.info("Started a process (PID: {}) to generate " + "tasks for {} - logging into {}" + .format(processor.pid, file_path, log_file_path)) + + self._processors[file_path] = processor + + return simple_dags + + def max_runs_reached(self): + """ + :return: whether all file paths have been processed max_runs times + """ + for file_path in self._file_paths: + if self._run_count[file_path] != self._max_runs: + return False + return True + + def terminate(self): + """ + Stops all running processors + :return: None + """ + for processor in self._processors.values(): + processor.terminate() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/utils/db.py ---------------------------------------------------------------------- diff --git a/airflow/utils/db.py b/airflow/utils/db.py index fda467d..0d6f3d5 100644 --- a/airflow/utils/db.py +++ b/airflow/utils/db.py @@ -17,6 +17,7 @@ from __future__ import division from __future__ import print_function from __future__ import unicode_literals +from datetime import datetime from functools import wraps import logging import os @@ -252,7 +253,13 @@ def initdb(): session.add(KET(know_event_type='Marketing Campaign')) session.commit() - models.DagBag(sync_to_db=True) + dagbag = models.DagBag() + # Save individual DAGs in the ORM + now = datetime.utcnow() + for dag in dagbag.dags.values(): + models.DAG.sync_to_db(dag, dag.owner, now) + # Deactivate the unknown ones + models.DAG.deactivate_unknown_dags(dagbag.dags.keys()) Chart = models.Chart chart_label = "Airflow task instance by type" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/scripts/ci/check-license.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/check-license.sh b/scripts/ci/check-license.sh index a44fb2d..87063e2 100755 --- a/scripts/ci/check-license.sh +++ b/scripts/ci/check-license.sh @@ -104,4 +104,4 @@ if test ! -z "$ERRORS"; then exit 0 else echo -e "RAT checks passed." -fi \ No newline at end of file +fi http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/scripts/ci/setup_env.sh ---------------------------------------------------------------------- diff --git a/scripts/ci/setup_env.sh b/scripts/ci/setup_env.sh index a5da867..9df4d81 100755 --- a/scripts/ci/setup_env.sh +++ b/scripts/ci/setup_env.sh @@ -126,7 +126,19 @@ tar zxf ${TRAVIS_CACHE}/hive/hive.tar.gz --strip-components 1 -C ${HIVE_HOME} echo "Downloading and unpacking minicluster" curl -z ${TRAVIS_CACHE}/minicluster/minicluster.zip -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL} +ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp +if [ $? != 0 ] ; then + # Try downloading w/o cache if there's a failure + curl -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL} + ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip + unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp + if [ $? != 0 ] ; then + echo "Failed twice in downloading and unpacking minicluster!" >&2 + exit 1 + fi + exit 1 +fi echo "Path = ${PATH}" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 5b14653..85e5195 100644 --- a/setup.py +++ b/setup.py @@ -191,6 +191,7 @@ def do_setup(): 'jinja2>=2.7.3, <3.0', 'markdown>=2.5.2, <3.0', 'pandas>=0.15.2, <1.0.0', + 'psutil>=4.2.0, <5.0.0', 'pygments>=2.0.1, <3.0', 'python-daemon>=2.1.1, <2.2', 'python-dateutil>=2.3, <3', @@ -198,6 +199,7 @@ def do_setup(): 'requests>=2.5.1, <3', 'setproctitle>=1.1.8, <2', 'sqlalchemy>=0.9.8', + 'tabulate>=0.7.5, <0.8.0', 'thrift>=0.9.2, <0.10', 'zope.deprecation>=4.0, <5.0', ], http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/core.py ---------------------------------------------------------------------- diff --git a/tests/core.py b/tests/core.py index 4b1926c..38652fc 100644 --- a/tests/core.py +++ b/tests/core.py @@ -109,6 +109,12 @@ class OperatorSubclass(BaseOperator): class CoreTest(unittest.TestCase): + + # These defaults make the test faster to run + default_scheduler_args = {"file_process_interval": 0, + "processor_poll_interval": 0.5, + "num_runs": 1} + def setUp(self): configuration.test_mode() self.dagbag = models.DagBag( @@ -131,7 +137,7 @@ class CoreTest(unittest.TestCase): owner='Also fake', start_date=datetime(2015, 1, 2, 0, 0))) - dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) + dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) assert dag_run is not None assert dag_run.dag_id == dag.dag_id assert dag_run.run_id is not None @@ -156,12 +162,13 @@ class CoreTest(unittest.TestCase): task_id="faketastic", owner='Also fake', start_date=DEFAULT_DATE)) - scheduler = jobs.SchedulerJob(test_mode=True) + + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE), execution_date=DEFAULT_DATE, state=State.SUCCESS, external_trigger=True) - dag_run = scheduler.schedule_dag(dag) + dag_run = scheduler.create_dag_run(dag) assert dag_run is not None assert dag_run.dag_id == dag.dag_id assert dag_run.run_id is not None @@ -183,8 +190,8 @@ class CoreTest(unittest.TestCase): task_id="faketastic", owner='Also fake', start_date=datetime(2015, 1, 2, 0, 0))) - dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) - dag_run2 = jobs.SchedulerJob(test_mode=True).schedule_dag(dag) + dag_run = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) + dag_run2 = jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag) assert dag_run is not None assert dag_run2 is None @@ -208,11 +215,11 @@ class CoreTest(unittest.TestCase): # Create and schedule the dag runs dag_runs = [] - scheduler = jobs.SchedulerJob(test_mode=True) + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): - dag_runs.append(scheduler.schedule_dag(dag)) + dag_runs.append(scheduler.create_dag_run(dag)) - additional_dag_run = scheduler.schedule_dag(dag) + additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: assert dag_run is not None @@ -243,9 +250,9 @@ class CoreTest(unittest.TestCase): owner='Also fake')) dag_runs = [] - scheduler = jobs.SchedulerJob(test_mode=True) + scheduler = jobs.SchedulerJob(**self.default_scheduler_args) for i in range(runs): - dag_run = scheduler.schedule_dag(dag) + dag_run = scheduler.create_dag_run(dag) dag_runs.append(dag_run) # Mark the DagRun as complete @@ -254,7 +261,7 @@ class CoreTest(unittest.TestCase): session.commit() # Attempt to schedule an additional dag run (for 2016-01-01) - additional_dag_run = scheduler.schedule_dag(dag) + additional_dag_run = scheduler.create_dag_run(dag) for dag_run in dag_runs: assert dag_run is not None @@ -595,7 +602,8 @@ class CoreTest(unittest.TestCase): job.run() def test_scheduler_job(self): - job = jobs.SchedulerJob(dag_id='example_bash_operator', test_mode=True) + job = jobs.SchedulerJob(dag_id='example_bash_operator', + **self.default_scheduler_args) job.run() def test_raw_job(self): @@ -798,6 +806,7 @@ class CliTests(unittest.TestCase): self.parser = cli.CLIFactory.get_parser() self.dagbag = models.DagBag( dag_folder=DEV_NULL, include_examples=True) + # Persist DAGs def test_cli_list_dags(self): args = self.parser.parse_args(['list_dags', '--report']) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags/no_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags/no_dags.py b/tests/dags/no_dags.py new file mode 100644 index 0000000..a84b6da --- /dev/null +++ b/tests/dags/no_dags.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/a_system_exit.py ---------------------------------------------------------------------- diff --git a/tests/dags_with_system_exit/a_system_exit.py b/tests/dags_with_system_exit/a_system_exit.py new file mode 100644 index 0000000..7012550 --- /dev/null +++ b/tests/dags_with_system_exit/a_system_exit.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tests to make sure that a system exit won't cause the scheduler to fail +# Starts with 'a' to get listed first. + +import sys + +from datetime import datetime +from airflow.models import DAG + +DEFAULT_DATE = datetime(2100, 1, 1) + +dag1 = DAG( + dag_id='test_system_exit', + start_date=DEFAULT_DATE) + +sys.exit(-1) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/b_test_scheduler_dags.py ---------------------------------------------------------------------- diff --git a/tests/dags_with_system_exit/b_test_scheduler_dags.py b/tests/dags_with_system_exit/b_test_scheduler_dags.py new file mode 100644 index 0000000..ed6904f --- /dev/null +++ b/tests/dags_with_system_exit/b_test_scheduler_dags.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from datetime import datetime + +from airflow.models import DAG +from airflow.operators.dummy_operator import DummyOperator + +DEFAULT_DATE = datetime(2000, 1, 1) + +dag1 = DAG( + dag_id='exit_test_dag', + start_date=DEFAULT_DATE) + +dag1_task1 = DummyOperator( + task_id='dummy', + dag=dag1, + owner='airflow') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/c_system_exit.py ---------------------------------------------------------------------- diff --git a/tests/dags_with_system_exit/c_system_exit.py b/tests/dags_with_system_exit/c_system_exit.py new file mode 100644 index 0000000..5644304 --- /dev/null +++ b/tests/dags_with_system_exit/c_system_exit.py @@ -0,0 +1,29 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Tests to make sure that a system exit won't cause the scheduler to fail. +# Start with 'z' to get listed last. + +import sys + +from datetime import datetime +from airflow.models import DAG + +DEFAULT_DATE = datetime(2100, 1, 1) + +dag1 = DAG( + dag_id='test_system_exit', + start_date=DEFAULT_DATE) + +sys.exit(-1) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/jobs.py ---------------------------------------------------------------------- diff --git a/tests/jobs.py b/tests/jobs.py index d981945..e86b9da 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -19,9 +19,11 @@ from __future__ import unicode_literals import datetime import logging +import os import unittest from airflow import AirflowException, settings +from airflow import models from airflow.bin import cli from airflow.executors import DEFAULT_EXECUTOR from airflow.jobs import BackfillJob, SchedulerJob @@ -172,6 +174,9 @@ class BackfillJobTest(unittest.TestCase): class SchedulerJobTest(unittest.TestCase): + # These defaults make the test faster to run + default_scheduler_args = {"file_process_interval": 0, + "processor_poll_interval": 0.5} def setUp(self): self.dagbag = DagBag() @@ -193,14 +198,14 @@ class SchedulerJobTest(unittest.TestCase): if run_kwargs is None: run_kwargs = {} - scheduler = SchedulerJob() + scheduler = SchedulerJob(**self.default_scheduler_args) dag = self.dagbag.get_dag(dag_id) dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) if advance_execution_date: # run a second time to schedule a dagrun after the start_date - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) ex_date = dr.execution_date try: @@ -264,22 +269,6 @@ class SchedulerJobTest(unittest.TestCase): }, dagrun_state=State.FAILED) - def test_dagrun_deadlock(self): - """ - Deadlocked DagRun is marked a failure - - Test that a deadlocked dagrun is marked as a failure by having - depends_on_past and an execution_date after the start_date - """ - self.evaluate_dagrun( - dag_id='test_dagrun_states_deadlock', - expected_task_states={ - 'test_depends_on_past': None, - 'test_depends_on_past_2': None, - }, - dagrun_state=State.FAILED, - advance_execution_date=True) - def test_scheduler_pooled_tasks(self): """ Test that the scheduler handles queued tasks correctly @@ -299,13 +288,17 @@ class SchedulerJobTest(unittest.TestCase): dag = self.dagbag.get_dag(dag_id) dag.clear() - scheduler = SchedulerJob(dag_id, num_runs=1) + scheduler = SchedulerJob(dag_id, + num_runs=1, + executor=TestExecutor(), + **self.default_scheduler_args) scheduler.run() task_1 = dag.tasks[0] logging.info("Trying to find task {}".format(task_1)) ti = TI(task_1, dag.start_date) ti.refresh_from_db() + logging.error("TI is: {}".format(ti)) self.assertEqual(ti.state, State.QUEUED) # now we use a DIFFERENT scheduler and executor @@ -313,7 +306,8 @@ class SchedulerJobTest(unittest.TestCase): scheduler2 = SchedulerJob( dag_id, num_runs=5, - executor=DEFAULT_EXECUTOR.__class__()) + executor=DEFAULT_EXECUTOR.__class__(), + **self.default_scheduler_args) scheduler2.run() ti.refresh_from_db() @@ -364,7 +358,9 @@ class SchedulerJobTest(unittest.TestCase): dag.clear() self.assertTrue(dag.start_date > DEFAULT_DATE) - scheduler = SchedulerJob(dag_id, num_runs=2) + scheduler = SchedulerJob(dag_id, + num_runs=2, + **self.default_scheduler_args) scheduler.run() # zero tasks ran @@ -387,7 +383,9 @@ class SchedulerJobTest(unittest.TestCase): self.assertEqual( len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1) - scheduler = SchedulerJob(dag_id, num_runs=2) + scheduler = SchedulerJob(dag_id, + num_runs=2, + **self.default_scheduler_args) scheduler.run() # still one task @@ -404,7 +402,10 @@ class SchedulerJobTest(unittest.TestCase): dag = self.dagbag.get_dag(dag_id) dag.clear() - scheduler = SchedulerJob(dag_ids=dag_ids, num_runs=2) + scheduler = SchedulerJob(dag_ids=dag_ids, + file_process_interval=0, + processor_poll_interval=0.5, + num_runs=2) scheduler.run() # zero tasks ran @@ -425,14 +426,15 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) - def test_scheduler_process_execute_task(self): + def test_scheduler_process_task_instances(self): """ - Test if process dag sends a task to the executor + Test if _process_task_instances puts the right task instances into the + queue. """ dag = DAG( dag_id='test_scheduler_process_execute_task', @@ -450,51 +452,16 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) - queue.put.assert_called_with( - ((dag.dag_id, dag_task1.task_id, DEFAULT_DATE), None) + queue.append.assert_called_with( + (dag.dag_id, dag_task1.task_id, DEFAULT_DATE) ) - tis = dr.get_task_instances(state=State.SCHEDULED) - self.assertIsNotNone(tis) - - def test_scheduler_process_check_heartrate(self): - """ - Test if process dag honors the heartrate - """ - dag = DAG( - dag_id='test_scheduler_process_check_heartrate', - start_date=DEFAULT_DATE) - dag_task1 = DummyOperator( - task_id='dummy', - dag=dag, - owner='airflow') - - session = settings.Session() - orm_dag = DagModel(dag_id=dag.dag_id) - orm_dag.last_scheduler_run = datetime.datetime.now() - session.merge(orm_dag) - session.commit() - session.close() - - scheduler = SchedulerJob() - scheduler.heartrate = 1000 - - dag.clear() - - dr = scheduler.schedule_dag(dag) - self.assertIsNotNone(dr) - - queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) - - queue.put.assert_not_called() - def test_scheduler_do_not_schedule_removed_task(self): dag = DAG( dag_id='test_scheduler_do_not_schedule_removed_task', @@ -513,7 +480,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) dag = DAG( @@ -521,7 +488,7 @@ class SchedulerJobTest(unittest.TestCase): start_date=DEFAULT_DATE) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -543,11 +510,11 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -568,7 +535,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) tis = dr.get_task_instances(session=session) @@ -579,7 +546,7 @@ class SchedulerJobTest(unittest.TestCase): session.close() queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) queue.put.assert_not_called() @@ -605,7 +572,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) tis = dr.get_task_instances() @@ -617,7 +584,7 @@ class SchedulerJobTest(unittest.TestCase): owner='airflow') queue = mock.Mock() - scheduler.process_dag(dag, queue=queue) + scheduler._process_task_instances(dag, queue=queue) tis = dr.get_task_instances() self.assertEquals(len(tis), 2) @@ -645,10 +612,10 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNone(dr) def test_scheduler_fail_dagrun_timeout(self): @@ -673,13 +640,13 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) session.merge(dr) session.commit() - dr2 = scheduler.schedule_dag(dag) + dr2 = scheduler.create_dag_run(dag) self.assertIsNotNone(dr2) dr.refresh_from_db(session=session) @@ -710,18 +677,18 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) # Should not be scheduled as DagRun has not timedout and max_active_runs is reached - new_dr = scheduler.schedule_dag(dag) + new_dr = scheduler.create_dag_run(dag) self.assertIsNone(new_dr) # Should be scheduled as dagrun_timeout has passed dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1) session.merge(dr) session.commit() - new_dr = scheduler.schedule_dag(dag) + new_dr = scheduler.create_dag_run(dag) self.assertIsNotNone(new_dr) def test_scheduler_auto_align(self): @@ -749,7 +716,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 4)) @@ -771,7 +738,7 @@ class SchedulerJobTest(unittest.TestCase): scheduler = SchedulerJob() dag.clear() - dr = scheduler.schedule_dag(dag) + dr = scheduler.create_dag_run(dag) self.assertIsNotNone(dr) self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 10)) @@ -808,7 +775,13 @@ class SchedulerJobTest(unittest.TestCase): @mock.patch('airflow.models.DagBag', return_value=dagbag) @mock.patch('airflow.models.DagBag.collect_dags') def do_schedule(function, function2): - scheduler = SchedulerJob(num_runs=1, executor=executor,) + # Use a empty file since the above mock will return the + # expected DAGs. Also specify only a single file so that it doesn't + # try to schedule the above DAG repeatedly. + scheduler = SchedulerJob(num_runs=1, + executor=executor, + subdir=os.path.join(models.DAGS_FOLDER, + "no_dags.py")) scheduler.heartrate = 0 scheduler.run() @@ -819,3 +792,52 @@ class SchedulerJobTest(unittest.TestCase): do_schedule() self.assertEquals(2, len(executor.queued_tasks)) + def test_scheduler_run_duration(self): + """ + Verifies that the scheduler run duration limit is followed. + """ + dag_id = 'test_start_date_scheduling' + dag = self.dagbag.get_dag(dag_id) + dag.clear() + self.assertTrue(dag.start_date > DEFAULT_DATE) + + expected_run_duration = 5 + start_time = datetime.datetime.now() + scheduler = SchedulerJob(dag_id, + run_duration=expected_run_duration, + **self.default_scheduler_args) + scheduler.run() + end_time = datetime.datetime.now() + + run_duration = (end_time - start_time).total_seconds() + logging.info("Test ran in %.2fs, expected %.2fs", + run_duration, + expected_run_duration) + assert run_duration - expected_run_duration < 5.0 + + def test_dag_with_system_exit(self): + """ + Test to check that a DAG with a system.exit() doesn't break the scheduler. + """ + + dag_id = 'exit_test_dag' + dag_ids = [dag_id] + dag_directory = os.path.join(models.DAGS_FOLDER, + "..", + "dags_with_system_exit") + dag_file = os.path.join(dag_directory, + 'b_test_scheduler_dags.py') + + dagbag = DagBag(dag_folder=dag_file) + for dag_id in dag_ids: + dag = dagbag.get_dag(dag_id) + dag.clear() + + scheduler = SchedulerJob(dag_ids=dag_ids, + subdir= dag_directory, + num_runs=1, + **self.default_scheduler_args) + scheduler.run() + session = settings.Session() + self.assertEqual( + len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)