Repository: incubator-airflow
Updated Branches:
  refs/heads/master 15ff540ec -> 67ab416db


[AIRFLOW-680] Disable connection pool for commands

This is a continuation of Max's PR here:
https://github.com/apache/incubator-
airflow/pull/1021
We have seen a very substantial DB cpu usage
decrease from this PR (~10x).

This PR was originally created by plypaul I am
just cherrypicking onto master for him.

Testing Done:
- Has been running in Airbnb production for quite
some time (off of a different merge base though)

Closes #1925 from aoen/ddavydov/reduce_db_sessions


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67ab416d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67ab416d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67ab416d

Branch: refs/heads/master
Commit: 67ab416dbc6f61095e6e102f44b3825fc482b6c5
Parents: 15ff540
Author: Paul Yang <paul.y...@airbnb.com>
Authored: Mon Dec 12 15:32:25 2016 -0800
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Mon Dec 12 15:32:29 2016 -0800

----------------------------------------------------------------------
 airflow/bin/cli.py  |  3 +++
 airflow/jobs.py     | 22 ++++++++++++++--------
 airflow/settings.py |  7 +++++--
 3 files changed, 22 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/bin/cli.py
----------------------------------------------------------------------
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 21e1d23..eab4a30 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -321,6 +321,9 @@ def set_is_paused(is_paused, args, dag=None):
 
 
 def run(args, dag=None):
+    # Disable connection pooling to reduce the # of connections on the DB
+    # while it's waiting for the task to finish.
+    settings.configure_orm(disable_connection_pool=True)
     db_utils.pessimistic_connection_handling()
     if dag:
         args.dag_id = dag.dag_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/jobs.py
----------------------------------------------------------------------
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 229424e..a2d94e3 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -165,15 +165,21 @@ class BaseJob(Base, LoggingMixin):
         if job.state == State.SHUTDOWN:
             self.kill()
 
+        # Figure out how long to sleep for
+        sleep_for = 0
         if job.latest_heartbeat:
-            sleep_for = self.heartrate - (
-                datetime.now() - job.latest_heartbeat).total_seconds()
-            if sleep_for > 0:
-                sleep(sleep_for)
+            sleep_for = max(
+                0,
+                self.heartrate - (datetime.now() - 
job.latest_heartbeat).total_seconds())
 
-        job.latest_heartbeat = datetime.now()
+        # Don't keep session open while sleeping as it leaves a connection open
+        session.close()
+        sleep(sleep_for)
 
+        # Update last heartbeat time
         session = settings.Session()
+        job = session.query(BaseJob).filter(BaseJob.id == self.id).first()
+        job.latest_heartbeat = datetime.now()
         session.merge(job)
         session.commit()
 
@@ -182,7 +188,7 @@ class BaseJob(Base, LoggingMixin):
         self.logger.debug('[heart] Boom.')
 
     def run(self):
-        Stats.incr(self.__class__.__name__.lower()+'_start', 1, 1)
+        Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1)
         # Adding an entry in the DB
         session = settings.Session()
         self.state = State.RUNNING
@@ -202,7 +208,7 @@ class BaseJob(Base, LoggingMixin):
         session.commit()
         session.close()
 
-        Stats.incr(self.__class__.__name__.lower()+'_end', 1, 1)
+        Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1)
 
     def _execute(self):
         raise NotImplementedError("This method needs to be overridden")
@@ -714,7 +720,7 @@ class SchedulerJob(BaseJob):
                 .filter(or_(
                     DagRun.external_trigger == False,
                     # add % as a wildcard for the like query
-                    DagRun.run_id.like(DagRun.ID_PREFIX+'%')
+                    DagRun.run_id.like(DagRun.ID_PREFIX + '%')
                 ))
             )
             last_scheduled_run = qry.scalar()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/settings.py
----------------------------------------------------------------------
diff --git a/airflow/settings.py b/airflow/settings.py
index 60efb4a..e8da674 100644
--- a/airflow/settings.py
+++ b/airflow/settings.py
@@ -23,6 +23,7 @@ import sys
 
 from sqlalchemy import create_engine
 from sqlalchemy.orm import scoped_session, sessionmaker
+from sqlalchemy.pool import NullPool
 
 from airflow import configuration as conf
 
@@ -121,11 +122,13 @@ engine = None
 Session = None
 
 
-def configure_orm():
+def configure_orm(disable_connection_pool=False):
     global engine
     global Session
     engine_args = {}
-    if 'sqlite' not in SQL_ALCHEMY_CONN:
+    if disable_connection_pool:
+        engine_args['poolclass'] = NullPool
+    elif 'sqlite' not in SQL_ALCHEMY_CONN:
         # Engine args not supported by sqlite
         engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE')
         engine_args['pool_recycle'] = conf.getint('core',

Reply via email to