[jira] [Commented] (AIRFLOW-1181) Enable delete and list function for Google Cloud Storage Hook
[ https://issues.apache.org/jira/browse/AIRFLOW-1181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003164#comment-16003164 ] ASF subversion and git services commented on AIRFLOW-1181: -- Commit 24f73c03259bd5a7a699ed5b6c4cd3d559bf9bf8 in incubator-airflow's branch refs/heads/master from [~cheny258] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=24f73c0 ] [AIRFLOW-1181] Add delete and list functionality to gcs_hook Closes #2281 from mattuuh7/gcs-delete-list > Enable delete and list function for Google Cloud Storage Hook > - > > Key: AIRFLOW-1181 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1181 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Matthew Chen >Assignee: Matthew Chen >Priority: Minor > Fix For: 1.9.0 > > > current {{GoogleCloudStorageHook}} does not support delete of a file, nor > does it support listing of files based on prefix. We would like to have these > features available for our use-case. > The delete function should be able to return true if successfully deleted the > object, based on generation etc. > The list function should be able to list all objects with the given prefix, > able to page through large result pages and return a list of all file names > satisfying the given prefix criteria -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (AIRFLOW-1181) Enable delete and list function for Google Cloud Storage Hook
[ https://issues.apache.org/jira/browse/AIRFLOW-1181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-1181. Resolution: Fixed Fix Version/s: 1.9.0 > Enable delete and list function for Google Cloud Storage Hook > - > > Key: AIRFLOW-1181 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1181 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Matthew Chen >Assignee: Matthew Chen >Priority: Minor > Fix For: 1.9.0 > > > current {{GoogleCloudStorageHook}} does not support delete of a file, nor > does it support listing of files based on prefix. We would like to have these > features available for our use-case. > The delete function should be able to return true if successfully deleted the > object, based on generation etc. > The list function should be able to list all objects with the given prefix, > able to page through large result pages and return a list of all file names > satisfying the given prefix criteria -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1181] Add delete and list functionality to gcs_hook
Repository: incubator-airflow Updated Branches: refs/heads/master ac9ccb151 -> 24f73c032 [AIRFLOW-1181] Add delete and list functionality to gcs_hook Closes #2281 from mattuuh7/gcs-delete-list Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24f73c03 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24f73c03 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24f73c03 Branch: refs/heads/master Commit: 24f73c03259bd5a7a699ed5b6c4cd3d559bf9bf8 Parents: ac9ccb1 Author: Matthew ChenAuthored: Tue May 9 10:50:30 2017 -0700 Committer: Chris Riccomini Committed: Tue May 9 10:50:37 2017 -0700 -- airflow/contrib/hooks/gcs_hook.py | 69 ++ 1 file changed, 69 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24f73c03/airflow/contrib/hooks/gcs_hook.py -- diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py index dd3cd27..d38ceff 100644 --- a/airflow/contrib/hooks/gcs_hook.py +++ b/airflow/contrib/hooks/gcs_hook.py @@ -152,3 +152,72 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook): raise return False + +def delete(self, bucket, object, generation=None): +""" +Delete an object if versioning is not enabled for the bucket, or if generation +parameter is used. +:param bucket: name of the bucket, where the object resides +:type bucket: string +:param object: name of the object to delete +:type object: string +:param generation: if present, permanently delete the object of this generation +:type generation: string +:return: True if succeeded +""" +service = self.get_conn() + +try: +service \ +.objects() \ +.delete(bucket=bucket, object=object, generation=generation) \ +.execute() +return True +except errors.HttpError as ex: +if ex.resp['status'] == '404': +return False +raise + +def list(self, bucket, versions=None, maxResults=None, prefix=None): +""" +List all objects from the bucket with the give string prefix in name +:param bucket: bucket name +:type bucket: string +:param versions: if true, list all versions of the objects +:type versions: boolean +:param maxResults: max count of items to return in a single page of responses +:type maxResults: integer +:param prefix: prefix string which filters objects whose name begin with this prefix +:type prefix: string +:return: a stream of object names matching the filtering criteria +""" +service = self.get_conn() + +ids = list() +pageToken = None +while(True): +response = service.objects().list( +bucket=bucket, +versions=versions, +maxResults=maxResults, +pageToken=pageToken, +prefix=prefix +).execute() + +if 'items' not in response: +logging.info("No items found for prefix:{}".format(prefix)) +break + +for item in response['items']: +if item and 'name' in item: +ids.append(item['name']) + +if 'nextPageToken' not in response: +# no further pages of results, so stop the loop +break + +pageToken = response['nextPageToken'] +if not pageToken: +# empty next page token +break +return ids
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003135#comment-16003135 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch refs/heads/v1-8-stable from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 (cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7) > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1142) SubDAG Tasks Not Executed Even Though All Dependencies Met
[ https://issues.apache.org/jira/browse/AIRFLOW-1142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003137#comment-16003137 ] ASF subversion and git services commented on AIRFLOW-1142: -- Commit 5800f565628d11d8ea504468bcc14c4d1c0da10c in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5800f56 ] [AIRFLOW-1142] Do not reset orphaned state for backfills The scheduler could interfere with backfills when it resets the state of tasks that were considered orphaned. This patch prevents the scheduler from doing so and adds a guard in the backfill. Closes #2260 from bolkedebruin/AIRFLOW-1142 (cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986) Signed-off-by: Bolke de Bruin> SubDAG Tasks Not Executed Even Though All Dependencies Met > -- > > Key: AIRFLOW-1142 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1142 > Project: Apache Airflow > Issue Type: Bug > Components: subdag >Affects Versions: 1.8.1 > Environment: 1.8.1rc1+incubating, Celery >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 2017-04-24T23-20-38-776547, run3-scheduler-stdout.log, > run3-task.log, SubDAGOperatorTaskLog-DEBUG.txt, Test_Nested_SubDAG_0.png, > Test_Nested_SubDAG_1-Zoomed.png, test_nested_subdag.py > > > Testing on 1.8.1rc1, we noticed that tasks in subdags were not getting > executed even though all dependencies had been met. > We were able to create a simple test DAG that re-creates the issue. Attached > is a test DAG, the log file of the subdag operator that shows it fails to run > even though dependencies are met, and screenshots of what the UI looks like. > This is definitely a regression as we have many similarly constructed DAGs > that have been running successfully on a pre-v1.8 version (a fork of > 1.7.1.3+master) for some time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-970) Latest runs on homepage should load async and in batch
[ https://issues.apache.org/jira/browse/AIRFLOW-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003141#comment-16003141 ] ASF subversion and git services commented on AIRFLOW-970: - Commit af2d0b4b5cb1ef30a065b1af66f90a01a953e2be in incubator-airflow's branch refs/heads/v1-8-stable from [~saguziel] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=af2d0b4 ] [AIRFLOW-970] Load latest_runs on homepage async The latest_runs column on the homepage loads synchronously with an n+1 query. Homepage loads will be significantly faster if this happens asynchronously and as a batch. Closes #2144 from saguziel/aguziel-latest-run- async (cherry picked from commit 0f7ddbbedb05f2f11500250db4989edcb27bc164) > Latest runs on homepage should load async and in batch > -- > > Key: AIRFLOW-970 > URL: https://issues.apache.org/jira/browse/AIRFLOW-970 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Alex Guziel >Assignee: Alex Guziel > > The latest_dag_run column on the homepage makes one query for each dag and > does it synchronously. We should do the opposite. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1142) SubDAG Tasks Not Executed Even Though All Dependencies Met
[ https://issues.apache.org/jira/browse/AIRFLOW-1142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003138#comment-16003138 ] ASF subversion and git services commented on AIRFLOW-1142: -- Commit 5800f565628d11d8ea504468bcc14c4d1c0da10c in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5800f56 ] [AIRFLOW-1142] Do not reset orphaned state for backfills The scheduler could interfere with backfills when it resets the state of tasks that were considered orphaned. This patch prevents the scheduler from doing so and adds a guard in the backfill. Closes #2260 from bolkedebruin/AIRFLOW-1142 (cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986) Signed-off-by: Bolke de Bruin> SubDAG Tasks Not Executed Even Though All Dependencies Met > -- > > Key: AIRFLOW-1142 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1142 > Project: Apache Airflow > Issue Type: Bug > Components: subdag >Affects Versions: 1.8.1 > Environment: 1.8.1rc1+incubating, Celery >Reporter: Joe Schmid >Priority: Blocker > Fix For: 1.8.1 > > Attachments: 2017-04-24T23-20-38-776547, run3-scheduler-stdout.log, > run3-task.log, SubDAGOperatorTaskLog-DEBUG.txt, Test_Nested_SubDAG_0.png, > Test_Nested_SubDAG_1-Zoomed.png, test_nested_subdag.py > > > Testing on 1.8.1rc1, we noticed that tasks in subdags were not getting > executed even though all dependencies had been met. > We were able to create a simple test DAG that re-creates the issue. Attached > is a test DAG, the log file of the subdag operator that shows it fails to run > even though dependencies are met, and screenshots of what the UI looks like. > This is definitely a regression as we have many similarly constructed DAGs > that have been running successfully on a pre-v1.8 version (a fork of > 1.7.1.3+master) for some time. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
[ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003139#comment-16003139 ] ASF subversion and git services commented on AIRFLOW-492: - Commit e342d0d223e47ea25f73baaa00a16df414a6e0df in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e342d0d ] [AIRFLOW-492] Make sure stat updates cannot fail a task Previously a failed commit into the db for the statistics could also fail a task. Secondly, the ui could display out of date statistics. This patch reworks DagStat so that failure to update the statistics does not propagate. Next to that, it make sure the ui always displays the latest statistics. Closes #2254 from bolkedebruin/AIRFLOW-492 (cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a) > Insert into dag_stats table results into failed task while task itself > succeeded > > > Key: AIRFLOW-492 > URL: https://issues.apache.org/jira/browse/AIRFLOW-492 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin >Assignee: Siddharth Anand >Priority: Critical > Fix For: 1.8.1 > > Attachments: subdag_test.py > > > In some occasions there seem to be a duplicate key being inserted in > dag_stats that results in a task/dag run being marked failed while the task > itself has succeeded. > [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: > backfill_2016-04-21T00:00:00, externally triggered: False> successful > [2016-09-07 18:44:17,671] {models.py:1450} ERROR - > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 1409, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", > line 88, in execute > executor=self.executor) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3244, in run > job.run() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 189, in run > self._execute() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 1855, in _execute > models.DagStat.clean_dirty([run.dag_id], session=session) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", > line 54, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3695, in clean_dirty > session.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 801, in commit > self.transaction.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 392, in commit > self._prepare_impl() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 372, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2019, in flush > self._flush(objects) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2137, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line > 60, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2101, in _flush > flush_context.execute() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 373, in execute > rec.execute(self) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 532, in execute > uow > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 174, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 767, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 914, in execute > return meth(self, multiparams, params) > File
[jira] [Commented] (AIRFLOW-1127) Move license notices to LICENSE instead of NOTICE
[ https://issues.apache.org/jira/browse/AIRFLOW-1127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003134#comment-16003134 ] ASF subversion and git services commented on AIRFLOW-1127: -- Commit dc6ebaab94bcc69b36bb97eefba3a01ee149b746 in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=dc6ebaa ] [AIRFLOW-1127] Move license notices to LICENSE Closes #2250 from bolkedebruin/AIRFLOW-1127 (cherry picked from commit 659827639e256a668d669d0d229abf49d6010bb8) > Move license notices to LICENSE instead of NOTICE > - > > Key: AIRFLOW-1127 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1127 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bolke de Bruin >Assignee: Bolke de Bruin >Priority: Blocker > Fix For: 1.8.1 > > > For all the bundled files with different licenses (MIT, BSD, etc), the full > texts of these licenses should be in the source tarball preferably at the > end of the LICENSE file. > webgl-2d needs to be called out as MIT license. > Are all the entries in the NOTICE file required or do they > just need to be in the LICENSE file? Any additions to the NOTICE have > downstream repercussions as they need to be propagated down by any other > project using airflow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded
[ https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003140#comment-16003140 ] ASF subversion and git services commented on AIRFLOW-492: - Commit e342d0d223e47ea25f73baaa00a16df414a6e0df in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e342d0d ] [AIRFLOW-492] Make sure stat updates cannot fail a task Previously a failed commit into the db for the statistics could also fail a task. Secondly, the ui could display out of date statistics. This patch reworks DagStat so that failure to update the statistics does not propagate. Next to that, it make sure the ui always displays the latest statistics. Closes #2254 from bolkedebruin/AIRFLOW-492 (cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a) > Insert into dag_stats table results into failed task while task itself > succeeded > > > Key: AIRFLOW-492 > URL: https://issues.apache.org/jira/browse/AIRFLOW-492 > Project: Apache Airflow > Issue Type: Bug >Reporter: Bolke de Bruin >Assignee: Siddharth Anand >Priority: Critical > Fix For: 1.8.1 > > Attachments: subdag_test.py > > > In some occasions there seem to be a duplicate key being inserted in > dag_stats that results in a task/dag run being marked failed while the task > itself has succeeded. > [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: > backfill_2016-04-21T00:00:00, externally triggered: False> successful > [2016-09-07 18:44:17,671] {models.py:1450} ERROR - > (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry > 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: > u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, > %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)] > Traceback (most recent call last): > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 1409, in run > result = task_copy.execute(context=context) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py", > line 88, in execute > executor=self.executor) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3244, in run > job.run() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 189, in run > self._execute() > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py", > line 1855, in _execute > models.DagStat.clean_dirty([run.dag_id], session=session) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py", > line 54, in wrapper > result = func(*args, **kwargs) > File > "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py", > line 3695, in clean_dirty > session.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 801, in commit > self.transaction.commit() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 392, in commit > self._prepare_impl() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 372, in _prepare_impl > self.session.flush() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2019, in flush > self._flush(objects) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2137, in _flush > transaction.rollback(_capture_exception=True) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line > 60, in __exit__ > compat.reraise(exc_type, exc_value, exc_tb) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", > line 2101, in _flush > flush_context.execute() > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 373, in execute > rec.execute(self) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", > line 532, in execute > uow > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 174, in save_obj > mapper, table, insert) > File > "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line > 767, in _emit_insert_statements > execute(statement, multiparams) > File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", > line 914, in execute > return meth(self, multiparams, params) > File
[20/36] incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background
[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background AIRFLOW-276 introduced a monitor process for gunicorn to find new files in the dag folder, but it also changed `airflow webserver -D`'s behavior to run in foreground. This PR fixes that by running the monitor as a daemon process. Closes #2208 from sekikn/AIRFLOW-1004 (cherry picked from commit a9b20a04b052e9479dbb79fd46124293085610e9) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5fb785 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5fb785 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5fb785 Branch: refs/heads/v1-8-stable Commit: 0a5fb7856b545073516210fcfc369d2072823ae9 Parents: c94b3a0 Author: Kengo SekiAuthored: Tue Apr 4 08:32:44 2017 +0200 Committer: Chris Riccomini Committed: Mon Apr 10 14:24:31 2017 -0700 -- airflow/bin/cli.py | 64 - tests/core.py | 54 + 2 files changed, 107 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5fb785/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e9c54e6..e4755c7 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -753,7 +753,12 @@ def webserver(args): app.run(debug=True, port=args.port, host=args.hostname, ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None) else: -pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) +pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, args.stdout, args.stderr, args.log_file) +if args.daemon: +handle = setup_logging(log_file) +stdout = open(stdout, 'w+') +stderr = open(stderr, 'w+') + print( textwrap.dedent('''\ Running the Gunicorn Server with: @@ -771,7 +776,6 @@ def webserver(args): '-t', str(worker_timeout), '-b', args.hostname + ':' + str(args.port), '-n', 'airflow-webserver', -'-p', str(pid), '-c', 'airflow.www.gunicorn_config' ] @@ -782,28 +786,66 @@ def webserver(args): run_args += ['--error-logfile', str(args.error_logfile)] if args.daemon: -run_args += ["-D"] +run_args += ['-D', '-p', str(pid)] + if ssl_cert: run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] run_args += ["airflow.www.app:cached_app()"] -gunicorn_master_proc = subprocess.Popen(run_args) +gunicorn_master_proc = None def kill_proc(dummy_signum, dummy_frame): gunicorn_master_proc.terminate() gunicorn_master_proc.wait() sys.exit(0) -signal.signal(signal.SIGINT, kill_proc) -signal.signal(signal.SIGTERM, kill_proc) +def monitor_gunicorn(gunicorn_master_proc): +# These run forever until SIG{INT, TERM, KILL, ...} signal is sent +if conf.getint('webserver', 'worker_refresh_interval') > 0: +restart_workers(gunicorn_master_proc, num_workers) +else: +while True: +time.sleep(1) -# These run forever until SIG{INT, TERM, KILL, ...} signal is sent -if conf.getint('webserver', 'worker_refresh_interval') > 0: -restart_workers(gunicorn_master_proc, num_workers) +if args.daemon: +base, ext = os.path.splitext(pid) +ctx = daemon.DaemonContext( +pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1), +files_preserve=[handle], +stdout=stdout, +stderr=stderr, +signal_map={ +signal.SIGINT: kill_proc, +signal.SIGTERM: kill_proc +}, +) +with ctx: +subprocess.Popen(run_args) + +# Reading pid file directly, since Popen#pid doesn't +# seem to return the right value with DaemonContext. +while True: +try: +with open(pid) as f: +gunicorn_master_proc_pid = int(f.read()) +break +except IOError: +logging.debug("Waiting for gunicorn's pid file to be created.") +time.sleep(0.1) + +gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid) +
[jira] [Commented] (AIRFLOW-1127) Move license notices to LICENSE instead of NOTICE
[ https://issues.apache.org/jira/browse/AIRFLOW-1127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003133#comment-16003133 ] ASF subversion and git services commented on AIRFLOW-1127: -- Commit dc6ebaab94bcc69b36bb97eefba3a01ee149b746 in incubator-airflow's branch refs/heads/v1-8-stable from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=dc6ebaa ] [AIRFLOW-1127] Move license notices to LICENSE Closes #2250 from bolkedebruin/AIRFLOW-1127 (cherry picked from commit 659827639e256a668d669d0d229abf49d6010bb8) > Move license notices to LICENSE instead of NOTICE > - > > Key: AIRFLOW-1127 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1127 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bolke de Bruin >Assignee: Bolke de Bruin >Priority: Blocker > Fix For: 1.8.1 > > > For all the bundled files with different licenses (MIT, BSD, etc), the full > texts of these licenses should be in the source tarball preferably at the > end of the LICENSE file. > webgl-2d needs to be called out as MIT license. > Are all the entries in the NOTICE file required or do they > just need to be in the LICENSE file? Any additions to the NOTICE have > downstream repercussions as they need to be propagated down by any other > project using airflow. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[14/36] incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags
[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags DAGs that did not have a schedule (None or @once) make the dependency checker raise an exception as the previous schedule will not exist. Also activates all ti_deps tests. Closes #2220 from bolkedebruin/AIRFLOW-1033 (cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7 Branch: refs/heads/v1-8-stable Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518 Parents: 9167411 Author: Bolke de Bruin Authored: Thu Apr 6 14:03:11 2017 +0200 Committer: Bolke de Bruin Committed: Thu Apr 6 14:03:24 2017 +0200 -- airflow/ti_deps/deps/base_ti_dep.py | 14 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 5 + .../ti_deps/deps/dag_ti_slots_available_dep.py | 41 --- tests/ti_deps/deps/dag_unpaused_dep.py | 41 --- tests/ti_deps/deps/dagrun_exists_dep.py | 41 --- tests/ti_deps/deps/not_in_retry_period_dep.py | 61 tests/ti_deps/deps/not_running_dep.py | 39 --- tests/ti_deps/deps/not_skipped_dep.py | 38 --- tests/ti_deps/deps/pool_has_space_dep.py| 37 --- tests/ti_deps/deps/prev_dagrun_dep.py | 143 - tests/ti_deps/deps/runnable_exec_date_dep.py| 92 -- .../deps/test_dag_ti_slots_available_dep.py | 42 +++ tests/ti_deps/deps/test_dag_unpaused_dep.py | 42 +++ tests/ti_deps/deps/test_dagrun_exists_dep.py| 40 +++ .../deps/test_not_in_retry_period_dep.py| 59 tests/ti_deps/deps/test_not_running_dep.py | 37 +++ tests/ti_deps/deps/test_not_skipped_dep.py | 36 +++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 123 .../ti_deps/deps/test_runnable_exec_date_dep.py | 76 + tests/ti_deps/deps/test_trigger_rule_dep.py | 252 tests/ti_deps/deps/test_valid_state_dep.py | 46 +++ tests/ti_deps/deps/trigger_rule_dep.py | 295 --- tests/ti_deps/deps/valid_state_dep.py | 49 --- 23 files changed, 768 insertions(+), 881 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py -- diff --git a/airflow/ti_deps/deps/base_ti_dep.py b/airflow/ti_deps/deps/base_ti_dep.py index 0188043..bad1fa0 100644 --- a/airflow/ti_deps/deps/base_ti_dep.py +++ b/airflow/ti_deps/deps/base_ti_dep.py @@ -51,7 +51,7 @@ class BaseTIDep(object): """ return getattr(self, 'NAME', self.__class__.__name__) -def _get_dep_statuses(self, ti, session, dep_context): +def _get_dep_statuses(self, ti, session, dep_context=None): """ Abstract method that returns an iterable of TIDepStatus objects that describe whether the given task instance has this dependency met. @@ -69,7 +69,7 @@ class BaseTIDep(object): raise NotImplementedError @provide_session -def get_dep_statuses(self, ti, session, dep_context): +def get_dep_statuses(self, ti, session, dep_context=None): """ Wrapper around the private _get_dep_statuses method that contains some global checks for all dependencies. @@ -81,6 +81,12 @@ class BaseTIDep(object): :param dep_context: the context for which this dependency should be evaluated for :type dep_context: DepContext """ +# this avoids a circular dependency +from airflow.ti_deps.dep_context import DepContext + +if dep_context is None: +dep_context = DepContext() + if self.IGNOREABLE and dep_context.ignore_all_deps: yield self._passing_status( reason="Context specified all dependencies should be ignored.") @@ -95,7 +101,7 @@ class BaseTIDep(object): yield dep_status @provide_session -def is_met(self, ti, session, dep_context): +def is_met(self, ti, session, dep_context=None): """ Returns whether or not this dependency is met for a given task instance. A dependency is considered met if all of the dependency statuses it reports are @@ -113,7 +119,7 @@ class BaseTIDep(object): self.get_dep_statuses(ti, session, dep_context)) @provide_session -def get_failure_reasons(self, ti, session, dep_context): +def get_failure_reasons(self, ti, session, dep_context=None): """ Returns an iterable of strings that
[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory
[ https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003136#comment-16003136 ] ASF subversion and git services commented on AIRFLOW-1138: -- Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch refs/heads/v1-8-stable from [~criccomini] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ] [AIRFLOW-1138] Add missing licenses to files in scripts directory Closes #2253 from criccomini/AIRFLOW-1138 (cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7) > Add licenses to files in scripts directory > -- > > Key: AIRFLOW-1138 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1138 > Project: Apache Airflow > Issue Type: Task > Components: release >Reporter: Chris Riccomini >Assignee: Chris Riccomini >Priority: Blocker > Fix For: 1.8.1 > > > These two files need license headers: > modified: scripts/ci/requirements.txt > modified: scripts/systemd/airflow -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[19/36] incubator-airflow git commit: [AIRFLOW-1001] Fix landing times if there is no following schedule
[AIRFLOW-1001] Fix landing times if there is no following schedule @once does not have a following schedule. This was not checked for and therefore the landing times page could bork. Closes #2213 from bolkedebruin/AIRFLOW-1001 (cherry picked from commit 0371df4f1bd78e220e591d5cb23630d6a062f109) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c94b3a02 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c94b3a02 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c94b3a02 Branch: refs/heads/v1-8-stable Commit: c94b3a02f430f1a5a86c83d5f7286dcdac31492b Parents: aec9770 Author: Bolke de BruinAuthored: Wed Apr 5 09:57:55 2017 +0200 Committer: Chris Riccomini Committed: Mon Apr 10 14:19:53 2017 -0700 -- airflow/www/views.py | 2 +- tests/core.py| 16 ++-- 2 files changed, 15 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c94b3a02/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index 962c1f0..fec4779 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -1553,7 +1553,7 @@ class Airflow(BaseView): for ti in task.get_task_instances(session, start_date=min_date, end_date=base_date): ts = ti.execution_date -if dag.schedule_interval: +if dag.schedule_interval and dag.following_schedule(ts): ts = dag.following_schedule(ts) if ti.end_date: dttm = wwwutils.epoch(ti.execution_date) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c94b3a02/tests/core.py -- diff --git a/tests/core.py b/tests/core.py index 870a0cb..c55b1e2 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1413,6 +1413,7 @@ class WebUiTests(unittest.TestCase): self.dag_bash2 = self.dagbag.dags['test_example_bash_operator'] self.sub_dag = self.dagbag.dags['example_subdag_operator'] self.runme_0 = self.dag_bash.get_task('runme_0') +self.example_xcom = self.dagbag.dags['example_xcom'] self.dag_bash2.create_dagrun( run_id="test_{}".format(models.DagRun.id_for_date(datetime.now())), @@ -1428,6 +1429,13 @@ class WebUiTests(unittest.TestCase): state=State.RUNNING ) +self.example_xcom.create_dagrun( +run_id="test_{}".format(models.DagRun.id_for_date(datetime.now())), +execution_date=DEFAULT_DATE, +start_date=datetime.now(), +state=State.RUNNING +) + def test_index(self): response = self.app.get('/', follow_redirects=True) assert "DAGs" in response.data.decode('utf-8') @@ -1473,8 +1481,12 @@ class WebUiTests(unittest.TestCase): assert "example_bash_operator" in response.data.decode('utf-8') response = self.app.get( '/admin/airflow/landing_times?' -'days=30_id=example_bash_operator') -assert "example_bash_operator" in response.data.decode('utf-8') +'days=30_id=test_example_bash_operator') +assert "test_example_bash_operator" in response.data.decode('utf-8') +response = self.app.get( +'/admin/airflow/landing_times?' +'days=30_id=example_xcom') +assert "example_xcom" in response.data.decode('utf-8') response = self.app.get( '/admin/airflow/gantt?dag_id=example_bash_operator') assert "example_bash_operator" in response.data.decode('utf-8')
[33/36] incubator-airflow git commit: [AIRFLOW-XXX] Fix merge issue with test/models.py by adding execution_date
[AIRFLOW-XXX] Fix merge issue with test/models.py by adding execution_date Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d61af623 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d61af623 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d61af623 Branch: refs/heads/v1-8-stable Commit: d61af623178253eb39a1fabd6116a94dca3f33a6 Parents: 0a105ee Author: Chris RiccominiAuthored: Thu Apr 27 13:15:37 2017 -0700 Committer: Chris Riccomini Committed: Thu Apr 27 13:15:37 2017 -0700 -- tests/models.py | 6 -- 1 file changed, 4 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d61af623/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index 981561a..da36d56 100644 --- a/tests/models.py +++ b/tests/models.py @@ -285,11 +285,13 @@ class DagStatTest(unittest.TestCase): class DagRunTest(unittest.TestCase): -def create_dag_run(self, dag, state=State.RUNNING, task_states=None): +def create_dag_run(self, dag, state=State.RUNNING, task_states=None, execution_date=None): now = datetime.datetime.now() +if execution_date is None: +execution_date = now dag_run = dag.create_dagrun( run_id='manual__' + now.isoformat(), -execution_date=now, +execution_date=execution_date, start_date=now, state=state, external_trigger=False,
[12/36] incubator-airflow git commit: Merge pull request #2195 from bolkedebruin/AIRFLOW-719
Merge pull request #2195 from bolkedebruin/AIRFLOW-719 (cherry picked from commit 4a6bef69d1817a5fc3ddd6ffe14c2578eaa49cf0) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dff6d21b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dff6d21b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dff6d21b Branch: refs/heads/v1-8-stable Commit: dff6d21bfd9a2585ca484fc8fd56aa100f640908 Parents: 9070a82 Author: Bolke de Bruin Authored: Tue Apr 4 17:04:12 2017 +0200 Committer: Bolke de Bruin Committed: Wed Apr 5 19:16:22 2017 +0200 -- airflow/operators/latest_only_operator.py | 30 ++- airflow/operators/python_operator.py | 82 +-- airflow/ti_deps/deps/trigger_rule_dep.py | 6 +- scripts/ci/requirements.txt | 1 + tests/dags/test_dagrun_short_circuit_false.py | 38 tests/models.py | 77 +++ tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 12 +- tests/operators/python_operator.py| 244 + 9 files changed, 384 insertions(+), 108 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dff6d21b/airflow/operators/latest_only_operator.py -- diff --git a/airflow/operators/latest_only_operator.py b/airflow/operators/latest_only_operator.py index 8b4e614..9d5defb 100644 --- a/airflow/operators/latest_only_operator.py +++ b/airflow/operators/latest_only_operator.py @@ -34,7 +34,7 @@ class LatestOnlyOperator(BaseOperator): def execute(self, context): # If the DAG Run is externally triggered, then return without # skipping downstream tasks -if context['dag_run'].external_trigger: +if context['dag_run'] and context['dag_run'].external_trigger: logging.info("""Externally triggered DAG_Run: allowing execution to proceed.""") return @@ -46,17 +46,39 @@ class LatestOnlyOperator(BaseOperator): logging.info( 'Checking latest only with left_window: %s right_window: %s ' 'now: %s', left_window, right_window, now) + if not left_window < now <= right_window: logging.info('Not latest execution, skipping downstream.') session = settings.Session() -for task in context['task'].downstream_list: -ti = TaskInstance( -task, execution_date=context['ti'].execution_date) + +TI = TaskInstance +tis = session.query(TI).filter( +TI.execution_date == context['ti'].execution_date, +TI.task_id.in_(context['task'].downstream_task_ids) +).with_for_update().all() + +for ti in tis: logging.info('Skipping task: %s', ti.task_id) ti.state = State.SKIPPED ti.start_date = now ti.end_date = now session.merge(ti) + +# this is defensive against dag runs that are not complete +for task in context['task'].downstream_list: +if task.task_id in tis: +continue + +logging.warning("Task {} was not part of a dag run. " +"This should not happen." +.format(task)) +now = datetime.datetime.now() +ti = TaskInstance(task, execution_date=context['ti'].execution_date) +ti.state = State.SKIPPED +ti.start_date = now +ti.end_date = now +session.merge(ti) + session.commit() session.close() logging.info('Done.') http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dff6d21b/airflow/operators/python_operator.py -- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index b5f6386..114bc7e 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -106,14 +106,36 @@ class BranchPythonOperator(PythonOperator): logging.info("Following branch " + branch) logging.info("Marking other directly downstream tasks as skipped") session = settings.Session() + +TI = TaskInstance +tis = session.query(TI).filter( +TI.execution_date == context['ti'].execution_date, +TI.task_id.in_(context['task'].downstream_task_ids), +TI.task_id !=
[08/36] incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs
[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs BackfillJob._execute() checks that the next run date is less than or equal to the end date before creating a DAG run and task instances. For SubDAGs, the next run date is not relevant, i.e. schedule_interval can be anything other than None or '@once' and should be ignored. However, current code calculates the next run date for a SubDAG and the condition check mentioned above always fails for SubDAG triggered manually. This change adds a simple check to determine if this is a SubDAG and, if so, sets next run date to DAG run's start date. Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug- backfill-execute-for-subdags (cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9 Branch: refs/heads/v1-8-stable Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b Parents: 68b1c98 Author: Joe Schmid Authored: Tue Apr 4 08:27:45 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:28:07 2017 +0200 -- airflow/jobs.py | 7 +-- airflow/models.py | 1 + tests/jobs.py | 28 3 files changed, 34 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 222d9ba..7db9b9c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob): # consider max_active_runs but ignore when running subdags # "parent.child" as a dag_id is by convention a subdag -if self.dag.schedule_interval and "." not in self.dag.dag_id: +if self.dag.schedule_interval and not self.dag.is_subdag: active_runs = DagRun.find( dag_id=self.dag.dag_id, state=State.RUNNING, @@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob): # create dag runs dr_start_date = start_date or min([t.start_date for t in self.dag.tasks]) -next_run_date = self.dag.normalize_schedule(dr_start_date) end_date = end_date or datetime.now() +# next run date for a subdag isn't relevant (schedule_interval for subdags +# is ignored) so we use the dag run's start date in the case of a subdag +next_run_date = (self.dag.normalize_schedule(dr_start_date) + if not self.dag.is_subdag else dr_start_date) active_dag_runs = [] while next_run_date and next_run_date <= end_date: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index bdda701..fdff54e 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin): self.sla_miss_callback = sla_miss_callback self.orientation = orientation self.catchup = catchup +self.is_subdag = False # DagBag.bag_dag() will set this to True if appropriate self.partial = False http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index aee0e9c..f9ede68 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase): else: self.assertEqual(State.NONE, ti.state) +def test_backfill_execute_subdag(self): +dag = self.dagbag.get_dag('example_subdag_operator') +subdag_op_task = dag.get_task('section-1') + +subdag = subdag_op_task.subdag +subdag.schedule_interval = '@daily' + +start_date = datetime.datetime.now() +executor = TestExecutor(do_update=True) +job = BackfillJob(dag=subdag, + start_date=start_date, + end_date=start_date, + executor=executor, + donot_pickle=True) +job.run() + +history = executor.history +subdag_history = history[0] + +# check that all 5 task instances of the subdag 'section-1' were executed +self.assertEqual(5, len(subdag_history)) +for sdh in subdag_history: +ti = sdh[3] +self.assertIn('section-1-task-',
[21/36] incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import (cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9e0894b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9e0894b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9e0894b Branch: refs/heads/v1-8-stable Commit: a9e0894ba0113cf62c7e9006fb0b42085bc5e9f9 Parents: 0a5fb78 Author: pdambrauskasAuthored: Tue Apr 4 08:39:54 2017 +0200 Committer: Chris Riccomini Committed: Mon Apr 10 14:25:43 2017 -0700 -- --
[05/36] incubator-airflow git commit: [AIRFLOW-832] Let debug server run without SSL
[AIRFLOW-832] Let debug server run without SSL Closes #2051 from gsakkis/fix-debug-server (cherry picked from commit b0ae70d3a8e935dc9266b6853683ae5375a7390b) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb12f016 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb12f016 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb12f016 Branch: refs/heads/v1-8-stable Commit: eb12f0164fbeedbe2701744c213cc90e6fc805f5 Parents: 46ca569 Author: George SakkisAuthored: Sun Feb 12 16:09:26 2017 -0500 Committer: Chris Riccomini Committed: Wed Mar 29 14:12:06 2017 -0700 -- airflow/bin/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb12f016/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index 61d8707..e9c54e6 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -751,7 +751,7 @@ def webserver(args): "Starting the web server on port {0} and host {1}.".format( args.port, args.hostname)) app.run(debug=True, port=args.port, host=args.hostname, -ssl_context=(ssl_cert, ssl_key)) +ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else None) else: pid, stdout, stderr, log_file = setup_locations("webserver", pid=args.pid) print(
[02/36] incubator-airflow git commit: [AIRFLOW-989] Do not mark dag run successful if unfinished tasks
[AIRFLOW-989] Do not mark dag run successful if unfinished tasks Dag runs could be marked successful if all root tasks were successful, even if some tasks did not run yet, ie. in case of clearing. Now we consider unfinished_tasks, before marking successful. Closes #2154 from bolkedebruin/AIRFLOW-989 (cherry picked from commit 3d6095ff5cf6eff0444d7e47a2360765f2953daf) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15600e42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15600e42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15600e42 Branch: refs/heads/v1-8-stable Commit: 15600e42c805b222d6147b60376b56c8e708dcde Parents: 3b37cfa Author: Bolke de Bruin Authored: Wed Mar 15 16:39:12 2017 -0700 Committer: Bolke de Bruin Committed: Wed Mar 15 16:39:26 2017 -0700 -- airflow/models.py | 6 +++--- tests/models.py | 51 ++ 2 files changed, 54 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 7c6590f..42b8a7f 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4064,9 +4064,9 @@ class DagRun(Base): logging.info('Marking run {} failed'.format(self)) self.state = State.FAILED -# if all roots succeeded, the run succeeded -elif all(r.state in (State.SUCCESS, State.SKIPPED) - for r in roots): +# if all roots succeeded and no unfinished tasks, the run succeeded +elif not unfinished_tasks and all(r.state in (State.SUCCESS, State.SKIPPED) + for r in roots): logging.info('Marking run {} successful'.format(self)) self.state = State.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index ffd1f31..1fbb3e6 100644 --- a/tests/models.py +++ b/tests/models.py @@ -259,6 +259,57 @@ class DagRunTest(unittest.TestCase): updated_dag_state = dag_run.update_state() self.assertEqual(State.SUCCESS, updated_dag_state) +def test_dagrun_success_conditions(self): +session = settings.Session() + +dag = DAG( +'test_dagrun_success_conditions', +start_date=DEFAULT_DATE, +default_args={'owner': 'owner1'}) + +# A -> B +# A -> C -> D +# ordered: B, D, C, A or D, B, C, A or D, C, B, A +with dag: +op1 = DummyOperator(task_id='A') +op2 = DummyOperator(task_id='B') +op3 = DummyOperator(task_id='C') +op4 = DummyOperator(task_id='D') +op1.set_upstream([op2, op3]) +op3.set_upstream(op4) + +dag.clear() + +now = datetime.datetime.now() +dr = dag.create_dagrun(run_id='test_dagrun_success_conditions', + state=State.RUNNING, + execution_date=now, + start_date=now) + +# op1 = root +ti_op1 = dr.get_task_instance(task_id=op1.task_id) +ti_op1.set_state(state=State.SUCCESS, session=session) + +ti_op2 = dr.get_task_instance(task_id=op2.task_id) +ti_op3 = dr.get_task_instance(task_id=op3.task_id) +ti_op4 = dr.get_task_instance(task_id=op4.task_id) + +# root is successful, but unfinished tasks +state = dr.update_state() +self.assertEqual(State.RUNNING, state) + +# one has failed, but root is successful +ti_op2.set_state(state=State.FAILED, session=session) +ti_op3.set_state(state=State.SUCCESS, session=session) +ti_op4.set_state(state=State.SUCCESS, session=session) +state = dr.update_state() +self.assertEqual(State.SUCCESS, state) + +# upstream dependency failed, root has not run +ti_op1.set_state(State.NONE, session) +state = dr.update_state() +self.assertEqual(State.FAILED, state) + class DagBagTest(unittest.TestCase):
[04/36] incubator-airflow git commit: [AIRFLOW-906] Update Code icon from lightning bolt to file
[AIRFLOW-906] Update Code icon from lightning bolt to file Lightning bolts are not a visual metaphor for code or files. Since Glyphicon doesn't have a code icon (<>, for instance), we should use its file icon. Dear Airflow Maintainers, Please accept this PR that addresses the following issues: AIRFLOW-906 Testing Done: None. Before/After screenshots in AIRFLOW-906 (https://i ssues.apache.org/jira/browse/AIRFLOW-906) Update Code icon from lightning bolt to file Lightning bolts are not a visual metaphor for code or files. Since Glyphicon doesn't have a code icon (<>, for instance), we should use its file icon. Merge pull request #1 from djarratt/djarratt- patch-1 Update Code icon from lightning bolt to file AIRFLOW-906 change glyphicon flash to file Merge pull request #2 from djarratt/djarratt- patch-2 AIRFLOW-906 change glyphicon flash to file Closes #2104 from djarratt/master (cherry picked from commit bc47200711be4d2c0b36b772651dae4f5e01a204) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/46ca569a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/46ca569a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/46ca569a Branch: refs/heads/v1-8-stable Commit: 46ca569a37513f3d13c529786f65c7e443c9837e Parents: 2106ff5 Author: Dan JarrattAuthored: Fri Feb 24 15:00:51 2017 -0800 Committer: Chris Riccomini Committed: Wed Mar 29 14:10:33 2017 -0700 -- airflow/www/static/bootstrap-theme.css | 2 +- airflow/www/templates/airflow/dag.html | 2 +- airflow/www/templates/airflow/dags.html | 2 +- airflow/www/templates/airflow/list_dags.html | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/static/bootstrap-theme.css -- diff --git a/airflow/www/static/bootstrap-theme.css b/airflow/www/static/bootstrap-theme.css index 5b126ae..734f940 100644 --- a/airflow/www/static/bootstrap-theme.css +++ b/airflow/www/static/bootstrap-theme.css @@ -3068,7 +3068,7 @@ tbody.collapse.in { .glyphicon-log-in:before { content: "\e161"; } -.glyphicon-flash:before { +.glyphicon-file:before { content: "\e162"; } .glyphicon-log-out:before { http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/dag.html -- diff --git a/airflow/www/templates/airflow/dag.html b/airflow/www/templates/airflow/dag.html index 8a4793d..c695f04 100644 --- a/airflow/www/templates/airflow/dag.html +++ b/airflow/www/templates/airflow/dag.html @@ -90,7 +90,7 @@ - + Code http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/dags.html -- diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 5792c6a..7c59dea 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -167,7 +167,7 @@ - + http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/list_dags.html -- diff --git a/airflow/www/templates/airflow/list_dags.html b/airflow/www/templates/airflow/list_dags.html index 2ad9416..9ace2fd 100644 --- a/airflow/www/templates/airflow/list_dags.html +++ b/airflow/www/templates/airflow/list_dags.html @@ -167,7 +167,7 @@ - +
[10/36] incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor Closes #2180 from pdambrauskas/fix/http_hook_import (cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4db53f39 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4db53f39 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4db53f39 Branch: refs/heads/v1-8-stable Commit: 4db53f39a972cae691dc49687a407dda0ff49aaf Parents: 010b80a Author: pdambrauskas Authored: Tue Apr 4 08:39:54 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:40:16 2017 +0200 -- airflow/operators/sensors.py | 24 +--- 1 file changed, 13 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db53f39/airflow/operators/sensors.py -- diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py index c0aba27..883d884 100644 --- a/airflow/operators/sensors.py +++ b/airflow/operators/sensors.py @@ -25,12 +25,12 @@ from time import sleep import re import sys -import airflow -from airflow import hooks, settings +from airflow import settings from airflow.exceptions import AirflowException, AirflowSensorTimeout, AirflowSkipException from airflow.models import BaseOperator, TaskInstance from airflow.hooks.base_hook import BaseHook from airflow.hooks.hdfs_hook import HDFSHook +from airflow.hooks.http_hook import HttpHook from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -298,9 +298,9 @@ class NamedHivePartitionSensor(BaseSensorOperator): raise ValueError('Could not parse ' + partition) def poke(self, context): - if not hasattr(self, 'hook'): -self.hook = hooks.HiveMetastoreHook( +from airflow.hooks.hive_hooks import HiveMetastoreHook +self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) def poke_partition(partition): @@ -369,7 +369,8 @@ class HivePartitionSensor(BaseSensorOperator): 'Poking for table {self.schema}.{self.table}, ' 'partition {self.partition}'.format(**locals())) if not hasattr(self, 'hook'): -self.hook = hooks.HiveMetastoreHook( +from airflow.hooks.hive_hooks import HiveMetastoreHook +self.hook = HiveMetastoreHook( metastore_conn_id=self.metastore_conn_id) return self.hook.check_for_partition( self.schema, self.table, self.partition) @@ -470,7 +471,8 @@ class WebHdfsSensor(BaseSensorOperator): self.webhdfs_conn_id = webhdfs_conn_id def poke(self, context): -c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id) +from airflow.hooks.webhdfs_hook import WebHDFSHook +c = WebHDFSHook(self.webhdfs_conn_id) logging.info( 'Poking for file {self.filepath} '.format(**locals())) return c.check_for_path(hdfs_path=self.filepath) @@ -520,8 +522,8 @@ class S3KeySensor(BaseSensorOperator): self.s3_conn_id = s3_conn_id def poke(self, context): -import airflow.hooks.S3_hook -hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) +from airflow.hooks.S3_hook import S3Hook +hook = S3Hook(s3_conn_id=self.s3_conn_id) full_url = "s3://" + self.bucket_name + "/" + self.bucket_key logging.info('Poking for key : {full_url}'.format(**locals())) if self.wildcard_match: @@ -567,8 +569,8 @@ class S3PrefixSensor(BaseSensorOperator): def poke(self, context): logging.info('Poking for prefix : {self.prefix}\n' 'in bucket s3://{self.bucket_name}'.format(**locals())) -import airflow.hooks.S3_hook -hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id) +from airflow.hooks.S3_hook import S3Hook +hook = S3Hook(s3_conn_id=self.s3_conn_id) return hook.check_for_prefix( prefix=self.prefix, delimiter=self.delimiter, @@ -660,7 +662,7 @@ class HttpSensor(BaseSensorOperator): self.extra_options = extra_options or {} self.response_check = response_check -self.hook = hooks.http_hook.HttpHook(method='GET', http_conn_id=http_conn_id) +self.hook = HttpHook(method='GET', http_conn_id=http_conn_id) def poke(self, context): logging.info('Poking: ' + self.endpoint)
[32/36] incubator-airflow git commit: [AIRFLOW-XXX] Set version to 1.8.1
[AIRFLOW-XXX] Set version to 1.8.1 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a105eed Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a105eed Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a105eed Branch: refs/heads/v1-8-stable Commit: 0a105eed4c14c1f1595c10a6529e3bdb51187a14 Parents: e342d0d Author: Chris RiccominiAuthored: Thu Apr 27 12:37:14 2017 -0700 Committer: Chris Riccomini Committed: Thu Apr 27 12:37:14 2017 -0700 -- airflow/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a105eed/airflow/version.py -- diff --git a/airflow/version.py b/airflow/version.py index 6bff40b..eb0bd4a 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -13,4 +13,4 @@ # limitations under the License. # -version = '1.8.1rc1+incubating' +version = '1.8.1+incubating'
[23/36] incubator-airflow git commit: [AIRFLOW-XXX] Set 1.8.1 version
[AIRFLOW-XXX] Set 1.8.1 version Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/58a0ee78 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/58a0ee78 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/58a0ee78 Branch: refs/heads/v1-8-stable Commit: 58a0ee787ed372034b417e6743175bdfe7f14808 Parents: bc52d09 Author: Chris RiccominiAuthored: Mon Apr 17 11:18:12 2017 -0700 Committer: Chris Riccomini Committed: Mon Apr 17 11:18:12 2017 -0700 -- airflow/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58a0ee78/airflow/version.py -- diff --git a/airflow/version.py b/airflow/version.py index 8f87df9..50361b9 100644 --- a/airflow/version.py +++ b/airflow/version.py @@ -13,4 +13,4 @@ # limitations under the License. # -version = '1.8.1alpha0' +version = '1.8.1rc0+apache.incubating'
[17/36] incubator-airflow git commit: [AIRFLOW-1035] Use binary exponential backoff
[AIRFLOW-1035] Use binary exponential backoff Closes #2196 from IvanVergiliev/exponential- backoff (cherry picked from commit 4ec932b551774bb394c5770c4d2660f565a4c592) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4199cd3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4199cd3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4199cd3d Branch: refs/heads/v1-8-stable Commit: 4199cd3d23d35183253c5d078e0f9937e87df232 Parents: ceb2ac3 Author: Ivan Vergiliev Authored: Fri Apr 7 19:35:03 2017 +0200 Committer: Bolke de Bruin Committed: Fri Apr 7 19:35:23 2017 +0200 -- airflow/models.py | 10 +- tests/models.py | 17 ++--- 2 files changed, 19 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4199cd3d/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 47413e0..5db0287 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1161,7 +1161,15 @@ class TaskInstance(Base): """ delay = self.task.retry_delay if self.task.retry_exponential_backoff: -delay_backoff_in_seconds = delay.total_seconds() ** self.try_number +# timedelta has a maximum representable value. The exponentiation +# here means this value can be exceeded after a certain number +# of tries (around 50 if the initial delay is 1s, even fewer if +# the delay is larger). Cap the value here before creating a +# timedelta object so the operation doesn't fail. +delay_backoff_in_seconds = min( +delay.total_seconds() * (2 ** (self.try_number - 1)), +timedelta.max.total_seconds() - 1 +) delay = timedelta(seconds=delay_backoff_in_seconds) if self.task.max_retry_delay: delay = min(self.task.max_retry_delay, delay) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4199cd3d/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index 9478088..8223276 100644 --- a/tests/models.py +++ b/tests/models.py @@ -757,9 +757,8 @@ class TaskInstanceTest(unittest.TestCase): self.assertEqual(ti.try_number, 4) def test_next_retry_datetime(self): -delay = datetime.timedelta(seconds=3) -delay_squared = datetime.timedelta(seconds=9) -max_delay = datetime.timedelta(seconds=10) +delay = datetime.timedelta(seconds=30) +max_delay = datetime.timedelta(minutes=60) dag = models.DAG(dag_id='fail_dag') task = BashOperator( @@ -778,13 +777,17 @@ class TaskInstanceTest(unittest.TestCase): ti.try_number = 1 dt = ti.next_retry_datetime() -self.assertEqual(dt, ti.end_date+delay) +self.assertEqual(dt, ti.end_date + delay) -ti.try_number = 2 +ti.try_number = 6 dt = ti.next_retry_datetime() -self.assertEqual(dt, ti.end_date+delay_squared) +self.assertEqual(dt, ti.end_date + (2 ** 5) * delay) -ti.try_number = 3 +ti.try_number = 8 +dt = ti.next_retry_datetime() +self.assertEqual(dt, ti.end_date+max_delay) + +ti.try_number = 50 dt = ti.next_retry_datetime() self.assertEqual(dt, ti.end_date+max_delay)
[24/36] incubator-airflow git commit: [AIRFLOW-1120] Update version view to include Apache prefix
[AIRFLOW-1120] Update version view to include Apache prefix Closes #2244 from criccomini/AIRFLOW-1120 (cherry picked from commit 6684597d951cb9f2fea24576a3d19534d67c89ea) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1725c951 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1725c951 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1725c951 Branch: refs/heads/v1-8-stable Commit: 1725c95163cf3a3d3b4c073922e39851e00942bf Parents: 58a0ee7 Author: Chris RiccominiAuthored: Tue Apr 18 13:53:03 2017 -0700 Committer: Chris Riccomini Committed: Tue Apr 18 13:53:55 2017 -0700 -- airflow/www/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1725c951/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index fec4779..53c6394 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -2513,7 +2513,7 @@ class VersionView(wwwutils.SuperUserMixin, LoggingMixin, BaseView): def version(self): # Look at the version from setup.py try: -airflow_version = pkg_resources.require("airflow")[0].version +airflow_version = pkg_resources.require("apache-airflow")[0].version except Exception as e: airflow_version = None self.logger.error(e)
[09/36] incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result
[AIRFLOW-1062] Fix DagRun#find to return correct result DagRun#find returns wrong result if external_trigger=False is specified, because adding filter is skipped on that condition. This PR fixes it. Closes #2210 from sekikn/AIRFLOW-1062 (cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/010b80aa Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/010b80aa Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/010b80aa Branch: refs/heads/v1-8-stable Commit: 010b80aa8b417091705556a07d5970fe0cc4efb2 Parents: 2bebeaf Author: Kengo Seki Authored: Tue Apr 4 08:30:40 2017 +0200 Committer: Bolke de Bruin Committed: Tue Apr 4 08:31:05 2017 +0200 -- airflow/models.py | 2 +- tests/models.py | 33 + 2 files changed, 34 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index fdff54e..6828ab6 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -3925,7 +3925,7 @@ class DagRun(Base): qry = qry.filter(DR.execution_date == execution_date) if state: qry = qry.filter(DR.state == state) -if external_trigger: +if external_trigger is not None: qry = qry.filter(DR.external_trigger == external_trigger) dr = qry.order_by(DR.execution_date).all() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/tests/models.py -- diff --git a/tests/models.py b/tests/models.py index c63c67e..6673c04 100644 --- a/tests/models.py +++ b/tests/models.py @@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase): 'scheduled__2015-01-02T03:04:05', run_id, 'Generated run_id did not match expectations: {0}'.format(run_id)) +def test_dagrun_find(self): +session = settings.Session() +now = datetime.datetime.now() + +dag_id1 = "test_dagrun_find_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id1, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=True, +) +session.add(dag_run) + +dag_id2 = "test_dagrun_find_not_externally_triggered" +dag_run = models.DagRun( +dag_id=dag_id2, +run_id='manual__' + now.isoformat(), +execution_date=now, +start_date=now, +state=State.RUNNING, +external_trigger=False, +) +session.add(dag_run) + +session.commit() + +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, external_trigger=True))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, external_trigger=False))) +self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, external_trigger=True))) +self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, external_trigger=False))) + def test_dagrun_running_when_upstream_skipped(self): """ Tests that a DAG run is not failed when an upstream task is skipped
[35/36] incubator-airflow git commit: Merge branch 'v1-8-test' into v1-8-stable
Merge branch 'v1-8-test' into v1-8-stable Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b811c44 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b811c44 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b811c44 Branch: refs/heads/v1-8-stable Commit: 2b811c445ef62236d25f37ac72bab94e79827032 Parents: f4760c3 af2d0b4 Author: Chris RiccominiAuthored: Tue May 9 10:29:23 2017 -0700 Committer: Chris Riccomini Committed: Tue May 9 10:29:23 2017 -0700 -- .rat-excludes | 1 + LICENSE | 343 +++ NOTICE | 17 +- airflow/bin/cli.py | 65 +++- airflow/contrib/hooks/spark_submit_hook.py | 32 +- .../contrib/operators/spark_submit_operator.py | 13 +- airflow/hooks/mssql_hook.py | 10 +- airflow/hooks/mysql_hook.py | 15 +- airflow/hooks/postgres_hook.py | 4 +- airflow/jobs.py | 58 +++- airflow/models.py | 235 + airflow/operators/latest_only_operator.py | 30 +- airflow/operators/mssql_operator.py | 11 +- airflow/operators/mysql_operator.py | 8 +- airflow/operators/postgres_operator.py | 7 +- airflow/operators/python_operator.py| 85 +++-- airflow/operators/sensors.py| 24 +- airflow/ti_deps/deps/base_ti_dep.py | 14 +- airflow/ti_deps/deps/prev_dagrun_dep.py | 5 + airflow/ti_deps/deps/trigger_rule_dep.py| 6 +- airflow/utils/file.py | 20 +- airflow/version.py | 2 +- airflow/www/api/experimental/endpoints.py | 23 +- airflow/www/static/bootstrap-theme.css | 2 +- airflow/www/templates/airflow/dag.html | 2 +- airflow/www/templates/airflow/dags.html | 31 +- airflow/www/templates/airflow/list_dags.html| 2 +- airflow/www/views.py| 11 +- dags/test_dag.py| 3 +- scripts/ci/requirements.txt | 14 + scripts/systemd/airflow | 13 + setup.py| 13 +- tests/contrib/hooks/spark_submit_hook.py| 51 ++- .../contrib/operators/spark_submit_operator.py | 8 +- tests/core.py | 120 ++- tests/dags/test_dagrun_short_circuit_false.py | 38 -- tests/dags/test_latest_runs.py | 27 ++ tests/jobs.py | 201 +++ tests/models.py | 306 ++--- tests/operators/__init__.py | 2 + tests/operators/latest_only_operator.py | 12 +- tests/operators/operators.py| 43 +++ tests/operators/python_operator.py | 244 + .../ti_deps/deps/dag_ti_slots_available_dep.py | 41 --- tests/ti_deps/deps/dag_unpaused_dep.py | 41 --- tests/ti_deps/deps/dagrun_exists_dep.py | 41 --- tests/ti_deps/deps/not_in_retry_period_dep.py | 61 tests/ti_deps/deps/not_running_dep.py | 39 --- tests/ti_deps/deps/not_skipped_dep.py | 38 -- tests/ti_deps/deps/pool_has_space_dep.py| 37 -- tests/ti_deps/deps/prev_dagrun_dep.py | 143 tests/ti_deps/deps/runnable_exec_date_dep.py| 92 - .../deps/test_dag_ti_slots_available_dep.py | 42 +++ tests/ti_deps/deps/test_dag_unpaused_dep.py | 42 +++ tests/ti_deps/deps/test_dagrun_exists_dep.py| 40 +++ .../deps/test_not_in_retry_period_dep.py| 59 tests/ti_deps/deps/test_not_running_dep.py | 37 ++ tests/ti_deps/deps/test_not_skipped_dep.py | 36 ++ tests/ti_deps/deps/test_prev_dagrun_dep.py | 123 +++ .../ti_deps/deps/test_runnable_exec_date_dep.py | 76 tests/ti_deps/deps/test_trigger_rule_dep.py | 252 ++ tests/ti_deps/deps/test_valid_state_dep.py | 46 +++ tests/ti_deps/deps/trigger_rule_dep.py | 295 tests/ti_deps/deps/valid_state_dep.py | 49 --- 64 files changed, 2606 insertions(+), 1195 deletions(-) --
[22/36] incubator-airflow git commit: [AIRFLOW-1000] Rebrand distribution to Apache Airflow
[AIRFLOW-1000] Rebrand distribution to Apache Airflow Per Apache requirements Airflow should be branded Apache Airflow. It is impossible to provide a forward compatible automatic update path and users will be required to manually upgrade. Closes #2172 from bolkedebruin/AIRFLOW-1000 (cherry picked from commit 4fb05d8cc7a69255c6bff33c7f856eb4a341d5f2) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bc52d092 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bc52d092 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bc52d092 Branch: refs/heads/v1-8-stable Commit: bc52d092b5194c3a389c19ce45c2c2bdda3bf265 Parents: a9e0894 Author: Bolke de Bruin Authored: Mon Apr 17 10:09:47 2017 +0200 Committer: Bolke de Bruin Committed: Mon Apr 17 11:27:51 2017 +0200 -- .rat-excludes | 1 + setup.py | 13 - 2 files changed, 13 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc52d092/.rat-excludes -- diff --git a/.rat-excludes b/.rat-excludes index 1363766..1238abb 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -13,6 +13,7 @@ docs dist build airflow.egg-info +apache_airflow.egg-info .idea metastore_db .*sql http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc52d092/setup.py -- diff --git a/setup.py b/setup.py index 43b97d3..7426ce9 100644 --- a/setup.py +++ b/setup.py @@ -18,6 +18,7 @@ from setuptools.command.test import test as TestCommand import imp import logging import os +import pip import sys logger = logging.getLogger(__name__) @@ -99,6 +100,15 @@ def write_version(filename=os.path.join(*['airflow', a.write(text) +def check_previous(): +installed_packages = ([package.project_name for package + in pip.get_installed_distributions()]) +if 'airflow' in installed_packages: +print("An earlier non-apache version of Airflow was installed, " + "please uninstall it first. Then reinstall.") +sys.exit(1) + + async = [ 'greenlet>=0.4.9', 'eventlet>= 0.9.7', @@ -184,9 +194,10 @@ devel_all = devel + all_dbs + doc + samba + s3 + slack + crypto + oracle + docke def do_setup(): +check_previous() write_version() setup( -name='airflow', +name='apache-airflow', description='Programmatically author, schedule and monitor data pipelines', license='Apache License 2.0', version=version,
[25/36] incubator-airflow git commit: [AIRFLOW-1124] Do not set all tasks to scheduled in backfill
[AIRFLOW-1124] Do not set all tasks to scheduled in backfill Backfill is supposed to fill in the blanks and not to reschedule all tasks. This fixes a regression from 1.8.0. Closes #2247 from bolkedebruin/AIRFLOW-1124 (cherry picked from commit 0406462dc91427793ba40d0f05f321e85dbc6f19) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f0d072cf Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f0d072cf Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f0d072cf Branch: refs/heads/v1-8-stable Commit: f0d072cfb3b023dd4c80fd4e30e42fef595793c7 Parents: 1725c95 Author: Bolke de Bruin Authored: Wed Apr 19 17:15:46 2017 +0200 Committer: Bolke de Bruin Committed: Wed Apr 19 17:15:59 2017 +0200 -- airflow/jobs.py | 3 ++- tests/jobs.py | 69 2 files changed, 71 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f0d072cf/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 11ff926..9a6687c 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1822,7 +1822,8 @@ class BackfillJob(BaseJob): for ti in run.get_task_instances(): # all tasks part of the backfill are scheduled to run -ti.set_state(State.SCHEDULED, session=session) +if ti.state == State.NONE: +ti.set_state(State.SCHEDULED, session=session) tasks_to_run[ti.key] = ti next_run_date = self.dag.following_schedule(next_run_date) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f0d072cf/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index 9b245ae..5db858d 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -348,6 +348,75 @@ class BackfillJobTest(unittest.TestCase): else: self.assertEqual(State.NONE, ti.state) +def test_backfill_fill_blanks(self): +dag = DAG( +'test_backfill_fill_blanks', +start_date=DEFAULT_DATE, +default_args={'owner': 'owner1'}, +) + +with dag: +op1 = DummyOperator(task_id='op1') +op2 = DummyOperator(task_id='op2') +op3 = DummyOperator(task_id='op3') +op4 = DummyOperator(task_id='op4') +op5 = DummyOperator(task_id='op5') +op6 = DummyOperator(task_id='op6') + +dag.clear() +dr = dag.create_dagrun(run_id='test', + state=State.SUCCESS, + execution_date=DEFAULT_DATE, + start_date=DEFAULT_DATE) +executor = TestExecutor(do_update=True) + +session = settings.Session() + +tis = dr.get_task_instances() +for ti in tis: +if ti.task_id == op1.task_id: +ti.state = State.UP_FOR_RETRY +ti.end_date = DEFAULT_DATE +elif ti.task_id == op2.task_id: +ti.state = State.FAILED +elif ti.task_id == op3.task_id: +ti.state = State.SKIPPED +elif ti.task_id == op4.task_id: +ti.state = State.SCHEDULED +elif ti.task_id == op5.task_id: +ti.state = State.UPSTREAM_FAILED +# op6 = None +session.merge(ti) +session.commit() +session.close() + +job = BackfillJob(dag=dag, + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, + executor=executor) +self.assertRaisesRegexp( +AirflowException, +'Some task instances failed', +job.run) + +self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db) +# the run_id should have changed, so a refresh won't work +drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE) +dr = drs[0] + +self.assertEqual(dr.state, State.FAILED) + +tis = dr.get_task_instances() +for ti in tis: +if ti.task_id in (op1.task_id, op4.task_id, op6.task_id): +self.assertEqual(ti.state, State.SUCCESS) +elif ti.task_id == op2.task_id: +self.assertEqual(ti.state, State.FAILED) +elif ti.task_id == op3.task_id: +self.assertEqual(ti.state, State.SKIPPED) +elif ti.task_id == op5.task_id: +self.assertEqual(ti.state, State.UPSTREAM_FAILED) +
[36/36] incubator-airflow git commit: Merge branch 'v1-8-stable' of https://git-wip-us.apache.org/repos/asf/incubator-airflow into v1-8-stable
Merge branch 'v1-8-stable' of https://git-wip-us.apache.org/repos/asf/incubator-airflow into v1-8-stable Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d8509e7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d8509e7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d8509e7 Branch: refs/heads/v1-8-stable Commit: 0d8509e7ec9894c62664df81b4f76fa37727d84d Parents: 2b811c4 8e7a558 Author: Chris RiccominiAuthored: Tue May 9 10:32:23 2017 -0700 Committer: Chris Riccomini Committed: Tue May 9 10:32:23 2017 -0700 -- --
[15/36] incubator-airflow git commit: [AIRFLOW-1050] Do not count up_for_retry as not ready
[AIRFLOW-1050] Do not count up_for_retry as not ready up_for_retry tasks were incorrectly counted towards not_ready therefore marking a dag run deadlocked instead of retrying. Closes #2225 from bolkedebruin/AIRFLOW-1050 (cherry picked from commit 35e43f5067f4741640278b765c0e54e4fd45ffa3) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0fa593e3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0fa593e3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0fa593e3 Branch: refs/heads/v1-8-stable Commit: 0fa593e38c7ea88765408af10abad3c3780ba27d Parents: ebfc3ea Author: Bolke de Bruin Authored: Fri Apr 7 08:00:10 2017 +0200 Committer: Bolke de Bruin Committed: Fri Apr 7 08:00:23 2017 +0200 -- airflow/jobs.py | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0fa593e3/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index ce45e05..11ff926 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1925,6 +1925,15 @@ class BackfillJob(BaseJob): started.pop(key) continue +# special case +if ti.state == State.UP_FOR_RETRY: +self.logger.debug("Task instance {} retry period not expired yet" + .format(ti)) +if key in started: +started.pop(key) +tasks_to_run[key] = ti +continue + # all remaining tasks self.logger.debug('Adding {} to not_ready'.format(ti)) not_ready.add(key)
[26/36] incubator-airflow git commit: [AIRFLOW-1121][AIRFLOW-1004] Fix `airflow webserver --pid` to write out pid file
[AIRFLOW-1121][AIRFLOW-1004] Fix `airflow webserver --pid` to write out pid file After AIRFLOW-1004, --pid option is no longer honored and the pid file is not being written out. This PR fixes it. Closes #2249 from sekikn/AIRFLOW-1121 (cherry picked from commit 8d643897cf6171d110e7139fb31c3d4d47c3acca) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aef7dd0a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aef7dd0a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aef7dd0a Branch: refs/heads/v1-8-stable Commit: aef7dd0a53411f3edb2333cb36a457056e5ab652 Parents: f0d072c Author: Kengo SekiAuthored: Wed Apr 19 12:31:10 2017 -0700 Committer: Chris Riccomini Committed: Thu Apr 20 15:22:21 2017 -0700 -- airflow/bin/cli.py | 3 ++- tests/core.py | 36 ++-- 2 files changed, 28 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aef7dd0a/airflow/bin/cli.py -- diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py index e4755c7..8e92ea1 100755 --- a/airflow/bin/cli.py +++ b/airflow/bin/cli.py @@ -776,6 +776,7 @@ def webserver(args): '-t', str(worker_timeout), '-b', args.hostname + ':' + str(args.port), '-n', 'airflow-webserver', +'-p', str(pid), '-c', 'airflow.www.gunicorn_config' ] @@ -786,7 +787,7 @@ def webserver(args): run_args += ['--error-logfile', str(args.error_logfile)] if args.daemon: -run_args += ['-D', '-p', str(pid)] +run_args += ['-D'] if ssl_cert: run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key] http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aef7dd0a/tests/core.py -- diff --git a/tests/core.py b/tests/core.py index 4fd2f08..c36c6c2 100644 --- a/tests/core.py +++ b/tests/core.py @@ -1398,6 +1398,14 @@ class CliTests(unittest.TestCase): os.remove('variables1.json') os.remove('variables2.json') +def _wait_pidfile(self, pidfile): +while True: +try: +with open(pidfile) as f: +return int(f.read()) +except: +sleep(1) + def test_cli_webserver_foreground(self): import subprocess @@ -1417,18 +1425,26 @@ class CliTests(unittest.TestCase): @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]), "Skipping test due to lack of required file permission") +def test_cli_webserver_foreground_with_pid(self): +import subprocess + +# Run webserver in foreground with --pid option +pidfile = tempfile.mkstemp()[1] +p = subprocess.Popen(["airflow", "webserver", "--pid", pidfile]) + +# Check the file specified by --pid option exists +self._wait_pidfile(pidfile) + +# Terminate webserver +p.terminate() +p.wait() + +@unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]), + "Skipping test due to lack of required file permission") def test_cli_webserver_background(self): import subprocess import psutil -def wait_pidfile(pidfile): -while True: -try: -with open(pidfile) as f: -return int(f.read()) -except IOError: -sleep(1) - # Confirm that webserver hasn't been launched. self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) self.assertEqual(1, subprocess.Popen(["pgrep", "-c", "gunicorn"]).wait()) @@ -1436,7 +1452,7 @@ class CliTests(unittest.TestCase): # Run webserver in background. subprocess.Popen(["airflow", "webserver", "-D"]) pidfile = cli.setup_locations("webserver")[0] -wait_pidfile(pidfile) +self._wait_pidfile(pidfile) # Assert that gunicorn and its monitor are launched. self.assertEqual(0, subprocess.Popen(["pgrep", "-c", "airflow"]).wait()) @@ -1444,7 +1460,7 @@ class CliTests(unittest.TestCase): # Terminate monitor process. pidfile = cli.setup_locations("webserver-monitor")[0] -pid = wait_pidfile(pidfile) +pid = self._wait_pidfile(pidfile) p = psutil.Process(pid) p.terminate() p.wait()
[34/36] incubator-airflow git commit: [AIRFLOW-970] Load latest_runs on homepage async
[AIRFLOW-970] Load latest_runs on homepage async The latest_runs column on the homepage loads synchronously with an n+1 query. Homepage loads will be significantly faster if this happens asynchronously and as a batch. Closes #2144 from saguziel/aguziel-latest-run- async (cherry picked from commit 0f7ddbbedb05f2f11500250db4989edcb27bc164) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/af2d0b4b Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/af2d0b4b Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/af2d0b4b Branch: refs/heads/v1-8-stable Commit: af2d0b4b5cb1ef30a065b1af66f90a01a953e2be Parents: d61af62 Author: Alex GuzielAuthored: Wed Apr 5 10:02:42 2017 +0200 Committer: Chris Riccomini Committed: Thu Apr 27 13:35:40 2017 -0700 -- airflow/models.py | 23 airflow/www/api/experimental/endpoints.py | 23 +++- airflow/www/templates/airflow/dags.html | 29 ++ tests/dags/test_latest_runs.py| 27 tests/models.py | 2 +- 5 files changed, 89 insertions(+), 15 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 1ceb821..646f74b 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -4249,6 +4249,29 @@ class DagRun(Base): return False +@classmethod +@provide_session +def get_latest_runs(cls, session): +"""Returns the latest running DagRun for each DAG. """ +subquery = ( +session +.query( +cls.dag_id, +func.max(cls.execution_date).label('execution_date')) +.filter(cls.state == State.RUNNING) +.group_by(cls.dag_id) +.subquery() +) +dagruns = ( +session +.query(cls) +.join(subquery, + and_(cls.dag_id == subquery.c.dag_id, + cls.execution_date == subquery.c.execution_date)) +.all() +) +return dagruns + class Pool(Base): __tablename__ = "slot_pool" http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/www/api/experimental/endpoints.py -- diff --git a/airflow/www/api/experimental/endpoints.py b/airflow/www/api/experimental/endpoints.py index 56b9d79..63355c7 100644 --- a/airflow/www/api/experimental/endpoints.py +++ b/airflow/www/api/experimental/endpoints.py @@ -20,7 +20,8 @@ from airflow.exceptions import AirflowException from airflow.www.app import csrf from flask import ( -g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, send_file +g, Markup, Blueprint, redirect, jsonify, abort, +request, current_app, send_file, url_for ) from datetime import datetime @@ -110,3 +111,23 @@ def task_info(dag_id, task_id): task = dag.get_task(task_id) fields = {k: str(v) for k, v in vars(task).items() if not k.startswith('_')} return jsonify(fields) + + +@api_experimental.route('/latest_runs', methods=['GET']) +@requires_authentication +def latest_dag_runs(): +"""Returns the latest running DagRun for each DAG formatted for the UI. """ +from airflow.models import DagRun +dagruns = DagRun.get_latest_runs() +payload = [] +for dagrun in dagruns: +if dagrun.execution_date: +payload.append({ +'dag_id': dagrun.dag_id, +'execution_date': dagrun.execution_date.strftime("%Y-%m-%d %H:%M"), +'start_date': ((dagrun.start_date or '') and + dagrun.start_date.strftime("%Y-%m-%d %H:%M")), +'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id, + execution_date=dagrun.execution_date) +}) +return jsonify(payload) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/www/templates/airflow/dags.html -- diff --git a/airflow/www/templates/airflow/dags.html b/airflow/www/templates/airflow/dags.html index 7c59dea..c0dbc62 100644 --- a/airflow/www/templates/airflow/dags.html +++ b/airflow/www/templates/airflow/dags.html @@ -105,19 +105,7 @@ - -{% if dag %} -{% set last_run =
[07/36] incubator-airflow git commit: [AIRFLOW-1054] Fix broken import in test_dag
[AIRFLOW-1054] Fix broken import in test_dag Closes #2201 from r39132/fix_broken_import_on_test_dag (cherry picked from commit c64e876bd50eeb6c9e2600ac9d832c05eb5e9640) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/68b1c982 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/68b1c982 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/68b1c982 Branch: refs/heads/v1-8-stable Commit: 68b1c982e048878ec9dd658072c147e4341bf2c2 Parents: 5eb3335 Author: Siddharth AnandAuthored: Mon Apr 3 13:10:51 2017 -0700 Committer: Chris Riccomini Committed: Mon Apr 3 13:11:42 2017 -0700 -- dags/test_dag.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68b1c982/dags/test_dag.py -- diff --git a/dags/test_dag.py b/dags/test_dag.py index db0b648..f2a9f6a 100644 --- a/dags/test_dag.py +++ b/dags/test_dag.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from airflow import utils from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime, timedelta @@ -24,7 +25,7 @@ DAG_NAME = 'test_dag_v1' default_args = { 'owner': 'airflow', 'depends_on_past': True, -'start_date': airflow.utils.dates.days_ago(2) +'start_date': utils.dates.days_ago(2) } dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', default_args=default_args)
[01/36] incubator-airflow git commit: [AIRFLOW-974] Fix mkdirs race condition
Repository: incubator-airflow Updated Branches: refs/heads/v1-8-stable 8e7a55836 -> 0d8509e7e [AIRFLOW-974] Fix mkdirs race condition mkdirs congtained a race condition for when if the directory is created between the os.path.exists and the os.makedirs calls, the os.makedirs will fail with an OSError. This reworks the function to be non-recursive as well, as permission errors were due to umasks being applied. Closes #2147 from bolkedebruin/AIRFLOW-974 (cherry picked from commit c5cc298cf16c9777c90aec1fc8cc24bde62f7b2f) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b37cfa1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b37cfa1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b37cfa1 Branch: refs/heads/v1-8-stable Commit: 3b37cfa1f2642ff90908a3af0a5674637c9518ee Parents: 2a60897 Author: Bolke de Bruin Authored: Mon Mar 13 20:14:07 2017 -0700 Committer: Bolke de Bruin Committed: Mon Mar 13 20:14:30 2017 -0700 -- airflow/utils/file.py | 20 +--- 1 file changed, 9 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b37cfa1/airflow/utils/file.py -- diff --git a/airflow/utils/file.py b/airflow/utils/file.py index 78ddeaa..352755e 100644 --- a/airflow/utils/file.py +++ b/airflow/utils/file.py @@ -44,16 +44,14 @@ def mkdirs(path, mode): :param path: The directory to create :type path: str -:param mode: The mode to give to the directory e.g. 0o755 +:param mode: The mode to give to the directory e.g. 0o755, ignores umask :type mode: int -:return: A list of directories that were created -:rtype: list[str] """ -if not path or os.path.exists(path): -return [] -(head, _) = os.path.split(path) -res = mkdirs(head, mode) -os.mkdir(path) -os.chmod(path, mode) -res += [path] -return res +try: +o_umask = os.umask(0) +os.makedirs(path, mode) +except OSError: +if not os.path.isdir(path): +raise +finally: +os.umask(o_umask)
[11/36] incubator-airflow git commit: [AIRFLOW-111] Include queued tasks in scheduler concurrency check
[AIRFLOW-111] Include queued tasks in scheduler concurrency check The concurrency argument in dags appears to not be obeyed because the scheduler does not check the concurrency properly when checking tasks. The tasks do not run, but this leads to a lot of scheduler churn. Closes #2214 from saguziel/aguziel-fix-concurrency (cherry picked from commit 3ff5abee3f9d29e545e021c2c060e9c9f3045236) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9070a827 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9070a827 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9070a827 Branch: refs/heads/v1-8-stable Commit: 9070a82775691e08fb1b95c28fbc2cc5ee7b843d Parents: 4db53f3 Author: Alex Guziel Authored: Wed Apr 5 09:59:53 2017 +0200 Committer: Bolke de Bruin Committed: Wed Apr 5 10:00:06 2017 +0200 -- airflow/jobs.py | 25 +++- airflow/models.py | 48 ++ tests/jobs.py | 62 ++ tests/models.py | 38 +++ 4 files changed, 142 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9070a827/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 7db9b9c..ce45e05 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -43,7 +43,7 @@ from tabulate import tabulate from airflow import executors, models, settings from airflow import configuration as conf from airflow.exceptions import AirflowException -from airflow.models import DagRun +from airflow.models import DAG, DagRun from airflow.settings import Stats from airflow.task_runner import get_task_runner from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS @@ -1036,7 +1036,7 @@ class SchedulerJob(BaseJob): task_instances, key=lambda ti: (-ti.priority_weight, ti.execution_date)) # DAG IDs with running tasks that equal the concurrency limit of the dag -dag_id_to_running_task_count = {} +dag_id_to_possibly_running_task_count = {} for task_instance in priority_sorted_task_instances: if open_slots <= 0: @@ -1063,22 +1063,24 @@ class SchedulerJob(BaseJob): # reached. dag_id = task_instance.dag_id -if dag_id not in dag_id_to_running_task_count: -dag_id_to_running_task_count[dag_id] = \ -DagRun.get_running_tasks( -session, +if dag_id not in dag_id_to_possibly_running_task_count: +dag_id_to_possibly_running_task_count[dag_id] = \ +DAG.get_num_task_instances( dag_id, -simple_dag_bag.get_dag(dag_id).task_ids) +simple_dag_bag.get_dag(dag_id).task_ids, +states=[State.RUNNING, State.QUEUED], +session=session) -current_task_concurrency = dag_id_to_running_task_count[dag_id] +current_task_concurrency = dag_id_to_possibly_running_task_count[dag_id] task_concurrency_limit = simple_dag_bag.get_dag(dag_id).concurrency -self.logger.info("DAG {} has {}/{} running tasks" +self.logger.info("DAG {} has {}/{} running and queued tasks" .format(dag_id, current_task_concurrency, task_concurrency_limit)) -if current_task_concurrency > task_concurrency_limit: +if current_task_concurrency >= task_concurrency_limit: self.logger.info("Not executing {} since the number " - "of tasks running from DAG {} is >= to the " + "of tasks running or queued from DAG {}" + " is >= to the " "DAG's task concurrency limit of {}" .format(task_instance, dag_id, @@ -1137,6 +1139,7 @@ class SchedulerJob(BaseJob): queue=queue) open_slots -= 1 +dag_id_to_possibly_running_task_count[dag_id] += 1 def _process_dags(self, dagbag, dags, tis_out): """ http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9070a827/airflow/models.py
[13/36] incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable argument
[AIRFLOW-969] Catch bad python_callable argument Checks for callable when Operator is created, not when it is run. * added initial PythonOperator unit test, testing run * python_callable must be callable; added unit test Closes #2142 from abloomston/python-callable (cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91674117 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91674117 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91674117 Branch: refs/heads/v1-8-stable Commit: 916741171cc0c6426dbcbe8a2b5ce2468fce870d Parents: dff6d21 Author: abloomston Authored: Thu Mar 16 19:36:00 2017 -0400 Committer: Bolke de Bruin Committed: Thu Apr 6 09:47:18 2017 +0200 -- airflow/operators/python_operator.py | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91674117/airflow/operators/python_operator.py -- diff --git a/airflow/operators/python_operator.py b/airflow/operators/python_operator.py index 114bc7e..cf240f2 100644 --- a/airflow/operators/python_operator.py +++ b/airflow/operators/python_operator.py @@ -16,6 +16,7 @@ from builtins import str from datetime import datetime import logging +from airflow.exceptions import AirflowException from airflow.models import BaseOperator, TaskInstance from airflow.utils.state import State from airflow.utils.decorators import apply_defaults @@ -63,6 +64,8 @@ class PythonOperator(BaseOperator): templates_exts=None, *args, **kwargs): super(PythonOperator, self).__init__(*args, **kwargs) +if not callable(python_callable): +raise AirflowException('`python_callable` param must be callable') self.python_callable = python_callable self.op_args = op_args or [] self.op_kwargs = op_kwargs or {}
[31/36] incubator-airflow git commit: [AIRFLOW-492] Make sure stat updates cannot fail a task
[AIRFLOW-492] Make sure stat updates cannot fail a task Previously a failed commit into the db for the statistics could also fail a task. Secondly, the ui could display out of date statistics. This patch reworks DagStat so that failure to update the statistics does not propagate. Next to that, it make sure the ui always displays the latest statistics. Closes #2254 from bolkedebruin/AIRFLOW-492 (cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a) Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e342d0d2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e342d0d2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e342d0d2 Branch: refs/heads/v1-8-stable Commit: e342d0d223e47ea25f73baaa00a16df414a6e0df Parents: 5800f56 Author: Bolke de BruinAuthored: Wed Apr 26 20:39:48 2017 +0200 Committer: Chris Riccomini Committed: Thu Apr 27 12:35:46 2017 -0700 -- airflow/jobs.py | 4 +- airflow/models.py| 133 ++ airflow/www/views.py | 7 +-- tests/core.py| 34 +++- tests/models.py | 66 ++- 5 files changed, 190 insertions(+), 54 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e342d0d2/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 11dbddf..379c96e 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1177,7 +1177,7 @@ class SchedulerJob(BaseJob): self._process_task_instances(dag, tis_out) self.manage_slas(dag) -models.DagStat.clean_dirty([d.dag_id for d in dags]) +models.DagStat.update([d.dag_id for d in dags]) def _process_executor_events(self): """ @@ -1977,7 +1977,7 @@ class BackfillJob(BaseJob): active_dag_runs.remove(run) if run.dag.is_paused: -models.DagStat.clean_dirty([run.dag_id], session=session) +models.DagStat.update([run.dag_id], session=session) msg = ' | '.join([ "[backfill progress]", http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e342d0d2/airflow/models.py -- diff --git a/airflow/models.py b/airflow/models.py index 2de88f6..1ceb821 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -29,6 +29,7 @@ import functools import getpass import imp import importlib +import itertools import inspect import zipfile import jinja2 @@ -719,6 +720,7 @@ class TaskInstance(Base): even while multiple schedulers may be firing task instances. """ + __tablename__ = "task_instance" task_id = Column(String(ID_LEN), primary_key=True) @@ -3089,7 +3091,7 @@ class DAG(BaseDag, LoggingMixin): for dr in drs: dr.state = state dirty_ids.append(dr.dag_id) -DagStat.clean_dirty(dirty_ids, session=session) +DagStat.update(dirty_ids, session=session) def clear( self, start_date=None, end_date=None, @@ -3383,6 +3385,9 @@ class DAG(BaseDag, LoggingMixin): state=state ) session.add(run) + +DagStat.set_dirty(dag_id=self.dag_id, session=session) + session.commit() run.dag = self @@ -3392,12 +3397,7 @@ class DAG(BaseDag, LoggingMixin): run.verify_integrity(session=session) run.refresh_from_db() -DagStat.set_dirty(self.dag_id, session=session) -# add a placeholder row into DagStat table -if not session.query(DagStat).filter(DagStat.dag_id == self.dag_id).first(): -session.add(DagStat(dag_id=self.dag_id, state=state, count=0, dirty=True)) -session.commit() return run @staticmethod @@ -3805,7 +3805,7 @@ class DagStat(Base): count = Column(Integer, default=0) dirty = Column(Boolean, default=False) -def __init__(self, dag_id, state, count, dirty=False): +def __init__(self, dag_id, state, count=0, dirty=False): self.dag_id = dag_id self.state = state self.count = count @@ -3814,42 +3814,104 @@ class DagStat(Base): @staticmethod @provide_session def set_dirty(dag_id, session=None): -for dag in session.query(DagStat).filter(DagStat.dag_id == dag_id): -dag.dirty = True -session.commit() +""" +:param dag_id: the dag_id to mark dirty +:param session: database session +:return: +""" +DagStat.create(dag_id=dag_id, session=session) + +try: +stats =
[16/36] incubator-airflow git commit: [AIRFLOW-1085] Enhance the SparkSubmitOperator
[AIRFLOW-1085] Enhance the SparkSubmitOperator - Allow the Spark home to be set on per connection basis to obviate the need for the spark-submit to be on the PATH, and allows different versions of Spark to be easily used. - Enable the use of the --driver-memory parameter on the spark-submit by making it parameter on the operator - Enable the use of the --class parameter on the spark-submit by making it a parameter on the operator Closes #2211 from camshrun/sparkSubmitImprovements (cherry picked from commit 0ade066f44257c5e119b292f4cc2ba105774f4e7) Signed-off-by: Bolke de BruinProject: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ceb2ac36 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ceb2ac36 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ceb2ac36 Branch: refs/heads/v1-8-stable Commit: ceb2ac366fce4eac7ca007e6ec15194e71e66409 Parents: 0fa593e Author: Stephan Werges Authored: Fri Apr 7 19:20:46 2017 +0200 Committer: Bolke de Bruin Committed: Fri Apr 7 19:21:38 2017 +0200 -- airflow/contrib/hooks/spark_submit_hook.py | 32 ++-- .../contrib/operators/spark_submit_operator.py | 13 - tests/contrib/hooks/spark_submit_hook.py| 51 +--- .../contrib/operators/spark_submit_operator.py | 8 ++- 4 files changed, 90 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ceb2ac36/airflow/contrib/hooks/spark_submit_hook.py -- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index 619cc71..59d28b5 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -13,6 +13,7 @@ # limitations under the License. # import logging +import os import subprocess import re @@ -25,7 +26,8 @@ log = logging.getLogger(__name__) class SparkSubmitHook(BaseHook): """ This hook is a wrapper around the spark-submit binary to kick off a spark-submit job. -It requires that the "spark-submit" binary is in the PATH. +It requires that the "spark-submit" binary is in the PATH or the spark_home to be +supplied. :param conf: Arbitrary Spark configuration properties :type conf: dict :param conn_id: The connection id as configured in Airflow administration. When an @@ -38,10 +40,14 @@ class SparkSubmitHook(BaseHook): :type py_files: str :param jars: Submit additional jars to upload and place them in executor classpath. :type jars: str +:param java_class: the main class of the Java application +:type java_class: str :param executor_cores: Number of cores per executor (Default: 2) :type executor_cores: int :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G) :type executor_memory: str +:param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) (Default: 1G) +:type driver_memory: str :param keytab: Full path to the file that contains the keytab :type keytab: str :param principal: The name of the kerberos principal used for keytab @@ -60,8 +66,10 @@ class SparkSubmitHook(BaseHook): files=None, py_files=None, jars=None, + java_class=None, executor_cores=None, executor_memory=None, + driver_memory=None, keytab=None, principal=None, name='default-name', @@ -72,8 +80,10 @@ class SparkSubmitHook(BaseHook): self._files = files self._py_files = py_files self._jars = jars +self._java_class = java_class self._executor_cores = executor_cores self._executor_memory = executor_memory +self._driver_memory = driver_memory self._keytab = keytab self._principal = principal self._name = name @@ -82,7 +92,7 @@ class SparkSubmitHook(BaseHook): self._sp = None self._yarn_application_id = None -(self._master, self._queue, self._deploy_mode) = self._resolve_connection() +(self._master, self._queue, self._deploy_mode, self._spark_home) = self._resolve_connection() self._is_yarn = 'yarn' in self._master def _resolve_connection(self): @@ -90,6 +100,7 @@ class SparkSubmitHook(BaseHook): master = 'yarn' queue = None deploy_mode = None +spark_home = None try: # Master can be local, yarn, spark://HOST:PORT or mesos://HOST:PORT @@ -105,6 +116,8 @@ class SparkSubmitHook(BaseHook):
[incubator-airflow] Git Push Summary
Repository: incubator-airflow Updated Tags: refs/tags/1.8.1 2b811c445 -> 0d8509e7e
[incubator-airflow] Git Push Summary
Repository: incubator-airflow Updated Tags: refs/tags/1.8.1 [created] 2b811c445
[jira] [Closed] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen closed AIRFLOW-1183. Resolution: Fixed > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > *Error* > {code} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} > *Code* > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp', > deploy_mode='cluster', > dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook
[ https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini closed AIRFLOW-1179. Resolution: Fixed Fix Version/s: 1.9.0 > Pandas 0.20 broke Google BigQuery hook > -- > > Key: AIRFLOW-1179 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1179 > Project: Apache Airflow > Issue Type: Bug >Reporter: Niels Zeilemaker >Assignee: Niels Zeilemaker > Fix For: 1.9.0 > > > Master build is broken due to pandas bigquery support being moved to an > external package -- This message was sent by Atlassian JIRA (v6.3.15#6346)
incubator-airflow git commit: [AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change
Repository: incubator-airflow Updated Branches: refs/heads/master 4284e6485 -> ac9ccb151 [AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change Closes #2279 from NielsZeilemaker/AIRFLOW-1179 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9ccb15 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9ccb15 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9ccb15 Branch: refs/heads/master Commit: ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9 Parents: 4284e64 Author: Niels ZeilemakerAuthored: Tue May 9 09:42:32 2017 -0700 Committer: Chris Riccomini Committed: Tue May 9 09:42:32 2017 -0700 -- airflow/contrib/hooks/bigquery_hook.py | 2 +- scripts/ci/requirements.txt| 1 + setup.py | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/airflow/contrib/hooks/bigquery_hook.py -- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 53ca123..06de4e8 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -24,7 +24,7 @@ import time from apiclient.discovery import build, HttpError from googleapiclient import errors from builtins import range -from pandas.io.gbq import GbqConnector, \ +from pandas_gbq.gbq import GbqConnector, \ _parse_data as gbq_parse_data, \ _check_google_client_version as gbq_check_google_client_version, \ _test_google_api_imports as gbq_test_google_api_imports http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/scripts/ci/requirements.txt -- diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt index 9769dfb..06ad0bb 100644 --- a/scripts/ci/requirements.txt +++ b/scripts/ci/requirements.txt @@ -61,6 +61,7 @@ nose-ignore-docstring==0.2 nose-parameterized nose-timer pandas +pandas-gbq psutil>=4.2.0, <5.0.0 psycopg2 pydruid http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/setup.py -- diff --git a/setup.py b/setup.py index 0b98e56..b0b2ddd 100644 --- a/setup.py +++ b/setup.py @@ -142,6 +142,7 @@ gcp_api = [ 'google-api-python-client>=1.5.0, <1.6.0', 'oauth2client>=2.0.2, <2.1.0', 'PyOpenSSL', +'pandas-gbq' ] hdfs = ['snakebite>=2.7.8'] webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']
[jira] [Commented] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook
[ https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003006#comment-16003006 ] ASF subversion and git services commented on AIRFLOW-1179: -- Commit ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9 in incubator-airflow's branch refs/heads/master from [~nzeilemaker] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ac9ccb1 ] [AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change Closes #2279 from NielsZeilemaker/AIRFLOW-1179 > Pandas 0.20 broke Google BigQuery hook > -- > > Key: AIRFLOW-1179 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1179 > Project: Apache Airflow > Issue Type: Bug >Reporter: Niels Zeilemaker >Assignee: Niels Zeilemaker > > Master build is broken due to pandas bigquery support being moved to an > external package -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook
[ https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003005#comment-16003005 ] ASF subversion and git services commented on AIRFLOW-1179: -- Commit ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9 in incubator-airflow's branch refs/heads/master from [~nzeilemaker] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ac9ccb1 ] [AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change Closes #2279 from NielsZeilemaker/AIRFLOW-1179 > Pandas 0.20 broke Google BigQuery hook > -- > > Key: AIRFLOW-1179 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1179 > Project: Apache Airflow > Issue Type: Bug >Reporter: Niels Zeilemaker >Assignee: Niels Zeilemaker > > Master build is broken due to pandas bigquery support being moved to an > external package -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1184) Contrib Spark Submit Hook does not split argument and argument value
Vianney FOUCAULT created AIRFLOW-1184: - Summary: Contrib Spark Submit Hook does not split argument and argument value Key: AIRFLOW-1184 URL: https://issues.apache.org/jira/browse/AIRFLOW-1184 Project: Apache Airflow Issue Type: Bug Components: contrib, hooks Affects Versions: Airflow 2.0, Airflow 1.8 Reporter: Vianney FOUCAULT Assignee: Vianney FOUCAULT Fix For: Airflow 2.0, Airflow 1.8 Python Popen expect a list as command. Spark submit too, as: * ['--option value'] is not the same as * ['--option', 'value'] in regards of spark. eg spark logs : (yarn logs) Error: Unknown option --end 2017-05-08 Error: Unknown option --begin 2017-05-07 Error: Unknown option --db_name mydb Error: Missing option --begin Error: Missing option --end Error: Missing option --db_name -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. *Error* {code} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > *Error* > {code} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} > *Code* > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. *Error* {code} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp', deploy_mode='cluster', dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. *Error* {code} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > *Error* > {code} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code} > *Code* > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp', > deploy_mode='cluster', > dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} > *Code* > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} *Code* {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} #Code {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > {quote} > *Code* > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} #Code {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > {quote} > #Code > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} {code:python} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > {quote} > {code:python} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} {code} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} {code:python} testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) {code} > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > {quote} > {code} > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: bq. *args: () bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > bq. PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > #Code > ``` > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. {quote} PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > {quote} > PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > {quote} > #Code > ``` > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. ``` PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}``` #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > bq. PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > *args: () > **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > #Code > ``` > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
[ https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sam sen updated AIRFLOW-1183: - Description: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: bq. *args: () bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` was: How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. bq. PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` > How to pass Spark URL for standalone cluster? > - > > Key: AIRFLOW-1183 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 > Project: Apache Airflow > Issue Type: Bug > Components: operators >Affects Versions: Airflow 1.8 >Reporter: sam sen >Priority: Critical > > How can I pass my Spark URL? When I look in the logs I see `--master` is > pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing > it within the function but I'm getting an error. > bq. PendingDeprecationWarning: Invalid arguments were passed to > SparkSubmitOperator. Support for passing such arguments will be dropped in > Airflow 2.0. Invalid arguments were: > bq. *args: () > bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} > #Code > ``` > testSpark = SparkSubmitOperator( >task_id='test-spark', > deploy_mode='cluster', > > application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', > java_class='SimpleApp' >dag=dag) > ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?
sam sen created AIRFLOW-1183: Summary: How to pass Spark URL for standalone cluster? Key: AIRFLOW-1183 URL: https://issues.apache.org/jira/browse/AIRFLOW-1183 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: Airflow 1.8 Reporter: sam sen Priority: Critical How can I pass my Spark URL? When I look in the logs I see `--master` is pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it within the function but I'm getting an error. ``` PendingDeprecationWarning: Invalid arguments were passed to SparkSubmitOperator. Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were: *args: () **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}``` #Code ``` testSpark = SparkSubmitOperator( task_id='test-spark', deploy_mode='cluster', application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar', java_class='SimpleApp' dag=dag) ``` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (AIRFLOW-1182) Contrib Spark Submit operator should template fields
Vianney FOUCAULT created AIRFLOW-1182: - Summary: Contrib Spark Submit operator should template fields Key: AIRFLOW-1182 URL: https://issues.apache.org/jira/browse/AIRFLOW-1182 Project: Apache Airflow Issue Type: Improvement Components: contrib, operators Affects Versions: Airflow 2.0, Airflow 1.8 Reporter: Vianney FOUCAULT Assignee: Vianney FOUCAULT Fix For: Airflow 2.0, 1.8.1 the spark submit operator is not templating any field making {{ ds }} unusable for spark apps. -- This message was sent by Atlassian JIRA (v6.3.15#6346)