Repository: incubator-airflow
Updated Branches:
  refs/heads/master 835bcb623 -> fdb7e9491


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/utils/dag_processing.py
----------------------------------------------------------------------
diff --git a/airflow/utils/dag_processing.py b/airflow/utils/dag_processing.py
new file mode 100644
index 0000000..fc4ca1b
--- /dev/null
+++ b/airflow/utils/dag_processing.py
@@ -0,0 +1,612 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+import logging
+import os
+import re
+import time
+
+from abc import ABCMeta, abstractmethod
+from collections import defaultdict
+from datetime import datetime
+
+from airflow.exceptions import AirflowException
+from airflow.dag.base_dag import BaseDag, BaseDagBag
+from airflow.utils.logging import LoggingMixin
+
+
+class SimpleDag(BaseDag):
+    """
+    A simplified representation of a DAG that contains all attributes
+    required for instantiating and scheduling its associated tasks.
+    """
+
+    def __init__(self,
+                 dag_id,
+                 task_ids,
+                 full_filepath,
+                 concurrency,
+                 is_paused,
+                 pickle_id):
+        """
+        :param dag_id: ID of the DAG
+        :type dag_id: unicode
+        :param task_ids: task IDs associated with the DAG
+        :type task_ids: list[unicode]
+        :param full_filepath: path to the file containing the DAG e.g.
+        /a/b/c.py
+        :type full_filepath: unicode
+        :param concurrency: No more than these many tasks from the
+        dag should run concurrently
+        :type concurrency: int
+        :param is_paused: Whether or not this DAG is paused. Tasks from paused
+        DAGs are not scheduled
+        :type is_paused: bool
+        :param pickle_id: ID associated with the pickled version of this DAG.
+        :type pickle_id: unicode
+        """
+        self._dag_id = dag_id
+        self._task_ids = task_ids
+        self._full_filepath = full_filepath
+        self._is_paused = is_paused
+        self._concurrency = concurrency
+        self._pickle_id = pickle_id
+
+    @property
+    def dag_id(self):
+        """
+        :return: the DAG ID
+        :rtype: unicode
+        """
+        return self._dag_id
+
+    @property
+    def task_ids(self):
+        """
+        :return: A list of task IDs that are in this DAG
+        :rtype: list[unicode]
+        """
+        return self._task_ids
+
+    @property
+    def full_filepath(self):
+        """
+        :return: The absolute path to the file that contains this DAG's 
definition
+        :rtype: unicode
+        """
+        return self._full_filepath
+
+    @property
+    def concurrency(self):
+        """
+        :return: maximum number of tasks that can run simultaneously from this 
DAG
+        :rtype: int
+        """
+        return self._concurrency
+
+    @property
+    def is_paused(self):
+        """
+        :return: whether this DAG is paused or not
+        :rtype: bool
+        """
+        return self._is_paused
+
+    @property
+    def pickle_id(self):
+        """
+        :return: The pickle ID for this DAG, if it has one. Otherwise None.
+        :rtype: unicode
+        """
+        return self._pickle_id
+
+
+class SimpleDagBag(BaseDagBag):
+    """
+    A collection of SimpleDag objects with some convenience methods.
+    """
+
+    def __init__(self, simple_dags):
+        """
+        Constructor.
+
+        :param simple_dags: SimpleDag objects that should be in this
+        :type: list(SimpleDag)
+        """
+        self.simple_dags = simple_dags
+        self.dag_id_to_simple_dag = {}
+
+        for simple_dag in simple_dags:
+            self.dag_id_to_simple_dag[simple_dag.dag_id] = simple_dag
+
+    @property
+    def dag_ids(self):
+        """
+        :return: IDs of all the DAGs in this
+        :rtype: list[unicode]
+        """
+        return self.dag_id_to_simple_dag.keys()
+
+    def get_dag(self, dag_id):
+        """
+        :param dag_id: DAG ID
+        :type dag_id: unicode
+        :return: if the given DAG ID exists in the bag, return the BaseDag
+        corresponding to that ID. Otherwise, throw an Exception
+        :rtype: SimpleDag
+        """
+        if dag_id not in self.dag_id_to_simple_dag:
+            raise AirflowException("Unknown DAG ID {}".format(dag_id))
+        return self.dag_id_to_simple_dag[dag_id]
+
+
+def list_py_file_paths(directory, safe_mode=True):
+    """
+    Traverse a directory and look for Python files.
+
+    :param directory: the directory to traverse
+    :type directory: unicode
+    :param safe_mode: whether to use a heuristic to determine whether a file
+    contains Airflow DAG definitions
+    :return: a list of paths to Python files in the specified directory
+    :rtype: list[unicode]
+    """
+    file_paths = []
+    if directory is None:
+        return []
+    elif os.path.isfile(directory):
+        return [directory]
+    elif os.path.isdir(directory):
+        patterns = []
+        for root, dirs, files in os.walk(directory, followlinks=True):
+            ignore_file = [f for f in files if f == '.airflowignore']
+            if ignore_file:
+                f = open(os.path.join(root, ignore_file[0]), 'r')
+                patterns += [p for p in f.read().split('\n') if p]
+                f.close()
+            for f in files:
+                try:
+                    file_path = os.path.join(root, f)
+                    if not os.path.isfile(file_path):
+                        continue
+                    mod_name, file_ext = os.path.splitext(
+                        os.path.split(file_path)[-1])
+                    if file_ext != '.py':
+                        continue
+                    if any([re.findall(p, file_path) for p in patterns]):
+                        continue
+
+                    # Heuristic that guesses whether a Python file contains an
+                    # Airflow DAG definition.
+                    might_contain_dag = True
+                    if safe_mode:
+                        with open(file_path, 'rb') as f:
+                            content = f.read()
+                            might_contain_dag = all([s in content
+                                                     for s in (b'DAG', 
b'airflow')])
+
+                    if not might_contain_dag:
+                        continue
+
+                    file_paths.append(file_path)
+                except Exception:
+                    logging.exception("Error while examining %s", f)
+    return file_paths
+
+
+class AbstractDagFileProcessor(object):
+    """
+    Processes a DAG file. See SchedulerJob.process_file() for more details.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractmethod
+    def start(self):
+        """
+        Launch the process to process the file
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def terminate(self, sigkill=False):
+        """
+        Terminate (and then kill) the process launched to process the file
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def pid(self):
+        """
+        :return: the PID of the process launched to process the given file
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def exit_code(self):
+        """
+        After the process is finished, this can be called to get the return 
code
+        :return: the exit code of the process
+        :rtype: int
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def done(self):
+        """
+        Check if the process launched to process this file is done.
+        :return: whether the process is finished running
+        :rtype: bool
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def result(self):
+        """
+        :return: result of running SchedulerJob.process_file()
+        :rtype: list[SimpleDag]
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def start_time(self):
+        """
+        :return: When this started to process the file
+        :rtype: datetime
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def log_file(self):
+        """
+        :return: the log file associated with this processor
+        :rtype: unicode
+        """
+        raise NotImplementedError()
+
+    @property
+    @abstractmethod
+    def file_path(self):
+        """
+        :return: the path to the file that this is processing
+        :rtype: unicode
+        """
+        raise NotImplementedError()
+
+
+class DagFileProcessorManager(LoggingMixin):
+    """
+    Given a list of DAG definition files, this kicks off several processors
+    in parallel to process them. The parallelism is limited and as the
+    processors finish, more are launched. The files are processed over and
+    over again, but no more often than the specified interval.
+
+    :type _file_path_queue: list[unicode]
+    :type _processors: dict[unicode, AbstractDagFileProcessor]
+    :type _last_runtime: dict[unicode, float]
+    :type _last_finish_time: dict[unicode, datetime]
+    """
+    def __init__(self,
+                 dag_directory,
+                 file_paths,
+                 parallelism,
+                 process_file_interval,
+                 child_process_log_directory,
+                 max_runs,
+                 processor_factory):
+        """
+        :param dag_directory: Directory where DAG definitions are kept. All
+        files in file_paths should be under this directory
+        :type dag_directory: unicode
+        :param file_paths: list of file paths that contain DAG definitions
+        :type file_paths: list[unicode]
+        :param parallelism: maximum number of simultaneous process to run at 
once
+        :type parallelism: int
+        :param process_file_interval: process a file at most once every this
+        many seconds
+        :type process_file_interval: float
+        :param max_runs: The number of times to parse and schedule each file. 
-1
+        for unlimited.
+        :type max_runs: int
+        :param child_process_log_directory: Store logs for child processes in
+        this directory
+        :type child_process_log_directory: unicode
+        :type process_file_interval: float
+        :param processor_factory: function that creates processors for DAG
+        definition files. Arguments are (dag_definition_path, log_file_path)
+        :type processor_factory: (unicode, unicode) -> 
(AbstractDagFileProcessor)
+
+        """
+        self._file_paths = file_paths
+        self._file_path_queue = []
+        self._parallelism = parallelism
+        self._dag_directory = dag_directory
+        self._max_runs = max_runs
+        self._process_file_interval = process_file_interval
+        self._child_process_log_directory = child_process_log_directory
+        self._processor_factory = processor_factory
+        # Map from file path to the processor
+        self._processors = {}
+        # Map from file path to the last runtime
+        self._last_runtime = {}
+        # Map from file path to the last finish time
+        self._last_finish_time = {}
+        # Map from file path to the number of runs
+        self._run_count = defaultdict(int)
+
+    @property
+    def file_paths(self):
+        return self._file_paths
+
+    def get_pid(self, file_path):
+        """
+        :param file_path: the path to the file that's being processed
+        :type file_path: unicode
+        :return: the PID of the process processing the given file or None if
+        the specified file is not being processed
+        :rtype: int
+        """
+        if file_path in self._processors:
+            return self._processors[file_path].pid
+        return None
+
+    def get_all_pids(self):
+        """
+        :return: a list of the PIDs for the processors that are running
+        :rtype: List[int]
+        """
+        return [x.pid for x in self._processors.values()]
+
+    def get_runtime(self, file_path):
+        """
+        :param file_path: the path to the file that's being processed
+        :type file_path: unicode
+        :return: the current runtime (in seconds) of the process that's
+        processing the specified file or None if the file is not currently
+        being processed
+        """
+        if file_path in self._processors:
+            return (datetime.now() - self._processors[file_path].start_time)\
+                .total_seconds()
+        return None
+
+    def get_last_runtime(self, file_path):
+        """
+        :param file_path: the path to the file that was processed
+        :type file_path: unicode
+        :return: the runtime (in seconds) of the process of the last run, or
+        None if the file was never processed.
+        :rtype: float
+        """
+        return self._last_runtime.get(file_path)
+
+    def get_last_finish_time(self, file_path):
+        """
+        :param file_path: the path to the file that was processed
+        :type file_path: unicode
+        :return: the finish time of the process of the last run, or None if the
+        file was never processed.
+        :rtype: datetime
+        """
+        return self._last_finish_time.get(file_path)
+
+    def get_start_time(self, file_path):
+        """
+        :param file_path: the path to the file that's being processed
+        :type file_path: unicode
+        :return: the start time of the process that's processing the
+        specified file or None if the file is not currently being processed
+        :rtype: datetime
+        """
+        if file_path in self._processors:
+            return self._processors[file_path].start_time
+        return None
+
+    def set_file_paths(self, new_file_paths):
+        """
+        Update this with a new set of paths to DAG definition files.
+
+        :param new_file_paths: list of paths to DAG definition files
+        :type new_file_paths: list[unicode]
+        :return: None
+        """
+        self._file_paths = new_file_paths
+        self._file_path_queue = [x for x in self._file_path_queue
+                                 if x in new_file_paths]
+        # Stop processors that are working on deleted files
+        filtered_processors = {}
+        for file_path, processor in self._processors.items():
+            if file_path in new_file_paths:
+                filtered_processors[file_path] = processor
+            else:
+                self.logger.warn("Stopping processor for {}".format(file_path))
+                processor.stop()
+        self._processors = filtered_processors
+
+    @staticmethod
+    def _split_path(file_path):
+        """
+        Return the path elements of a path as an array. E.g. /a/b/c ->
+        ['a', 'b', 'c']
+
+        :param file_path: the file path to split
+        :return: a list of the elements of the file path
+        :rtype: list[unicode]
+        """
+        results = []
+        while True:
+            head, tail = os.path.split(file_path)
+            if len(tail) != 0:
+                results.append(tail)
+            if file_path == head:
+                break
+            file_path = head
+        results.reverse()
+        return results
+
+    def _get_log_file_path(self, dag_file_path):
+        """
+        Log output from processing the specified file should go to this
+        location.
+
+        :param dag_file_path: file containing a DAG
+        :type dag_file_path: unicode
+        :return: the path to the corresponding log file
+        :rtype: unicode
+        """
+        # General approach is to put the log file under the same relative path
+        # under the log directory as the DAG file in the DAG directory
+        now = datetime.now()
+        log_directory = os.path.join(self._child_process_log_directory,
+                                     now.strftime("%Y-%m-%d"))
+        relative_dag_file_path = os.path.relpath(dag_file_path, 
start=self._dag_directory)
+        path_elements = self._split_path(relative_dag_file_path)
+
+        # Add a .log suffix for the log file
+        path_elements[-1] += ".log"
+
+        return os.path.join(log_directory, *path_elements)
+
+    def processing_count(self):
+        """
+        :return: the number of files currently being processed
+        :rtype: int
+        """
+        return len(self._processors)
+
+    def wait_until_finished(self):
+        """
+        Sleeps until all the processors are done.
+        """
+        for file_path, processor in self._processors.items():
+            while not processor.done:
+                time.sleep(0.1)
+
+    def heartbeat(self):
+        """
+        This should be periodically called by the scheduler. This method will
+        kick of new processes to process DAG definition files and read the
+        results from the finished processors.
+
+        :return: a list of SimpleDags that were produced by processors that
+        have finished since the last time this was called
+        :rtype: list[SimpleDag]
+        """
+        finished_processors = {}
+        """:type : dict[unicode, AbstractDagFileProcessor]"""
+        running_processors = {}
+        """:type : dict[unicode, AbstractDagFileProcessor]"""
+
+        for file_path, processor in self._processors.items():
+            if processor.done:
+                self.logger.info("Processor for {} finished".format(file_path))
+                now = datetime.now()
+                finished_processors[file_path] = processor
+                self._last_runtime[file_path] = (now -
+                                                 
processor.start_time).total_seconds()
+                self._last_finish_time[file_path] = now
+                self._run_count[file_path] += 1
+            else:
+                running_processors[file_path] = processor
+        self._processors = running_processors
+
+        # Collect all the DAGs that were found in the processed files
+        simple_dags = []
+        for file_path, processor in finished_processors.items():
+            if processor.result is None:
+                self.logger.warn("Processor for {} exited with return code "
+                                 "{}. See {} for details."
+                                 .format(processor.file_path,
+                                         processor.exit_code,
+                                         processor.log_file))
+            else:
+                for simple_dag in processor.result:
+                    simple_dags.append(simple_dag)
+
+        # Generate more file paths to process if we processed all the files
+        # already.
+        if len(self._file_path_queue) == 0:
+            # If the file path is already being processed, or if a file was
+            # processed recently, wait until the next batch
+            file_paths_in_progress = self._processors.keys()
+            now = datetime.now()
+            file_paths_recently_processed = []
+            for file_path in self._file_paths:
+                last_finish_time = self.get_last_finish_time(file_path)
+                if (last_finish_time is not None and
+                    (now - last_finish_time).total_seconds() <
+                        self._process_file_interval):
+                    file_paths_recently_processed.append(file_path)
+
+            files_paths_at_run_limit = [file_path
+                                        for file_path, num_runs in 
self._run_count.items()
+                                        if num_runs == self._max_runs]
+
+            files_paths_to_queue = list(set(self._file_paths) -
+                                        set(file_paths_in_progress) -
+                                        set(file_paths_recently_processed) -
+                                        set(files_paths_at_run_limit))
+
+            for file_path, processor in self._processors.items():
+                self.logger.debug("File path {} is still being processed 
(started: {})"
+                                  .format(processor.file_path,
+                                          processor.start_time.isoformat()))
+
+            self.logger.debug("Queuing the following files for 
processing:\n\t{}"
+                              .format("\n\t".join(files_paths_to_queue)))
+
+            self._file_path_queue.extend(files_paths_to_queue)
+
+        # Start more processors if we have enough slots and files to process
+        while (self._parallelism - len(self._processors) > 0 and
+               len(self._file_path_queue) > 0):
+            file_path = self._file_path_queue.pop(0)
+            log_file_path = self._get_log_file_path(file_path)
+            processor = self._processor_factory(file_path, log_file_path)
+
+            processor.start()
+            self.logger.info("Started a process (PID: {}) to generate "
+                             "tasks for {} - logging into {}"
+                             .format(processor.pid, file_path, log_file_path))
+
+            self._processors[file_path] = processor
+
+        return simple_dags
+
+    def max_runs_reached(self):
+        """
+        :return: whether all file paths have been processed max_runs times
+        """
+        for file_path in self._file_paths:
+            if self._run_count[file_path] != self._max_runs:
+                return False
+        return True
+
+    def terminate(self):
+        """
+        Stops all running processors
+        :return: None
+        """
+        for processor in self._processors.values():
+            processor.terminate()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/utils/db.py
----------------------------------------------------------------------
diff --git a/airflow/utils/db.py b/airflow/utils/db.py
index fda467d..0d6f3d5 100644
--- a/airflow/utils/db.py
+++ b/airflow/utils/db.py
@@ -17,6 +17,7 @@ from __future__ import division
 from __future__ import print_function
 from __future__ import unicode_literals
 
+from datetime import datetime
 from functools import wraps
 import logging
 import os
@@ -252,7 +253,13 @@ def initdb():
         session.add(KET(know_event_type='Marketing Campaign'))
     session.commit()
 
-    models.DagBag(sync_to_db=True)
+    dagbag = models.DagBag()
+    # Save individual DAGs in the ORM
+    now = datetime.utcnow()
+    for dag in dagbag.dags.values():
+        models.DAG.sync_to_db(dag, dag.owner, now)
+    # Deactivate the unknown ones
+    models.DAG.deactivate_unknown_dags(dagbag.dags.keys())
 
     Chart = models.Chart
     chart_label = "Airflow task instance by type"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/scripts/ci/check-license.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/check-license.sh b/scripts/ci/check-license.sh
index a44fb2d..87063e2 100755
--- a/scripts/ci/check-license.sh
+++ b/scripts/ci/check-license.sh
@@ -104,4 +104,4 @@ if test ! -z "$ERRORS"; then
     exit 0
 else
     echo -e "RAT checks passed."
-fi
\ No newline at end of file
+fi

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/scripts/ci/setup_env.sh
----------------------------------------------------------------------
diff --git a/scripts/ci/setup_env.sh b/scripts/ci/setup_env.sh
index a5da867..9df4d81 100755
--- a/scripts/ci/setup_env.sh
+++ b/scripts/ci/setup_env.sh
@@ -126,7 +126,19 @@ tar zxf ${TRAVIS_CACHE}/hive/hive.tar.gz 
--strip-components 1 -C ${HIVE_HOME}
 
 echo "Downloading and unpacking minicluster"
 curl -z ${TRAVIS_CACHE}/minicluster/minicluster.zip -o 
${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL}
+ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip
 unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp
+if [ $? != 0 ] ; then
+    # Try downloading w/o cache if there's a failure
+    curl -o ${TRAVIS_CACHE}/minicluster/minicluster.zip -L ${MINICLUSTER_URL}
+    ls -l ${TRAVIS_CACHE}/minicluster/minicluster.zip
+    unzip ${TRAVIS_CACHE}/minicluster/minicluster.zip -d /tmp
+    if [ $? != 0 ] ; then
+        echo "Failed twice in downloading and unpacking minicluster!" >&2
+        exit 1
+    fi
+    exit 1
+fi
 
 echo "Path = ${PATH}"
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 5b14653..85e5195 100644
--- a/setup.py
+++ b/setup.py
@@ -191,6 +191,7 @@ def do_setup():
             'jinja2>=2.7.3, <3.0',
             'markdown>=2.5.2, <3.0',
             'pandas>=0.15.2, <1.0.0',
+            'psutil>=4.2.0, <5.0.0',
             'pygments>=2.0.1, <3.0',
             'python-daemon>=2.1.1, <2.2',
             'python-dateutil>=2.3, <3',
@@ -198,6 +199,7 @@ def do_setup():
             'requests>=2.5.1, <3',
             'setproctitle>=1.1.8, <2',
             'sqlalchemy>=0.9.8',
+            'tabulate>=0.7.5, <0.8.0',
             'thrift>=0.9.2, <0.10',
             'zope.deprecation>=4.0, <5.0',
         ],

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/core.py
----------------------------------------------------------------------
diff --git a/tests/core.py b/tests/core.py
index 4b1926c..38652fc 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -109,6 +109,12 @@ class OperatorSubclass(BaseOperator):
 
 
 class CoreTest(unittest.TestCase):
+
+    # These defaults make the test faster to run
+    default_scheduler_args = {"file_process_interval": 0,
+                              "processor_poll_interval": 0.5,
+                              "num_runs": 1}
+
     def setUp(self):
         configuration.test_mode()
         self.dagbag = models.DagBag(
@@ -131,7 +137,7 @@ class CoreTest(unittest.TestCase):
             owner='Also fake',
             start_date=datetime(2015, 1, 2, 0, 0)))
 
-        dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag)
+        dag_run = 
jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
         assert dag_run is not None
         assert dag_run.dag_id == dag.dag_id
         assert dag_run.run_id is not None
@@ -156,12 +162,13 @@ class CoreTest(unittest.TestCase):
             task_id="faketastic",
             owner='Also fake',
             start_date=DEFAULT_DATE))
-        scheduler = jobs.SchedulerJob(test_mode=True)
+
+        scheduler = jobs.SchedulerJob(**self.default_scheduler_args)
         dag.create_dagrun(run_id=models.DagRun.id_for_date(DEFAULT_DATE),
                           execution_date=DEFAULT_DATE,
                           state=State.SUCCESS,
                           external_trigger=True)
-        dag_run = scheduler.schedule_dag(dag)
+        dag_run = scheduler.create_dag_run(dag)
         assert dag_run is not None
         assert dag_run.dag_id == dag.dag_id
         assert dag_run.run_id is not None
@@ -183,8 +190,8 @@ class CoreTest(unittest.TestCase):
             task_id="faketastic",
             owner='Also fake',
             start_date=datetime(2015, 1, 2, 0, 0)))
-        dag_run = jobs.SchedulerJob(test_mode=True).schedule_dag(dag)
-        dag_run2 = jobs.SchedulerJob(test_mode=True).schedule_dag(dag)
+        dag_run = 
jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
+        dag_run2 = 
jobs.SchedulerJob(**self.default_scheduler_args).create_dag_run(dag)
 
         assert dag_run is not None
         assert dag_run2 is None
@@ -208,11 +215,11 @@ class CoreTest(unittest.TestCase):
 
         # Create and schedule the dag runs
         dag_runs = []
-        scheduler = jobs.SchedulerJob(test_mode=True)
+        scheduler = jobs.SchedulerJob(**self.default_scheduler_args)
         for i in range(runs):
-            dag_runs.append(scheduler.schedule_dag(dag))
+            dag_runs.append(scheduler.create_dag_run(dag))
 
-        additional_dag_run = scheduler.schedule_dag(dag)
+        additional_dag_run = scheduler.create_dag_run(dag)
 
         for dag_run in dag_runs:
             assert dag_run is not None
@@ -243,9 +250,9 @@ class CoreTest(unittest.TestCase):
                                          owner='Also fake'))
 
         dag_runs = []
-        scheduler = jobs.SchedulerJob(test_mode=True)
+        scheduler = jobs.SchedulerJob(**self.default_scheduler_args)
         for i in range(runs):
-            dag_run = scheduler.schedule_dag(dag)
+            dag_run = scheduler.create_dag_run(dag)
             dag_runs.append(dag_run)
 
             # Mark the DagRun as complete
@@ -254,7 +261,7 @@ class CoreTest(unittest.TestCase):
             session.commit()
 
         # Attempt to schedule an additional dag run (for 2016-01-01)
-        additional_dag_run = scheduler.schedule_dag(dag)
+        additional_dag_run = scheduler.create_dag_run(dag)
 
         for dag_run in dag_runs:
             assert dag_run is not None
@@ -595,7 +602,8 @@ class CoreTest(unittest.TestCase):
         job.run()
 
     def test_scheduler_job(self):
-        job = jobs.SchedulerJob(dag_id='example_bash_operator', test_mode=True)
+        job = jobs.SchedulerJob(dag_id='example_bash_operator',
+                                **self.default_scheduler_args)
         job.run()
 
     def test_raw_job(self):
@@ -798,6 +806,7 @@ class CliTests(unittest.TestCase):
         self.parser = cli.CLIFactory.get_parser()
         self.dagbag = models.DagBag(
             dag_folder=DEV_NULL, include_examples=True)
+        # Persist DAGs
 
     def test_cli_list_dags(self):
         args = self.parser.parse_args(['list_dags', '--report'])

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags/no_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags/no_dags.py b/tests/dags/no_dags.py
new file mode 100644
index 0000000..a84b6da
--- /dev/null
+++ b/tests/dags/no_dags.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/a_system_exit.py
----------------------------------------------------------------------
diff --git a/tests/dags_with_system_exit/a_system_exit.py 
b/tests/dags_with_system_exit/a_system_exit.py
new file mode 100644
index 0000000..7012550
--- /dev/null
+++ b/tests/dags_with_system_exit/a_system_exit.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Tests to make sure that a system exit won't cause the scheduler to fail
+# Starts with 'a' to get listed first.
+
+import sys
+
+from datetime import datetime
+from airflow.models import DAG
+
+DEFAULT_DATE = datetime(2100, 1, 1)
+
+dag1 = DAG(
+    dag_id='test_system_exit',
+    start_date=DEFAULT_DATE)
+
+sys.exit(-1)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/b_test_scheduler_dags.py
----------------------------------------------------------------------
diff --git a/tests/dags_with_system_exit/b_test_scheduler_dags.py 
b/tests/dags_with_system_exit/b_test_scheduler_dags.py
new file mode 100644
index 0000000..ed6904f
--- /dev/null
+++ b/tests/dags_with_system_exit/b_test_scheduler_dags.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from airflow.models import DAG
+from airflow.operators.dummy_operator import DummyOperator
+
+DEFAULT_DATE = datetime(2000, 1, 1)
+
+dag1 = DAG(
+    dag_id='exit_test_dag',
+    start_date=DEFAULT_DATE)
+
+dag1_task1 = DummyOperator(
+    task_id='dummy',
+    dag=dag1,
+    owner='airflow')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/dags_with_system_exit/c_system_exit.py
----------------------------------------------------------------------
diff --git a/tests/dags_with_system_exit/c_system_exit.py 
b/tests/dags_with_system_exit/c_system_exit.py
new file mode 100644
index 0000000..5644304
--- /dev/null
+++ b/tests/dags_with_system_exit/c_system_exit.py
@@ -0,0 +1,29 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Tests to make sure that a system exit won't cause the scheduler to fail.
+# Start with 'z' to get listed last.
+
+import sys
+
+from datetime import datetime
+from airflow.models import DAG
+
+DEFAULT_DATE = datetime(2100, 1, 1)
+
+dag1 = DAG(
+    dag_id='test_system_exit',
+    start_date=DEFAULT_DATE)
+
+sys.exit(-1)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/tests/jobs.py
----------------------------------------------------------------------
diff --git a/tests/jobs.py b/tests/jobs.py
index d981945..e86b9da 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -19,9 +19,11 @@ from __future__ import unicode_literals
 
 import datetime
 import logging
+import os
 import unittest
 
 from airflow import AirflowException, settings
+from airflow import models
 from airflow.bin import cli
 from airflow.executors import DEFAULT_EXECUTOR
 from airflow.jobs import BackfillJob, SchedulerJob
@@ -172,6 +174,9 @@ class BackfillJobTest(unittest.TestCase):
 
 
 class SchedulerJobTest(unittest.TestCase):
+    # These defaults make the test faster to run
+    default_scheduler_args = {"file_process_interval": 0,
+                              "processor_poll_interval": 0.5}
 
     def setUp(self):
         self.dagbag = DagBag()
@@ -193,14 +198,14 @@ class SchedulerJobTest(unittest.TestCase):
         if run_kwargs is None:
             run_kwargs = {}
 
-        scheduler = SchedulerJob()
+        scheduler = SchedulerJob(**self.default_scheduler_args)
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
 
         if advance_execution_date:
             # run a second time to schedule a dagrun after the start_date
-            dr = scheduler.schedule_dag(dag)
+            dr = scheduler.create_dag_run(dag)
         ex_date = dr.execution_date
 
         try:
@@ -264,22 +269,6 @@ class SchedulerJobTest(unittest.TestCase):
             },
             dagrun_state=State.FAILED)
 
-    def test_dagrun_deadlock(self):
-        """
-        Deadlocked DagRun is marked a failure
-
-        Test that a deadlocked dagrun is marked as a failure by having
-        depends_on_past and an execution_date after the start_date
-        """
-        self.evaluate_dagrun(
-            dag_id='test_dagrun_states_deadlock',
-            expected_task_states={
-                'test_depends_on_past': None,
-                'test_depends_on_past_2': None,
-            },
-            dagrun_state=State.FAILED,
-            advance_execution_date=True)
-
     def test_scheduler_pooled_tasks(self):
         """
         Test that the scheduler handles queued tasks correctly
@@ -299,13 +288,17 @@ class SchedulerJobTest(unittest.TestCase):
         dag = self.dagbag.get_dag(dag_id)
         dag.clear()
 
-        scheduler = SchedulerJob(dag_id, num_runs=1)
+        scheduler = SchedulerJob(dag_id,
+                                 num_runs=1,
+                                 executor=TestExecutor(),
+                                 **self.default_scheduler_args)
         scheduler.run()
 
         task_1 = dag.tasks[0]
         logging.info("Trying to find task {}".format(task_1))
         ti = TI(task_1, dag.start_date)
         ti.refresh_from_db()
+        logging.error("TI is: {}".format(ti))
         self.assertEqual(ti.state, State.QUEUED)
 
         # now we use a DIFFERENT scheduler and executor
@@ -313,7 +306,8 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler2 = SchedulerJob(
             dag_id,
             num_runs=5,
-            executor=DEFAULT_EXECUTOR.__class__())
+            executor=DEFAULT_EXECUTOR.__class__(),
+            **self.default_scheduler_args)
         scheduler2.run()
 
         ti.refresh_from_db()
@@ -364,7 +358,9 @@ class SchedulerJobTest(unittest.TestCase):
         dag.clear()
         self.assertTrue(dag.start_date > DEFAULT_DATE)
 
-        scheduler = SchedulerJob(dag_id, num_runs=2)
+        scheduler = SchedulerJob(dag_id,
+                                 num_runs=2,
+                                 **self.default_scheduler_args)
         scheduler.run()
 
         # zero tasks ran
@@ -387,7 +383,9 @@ class SchedulerJobTest(unittest.TestCase):
         self.assertEqual(
             len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)
 
-        scheduler = SchedulerJob(dag_id, num_runs=2)
+        scheduler = SchedulerJob(dag_id,
+                                 num_runs=2,
+                                 **self.default_scheduler_args)
         scheduler.run()
 
         # still one task
@@ -404,7 +402,10 @@ class SchedulerJobTest(unittest.TestCase):
             dag = self.dagbag.get_dag(dag_id)
             dag.clear()
 
-        scheduler = SchedulerJob(dag_ids=dag_ids, num_runs=2)
+        scheduler = SchedulerJob(dag_ids=dag_ids,
+                                 file_process_interval=0,
+                                 processor_poll_interval=0.5,
+                                 num_runs=2)
         scheduler.run()
 
         # zero tasks ran
@@ -425,14 +426,15 @@ class SchedulerJobTest(unittest.TestCase):
 
         scheduler = SchedulerJob()
         dag.clear()
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
-    def test_scheduler_process_execute_task(self):
+    def test_scheduler_process_task_instances(self):
         """
-        Test if process dag sends a task to the executor
+        Test if _process_task_instances puts the right task instances into the
+        queue.
         """
         dag = DAG(
             dag_id='test_scheduler_process_execute_task',
@@ -450,51 +452,16 @@ class SchedulerJobTest(unittest.TestCase):
 
         scheduler = SchedulerJob()
         dag.clear()
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
         queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
+        scheduler._process_task_instances(dag, queue=queue)
 
-        queue.put.assert_called_with(
-            ((dag.dag_id, dag_task1.task_id, DEFAULT_DATE), None)
+        queue.append.assert_called_with(
+            (dag.dag_id, dag_task1.task_id, DEFAULT_DATE)
         )
 
-        tis = dr.get_task_instances(state=State.SCHEDULED)
-        self.assertIsNotNone(tis)
-
-    def test_scheduler_process_check_heartrate(self):
-        """
-        Test if process dag honors the heartrate
-        """
-        dag = DAG(
-            dag_id='test_scheduler_process_check_heartrate',
-            start_date=DEFAULT_DATE)
-        dag_task1 = DummyOperator(
-            task_id='dummy',
-            dag=dag,
-            owner='airflow')
-
-        session = settings.Session()
-        orm_dag = DagModel(dag_id=dag.dag_id)
-        orm_dag.last_scheduler_run = datetime.datetime.now()
-        session.merge(orm_dag)
-        session.commit()
-        session.close()
-
-        scheduler = SchedulerJob()
-        scheduler.heartrate = 1000
-
-        dag.clear()
-
-        dr = scheduler.schedule_dag(dag)
-        self.assertIsNotNone(dr)
-
-        queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
-
-        queue.put.assert_not_called()
-
     def test_scheduler_do_not_schedule_removed_task(self):
         dag = DAG(
             dag_id='test_scheduler_do_not_schedule_removed_task',
@@ -513,7 +480,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
         dag = DAG(
@@ -521,7 +488,7 @@ class SchedulerJobTest(unittest.TestCase):
             start_date=DEFAULT_DATE)
 
         queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
+        scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
 
@@ -543,11 +510,11 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
         queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
+        scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
 
@@ -568,7 +535,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
         tis = dr.get_task_instances(session=session)
@@ -579,7 +546,7 @@ class SchedulerJobTest(unittest.TestCase):
         session.close()
 
         queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
+        scheduler._process_task_instances(dag, queue=queue)
 
         queue.put.assert_not_called()
 
@@ -605,7 +572,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
         tis = dr.get_task_instances()
@@ -617,7 +584,7 @@ class SchedulerJobTest(unittest.TestCase):
             owner='airflow')
 
         queue = mock.Mock()
-        scheduler.process_dag(dag, queue=queue)
+        scheduler._process_task_instances(dag, queue=queue)
 
         tis = dr.get_task_instances()
         self.assertEquals(len(tis), 2)
@@ -645,10 +612,10 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNone(dr)
 
     def test_scheduler_fail_dagrun_timeout(self):
@@ -673,13 +640,13 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
         dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
 
-        dr2 = scheduler.schedule_dag(dag)
+        dr2 = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr2)
 
         dr.refresh_from_db(session=session)
@@ -710,18 +677,18 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
 
         # Should not be scheduled as DagRun has not timedout and 
max_active_runs is reached
-        new_dr = scheduler.schedule_dag(dag)
+        new_dr = scheduler.create_dag_run(dag)
         self.assertIsNone(new_dr)
 
         # Should be scheduled as dagrun_timeout has passed
         dr.start_date = datetime.datetime.now() - datetime.timedelta(days=1)
         session.merge(dr)
         session.commit()
-        new_dr = scheduler.schedule_dag(dag)
+        new_dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(new_dr)
 
     def test_scheduler_auto_align(self):
@@ -749,7 +716,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
         self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 2, 5, 
4))
 
@@ -771,7 +738,7 @@ class SchedulerJobTest(unittest.TestCase):
         scheduler = SchedulerJob()
         dag.clear()
 
-        dr = scheduler.schedule_dag(dag)
+        dr = scheduler.create_dag_run(dag)
         self.assertIsNotNone(dr)
         self.assertEquals(dr.execution_date, datetime.datetime(2016, 1, 1, 10, 
10))
 
@@ -808,7 +775,13 @@ class SchedulerJobTest(unittest.TestCase):
         @mock.patch('airflow.models.DagBag', return_value=dagbag)
         @mock.patch('airflow.models.DagBag.collect_dags')
         def do_schedule(function, function2):
-            scheduler = SchedulerJob(num_runs=1, executor=executor,)
+            # Use a empty file since the above mock will return the
+            # expected DAGs. Also specify only a single file so that it doesn't
+            # try to schedule the above DAG repeatedly.
+            scheduler = SchedulerJob(num_runs=1,
+                                     executor=executor,
+                                     subdir=os.path.join(models.DAGS_FOLDER,
+                                                         "no_dags.py"))
             scheduler.heartrate = 0
             scheduler.run()
 
@@ -819,3 +792,52 @@ class SchedulerJobTest(unittest.TestCase):
         do_schedule()
         self.assertEquals(2, len(executor.queued_tasks))
 
+    def test_scheduler_run_duration(self):
+        """
+        Verifies that the scheduler run duration limit is followed.
+        """
+        dag_id = 'test_start_date_scheduling'
+        dag = self.dagbag.get_dag(dag_id)
+        dag.clear()
+        self.assertTrue(dag.start_date > DEFAULT_DATE)
+
+        expected_run_duration = 5
+        start_time = datetime.datetime.now()
+        scheduler = SchedulerJob(dag_id,
+                                 run_duration=expected_run_duration,
+                                 **self.default_scheduler_args)
+        scheduler.run()
+        end_time = datetime.datetime.now()
+
+        run_duration = (end_time - start_time).total_seconds()
+        logging.info("Test ran in %.2fs, expected %.2fs",
+                     run_duration,
+                     expected_run_duration)
+        assert run_duration - expected_run_duration < 5.0
+
+    def test_dag_with_system_exit(self):
+        """
+        Test to check that a DAG with a system.exit() doesn't break the 
scheduler.
+        """
+
+        dag_id = 'exit_test_dag'
+        dag_ids = [dag_id]
+        dag_directory = os.path.join(models.DAGS_FOLDER,
+                                     "..",
+                                     "dags_with_system_exit")
+        dag_file = os.path.join(dag_directory,
+                                'b_test_scheduler_dags.py')
+
+        dagbag = DagBag(dag_folder=dag_file)
+        for dag_id in dag_ids:
+            dag = dagbag.get_dag(dag_id)
+            dag.clear()
+
+        scheduler = SchedulerJob(dag_ids=dag_ids,
+                                 subdir= dag_directory,
+                                 num_runs=1,
+                                 **self.default_scheduler_args)
+        scheduler.run()
+        session = settings.Session()
+        self.assertEqual(
+            len(session.query(TI).filter(TI.dag_id == dag_id).all()), 1)

Reply via email to