Repository: incubator-airflow Updated Branches: refs/heads/master 15ff540ec -> 67ab416db
[AIRFLOW-680] Disable connection pool for commands This is a continuation of Max's PR here: https://github.com/apache/incubator- airflow/pull/1021 We have seen a very substantial DB cpu usage decrease from this PR (~10x). This PR was originally created by plypaul I am just cherrypicking onto master for him. Testing Done: - Has been running in Airbnb production for quite some time (off of a different merge base though) Closes #1925 from aoen/ddavydov/reduce_db_sessions Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67ab416d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67ab416d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67ab416d Branch: refs/heads/master Commit: 67ab416dbc6f61095e6e102f44b3825fc482b6c5 Parents: 15ff540 Author: Paul Yang <paul.y...@airbnb.com> Authored: Mon Dec 12 15:32:25 2016 -0800 Committer: Dan Davydov <dan.davy...@airbnb.com> Committed: Mon Dec 12 15:32:29 2016 -0800 ---------------------------------------------------------------------- airflow/bin/cli.py | 3 +++ airflow/jobs.py | 22 ++++++++++++++-------- airflow/settings.py | 7 +++++-- 3 files changed, 22 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/bin/cli.py ---------------------------------------------------------------------- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 21e1d23..eab4a30 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -321,6 +321,9 @@ def set_is_paused(is_paused, args, dag=None): def run(args, dag=None): + # Disable connection pooling to reduce the # of connections on the DB + # while it's waiting for the task to finish. + settings.configure_orm(disable_connection_pool=True) db_utils.pessimistic_connection_handling() if dag: args.dag_id = dag.dag_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/jobs.py ---------------------------------------------------------------------- diff --git a/airflow/jobs.py b/airflow/jobs.py index 229424e..a2d94e3 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -165,15 +165,21 @@ class BaseJob(Base, LoggingMixin): if job.state == State.SHUTDOWN: self.kill() + # Figure out how long to sleep for + sleep_for = 0 if job.latest_heartbeat: - sleep_for = self.heartrate - ( - datetime.now() - job.latest_heartbeat).total_seconds() - if sleep_for > 0: - sleep(sleep_for) + sleep_for = max( + 0, + self.heartrate - (datetime.now() - job.latest_heartbeat).total_seconds()) - job.latest_heartbeat = datetime.now() + # Don't keep session open while sleeping as it leaves a connection open + session.close() + sleep(sleep_for) + # Update last heartbeat time session = settings.Session() + job = session.query(BaseJob).filter(BaseJob.id == self.id).first() + job.latest_heartbeat = datetime.now() session.merge(job) session.commit() @@ -182,7 +188,7 @@ class BaseJob(Base, LoggingMixin): self.logger.debug('[heart] Boom.') def run(self): - Stats.incr(self.__class__.__name__.lower()+'_start', 1, 1) + Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1) # Adding an entry in the DB session = settings.Session() self.state = State.RUNNING @@ -202,7 +208,7 @@ class BaseJob(Base, LoggingMixin): session.commit() session.close() - Stats.incr(self.__class__.__name__.lower()+'_end', 1, 1) + Stats.incr(self.__class__.__name__.lower() + '_end', 1, 1) def _execute(self): raise NotImplementedError("This method needs to be overridden") @@ -714,7 +720,7 @@ class SchedulerJob(BaseJob): .filter(or_( DagRun.external_trigger == False, # add % as a wildcard for the like query - DagRun.run_id.like(DagRun.ID_PREFIX+'%') + DagRun.run_id.like(DagRun.ID_PREFIX + '%') )) ) last_scheduled_run = qry.scalar() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67ab416d/airflow/settings.py ---------------------------------------------------------------------- diff --git a/airflow/settings.py b/airflow/settings.py index 60efb4a..e8da674 100644 --- a/airflow/settings.py +++ b/airflow/settings.py @@ -23,6 +23,7 @@ import sys from sqlalchemy import create_engine from sqlalchemy.orm import scoped_session, sessionmaker +from sqlalchemy.pool import NullPool from airflow import configuration as conf @@ -121,11 +122,13 @@ engine = None Session = None -def configure_orm(): +def configure_orm(disable_connection_pool=False): global engine global Session engine_args = {} - if 'sqlite' not in SQL_ALCHEMY_CONN: + if disable_connection_pool: + engine_args['poolclass'] = NullPool + elif 'sqlite' not in SQL_ALCHEMY_CONN: # Engine args not supported by sqlite engine_args['pool_size'] = conf.getint('core', 'SQL_ALCHEMY_POOL_SIZE') engine_args['pool_recycle'] = conf.getint('core',