Repository: incubator-airflow
Updated Branches:
  refs/heads/master b0669b532 -> 0d0cc62f4


[AIRFLOW-1452] workaround lock on method

Workaround lock on method "has_table" in case
mssql is used
as storage engine.

Closes #2514 from patsak/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d0cc62f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d0cc62f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d0cc62f

Branch: refs/heads/master
Commit: 0d0cc62f49525166bc877606affa5a623ba52c4d
Parents: b0669b5
Author: k.privezentsev <konstantin.privezent...@kaspersky.com>
Authored: Fri Aug 11 11:47:35 2017 -0700
Committer: Dan Davydov <dan.davy...@airbnb.com>
Committed: Fri Aug 11 11:47:42 2017 -0700

----------------------------------------------------------------------
 .../cc1e65623dc7_add_max_tries_column_to_task_instance.py   | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0d0cc62f/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
----------------------------------------------------------------------
diff --git 
a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
index 2d5ffc2..b151e0c 100644
--- 
a/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
+++ 
b/airflow/migrations/versions/cc1e65623dc7_add_max_tries_column_to_task_instance.py
@@ -29,6 +29,7 @@ from alembic import op
 import sqlalchemy as sa
 from airflow import settings
 from airflow.models import DagBag, TaskInstance
+from sqlalchemy.engine.reflection import Inspector
 
 BATCH_SIZE = 5000
 
@@ -39,10 +40,12 @@ def upgrade():
     # needed for database that does not create table until migration finishes.
     # Checking task_instance table exists prevent the error of querying
     # non-existing task_instance table.
-    engine = settings.engine
-    if engine.dialect.has_table(engine, 'task_instance'):
+    connection = op.get_bind()
+    inspector = Inspector.from_engine(connection)
+    tables = inspector.get_table_names()
+
+    if 'task_instance' in tables:
         # Get current session
-        connection = op.get_bind()
         sessionmaker = sa.orm.sessionmaker()
         session = sessionmaker(bind=connection)
         dagbag = DagBag(settings.DAGS_FOLDER)

Reply via email to