[02/14] incubator-airflow git commit: [AIRFLOW-780] Fix dag import errors no longer working

2017-02-01 Thread bolke
[AIRFLOW-780] Fix dag import errors no longer working

The import errors were no longer working after the
multiprocessor update
(since they are cleared after each DAG directory
is parsed). This change
fixes them, and adds tests to prevent future
regressions.

Also fix a couple of linter errors.

Note that there are a few inefficiencies (e.g.
sometimes we delete then add import errors in the
same place instead of just doing an update), but
this is equivalent to the old behavior.

Testing Done:
- Added missing unit tests for dag imports. Note
that some of them strangely fail for python 3 and
it became too time consuming to debug since I
don't have a copy of the travis environment, I
even ran with the same version of python locally
and couldn't reproduce. I have skipped those 3
tests in python 3 for now.

Closes #2018 from aoen/fix_parse_errors_not_displa


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67cbb966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67cbb966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67cbb966

Branch: refs/heads/v1-8-test
Commit: 67cbb966410226c1489bb730af3af45330fc51b9
Parents: dc97bcd
Author: Dan Davydov 
Authored: Fri Jan 27 01:29:00 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 27 01:29:04 2017 -0800

--
 airflow/configuration.py|   1 +
 airflow/jobs.py |  48 +
 airflow/models.py   |   3 +
 airflow/settings.py |   3 +-
 airflow/utils/dag_processing.py |   4 +-
 tests/jobs.py   | 193 +--
 6 files changed, 219 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/configuration.py
--
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 979b071..011f764 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -503,6 +503,7 @@ authenticate = true
 max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 201d87f..1a581e9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -657,34 +657,44 @@ class SchedulerJob(BaseJob):
 session.close()
 
 @staticmethod
-def record_import_errors(session, dagbag):
+@provide_session
+def clear_nonexistent_import_errors(session, known_file_paths):
 """
-For the DAGs in the given DagBag, record any associated import errors.
-These are usually displayed through the Airflow UI so that users know
-that there are issues parsing DAGs.
+Clears import errors for files that no longer exist.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
-:param dagbag: DagBag containing DAGs with import errors
-:type dagbag: models.Dagbag
+:param known_file_paths: The list of existing files that are parsed 
for DAGs
+:type known_file_paths: list[unicode]
 """
-for filename, stacktrace in list(dagbag.import_errors.items()):
-session.query(models.ImportError).filter(
-models.ImportError.filename == filename
-).delete()
-session.add(models.ImportError(
-filename=filename, stacktrace=stacktrace))
+session.query(models.ImportError).filter(
+~models.ImportError.filename.in_(known_file_paths)
+).delete(synchronize_session='fetch')
 session.commit()
 
 @staticmethod
-def clear_import_errors(session):
+def update_import_errors(session, dagbag):
 """
-Remove all the known import errors from the DB.
+For the DAGs in the given DagBag, record any associated import errors 
and clears
+errors for files that no longer have them. These are usually displayed 
through the
+Airflow UI so that users know that there are issues parsing DAGs.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
+:param dagbag: DagBag containing DAGs with import errors
+:type dagbag: models.Dagbag
 """
-session.query(models.ImportError).delete()
+# Clear the errors of the processed files
+for dagbag_file in dagbag.file_last_changed:
+session.query(models.ImportError).filter(
+models.ImportError.filename == 

incubator-airflow git commit: [AIRFLOW-780] Fix dag import errors no longer working

2017-01-27 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-test 7ce344455 -> b71037dd4


[AIRFLOW-780] Fix dag import errors no longer working

The import errors were no longer working after the
multiprocessor update
(since they are cleared after each DAG directory
is parsed). This change
fixes them, and adds tests to prevent future
regressions.

Also fix a couple of linter errors.

Note that there are a few inefficiencies (e.g.
sometimes we delete then add import errors in the
same place instead of just doing an update), but
this is equivalent to the old behavior.

Testing Done:
- Added missing unit tests for dag imports. Note
that some of them strangely fail for python 3 and
it became too time consuming to debug since I
don't have a copy of the travis environment, I
even ran with the same version of python locally
and couldn't reproduce. I have skipped those 3
tests in python 3 for now.

Closes #2018 from aoen/fix_parse_errors_not_displa


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b71037dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b71037dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b71037dd

Branch: refs/heads/v1-8-test
Commit: b71037dd402f60f5ea4061354c90e39e4d2d8827
Parents: 7ce3444
Author: Dan Davydov 
Authored: Fri Jan 27 01:29:00 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 27 01:37:58 2017 -0800

--
 airflow/configuration.py|   1 +
 airflow/jobs.py |  48 +
 airflow/models.py   |   3 +
 airflow/settings.py |   3 +-
 airflow/utils/dag_processing.py |   4 +-
 tests/jobs.py   | 193 +--
 6 files changed, 219 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b71037dd/airflow/configuration.py
--
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 979b071..011f764 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -503,6 +503,7 @@ authenticate = true
 max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b71037dd/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 201d87f..1a581e9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -657,34 +657,44 @@ class SchedulerJob(BaseJob):
 session.close()
 
 @staticmethod
-def record_import_errors(session, dagbag):
+@provide_session
+def clear_nonexistent_import_errors(session, known_file_paths):
 """
-For the DAGs in the given DagBag, record any associated import errors.
-These are usually displayed through the Airflow UI so that users know
-that there are issues parsing DAGs.
+Clears import errors for files that no longer exist.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
-:param dagbag: DagBag containing DAGs with import errors
-:type dagbag: models.Dagbag
+:param known_file_paths: The list of existing files that are parsed 
for DAGs
+:type known_file_paths: list[unicode]
 """
-for filename, stacktrace in list(dagbag.import_errors.items()):
-session.query(models.ImportError).filter(
-models.ImportError.filename == filename
-).delete()
-session.add(models.ImportError(
-filename=filename, stacktrace=stacktrace))
+session.query(models.ImportError).filter(
+~models.ImportError.filename.in_(known_file_paths)
+).delete(synchronize_session='fetch')
 session.commit()
 
 @staticmethod
-def clear_import_errors(session):
+def update_import_errors(session, dagbag):
 """
-Remove all the known import errors from the DB.
+For the DAGs in the given DagBag, record any associated import errors 
and clears
+errors for files that no longer have them. These are usually displayed 
through the
+Airflow UI so that users know that there are issues parsing DAGs.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
+:param dagbag: DagBag containing DAGs with import errors
+:type dagbag: models.Dagbag
 """
-session.query(models.ImportError).delete()
+# Clear the errors of the processed files
+for dagbag_file in dagbag.file_last_changed:
+   

incubator-airflow git commit: [AIRFLOW-780] Fix dag import errors no longer working

2017-01-27 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/apache/v1-8-test [created] b71037dd4


[AIRFLOW-780] Fix dag import errors no longer working

The import errors were no longer working after the
multiprocessor update
(since they are cleared after each DAG directory
is parsed). This change
fixes them, and adds tests to prevent future
regressions.

Also fix a couple of linter errors.

Note that there are a few inefficiencies (e.g.
sometimes we delete then add import errors in the
same place instead of just doing an update), but
this is equivalent to the old behavior.

Testing Done:
- Added missing unit tests for dag imports. Note
that some of them strangely fail for python 3 and
it became too time consuming to debug since I
don't have a copy of the travis environment, I
even ran with the same version of python locally
and couldn't reproduce. I have skipped those 3
tests in python 3 for now.

Closes #2018 from aoen/fix_parse_errors_not_displa


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b71037dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b71037dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b71037dd

Branch: refs/heads/apache/v1-8-test
Commit: b71037dd402f60f5ea4061354c90e39e4d2d8827
Parents: 7ce3444
Author: Dan Davydov 
Authored: Fri Jan 27 01:29:00 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 27 01:37:58 2017 -0800

--
 airflow/configuration.py|   1 +
 airflow/jobs.py |  48 +
 airflow/models.py   |   3 +
 airflow/settings.py |   3 +-
 airflow/utils/dag_processing.py |   4 +-
 tests/jobs.py   | 193 +--
 6 files changed, 219 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b71037dd/airflow/configuration.py
--
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 979b071..011f764 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -503,6 +503,7 @@ authenticate = true
 max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b71037dd/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 201d87f..1a581e9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -657,34 +657,44 @@ class SchedulerJob(BaseJob):
 session.close()
 
 @staticmethod
-def record_import_errors(session, dagbag):
+@provide_session
+def clear_nonexistent_import_errors(session, known_file_paths):
 """
-For the DAGs in the given DagBag, record any associated import errors.
-These are usually displayed through the Airflow UI so that users know
-that there are issues parsing DAGs.
+Clears import errors for files that no longer exist.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
-:param dagbag: DagBag containing DAGs with import errors
-:type dagbag: models.Dagbag
+:param known_file_paths: The list of existing files that are parsed 
for DAGs
+:type known_file_paths: list[unicode]
 """
-for filename, stacktrace in list(dagbag.import_errors.items()):
-session.query(models.ImportError).filter(
-models.ImportError.filename == filename
-).delete()
-session.add(models.ImportError(
-filename=filename, stacktrace=stacktrace))
+session.query(models.ImportError).filter(
+~models.ImportError.filename.in_(known_file_paths)
+).delete(synchronize_session='fetch')
 session.commit()
 
 @staticmethod
-def clear_import_errors(session):
+def update_import_errors(session, dagbag):
 """
-Remove all the known import errors from the DB.
+For the DAGs in the given DagBag, record any associated import errors 
and clears
+errors for files that no longer have them. These are usually displayed 
through the
+Airflow UI so that users know that there are issues parsing DAGs.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
+:param dagbag: DagBag containing DAGs with import errors
+:type dagbag: models.Dagbag
 """
-session.query(models.ImportError).delete()
+# Clear the errors of the processed files
+for dagbag_file in 

incubator-airflow git commit: [AIRFLOW-780] Fix dag import errors no longer working

2017-01-27 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master dc97bcd3b -> 67cbb9664


[AIRFLOW-780] Fix dag import errors no longer working

The import errors were no longer working after the
multiprocessor update
(since they are cleared after each DAG directory
is parsed). This change
fixes them, and adds tests to prevent future
regressions.

Also fix a couple of linter errors.

Note that there are a few inefficiencies (e.g.
sometimes we delete then add import errors in the
same place instead of just doing an update), but
this is equivalent to the old behavior.

Testing Done:
- Added missing unit tests for dag imports. Note
that some of them strangely fail for python 3 and
it became too time consuming to debug since I
don't have a copy of the travis environment, I
even ran with the same version of python locally
and couldn't reproduce. I have skipped those 3
tests in python 3 for now.

Closes #2018 from aoen/fix_parse_errors_not_displa


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/67cbb966
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/67cbb966
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/67cbb966

Branch: refs/heads/master
Commit: 67cbb966410226c1489bb730af3af45330fc51b9
Parents: dc97bcd
Author: Dan Davydov 
Authored: Fri Jan 27 01:29:00 2017 -0800
Committer: Dan Davydov 
Committed: Fri Jan 27 01:29:04 2017 -0800

--
 airflow/configuration.py|   1 +
 airflow/jobs.py |  48 +
 airflow/models.py   |   3 +
 airflow/settings.py |   3 +-
 airflow/utils/dag_processing.py |   4 +-
 tests/jobs.py   | 193 +--
 6 files changed, 219 insertions(+), 33 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/configuration.py
--
diff --git a/airflow/configuration.py b/airflow/configuration.py
index 979b071..011f764 100644
--- a/airflow/configuration.py
+++ b/airflow/configuration.py
@@ -503,6 +503,7 @@ authenticate = true
 max_threads = 2
 catchup_by_default = True
 scheduler_zombie_task_threshold = 300
+dag_dir_list_interval = 0
 """
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/67cbb966/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 201d87f..1a581e9 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -657,34 +657,44 @@ class SchedulerJob(BaseJob):
 session.close()
 
 @staticmethod
-def record_import_errors(session, dagbag):
+@provide_session
+def clear_nonexistent_import_errors(session, known_file_paths):
 """
-For the DAGs in the given DagBag, record any associated import errors.
-These are usually displayed through the Airflow UI so that users know
-that there are issues parsing DAGs.
+Clears import errors for files that no longer exist.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
-:param dagbag: DagBag containing DAGs with import errors
-:type dagbag: models.Dagbag
+:param known_file_paths: The list of existing files that are parsed 
for DAGs
+:type known_file_paths: list[unicode]
 """
-for filename, stacktrace in list(dagbag.import_errors.items()):
-session.query(models.ImportError).filter(
-models.ImportError.filename == filename
-).delete()
-session.add(models.ImportError(
-filename=filename, stacktrace=stacktrace))
+session.query(models.ImportError).filter(
+~models.ImportError.filename.in_(known_file_paths)
+).delete(synchronize_session='fetch')
 session.commit()
 
 @staticmethod
-def clear_import_errors(session):
+def update_import_errors(session, dagbag):
 """
-Remove all the known import errors from the DB.
+For the DAGs in the given DagBag, record any associated import errors 
and clears
+errors for files that no longer have them. These are usually displayed 
through the
+Airflow UI so that users know that there are issues parsing DAGs.
 
 :param session: session for ORM operations
 :type session: sqlalchemy.orm.session.Session
+:param dagbag: DagBag containing DAGs with import errors
+:type dagbag: models.Dagbag
 """
-session.query(models.ImportError).delete()
+# Clear the errors of the processed files
+for dagbag_file in dagbag.file_last_changed:
+