[AIRFLOW-160] Parse DAG files through child processes

Instead of parsing the DAG definition files in the same process as the
scheduler, this change parses the files in a child process. This helps
to isolate the scheduler from bad user code.

Closes #1636 from plypaul/plypaul_schedule_by_file_rebase_master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/fdb7e949
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/fdb7e949
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/fdb7e949

Branch: refs/heads/master
Commit: fdb7e949140b735b8554ae5b22ad752e86f6ebaf
Parents: 835bcb6
Author: Paul Yang <paul.y...@airbnb.com>
Authored: Sun Jul 31 12:49:30 2016 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Sun Jul 31 12:49:39 2016 -0700

----------------------------------------------------------------------
 airflow/bin/cli.py                              |   33 +-
 airflow/configuration.py                        |    5 +
 airflow/dag/__init__.py                         |   14 +
 airflow/dag/base_dag.py                         |   96 ++
 airflow/executors/base_executor.py              |    2 +-
 airflow/jobs.py                                 | 1224 ++++++++++++++----
 airflow/models.py                               |  273 +++-
 airflow/settings.py                             |   36 +-
 airflow/utils/dag_processing.py                 |  612 +++++++++
 airflow/utils/db.py                             |    9 +-
 scripts/ci/check-license.sh                     |    2 +-
 scripts/ci/setup_env.sh                         |   12 +
 setup.py                                        |    2 +
 tests/core.py                                   |   33 +-
 tests/dags/no_dags.py                           |   14 +
 tests/dags_with_system_exit/a_system_exit.py    |   29 +
 .../b_test_scheduler_dags.py                    |   29 +
 tests/dags_with_system_exit/c_system_exit.py    |   29 +
 tests/jobs.py                                   |  192 +--
 19 files changed, 2239 insertions(+), 407 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 2d02f1e..0d35661 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -31,6 +31,8 @@ import daemon
 from daemon.pidfile import TimeoutPIDLockFile
 import signal
 import sys
+import threading
+import traceback
 
 import airflow
 from airflow import jobs, settings
@@ -45,10 +47,28 @@ from airflow.exceptions import AirflowException
 DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
 
-def sigint_handler(signal, frame):
+def sigint_handler(sig, frame):
     sys.exit(0)
 
 
+def sigquit_handler(sig, frame):
+    """Helps debug deadlocks by printing stacktraces when this gets a SIGQUIT
+    e.g. kill -s QUIT <PID> or CTRL+\
+    """
+    print("Dumping stack traces for all threads in PID {}".format(os.getpid()))
+    id_to_name = dict([(th.ident, th.name) for th in threading.enumerate()])
+    code = []
+    for thread_id, stack in sys._current_frames().items():
+        code.append("\n# Thread: {}({})"
+                    .format(id_to_name.get(thread_id, ""), thread_id))
+        for filename, line_number, name, line in 
traceback.extract_stack(stack):
+            code.append('File: "{}", line {}, in {}'
+                        .format((filename, line_number, name)))
+            if line:
+                code.append("  {}".format(line.strip()))
+    print("\n".join(code))
+
+
 def setup_logging(filename):
     root = logging.getLogger()
     handler = logging.FileHandler(filename)
@@ -538,6 +558,7 @@ def scheduler(args):
     job = jobs.SchedulerJob(
         dag_id=args.dag_id,
         subdir=process_subdir(args.subdir),
+        run_duration=args.run_duration,
         num_runs=args.num_runs,
         do_pickle=args.do_pickle)
 
@@ -561,6 +582,7 @@ def scheduler(args):
     else:
         signal.signal(signal.SIGINT, sigint_handler)
         signal.signal(signal.SIGTERM, sigint_handler)
+        signal.signal(signal.SIGQUIT, sigquit_handler)
         job.run()
 
 
@@ -918,6 +940,10 @@ class CLIFactory(object):
             default=False),
         # scheduler
         'dag_id_opt': Arg(("-d", "--dag_id"), help="The id of the dag to run"),
+        'run_duration': Arg(
+            ("-r", "--run-duration"),
+            default=None, type=int,
+            help="Set number of seconds to execute before exiting"),
         'num_runs': Arg(
             ("-n", "--num_runs"),
             default=None, type=int,
@@ -1056,8 +1082,9 @@ class CLIFactory(object):
         }, {
             'func': scheduler,
             'help': "Start a scheduler instance",
-            'args': ('dag_id_opt', 'subdir', 'num_runs', 'do_pickle',
-                     'pid', 'daemon', 'stdout', 'stderr', 'log_file'),
+            'args': ('dag_id_opt', 'subdir', 'run_duration', 'num_runs',
+                     'do_pickle', 'pid', 'daemon', 'stdout', 'stderr',
+                     'log_file'),
         }, {
             'func': worker,
             'help': "Start a Celery worker node",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/configuration.py
----------------------------------------------------------------------
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 5a380ae..65b482c 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -141,6 +141,11 @@ defaults = {
         'scheduler_heartbeat_sec': 60,
         'authenticate': False,
         'max_threads': 2,
+        'run_duration': 30 * 60,
+        'dag_dir_list_interval': 5 * 60,
+        'print_stats_interval': 30,
+        'min_file_process_interval': 180,
+        'child_process_log_directory': '/tmp/airflow/scheduler/logs'
     },
     'celery': {
         'broker_url': 'sqla+mysql://airflow:airflow@localhost:3306/airflow',

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/dag/__init__.py
----------------------------------------------------------------------
diff --git a/airflow/dag/__init__.py b/airflow/dag/__init__.py
new file mode 100644
index 0000000..a84b6da
--- /dev/null
+++ b/airflow/dag/__init__.py
@@ -0,0 +1,14 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/dag/base_dag.py
----------------------------------------------------------------------
diff --git a/airflow/dag/base_dag.py b/airflow/dag/base_dag.py
new file mode 100644
index 0000000..83ecfb9
--- /dev/null
+++ b/airflow/dag/base_dag.py
@@ -0,0 +1,96 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from __future__ import absolute_import
+from __future__ import division
+from __future__ import print_function
+from __future__ import unicode_literals
+
+from abc import ABCMeta, abstractmethod, abstractproperty
+
+
+class BaseDag(object):
+    """
+    Base DAG object that both the SimpleDag and DAG inherit.
+    """
+    __metaclass__ = ABCMeta
+
+    @abstractproperty
+    def dag_id(self):
+        """
+        :return: the DAG ID
+        :rtype: unicode
+        """
+        raise NotImplementedError()
+
+    @abstractproperty
+    def task_ids(self):
+        """
+        :return: A list of task IDs that are in this DAG
+        :rtype: List[unicode]
+        """
+        raise NotImplementedError()
+
+    @abstractproperty
+    def full_filepath(self):
+        """
+        :return: The absolute path to the file that contains this DAG's 
definition
+        :rtype: unicode
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def concurrency(self):
+        """
+        :return: maximum number of tasks that can run simultaneously from this 
DAG
+        :rtype: int
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def is_paused(self):
+        """
+        :return: whether this DAG is paused or not
+        :rtype: bool
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def pickle_id(self):
+        """
+        :return: The pickle ID for this DAG, if it has one. Otherwise None.
+        :rtype: unicode
+        """
+        raise NotImplementedError
+
+
+class BaseDagBag(object):
+    """
+    Base object that both the SimpleDagBag and DagBag inherit.
+    """
+    @abstractproperty
+    def dag_ids(self):
+        """
+        :return: a list of DAG IDs in this bag
+        :rtype: List[unicode]
+        """
+        raise NotImplementedError()
+
+    @abstractmethod
+    def get_dag(self, dag_id):
+        """
+        :return: whether the task exists in this bag
+        :rtype: BaseDag
+        """
+        raise NotImplementedError()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/executors/base_executor.py
----------------------------------------------------------------------
diff --git a/airflow/executors/base_executor.py 
b/airflow/executors/base_executor.py
index ca63443..d6a06d8 100644
--- a/airflow/executors/base_executor.py
+++ b/airflow/executors/base_executor.py
@@ -102,7 +102,7 @@ class BaseExecutor(LoggingMixin):
             # TODO(jlowin) without a way to know what Job ran which tasks,
             # there is a danger that another Job started running a task
             # that was also queued to this executor. This is the last chance
-            # to check if that hapened. The most probable way is that a
+            # to check if that happened. The most probable way is that a
             # Scheduler tried to run a task that was originally queued by a
             # Backfill. This fix reduces the probability of a collision but
             # does NOT eliminate it.

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 77f34ee..b391415 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -19,30 +19,45 @@ from __future__ import unicode_literals
 
 from past.builtins import basestring
 from collections import defaultdict, Counter
-from datetime import datetime, timedelta
+
+from datetime import datetime
+
 import getpass
 import logging
 import socket
 import subprocess
 import multiprocessing
-import math
+import os
+import signal
+import sys
+import threading
+import time
 from time import sleep
 
+import psutil
 from sqlalchemy import Column, Integer, String, DateTime, func, Index, or_
 from sqlalchemy.orm.session import make_transient
+from tabulate import tabulate
 
 from airflow import executors, models, settings
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException
+from airflow.models import DagRun
+from airflow.settings import Stats
 from airflow.utils.state import State
 from airflow.utils.db import provide_session, pessimistic_connection_handling
+from airflow.utils.dag_processing import (AbstractDagFileProcessor,
+                                          DagFileProcessorManager,
+                                          SimpleDag,
+                                          SimpleDagBag,
+                                          list_py_file_paths)
 from airflow.utils.email import send_email
 from airflow.utils.logging import LoggingMixin
 from airflow.utils import asciiart
-from airflow.settings import Stats
 
-DagRun = models.DagRun
+
 Base = models.Base
+DagRun = models.DagRun
 ID_LEN = models.ID_LEN
 Stats = settings.Stats
 
@@ -186,27 +201,247 @@ class BaseJob(Base, LoggingMixin):
         raise NotImplementedError("This method needs to be overridden")
 
 
+class DagFileProcessor(AbstractDagFileProcessor):
+    """Helps call SchedulerJob.process_file() in a separate process."""
+
+    # Counter that increments everytime an instance of this class is created
+    class_creation_counter = 0
+
+    def __init__(self, file_path, pickle_dags, dag_id_white_list, log_file):
+        """
+        :param file_path: a Python file containing Airflow DAG definitions
+        :type file_path: unicode
+        :param pickle_dags: whether to serialize the DAG objects to the DB
+        :type pickle_dags: bool
+        :param dag_id_whitelist: If specified, only look at these DAG ID's
+        :type dag_id_whitelist: list[unicode]
+        :param log_file: the path to the file where log lines should be output
+        :type log_file: unicode
+        """
+        self._file_path = file_path
+        self._log_file = log_file
+        # Queue that's used to pass results from the child process.
+        self._result_queue = multiprocessing.Queue()
+        # The process that was launched to process the given .
+        self._process = None
+        self._dag_id_white_list = dag_id_white_list
+        self._pickle_dags = pickle_dags
+        # The result of Scheduler.process_file(file_path).
+        self._result = None
+        # Whether the process is done running.
+        self._done = False
+        # When the process started.
+        self._start_time = None
+        # This ID is use to uniquely name the process / thread that's launched
+        # by this processor instance
+        self._instance_id = DagFileProcessor.class_creation_counter
+        DagFileProcessor.class_creation_counter += 1
+
+    @property
+    def file_path(self):
+        return self._file_path
+
+    @property
+    def log_file(self):
+        return self._log_file
+
+    @staticmethod
+    def _launch_process(result_queue,
+                        file_path,
+                        pickle_dags,
+                        dag_id_white_list,
+                        thread_name,
+                        log_file):
+        """
+        Launch a process to process the given file.
+
+        :param result_queue: the queue to use for passing back the result
+        :type result_queue: multiprocessing.Queue
+        :param file_path: the file to process
+        :type file_path: unicode
+        :param pickle_dags: whether to pickle the DAGs found in the file and
+        save them to the DB
+        :type pickle_dags: bool
+        :param dag_id_white_list: if specified, only examine DAG ID's that are
+        in this list
+        :type dag_id_white_list: list[unicode]
+        :param thread_name: the name to use for the process that is launched
+        :type thread_name: unicode
+        :param log_file: the logging output for the process should be directed
+        to this file
+        :type log_file: unicode
+        :return: the process that was launched
+        :rtype: multiprocessing.Process
+        """
+        def helper():
+            # This helper runs in the newly created process
+
+            # Re-direct stdout and stderr to a separate log file. Otherwise,
+            # the main log becomes too hard to read. No buffering to enable
+            # responsive file tailing
+            parent_dir, _ = os.path.split(log_file)
+
+            # Create the parent directory for the log file if necessary.
+            if not os.path.isdir(parent_dir):
+                os.makedirs(parent_dir)
+
+            f = open(log_file, "a")
+            original_stdout = sys.stdout
+            original_stderr = sys.stderr
+
+            sys.stdout = f
+            sys.stderr = f
+
+            try:
+                # Re-configure logging to use the new output streams
+                log_format = settings.LOG_FORMAT_WITH_THREAD_NAME
+                settings.configure_logging(log_format=log_format)
+                # Re-configure the ORM engine as there are issues with 
multiple processes
+                settings.configure_orm()
+
+                # Change the thread name to differentiate log lines. This is
+                # really a separate process, but changing the name of the
+                # process doesn't work, so changing the thread name instead.
+                threading.current_thread().name = thread_name
+                start_time = time.time()
+
+                logging.info("Started process (PID=%s) to work on %s",
+                             os.getpid(),
+                             file_path)
+                scheduler_job = SchedulerJob(dag_ids=dag_id_white_list)
+                result = scheduler_job.process_file(file_path,
+                                                    pickle_dags)
+                result_queue.put(result)
+                end_time = time.time()
+                logging.info("Processing %s took %.3f seconds",
+                             file_path,
+                             end_time - start_time)
+            except:
+                # Log exceptions through the logging framework.
+                logging.exception("Got an exception! Propagating...")
+                raise
+            finally:
+                sys.stdout = original_stdout
+                sys.stderr = original_stderr
+                f.close()
+
+        p = multiprocessing.Process(target=helper,
+                                    args=(),
+                                    name="{}-Process".format(thread_name))
+        p.start()
+        return p
+
+    def start(self):
+        """
+        Launch the process and start processing the DAG.
+        """
+        self._process = DagFileProcessor._launch_process(
+            self._result_queue,
+            self.file_path,
+            self._pickle_dags,
+            self._dag_id_white_list,
+            "DagFileProcessor{}".format(self._instance_id),
+            self.log_file)
+        self._start_time = datetime.now()
+
+    def terminate(self, sigkill=False):
+        """
+        Terminate (and then kill) the process launched to process the file.
+        :param sigkill: whether to issue a SIGKILL if SIGTERM doesn't work.
+        :type sigkill: bool
+        """
+        if self._process is None:
+            raise AirflowException("Tried to call stop before starting!")
+        # The queue will likely get corrupted, so remove the reference
+        self._result_queue = None
+        self._process.terminate()
+        # Arbitrarily wait 5s for the process to die
+        self._process.join(5)
+        if sigkill and self._process.is_alive():
+            logging.warn("Killing PID %s", self._process.pid)
+            os.kill(self._process.pid, signal.SIGKILL)
+
+    @property
+    def pid(self):
+        """
+        :return: the PID of the process launched to process the given file
+        :rtype: int
+        """
+        if self._process is None:
+            raise AirflowException("Tried to get PID before starting!")
+        return self._process.pid
+
+    @property
+    def exit_code(self):
+        """
+        After the process is finished, this can be called to get the return 
code
+        :return: the exit code of the process
+        :rtype: int
+        """
+        if not self._done:
+            raise AirflowException("Tried to call retcode before process was 
finished!")
+        return self._process.exitcode
+
+    @property
+    def done(self):
+        """
+        Check if the process launched to process this file is done.
+        :return: whether the process is finished running
+        :rtype: bool
+        """
+        if self._process is None:
+            raise AirflowException("Tried to see if it's done before 
starting!")
+
+        if self._done:
+            return True
+
+        if not self._result_queue.empty():
+            self._result = self._result_queue.get_nowait()
+            self._done = True
+            logging.debug("Waiting for %s", self._process)
+            self._process.join()
+            return True
+
+        # Potential error case when process dies
+        if not self._process.is_alive():
+            self._done = True
+            # Get the object from the queue or else join() can hang.
+            if not self._result_queue.empty():
+                self._result = self._result_queue.get_nowait()
+            logging.debug("Waiting for %s", self._process)
+            self._process.join()
+            return True
+
+        return False
+
+    @property
+    def result(self):
+        """
+        :return: result of running SchedulerJob.process_file()
+        :rtype: SimpleDag
+        """
+        if not self.done:
+            raise AirflowException("Tried to get the result before it's done!")
+        return self._result
+
+    @property
+    def start_time(self):
+        """
+        :return: when this started to process the file
+        :rtype: datetime
+        """
+        if self._start_time is None:
+            raise AirflowException("Tried to get start time before it 
started!")
+        return self._start_time
+
+
 class SchedulerJob(BaseJob):
     """
-    This SchedulerJob runs indefinitely and constantly schedules the jobs
+    This SchedulerJob runs for a specific time interval and schedules the jobs
     that are ready to run. It figures out the latest runs for each
-    task and see if the dependencies for the next schedules are met.
-    If so it triggers the task instance. It does this for each task
-    in each DAG and repeats.
-
-    :param dag_id: to run the scheduler for a single specific DAG
-    :type dag_id: string
-    :param subdir: to search for DAG under a certain folder only
-    :type subdir: string
-    :param test_mode: used for unit testing this class only, runs a single
-        schedule run
-    :type test_mode: bool
-    :param refresh_dags_every: force refresh the DAG definition every N
-        runs, as specified here
-    :type refresh_dags_every: int
-    :param do_pickle: to pickle the DAG object and send over to workers
-        for non-local executors
-    :type do_pickle: bool
+    task and sees if the dependencies for the next schedules are met.
+    If so, it creates appropriate TaskInstances and sends run commands to the
+    executor. It does this for each task in each DAG and repeats.
     """
 
     __mapper_args__ = {
@@ -217,13 +452,32 @@ class SchedulerJob(BaseJob):
             self,
             dag_id=None,
             dag_ids=None,
-            subdir=None,
-            test_mode=False,
-            refresh_dags_every=10,
-            num_runs=None,
+            subdir=models.DAGS_FOLDER,
+            num_runs=-1,
+            file_process_interval=conf.getint('scheduler',
+                                              'min_file_process_interval'),
+            processor_poll_interval=1.0,
+            run_duration=None,
             do_pickle=False,
             *args, **kwargs):
-
+        """
+        :param dag_id: if specified, only schedule tasks with this DAG ID
+        :type dag_id: unicode
+        :param dag_ids: if specified, only schedule tasks with these DAG IDs
+        :type dag_ids: list[unicode]
+        :param subdir: directory containing Python files with Airflow DAG
+        definitions, or a specific path to a file
+        :type subdir: unicode
+        :param num_runs: The number of times to try to schedule each DAG file.
+        -1 for unlimited within the run_duration.
+        :param processor_poll_interval: The number of seconds to wait between
+        polls of running processors
+        :param run_duration: how long to run (in seconds) before exiting
+        :type run_duration: int
+        :param do_pickle: once a DAG object is obtained by executing the Python
+        file, whether to serialize the DAG object to the DB
+        :type do_pickle: bool
+        """
         # for BaseJob compatibility
         self.dag_id = dag_id
         self.dag_ids = [dag_id] if dag_id else []
@@ -232,21 +486,38 @@ class SchedulerJob(BaseJob):
 
         self.subdir = subdir
 
-        if test_mode:
-            self.num_runs = 1
-        else:
-            self.num_runs = num_runs
+        self.num_runs = num_runs
+        self.run_duration = run_duration
+        self._processor_poll_interval = processor_poll_interval
 
-        self.refresh_dags_every = refresh_dags_every
         self.do_pickle = do_pickle
         super(SchedulerJob, self).__init__(*args, **kwargs)
 
         self.heartrate = conf.getint('scheduler', 'SCHEDULER_HEARTBEAT_SEC')
         self.max_threads = min(conf.getint('scheduler', 'max_threads'), 
multiprocessing.cpu_count())
+        self.using_sqlite = False
         if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
             if self.max_threads > 1:
                 self.logger.error("Cannot use more than 1 thread when using 
sqlite. Setting max_threads to 1")
             self.max_threads = 1
+            self.using_sqlite = True
+
+        # How often to scan the DAGs directory for new files. Default to 5 
minutes.
+        self.dag_dir_list_interval = conf.getint('scheduler',
+                                                 'dag_dir_list_interval')
+        # How often to print out DAG file processing stats to the log. Default 
to
+        # 30 seconds.
+        self.print_stats_interval = conf.getint('scheduler',
+                                                'print_stats_interval')
+        # Parse and schedule each file no faster than this interval. Default
+        # to 3 minutes.
+        self.file_process_interval = file_process_interval
+        # Directory where log files for the processes that scheduled the DAGs 
reside
+        self.child_process_log_directory = conf.get('scheduler',
+                                                    
'child_process_log_directory')
+        if run_duration is None:
+            self.run_duration = conf.getint('scheduler',
+                                            'run_duration')
 
     @provide_session
     def manage_slas(self, dag, session=None):
@@ -296,7 +567,8 @@ class SchedulerJob(BaseJob):
         slas = (
             session
             .query(SlaMiss)
-            .filter(SlaMiss.email_sent.is_(False) or 
SlaMiss.notification_sent.is_(False))
+            .filter(or_(SlaMiss.email_sent == False,
+                        SlaMiss.notification_sent == False))
             .filter(SlaMiss.dag_id == dag.dag_id)
             .all()
         )
@@ -370,23 +642,45 @@ class SchedulerJob(BaseJob):
             session.commit()
             session.close()
 
-    def import_errors(self, dagbag):
-        session = settings.Session()
-        session.query(models.ImportError).delete()
+    @staticmethod
+    def record_import_errors(session, dagbag):
+        """
+        For the DAGs in the given DagBag, record any associated import errors.
+        These are usually displayed through the Airflow UI so that users know
+        that there are issues parsing DAGs.
+
+        :param session: session for ORM operations
+        :type session: sqlalchemy.orm.session.Session
+        :param dagbag: DagBag containing DAGs with import errors
+        :type dagbag: models.Dagbag
+        """
         for filename, stacktrace in list(dagbag.import_errors.items()):
+            session.query(models.ImportError).filter(
+                models.ImportError.filename == filename
+            ).delete()
             session.add(models.ImportError(
                 filename=filename, stacktrace=stacktrace))
         session.commit()
 
+    @staticmethod
+    def clear_import_errors(session):
+        """
+        Remove all the known import errors from the DB.
+
+        :param session: session for ORM operations
+        :type session: sqlalchemy.orm.session.Session
+        """
+        session.query(models.ImportError).delete()
+        session.commit()
+
     @provide_session
-    def schedule_dag(self, dag, session=None):
+    def create_dag_run(self, dag, session=None):
         """
         This method checks whether a new DagRun needs to be created
         for a DAG based on scheduling interval
         Returns DagRun if one is scheduled. Otherwise returns None.
         """
         if dag.schedule_interval:
-            DagRun = models.DagRun
             active_runs = DagRun.find(
                 dag_id=dag.dag_id,
                 state=State.RUNNING,
@@ -475,52 +769,24 @@ class SchedulerJob(BaseJob):
                 )
                 return next_run
 
-    def process_dag(self, dag, queue):
+    def _process_task_instances(self, dag, queue):
         """
-        This method schedules a single DAG by looking at the latest
-        run for each task and attempting to schedule the following run.
-
-        As multiple schedulers may be running for redundancy, this
-        function takes a lock on the DAG and timestamps the last run
-        in ``last_scheduler_run``.
+        This method schedules the tasks for a single DAG by looking at the
+        active DAG runs and adding task instances that should run to the
+        queue.
         """
         DagModel = models.DagModel
         session = settings.Session()
 
-        # picklin'
-        pickle_id = None
-        if self.do_pickle and self.executor.__class__ not in (
-                executors.LocalExecutor, executors.SequentialExecutor):
-            pickle_id = dag.pickle(session).id
-
-        # obtain db lock
-        db_dag = session.query(DagModel).filter_by(
-            dag_id=dag.dag_id
-        ).with_for_update().one()
-
-        last_scheduler_run = db_dag.last_scheduler_run or datetime(2000, 1, 1)
-        secs_since_last = (datetime.now() - last_scheduler_run).total_seconds()
-
-        if secs_since_last < self.heartrate:
-            # release db lock
-            session.commit()
-            session.close()
-            return None
-
-        # Release the db lock
-        # the assumption here is that process_dag will take less
-        # time than self.heartrate otherwise we might unlock too
-        # quickly and this should moved below, but that would increase
-        # the time the record is locked and is blocking for other calls.
-        db_dag.last_scheduler_run = datetime.now()
-        session.commit()
-
         # update the state of the previously active dag runs
         dag_runs = DagRun.find(dag_id=dag.dag_id, state=State.RUNNING, 
session=session)
         active_dag_runs = []
         for run in dag_runs:
+            self.logger.info("Examining DAG run {}".format(run))
             # do not consider runs that are executed in the future
             if run.execution_date > datetime.now():
+                self.logging.error("Execution date is in future: {}"
+                                   .format(run.execution_date))
                 continue
 
             # todo: run.dag is transient but needs to be set
@@ -533,6 +799,7 @@ class SchedulerJob(BaseJob):
                 active_dag_runs.append(run)
 
         for run in active_dag_runs:
+            self.logger.debug("Examining active DAG run {}".format(run))
             # this needs a fresh session sometimes tis get detached
             tis = run.get_task_instances(state=(State.NONE,
                                                 State.UP_FOR_RETRY))
@@ -552,41 +819,122 @@ class SchedulerJob(BaseJob):
 
                 if ti.is_runnable(flag_upstream_failed=True):
                     self.logger.debug('Queuing task: {}'.format(ti))
-                    queue.put((ti.key, pickle_id))
+                    queue.append(ti.key)
 
         session.close()
 
     @provide_session
-    def prioritize_queued(self, session, executor, dagbag):
-        # Prioritizing queued task instances
+    def _change_state_for_tis_without_dagrun(self,
+                                             simple_dag_bag,
+                                             old_states,
+                                             new_state,
+                                             session=None):
+        """
+        For all DAG IDs in the SimpleDagBag, look for task instances in the
+        old_states and set them to new_state if the corresponding DagRun
+        exists but is not in the running state. This normally should not
+        happen, but it can if the state of DagRuns are changed manually.
+
+        :param old_states: examine TaskInstances in this state
+        :type old_state: list[State]
+        :param new_state: set TaskInstances to this state
+        :type new_state: State
+        :param simple_dag_bag: TaskInstances associated with DAGs in the
+        simple_dag_bag and with states in the old_state will be examined
+        :type simple_dag_bag: SimpleDagBag
+        """
 
-        pools = {p.pool: p for p in session.query(models.Pool).all()}
+        task_instances_to_change = (
+            session
+            .query(models.TaskInstance)
+            .filter(models.TaskInstance.dag_id.in_(simple_dag_bag.dag_ids))
+            .filter(models.TaskInstance.state.in_(old_states))
+            .all()
+        )
+        """:type: list[TaskInstance]"""
+
+        for task_instance in task_instances_to_change:
+            dag_runs = DagRun.find(dag_id=task_instance.dag_id,
+                                   execution_date=task_instance.execution_date,
+                                   )
+
+            if len(dag_runs) == 0:
+                self.logger.warn("DagRun for %s %s does not exist",
+                                 task_instance.dag_id,
+                                 task_instance.execution_date)
+                continue
+
+            # There should only be one DAG run. Add some logging info if this
+            # is not the case for later debugging.
+            if len(dag_runs) > 1:
+                self.logger.warn("Multiple DagRuns found for {} {}: {}"
+                                 .format(task_instance.dag_id,
+                                         task_instance.execution_date,
+                                         dag_runs))
+
+            dag_is_running = True
+            for dag_run in dag_runs:
+                if dag_run.state == State.RUNNING:
+                    dag_is_running = True
+                    break
+
+            if not dag_is_running:
+                self.logger.warn("Setting {} to state={} as it does not have "
+                                 "a DagRun in the {} state"
+                                 .format(task_instance,
+                                         new_state,
+                                         State.RUNNING))
+                task_instance.state = new_state
+                session.merge(task_instance)
+        session.commit()
+
+    @provide_session
+    def _execute_task_instances(self,
+                                simple_dag_bag,
+                                states,
+                                session=None):
+        """
+        Fetches task instances from ORM in the specified states, figures
+        out pool limits, and sends them to the executor for execution.
+
+        :param simple_dag_bag: TaskInstances associated with DAGs in the
+        simple_dag_bag will be fetched from the DB and executed
+        :type simple_dag_bag: SimpleDagBag
+        :param executor: the executor that runs task instances
+        :type executor: BaseExecutor
+        :param states: Execute TaskInstances in these states
+        :type states: Tuple[State]
+        :return: None
+        """
+        # Get all the relevant task instances
         TI = models.TaskInstance
-        queued_tis = (
-            session.query(TI)
-            .filter(TI.state == State.QUEUED)
+        task_instances_to_examine = (
+            session
+            .query(TI)
+            .filter(TI.dag_id.in_(simple_dag_bag.dag_ids))
+            .filter(TI.state.in_(states))
             .all()
         )
-        self.logger.info(
-            "Prioritizing {} queued jobs".format(len(queued_tis)))
-        session.expunge_all()
-        d = defaultdict(list)
-        for ti in queued_tis:
-            if ti.dag_id not in dagbag.dags:
-                self.logger.info(
-                    "DAG no longer in dagbag, deleting {}".format(ti))
-                session.delete(ti)
-                session.commit()
-            elif not dagbag.dags[ti.dag_id].has_task(ti.task_id):
-                self.logger.info(
-                    "Task no longer exists, deleting {}".format(ti))
-                session.delete(ti)
-                session.commit()
-            else:
-                d[ti.pool].append(ti)
 
-        dag_blacklist = set(dagbag.paused_dags())
-        for pool, tis in list(d.items()):
+        # Put one task instance on each line
+        if len(task_instances_to_examine) == 0:
+            self.logger.info("No tasks to send to the executor")
+            return
+
+        task_instance_str = "\n\t".join(
+            ["{}".format(x) for x in task_instances_to_examine])
+        self.logger.info("Tasks up for 
execution:\n\t{}".format(task_instance_str))
+
+        # Get the pool settings
+        pools = {p.pool: p for p in session.query(models.Pool).all()}
+
+        pool_to_task_instances = defaultdict(list)
+        for task_instance in task_instances_to_examine:
+            pool_to_task_instances[task_instance.pool].append(task_instance)
+
+        # Go through each pool, and queue up a task for execution if there are
+        # any open slots in the pool.
+        for pool, task_instances in pool_to_task_instances.items():
             if not pool:
                 # Arbitrary:
                 # If queued outside of a pool, trigger no more than
@@ -595,78 +943,218 @@ class SchedulerJob(BaseJob):
             else:
                 open_slots = pools[pool].open_slots(session=session)
 
-            queue_size = len(tis)
-            self.logger.info("Pool {pool} has {open_slots} slots, {queue_size} 
"
+            num_queued = len(task_instances)
+            self.logger.info("Figuring out tasks to run in Pool(name={pool}) "
+                             "with {open_slots} open slots and {num_queued} "
                              "task instances in queue".format(**locals()))
+
             if open_slots <= 0:
                 continue
-            tis = sorted(
-                tis, key=lambda ti: (-ti.priority_weight, ti.start_date))
-            for ti in tis:
-                if open_slots <= 0:
-                    continue
-                task = None
-                try:
-                    task = dagbag.dags[ti.dag_id].get_task(ti.task_id)
-                except:
-                    self.logger.error("Queued task {} seems gone".format(ti))
-                    session.delete(ti)
-                    session.commit()
-                    continue
 
-                if not task:
-                    continue
+            priority_sorted_task_instances = sorted(
+                task_instances, key=lambda ti: (-ti.priority_weight, 
ti.execution_date))
 
-                ti.task = task
+            # DAG IDs with running tasks that equal the concurrency limit of 
the dag
+            dag_id_to_running_task_count = {}
 
-                # picklin'
-                dag = dagbag.dags[ti.dag_id]
-                pickle_id = None
-                if self.do_pickle and self.executor.__class__ not in (
-                        executors.LocalExecutor,
-                        executors.SequentialExecutor):
-                    self.logger.info("Pickling DAG {}".format(dag))
-                    pickle_id = dag.pickle(session).id
+            for task_instance in priority_sorted_task_instances:
+                if open_slots <= 0:
+                    self.logger.info("No more slots free")
+                    # Can't schedule any more since there are no more open 
slots.
+                    break
 
-                if dag.dag_id in dag_blacklist:
-                    continue
-                if dag.concurrency_reached:
-                    dag_blacklist.add(dag.dag_id)
+                if simple_dag_bag.get_dag(task_instance.dag_id).is_paused:
+                    self.logger.info("Not executing queued {} since {} is 
paused"
+                                     .format(task_instance, 
task_instance.dag_id))
                     continue
-                if ti.are_dependencies_met():
-                    executor.queue_task_instance(ti, pickle_id=pickle_id)
-                    open_slots -= 1
-                else:
-                    session.delete(ti)
-                    session.commit()
+
+                # Check to make sure that the task concurrency of the DAG 
hasn't been
+                # reached.
+                dag_id = task_instance.dag_id
+
+                if dag_id not in dag_id_to_running_task_count:
+                    dag_id_to_running_task_count[dag_id] = \
+                        DagRun.get_running_tasks(
+                            session,
+                            dag_id,
+                            simple_dag_bag.get_dag(dag_id).task_ids)
+
+                current_task_concurrency = dag_id_to_running_task_count[dag_id]
+                task_concurrency_limit = 
simple_dag_bag.get_dag(dag_id).concurrency
+                self.logger.info("DAG {} has {}/{} running tasks"
+                                 .format(dag_id,
+                                         current_task_concurrency,
+                                         task_concurrency_limit))
+                if current_task_concurrency > task_concurrency_limit:
+                    self.logger.info("Not executing {} since the number "
+                                     "of tasks running from DAG {} is >= to 
the "
+                                     "DAG's task concurrency limit of {}"
+                                     .format(task_instance,
+                                             dag_id,
+                                             task_concurrency_limit))
                     continue
-                ti.task = task
 
+                command = TI.generate_command(
+                    task_instance.dag_id,
+                    task_instance.task_id,
+                    task_instance.execution_date,
+                    local=True,
+                    mark_success=False,
+                    force=False,
+                    ignore_dependencies=False,
+                    ignore_depends_on_past=False,
+                    pool=task_instance.pool,
+                    
file_path=simple_dag_bag.get_dag(task_instance.dag_id).full_filepath,
+                    
pickle_id=simple_dag_bag.get_dag(task_instance.dag_id).pickle_id)
+
+                priority = task_instance.priority_weight
+                queue = task_instance.queue
+                self.logger.info("Sending to executor {} with priority {} and 
queue {}"
+                                 .format(task_instance.key, priority, queue))
+
+                # Set the state to queued
+                self.logger.info("Setting state of {} to {}".format(
+                    task_instance.key, State.QUEUED))
+                task_instance.state = State.QUEUED
+                task_instance.queued_dttm = (datetime.now()
+                                             if not task_instance.queued_dttm
+                                             else task_instance.queued_dttm)
+                session.merge(task_instance)
                 session.commit()
 
-    def _split(self, items, size):
-        """
-        This function splits a list of items into chunks of int size.
-        _split([1,2,3,4,5,6], 3) becomes [[1,2,3],[4,5,6]]
-        """
-        size = max(1, size)
-        return [items[i:i + size] for i in range(0, len(items), size)]
+                # These attributes will be lost after the object expires, so 
save them.
+                task_id_ = task_instance.task_id
+                dag_id_ = task_instance.dag_id
+                execution_date_ = task_instance.execution_date
+                make_transient(task_instance)
+                task_instance.task_id = task_id_
+                task_instance.dag_id = dag_id_
+                task_instance.execution_date = execution_date_
+
+                self.executor.queue_command(
+                    task_instance,
+                    command,
+                    priority=priority,
+                    queue=queue)
 
-    def _do_dags(self, dagbag, dags, tis_out):
+                open_slots -= 1
+
+    def _process_dags(self, dagbag, dags, tis_out):
         """
-        Iterates over the dags and schedules and processes them
+        Iterates over the dags and processes them. Processing includes:
+
+        1. Create appropriate DagRun(s) in the DB.
+        2. Create appropriate TaskInstance(s) in the DB.
+        3. Send emails for tasks that have missed SLAs.
+
+        :param dagbag: a collection of DAGs to process
+        :type dagbag: models.DagBag
+        :param dags: the DAGs from the DagBag to process
+        :type dags: DAG
+        :param tis_out: A queue to add generated TaskInstance objects
+        :type tis_out: multiprocessing.Queue[TaskInstance]
+        :return: None
         """
         for dag in dags:
-            self.logger.debug("Scheduling {}".format(dag.dag_id))
             dag = dagbag.get_dag(dag.dag_id)
+            if dag.is_paused:
+                self.logger.info("Not processing DAG {} since it's paused"
+                                 .format(dag.dag_id))
+                continue
+
             if not dag:
+                self.logger.error("DAG ID {} was not found in the DagBag")
                 continue
-            try:
-                self.schedule_dag(dag)
-                self.process_dag(dag, tis_out)
-                self.manage_slas(dag)
-            except Exception as e:
-                self.logger.exception(e)
+
+            self.logger.info("Processing {}".format(dag.dag_id))
+
+            dag_run = self.create_dag_run(dag)
+            if dag_run:
+                self.logger.info("Created {}".format(dag_run))
+            self._process_task_instances(dag, tis_out)
+            self.manage_slas(dag)
+
+    def _process_executor_events(self):
+        """
+        Respond to executor events.
+
+        :param executor: the executor that's running the task instances
+        :type executor: BaseExecutor
+        :return: None
+        """
+        for key, executor_state in 
list(self.executor.get_event_buffer().items()):
+            dag_id, task_id, execution_date = key
+            self.logger.info("Executor reports {}.{} execution_date={} as {}"
+                             .format(dag_id,
+                                     task_id,
+                                     execution_date,
+                                     executor_state))
+
+    def _log_file_processing_stats(self,
+                                   known_file_paths,
+                                   processor_manager):
+        """
+        Print out stats about how files are getting processed.
+
+        :param known_file_paths: a list of file paths that may contain Airflow
+        DAG definitions
+        :type known_file_paths: list[unicode]
+        :param processor_manager: manager for the file processors
+        :type stats: DagFileProcessorManager
+        :return: None
+        """
+
+        # File Path: Path to the file containing the DAG definition
+        # PID: PID associated with the process that's processing the file. May
+        # be empty.
+        # Runtime: If the process is currently running, how long it's been
+        # running for in seconds.
+        # Last Runtime: If the process ran before, how long did it take to
+        # finish in seconds
+        # Last Run: When the file finished processing in the previous run.
+        headers = ["File Path",
+                   "PID",
+                   "Runtime",
+                   "Last Runtime",
+                   "Last Run"]
+
+        rows = []
+        for file_path in known_file_paths:
+            last_runtime = processor_manager.get_last_runtime(file_path)
+            processor_pid = processor_manager.get_pid(file_path)
+            processor_start_time = processor_manager.get_start_time(file_path)
+            runtime = ((datetime.now() - processor_start_time).total_seconds()
+                       if processor_start_time else None)
+            last_run = processor_manager.get_last_finish_time(file_path)
+
+            rows.append((file_path,
+                         processor_pid,
+                         runtime,
+                         last_runtime,
+                         last_run))
+
+        # Sort by longest last runtime. (Can't sort None values in python3)
+        rows = sorted(rows, key=lambda x: x[3] or 0.0)
+
+        formatted_rows = []
+        for file_path, pid, runtime, last_runtime, last_run in rows:
+            formatted_rows.append((file_path,
+                                   pid,
+                                   "{:.2f}s".format(runtime)
+                                   if runtime else None,
+                                   "{:.2f}s".format(last_runtime)
+                                   if last_runtime else None,
+                                   last_run.strftime("%Y-%m-%dT%H:%M:%S")
+                                   if last_run else None))
+        log_str = ("\n" +
+                   "=" * 80 +
+                   "\n" +
+                   "DAG File Processing Stats\n\n" +
+                   tabulate(formatted_rows, headers=headers) +
+                   "\n" +
+                   "=" * 80)
+
+        self.logger.info(log_str)
 
     @provide_session
     def _reset_state_for_orphaned_tasks(self, dag_run, session=None):
@@ -680,27 +1168,110 @@ class SchedulerJob(BaseJob):
 
         # also consider running as the state might not have changed in the db 
yet
         running = self.executor.running
-        tis = dag_run.get_task_instances(state=State.SCHEDULED, 
session=session)
+        tis = list()
+        tis.extend(dag_run.get_task_instances(state=State.SCHEDULED, 
session=session))
+        tis.extend(dag_run.get_task_instances(state=State.QUEUED, 
session=session))
+
         for ti in tis:
             if ti.key not in queued_tis and ti.key not in running:
-                ti.state = State.NONE
                 self.logger.debug("Rescheduling orphaned task {}".format(ti))
-
+                ti.state = State.NONE
         session.commit()
 
     def _execute(self):
-        session = settings.Session()
-        TI = models.TaskInstance
-
+        self.logger.info("Starting the scheduler")
         pessimistic_connection_handling()
 
         logging.basicConfig(level=logging.DEBUG)
-        self.logger.info("Starting the scheduler")
 
-        dagbag = models.DagBag(self.subdir, sync_to_db=True)
-        executor = self.executor = dagbag.executor
-        executor.start()
+        # DAGs can be pickled for easier remote execution by some executors
+        pickle_dags = False
+        if self.do_pickle and self.executor.__class__ not in \
+                (executors.LocalExecutor, executors.SequentialExecutor):
+            pickle_dags = True
+
+        # Use multiple processes to parse and generate tasks for the
+        # DAGs in parallel. By processing them in separate processes,
+        # we can get parallelism and isolation from potentially harmful
+        # user code.
+        self.logger.info("Processing files using up to {} processes at a time "
+                         .format(self.max_threads))
+        self.logger.info("Running execute loop for {} seconds"
+                         .format(self.run_duration))
+        self.logger.info("Processing each file at most {} times"
+                         .format(self.num_runs))
+        self.logger.info("Process each file at most once every {} seconds"
+                         .format(self.file_process_interval))
+        self.logger.info("Checking for new files in {} every {} seconds"
+                         .format(self.subdir, self.dag_dir_list_interval))
+
+        # Build up a list of Python files that could contain DAGs
+        self.logger.info("Searching for files in {}".format(self.subdir))
+        known_file_paths = list_py_file_paths(self.subdir)
+        self.logger.info("There are {} files in {}"
+                         .format(len(known_file_paths), self.subdir))
+
+        def processor_factory(file_path, log_file_path):
+            return DagFileProcessor(file_path,
+                                    pickle_dags,
+                                    self.dag_ids,
+                                    log_file_path)
+
+        processor_manager = DagFileProcessorManager(self.subdir,
+                                                    known_file_paths,
+                                                    self.max_threads,
+                                                    self.file_process_interval,
+                                                    
self.child_process_log_directory,
+                                                    self.num_runs,
+                                                    processor_factory)
 
+        try:
+            self._execute_helper(processor_manager)
+        finally:
+            self.logger.info("Exited execute loop")
+
+            # Kill all child processes on exit since we don't want to leave
+            # them as orphaned.
+            pids_to_kill = processor_manager.get_all_pids()
+            if len(pids_to_kill) > 0:
+                # First try SIGTERM
+                this_process = psutil.Process(os.getpid())
+                # Only check child processes to ensure that we don't have a 
case
+                # where we kill the wrong process because a child process died
+                # but the PID got reused.
+                child_processes = [x for x in 
this_process.children(recursive=True)
+                                   if x.is_running() and x.pid in pids_to_kill]
+                for child in child_processes:
+                    self.logger.info("Terminating child PID: 
{}".format(child.pid))
+                    child.terminate()
+                timeout = 5
+                self.logger.info("Waiting up to {}s for processes to exit..."
+                                 .format(timeout))
+                try:
+                    psutil.wait_procs(child_processes, timeout)
+                except psutil.TimeoutExpired:
+                    self.logger.debug("Ran out of time while waiting for "
+                                      "processes to exit")
+
+                # Then SIGKILL
+                child_processes = [x for x in 
this_process.children(recursive=True)
+                                   if x.is_running() and x.pid in pids_to_kill]
+                if len(child_processes) > 0:
+                    for child in child_processes:
+                        self.logger.info("Killing child PID: 
{}".format(child.pid))
+                        child.kill()
+                        child.wait()
+
+    def _execute_helper(self, processor_manager):
+        """
+        :param processor_manager: manager to use
+        :type processor_manager: DagFileProcessorManager
+        :return: None
+        """
+        self.executor.start()
+
+        session = settings.Session()
+        self.logger.info("Resetting state for orphaned tasks")
         # grab orphaned tasks and make sure to reset their state
         active_runs = DagRun.find(
             state=State.RUNNING,
@@ -708,103 +1279,250 @@ class SchedulerJob(BaseJob):
             session=session
         )
         for dr in active_runs:
+            self.logger.info("Resetting {} {}".format(dr.dag_id,
+                                                      dr.execution_date))
             self._reset_state_for_orphaned_tasks(dr, session=session)
 
-        self.runs = 0
-        while not self.num_runs or self.num_runs > self.runs:
-            try:
-                loop_start_dttm = datetime.now()
-                try:
-                    self.prioritize_queued(executor=executor, dagbag=dagbag)
-                except Exception as e:
-                    self.logger.exception(e)
+        self.logger.info("Removing old import errors")
+        self.clear_import_errors(session)
+        session.close()
 
-                self.runs += 1
-                try:
-                    if self.runs % self.refresh_dags_every == 0:
-                        dagbag = models.DagBag(self.subdir, sync_to_db=True)
-                    else:
-                        dagbag.collect_dags(only_if_updated=True)
-                except Exception as e:
-                    self.logger.error("Failed at reloading the dagbag. 
{}".format(e))
-                    Stats.incr('dag_refresh_error', 1, 1)
-                    sleep(5)
-
-                if len(self.dag_ids) > 0:
-                    dags = [dag for dag in dagbag.dags.values() if dag.dag_id 
in self.dag_ids]
-                else:
-                    dags = [
-                        dag for dag in dagbag.dags.values()
-                        if not dag.parent_dag]
-
-                paused_dag_ids = dagbag.paused_dags()
-                dags = [x for x in dags if x.dag_id not in paused_dag_ids]
-                # dags = filter(lambda x: x.dag_id not in paused_dag_ids, dags)
-
-                self.logger.debug("Total Cores: {} Max Threads: {} DAGs:{}".
-                                  format(multiprocessing.cpu_count(),
-                                         self.max_threads,
-                                         len(dags)))
-                dags = self._split(dags, math.ceil(len(dags) / 
self.max_threads))
-                tis_q = multiprocessing.Queue()
-                jobs = [multiprocessing.Process(target=self._do_dags,
-                                                args=(dagbag, dags[i], tis_q))
-                        for i in range(len(dags))]
-
-                self.logger.info("Starting {} scheduler 
jobs".format(len(jobs)))
-                for j in jobs:
-                    j.start()
-
-                while any(j.is_alive() for j in jobs):
-                    while not tis_q.empty():
-                        ti_key, pickle_id = tis_q.get()
-                        dag = dagbag.dags[ti_key[0]]
-                        task = dag.get_task(ti_key[1])
-                        ti = TI(task, ti_key[2])
-                        ti.refresh_from_db(session=session, 
lock_for_update=True)
-                        if ti.state == State.SCHEDULED:
-                            session.commit()
-                            self.logger.debug("Task {} was picked up by 
another scheduler"
-                                              .format(ti))
-                            continue
-                        elif ti.state is State.NONE:
-                            ti.state = State.SCHEDULED
-
-                        self.executor.queue_task_instance(ti, 
pickle_id=pickle_id)
+        execute_start_time = datetime.now()
+
+        # Last time stats were printed
+        last_stat_print_time = datetime(2000, 1, 1)
+        # Last time that self.heartbeat() was called.
+        last_self_heartbeat_time = datetime.now()
+        # Last time that the DAG dir was traversed to look for files
+        last_dag_dir_refresh_time = datetime.now()
+
+        # Use this value initially
+        known_file_paths = processor_manager.file_paths
+
+        # For the execute duration, parse and schedule DAGs
+        while (datetime.now() - execute_start_time).total_seconds() < \
+                self.run_duration:
+            self.logger.debug("Starting Loop...")
+            loop_start_time = time.time()
+
+            # Traverse the DAG directory for Python files containing DAGs
+            # periodically
+            elapsed_time_since_refresh = (datetime.now() -
+                                          
last_dag_dir_refresh_time).total_seconds()
+
+            if elapsed_time_since_refresh > self.dag_dir_list_interval:
+                # Build up a list of Python files that could contain DAGs
+                self.logger.info("Searching for files in 
{}".format(self.subdir))
+                known_file_paths = list_py_file_paths(self.subdir)
+                last_dag_dir_refresh_time = datetime.now()
+                self.logger.info("There are {} files in {}"
+                                 .format(len(known_file_paths), self.subdir))
+                processor_manager.set_file_paths(known_file_paths)
+
+            # Kick of new processes and collect results from finished ones
+            self.logger.info("Heartbeating the process manager")
+            simple_dags = processor_manager.heartbeat()
+
+            if self.using_sqlite:
+                # For the sqlite case w/ 1 thread, wait until the processor
+                # is finished to avoid concurrent access to the DB.
+                self.logger.debug("Waiting for processors to finish since 
we're "
+                                  "using sqlite")
+                processor_manager.wait_until_finished()
+
+            # Send tasks for execution if available
+            if len(simple_dags) > 0:
+                simple_dag_bag = SimpleDagBag(simple_dags)
+
+                # Handle cases where a DAG run state is set (perhaps manually) 
to
+                # a non-running state. Handle task instances that belong to
+                # DAG runs in those states
+
+                # If a task instance is up for retry but the corresponding DAG 
run
+                # isn't running, mark the task instance as FAILED so we don't 
try
+                # to re-run it.
+                self._change_state_for_tis_without_dagrun(simple_dag_bag,
+                                                          [State.UP_FOR_RETRY],
+                                                          State.FAILED)
+                # If a task instance is scheduled or queued, but the 
corresponding
+                # DAG run isn't running, set the state to NONE so we don't try 
to
+                # re-run it.
+                self._change_state_for_tis_without_dagrun(simple_dag_bag,
+                                                          [State.QUEUED,
+                                                           State.SCHEDULED],
+                                                          State.NONE)
+
+                self._execute_task_instances(simple_dag_bag,
+                                             (State.SCHEDULED,
+                                              State.UP_FOR_RETRY))
+
+            # Call hearbeats
+            self.logger.info("Heartbeating the executor")
+            self.executor.heartbeat()
+
+            # Process events from the executor
+            self._process_executor_events()
+
+            # Heartbeat the scheduler periodically
+            time_since_last_heartbeat = (datetime.now() -
+                                         
last_self_heartbeat_time).total_seconds()
+            if time_since_last_heartbeat > self.heartrate:
+                self.logger.info("Heartbeating the scheduler")
+                self.heartbeat()
+                last_self_heartbeat_time = datetime.now()
+
+            # Occasionally print out stats about how fast the files are 
getting processed
+            if ((datetime.now() - last_stat_print_time).total_seconds() >
+                    self.print_stats_interval):
+                if len(known_file_paths) > 0:
+                    self._log_file_processing_stats(known_file_paths,
+                                                    processor_manager)
+                last_stat_print_time = datetime.now()
+
+            loop_end_time = time.time()
+            self.logger.debug("Ran scheduling loop in {:.2f}s"
+                              .format(loop_end_time - loop_start_time))
+            self.logger.debug("Sleeping for {:.2f}s"
+                              .format(self._processor_poll_interval))
+            time.sleep(self._processor_poll_interval)
+
+            # Exit early for a test mode
+            if processor_manager.max_runs_reached():
+                self.logger.info("Exiting loop as all files have been 
processed "
+                                 "{} times".format(self.num_runs))
+                break
+
+        # Stop any processors
+        processor_manager.terminate()
+
+        # Verify that all files were processed, and if so, deactivate DAGs that
+        # haven't been touched by the scheduler as they likely have been
+        # deleted.
+        all_files_processed = True
+        for file_path in known_file_paths:
+            if processor_manager.get_last_finish_time(file_path) is None:
+                all_files_processed = False
+                break
+        if all_files_processed:
+            self.logger.info("Deactivating DAGs that haven't been touched 
since {}"
+                             .format(execute_start_time.isoformat()))
+            models.DAG.deactivate_stale_dags(execute_start_time)
+
+        self.executor.end()
+
+        settings.Session.remove()
 
-                        session.merge(ti)
-                        session.commit()
+    @provide_session
+    def process_file(self, file_path, pickle_dags=False, session=None):
+        """
+        Process a Python file containing Airflow DAGs.
+
+        This includes:
+
+        1. Execute the file and look for DAG objects in the namespace.
+        2. Pickle the DAG and save it to the DB (if necessary).
+        3. For each DAG, see what tasks should run and create appropriate task
+        instances in the DB.
+        4. Record any errors importing the file into ORM
+        5. Kill (in ORM) any task instances belonging to the DAGs that haven't
+        issued a heartbeat in a while.
+
+        Returns a list of SimpleDag objects that represent the DAGs found in
+        the file
+
+        :param file_path: the path to the Python file that should be executed
+        :type file_path: unicode
+        :param pickle_dags: whether serialize the DAGs found in the file and
+        save them to the db
+        :type pickle_dags: bool
+        :return: a list of SimpleDags made from the Dags found in the file
+        :rtype: list[SimpleDag]
+        """
+        self.logger.info("Processing file {} for tasks to 
queue".format(file_path))
+        # As DAGs are parsed from this file, they will be converted into 
SimpleDags
+        simple_dags = []
+
+        try:
+            dagbag = models.DagBag(file_path)
+        except Exception:
+            self.logger.exception("Failed at reloading the DAG file 
{}".format(file_path))
+            Stats.incr('dag_file_refresh_error', 1, 1)
+            return []
+
+        if len(dagbag.dags) > 0:
+            self.logger.info("DAG(s) {} retrieved from {}"
+                             .format(dagbag.dags.keys(),
+                                     file_path))
+        else:
+            self.logger.warn("No viable dags retrieved from 
{}".format(file_path))
+            return []
+
+        # Save individual DAGs in the ORM and update 
DagModel.last_scheduled_time
+        sync_time = datetime.now()
+        for dag in dagbag.dags.values():
+            models.DAG.sync_to_db(dag, dag.owner, sync_time)
+
+        paused_dag_ids = [dag.dag_id for dag in dagbag.dags.values()
+                          if dag.is_paused]
+
+        # Pickle the DAGs (if necessary) and put them into a SimpleDag
+        for dag_id in dagbag.dags:
+            dag = dagbag.get_dag(dag_id)
+            pickle_id = None
+            if pickle_dags:
+                pickle_id = dag.pickle(session).id
+
+            task_ids = [task.task_id for task in dag.tasks]
+
+            # Only return DAGs that are not paused
+            if dag_id not in paused_dag_ids:
+                simple_dags.append(SimpleDag(dag.dag_id,
+                                             task_ids,
+                                             dag.full_filepath,
+                                             dag.concurrency,
+                                             dag.is_paused,
+                                             pickle_id))
+
+        if len(self.dag_ids) > 0:
+            dags = [dag for dag in dagbag.dags.values()
+                    if dag.dag_id in self.dag_ids and
+                    dag.dag_id not in paused_dag_ids]
+        else:
+            dags = [dag for dag in dagbag.dags.values()
+                    if not dag.parent_dag and
+                    dag.dag_id not in paused_dag_ids]
+
+        # Not using multiprocessing.Queue() since it's no longer a separate
+        # process and due to some unusual behavior. (empty() incorrectly
+        # returns true?)
+        ti_keys_to_schedule = []
+
+        self._process_dags(dagbag, dags, ti_keys_to_schedule)
+
+        for ti_key in ti_keys_to_schedule:
+            dag = dagbag.dags[ti_key[0]]
+            task = dag.get_task(ti_key[1])
+            ti = models.TaskInstance(task, ti_key[2])
+            # Task starts out in the scheduled state. All tasks in the
+            # scheduled state will be sent to the executor
+            ti.state = State.SCHEDULED
+
+            # Also save this task instance to the DB.
+            self.logger.info("Creating / updating {} in ORM".format(ti))
+            session.merge(ti)
+            session.commit()
 
-                for j in jobs:
-                    j.join()
+        # Record import errors into the ORM
+        try:
+            self.record_import_errors(session, dagbag)
+        except Exception:
+            self.logger.exception("Error logging import errors!")
+        try:
+            dagbag.kill_zombies()
+        except Exception:
+            self.logger.exception("Error killing zombies!")
 
-                self.logger.info("Done queuing tasks, calling the executor's "
-                              "heartbeat")
-                duration_sec = (datetime.now() - 
loop_start_dttm).total_seconds()
-                self.logger.info("Loop took: {} seconds".format(duration_sec))
-                Stats.timing("scheduler_loop", duration_sec * 1000)
-                try:
-                    self.import_errors(dagbag)
-                except Exception as e:
-                    self.logger.exception(e)
-                try:
-                    dagbag.kill_zombies()
-                except Exception as e:
-                    self.logger.exception(e)
-                try:
-                    # We really just want the scheduler to never ever stop.
-                    executor.heartbeat()
-                    self.heartbeat()
-                except Exception as e:
-                    self.logger.exception(e)
-                    self.logger.error("Tachycardia!")
-            except Exception as deep_e:
-                self.logger.exception(deep_e)
-                raise
-            finally:
-                settings.Session.remove()
-        executor.end()
-        session.close()
+        return simple_dags
 
     @provide_session
     def heartbeat_callback(self, session=None):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/models.py
----------------------------------------------------------------------
diff --git a/airflow/models.py b/airflow/models.py
index f589b3e..8326a92 100644
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -59,6 +59,7 @@ from airflow import settings, utils
 from airflow.executors import DEFAULT_EXECUTOR, LocalExecutor
 from airflow import configuration
 from airflow.exceptions import AirflowException, AirflowSkipException
+from airflow.dag.base_dag import BaseDag, BaseDagBag
 from airflow.utils.dates import cron_presets, date_range as utils_date_range
 from airflow.utils.db import provide_session
 from airflow.utils.decorators import apply_defaults
@@ -129,7 +130,7 @@ def clear_task_instances(tis, session, 
activate_dag_runs=True):
             dr.start_date = datetime.now()
 
 
-class DagBag(LoggingMixin):
+class DagBag(BaseDagBag, LoggingMixin):
     """
     A dagbag is a collection of dags, parsed out of a folder tree and has high
     level configuration settings, like what database to use as a backend and
@@ -140,7 +141,7 @@ class DagBag(LoggingMixin):
     independent settings sets.
 
     :param dag_folder: the folder to scan to find DAGs
-    :type dag_folder: str
+    :type dag_folder: unicode
     :param executor: the executor to use when executing task instances
         in this DagBag
     :param include_examples: whether to include the examples that ship
@@ -155,26 +156,23 @@ class DagBag(LoggingMixin):
             self,
             dag_folder=None,
             executor=DEFAULT_EXECUTOR,
-            include_examples=configuration.getboolean('core', 'LOAD_EXAMPLES'),
-            sync_to_db=False):
+            include_examples=configuration.getboolean('core', 
'LOAD_EXAMPLES')):
 
         dag_folder = dag_folder or DAGS_FOLDER
         self.logger.info("Filling up the DagBag from {}".format(dag_folder))
         self.dag_folder = dag_folder
         self.dags = {}
-        self.sync_to_db = sync_to_db
         # the file's last modified timestamp when we last read it
         self.file_last_changed = {}
         self.executor = executor
         self.import_errors = {}
+
         if include_examples:
             example_dag_folder = os.path.join(
                 os.path.dirname(__file__),
                 'example_dags')
             self.collect_dags(example_dag_folder)
         self.collect_dags(dag_folder)
-        if sync_to_db:
-            self.deactivate_inactive_dags()
 
     def size(self):
         """
@@ -307,7 +305,7 @@ class DagBag(LoggingMixin):
         return found_dags
 
     @provide_session
-    def kill_zombies(self, session):
+    def kill_zombies(self, session=None):
         """
         Fails tasks that haven't had a heartbeat in too long
         """
@@ -355,20 +353,6 @@ class DagBag(LoggingMixin):
         for task in dag.tasks:
             settings.policy(task)
 
-        if self.sync_to_db:
-            session = settings.Session()
-            orm_dag = session.query(
-                DagModel).filter(DagModel.dag_id == dag.dag_id).first()
-            if not orm_dag:
-                orm_dag = DagModel(dag_id=dag.dag_id)
-            orm_dag.fileloc = root_dag.full_filepath
-            orm_dag.is_subdag = dag.is_subdag
-            orm_dag.owners = root_dag.owner
-            orm_dag.is_active = True
-            session.merge(orm_dag)
-            session.commit()
-            session.close()
-
         for subdag in dag.subdags:
             subdag.full_filepath = dag.full_filepath
             subdag.parent_dag = dag
@@ -756,8 +740,77 @@ class TaskInstance(Base):
         the orchestrator.
         """
         dag = self.task.dag
-        iso = self.execution_date.isoformat()
-        cmd = "airflow run {self.dag_id} {self.task_id} {iso} "
+
+        # Keeping existing logic, but not entirely sure why this is here.
+        if not pickle_id and dag:
+            if dag.full_filepath != dag.filepath:
+                path = "DAGS_FOLDER/{}".format(dag.filepath)
+            elif dag.full_filepath:
+                path = dag.full_filepath
+
+        return TaskInstance.generate_command(
+            self.dag_id,
+            self.task_id,
+            self.execution_date,
+            mark_success=mark_success,
+            ignore_dependencies=ignore_dependencies,
+            ignore_depends_on_past=ignore_depends_on_past,
+            force=force,
+            local=local,
+            pickle_id=pickle_id,
+            file_path=path,
+            raw=raw,
+            job_id=job_id,
+            pool=pool)
+
+    @staticmethod
+    def generate_command(dag_id,
+                         task_id,
+                         execution_date,
+                         mark_success=False,
+                         ignore_dependencies=False,
+                         ignore_depends_on_past=False,
+                         force=False,
+                         local=False,
+                         pickle_id=None,
+                         file_path=None,
+                         raw=False,
+                         job_id=None,
+                         pool=None
+                         ):
+        """
+        Generates the shell command required to execute this task instance.
+
+        :param dag_id: DAG ID
+        :type dag_id: unicode
+        :param task_id: Task ID
+        :type task_id: unicode
+        :param execution_date: Execution date for the task
+        :type execution_date: datetime
+        :param mark_success: Whether to mark the task as successful
+        :type mark_success: bool
+        :param ignore_dependencies: Whether to ignore the dependencies and run
+        anyway
+        :type ignore_dependencies: bool
+        :param ignore_depends_on_past: Whether to ignore the depends on past
+        setting and run anyway
+        :type ignore_depends_on_past: bool
+        :param force: Whether to force running - see TaskInstance.run()
+        :type force: bool
+        :param local: Whether to run the task locally
+        :type local: bool
+        :param pickle_id: If the DAG was serialized to the DB, the ID
+        associated with the pickled DAG
+        :type pickle_id: unicode
+        :param file_path: path to the file containing the DAG definition
+        :param raw: raw mode (needs more details)
+        :param job_id: job ID (needs more details)
+        :param pool: the Airflow pool that the task should run in
+        :type pool: unicode
+        :return: shell command that can be used to run the task instance
+        """
+        iso = execution_date.isoformat()
+        cmd = "airflow run {dag_id} {task_id} {iso} "
         cmd += "--mark_success " if mark_success else ""
         cmd += "--pickle {pickle_id} " if pickle_id else ""
         cmd += "--job_id {job_id} " if job_id else ""
@@ -767,11 +820,7 @@ class TaskInstance(Base):
         cmd += "--local " if local else ""
         cmd += "--pool {pool} " if pool else ""
         cmd += "--raw " if raw else ""
-        if not pickle_id and dag:
-            if dag.full_filepath != dag.filepath:
-                cmd += "-sd DAGS_FOLDER/{dag.filepath} "
-            elif dag.full_filepath:
-                cmd += "-sd {dag.full_filepath}"
+        cmd += "-sd {file_path}"
         return cmd.format(**locals())
 
     @property
@@ -1185,9 +1234,7 @@ class TaskInstance(Base):
             .first()
         )
         if not pool:
-            raise ValueError(
-                "Task specified a pool ({}) but the pool "
-                "doesn't exist!".format(self.task.pool))
+            return False
         open_slots = pool.open_slots(session=session)
 
         return open_slots <= 0
@@ -2469,7 +2516,7 @@ class DagModel(Base):
 
 
 @functools.total_ordering
-class DAG(LoggingMixin):
+class DAG(BaseDag, LoggingMixin):
     """
     A dag (directed acyclic graph) is a collection of tasks with directional
     dependencies. A dag also has a schedule, a start end an end date
@@ -2557,8 +2604,14 @@ class DAG(LoggingMixin):
             del self.default_args['params']
 
         validate_key(dag_id)
+
+        # Properties from BaseDag
+        self._dag_id = dag_id
+        self._full_filepath = full_filepath if full_filepath else ''
+        self._concurrency = concurrency
+        self._pickle_id = None
+
         self.task_dict = dict()
-        self.dag_id = dag_id
         self.start_date = start_date
         self.end_date = end_date
         self.schedule_interval = schedule_interval
@@ -2568,14 +2621,12 @@ class DAG(LoggingMixin):
             self._schedule_interval = None
         else:
             self._schedule_interval = schedule_interval
-        self.full_filepath = full_filepath if full_filepath else ''
         if isinstance(template_searchpath, six.string_types):
             template_searchpath = [template_searchpath]
         self.template_searchpath = template_searchpath
         self.parent_dag = None  # Gets set when DAGs are loaded
         self.last_loaded = datetime.now()
         self.safe_dag_id = dag_id.replace('.', '__dot__')
-        self.concurrency = concurrency
         self.max_active_runs = max_active_runs
         self.dagrun_timeout = dagrun_timeout
         self.sla_miss_callback = sla_miss_callback
@@ -2597,7 +2648,9 @@ class DAG(LoggingMixin):
     def __eq__(self, other):
         return (
             type(self) == type(other) and
-            all(self.__dict__.get(c, None) == other.__dict__.get(c, None)
+            # Use getattr() instead of __dict__ as __dict__ doesn't return
+            # correct values for properties.
+            all(getattr(self, c, None) == getattr(other, c, None)
                 for c in self._comps))
 
     def __ne__(self, other):
@@ -2672,6 +2725,38 @@ class DAG(LoggingMixin):
         return dttm
 
     @property
+    def dag_id(self):
+        return self._dag_id
+
+    @dag_id.setter
+    def dag_id(self, value):
+        self._dag_id = value
+
+    @property
+    def full_filepath(self):
+        return self._full_filepath
+
+    @full_filepath.setter
+    def full_filepath(self, value):
+        self._full_filepath = value
+
+    @property
+    def concurrency(self):
+        return self._concurrency
+
+    @concurrency.setter
+    def concurrency(self, value):
+        self._concurrency = value
+
+    @property
+    def pickle_id(self):
+        return self._pickle_id
+
+    @pickle_id.setter
+    def pickle_id(self, value):
+        self._pickle_id = value
+
+    @property
     def tasks(self):
         return list(self.task_dict.values())
 
@@ -3134,6 +3219,80 @@ class DAG(LoggingMixin):
         run.refresh_from_db()
         return run
 
+    @staticmethod
+    @provide_session
+    def sync_to_db(dag, owner, sync_time, session=None):
+        """
+        Save attributes about this DAG to the DB. Note that this method
+        can be called for both DAGs and SubDAGs. A SubDag is actually a
+        SubDagOperator.
+
+        :param dag: the DAG object to save to the DB
+        :type dag: DAG
+        :own
+        :param sync_time: The time that the DAG should be marked as sync'ed
+        :type sync_time: datetime
+        :return: None
+        """
+        orm_dag = session.query(
+            DagModel).filter(DagModel.dag_id == dag.dag_id).first()
+        if not orm_dag:
+            orm_dag = DagModel(dag_id=dag.dag_id)
+            logging.info("Creating ORM DAG for %s",
+                         dag.dag_id)
+        orm_dag.fileloc = dag.full_filepath
+        orm_dag.is_subdag = dag.is_subdag
+        orm_dag.owners = owner
+        orm_dag.is_active = True
+        orm_dag.last_scheduler_run = sync_time
+        session.merge(orm_dag)
+        session.commit()
+
+        for subdag in dag.subdags:
+            DAG.sync_to_db(subdag, owner, sync_time, session=session)
+
+    @staticmethod
+    @provide_session
+    def deactivate_unknown_dags(active_dag_ids, session=None):
+        """
+        Given a list of known DAGs, deactivate any other DAGs that are
+        marked as active in the ORM
+
+        :param active_dag_ids: list of DAG IDs that are active
+        :type active_dag_ids: list[unicode]
+        :return: None
+        """
+
+        if len(active_dag_ids) == 0:
+            return
+        for dag in session.query(
+                DagModel).filter(~DagModel.dag_id.in_(active_dag_ids)).all():
+            dag.is_active = False
+            session.merge(dag)
+
+    @staticmethod
+    @provide_session
+    def deactivate_stale_dags(expiration_date, session=None):
+        """
+        Deactivate any DAGs that were last touched by the scheduler before
+        the expiration date. These DAGs were likely deleted.
+
+        :param expiration_date: set inactive DAGs that were touched before this
+        time
+        :type expiration_date: datetime
+        :return: None
+        """
+        for dag in session.query(
+                DagModel).filter(DagModel.last_scheduler_run < expiration_date,
+                                 DagModel.is_active).all():
+            logging.info("Deactivating DAG ID %s since it was last touched "
+                         "by the scheduler at %s",
+                         dag.dag_id,
+                         dag.last_scheduler_run.isoformat())
+            dag.is_active = False
+            session.merge(dag)
+            session.commit()
+
 
 class Chart(Base):
     __tablename__ = "chart"
@@ -3567,7 +3726,7 @@ class DagRun(Base):
             state=State.unfinished(),
             session=session
         )
-        none_depends_on_past = all(t.task.depends_on_past for t in 
unfinished_tasks)
+        none_depends_on_past = all(not t.task.depends_on_past for t in 
unfinished_tasks)
 
         # small speed up
         if unfinished_tasks and none_depends_on_past:
@@ -3642,6 +3801,44 @@ class DagRun(Base):
 
         session.commit()
 
+    @staticmethod
+    def get_running_tasks(session, dag_id, task_ids):
+        """
+        Returns the number of tasks running in the given DAG.
+
+        :param session: ORM session
+        :param dag_id: ID of the DAG to get the task concurrency of
+        :type dag_id: unicode
+        :param task_ids: A list of valid task IDs for the given DAG
+        :type task_ids: list[unicode]
+        :return: The number of running tasks
+        :rtype: int
+        """
+        qry = session.query(func.count(TaskInstance.task_id)).filter(
+            TaskInstance.dag_id == dag_id,
+            TaskInstance.task_id.in_(task_ids),
+            TaskInstance.state == State.RUNNING,
+        )
+        return qry.scalar()
+
+    @staticmethod
+    def get_run(session, dag_id, execution_date):
+        """
+        :param dag_id: DAG ID
+        :type dag_id: unicode
+        :param execution_date: execution date
+        :type execution_date: datetime
+        :return: DagRun corresponding to the given dag_id and execution date
+        if one exists. None otherwise.
+        :rtype: DagRun
+        """
+        qry = session.query(DagRun).filter(
+            DagRun.dag_id == dag_id,
+            DagRun.external_trigger == False,
+            DagRun.execution_date == execution_date,
+        )
+        return qry.first()
+
 
 class Pool(Base):
     __tablename__ = "slot_pool"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/fdb7e949/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 9f9bb14..ccd77ee 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -72,20 +72,11 @@ SQL_ALCHEMY_CONN = conf.get('core', 'SQL_ALCHEMY_CONN')
 LOGGING_LEVEL = logging.INFO
 DAGS_FOLDER = os.path.expanduser(conf.get('core', 'DAGS_FOLDER'))
 
-engine_args = {}
-if 'sqlite' not in SQL_ALCHEMY_CONN:
-    # Engine args not supported by sqlite
-    engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
-    engine_args['pool_recycle'] = conf.getint('core',
-                                              'SQL_ALCHEMY_POOL_RECYCLE')
-
-engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
-Session = scoped_session(
-    sessionmaker(autocommit=False, autoflush=False, bind=engine))
-
 # can't move this to conf due to ConfigParser interpolation
 LOG_FORMAT = (
     '[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s')
+LOG_FORMAT_WITH_THREAD_NAME = (
+    '[%(asctime)s] {%(filename)s:%(lineno)d} %(threadName)s %(levelname)s - 
%(message)s')
 SIMPLE_LOG_FORMAT = '%(asctime)s %(levelname)s - %(message)s'
 
 
@@ -116,10 +107,28 @@ def policy(task_instance):
     pass
 
 
-def configure_logging():
+def configure_logging(log_format=LOG_FORMAT):
     logging.root.handlers = []
     logging.basicConfig(
-        format=LOG_FORMAT, stream=sys.stdout, level=LOGGING_LEVEL)
+        format=log_format, stream=sys.stdout, level=LOGGING_LEVEL)
+
+engine = None
+Session = None
+
+
+def configure_orm():
+    global engine
+    global Session
+    engine_args = {}
+    if 'sqlite' not in SQL_ALCHEMY_CONN:
+        # Engine args not supported by sqlite
+        engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
+        engine_args['pool_recycle'] = conf.getint('core',
+                                                  'SQL_ALCHEMY_POOL_RECYCLE')
+
+    engine = create_engine(SQL_ALCHEMY_CONN, **engine_args)
+    Session = scoped_session(
+        sessionmaker(autocommit=False, autoflush=False, bind=engine))
 
 try:
     from airflow_local_settings import *
@@ -128,3 +137,4 @@ except:
     pass
 
 configure_logging()
+configure_orm()


Reply via email to