[jira] [Commented] (AIRFLOW-1181) Enable delete and list function for Google Cloud Storage Hook

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1181?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003164#comment-16003164
 ] 

ASF subversion and git services commented on AIRFLOW-1181:
--

Commit 24f73c03259bd5a7a699ed5b6c4cd3d559bf9bf8 in incubator-airflow's branch 
refs/heads/master from [~cheny258]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=24f73c0 ]

[AIRFLOW-1181] Add delete and list functionality to gcs_hook

Closes #2281 from mattuuh7/gcs-delete-list


> Enable delete and list function for Google Cloud Storage Hook
> -
>
> Key: AIRFLOW-1181
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1181
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Matthew Chen
>Assignee: Matthew Chen
>Priority: Minor
> Fix For: 1.9.0
>
>
> current {{GoogleCloudStorageHook}} does not support delete of a file, nor 
> does it support listing of files based on prefix. We would like to have these 
> features available for our use-case. 
> The delete function should be able to return true if successfully deleted the 
> object, based on generation etc.
> The list function should be able to list all objects with the given prefix, 
> able to page through large result pages and return a list of all file names 
> satisfying the given prefix criteria



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (AIRFLOW-1181) Enable delete and list function for Google Cloud Storage Hook

2017-05-09 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-1181.

   Resolution: Fixed
Fix Version/s: 1.9.0

> Enable delete and list function for Google Cloud Storage Hook
> -
>
> Key: AIRFLOW-1181
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1181
> Project: Apache Airflow
>  Issue Type: New Feature
>Reporter: Matthew Chen
>Assignee: Matthew Chen
>Priority: Minor
> Fix For: 1.9.0
>
>
> current {{GoogleCloudStorageHook}} does not support delete of a file, nor 
> does it support listing of files based on prefix. We would like to have these 
> features available for our use-case. 
> The delete function should be able to return true if successfully deleted the 
> object, based on generation etc.
> The list function should be able to list all objects with the given prefix, 
> able to page through large result pages and return a list of all file names 
> satisfying the given prefix criteria



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1181] Add delete and list functionality to gcs_hook

2017-05-09 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/master ac9ccb151 -> 24f73c032


[AIRFLOW-1181] Add delete and list functionality to gcs_hook

Closes #2281 from mattuuh7/gcs-delete-list


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/24f73c03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/24f73c03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/24f73c03

Branch: refs/heads/master
Commit: 24f73c03259bd5a7a699ed5b6c4cd3d559bf9bf8
Parents: ac9ccb1
Author: Matthew Chen 
Authored: Tue May 9 10:50:30 2017 -0700
Committer: Chris Riccomini 
Committed: Tue May 9 10:50:37 2017 -0700

--
 airflow/contrib/hooks/gcs_hook.py | 69 ++
 1 file changed, 69 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/24f73c03/airflow/contrib/hooks/gcs_hook.py
--
diff --git a/airflow/contrib/hooks/gcs_hook.py 
b/airflow/contrib/hooks/gcs_hook.py
index dd3cd27..d38ceff 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -152,3 +152,72 @@ class GoogleCloudStorageHook(GoogleCloudBaseHook):
 raise
 
 return False
+
+def delete(self, bucket, object, generation=None):
+"""
+Delete an object if versioning is not enabled for the bucket, or if 
generation
+parameter is used.
+:param bucket: name of the bucket, where the object resides
+:type bucket: string
+:param object: name of the object to delete
+:type object: string
+:param generation: if present, permanently delete the object of this 
generation
+:type generation: string
+:return: True if succeeded
+"""
+service = self.get_conn()
+
+try:
+service \
+.objects() \
+.delete(bucket=bucket, object=object, generation=generation) \
+.execute()
+return True
+except errors.HttpError as ex:
+if ex.resp['status'] == '404':
+return False
+raise
+
+def list(self, bucket, versions=None, maxResults=None, prefix=None):
+"""
+List all objects from the bucket with the give string prefix in name
+:param bucket: bucket name
+:type bucket: string
+:param versions: if true, list all versions of the objects
+:type versions: boolean
+:param maxResults: max count of items to return in a single page of 
responses
+:type maxResults: integer
+:param prefix: prefix string which filters objects whose name begin 
with this prefix
+:type prefix: string
+:return: a stream of object names matching the filtering criteria
+"""
+service = self.get_conn()
+
+ids = list()
+pageToken = None
+while(True):
+response = service.objects().list(
+bucket=bucket,
+versions=versions,
+maxResults=maxResults,
+pageToken=pageToken,
+prefix=prefix
+).execute()
+
+if 'items' not in response:
+logging.info("No items found for prefix:{}".format(prefix))
+break
+
+for item in response['items']:
+if item and 'name' in item:
+ids.append(item['name'])
+
+if 'nextPageToken' not in response:
+# no further pages of results, so stop the loop
+break
+
+pageToken = response['nextPageToken']
+if not pageToken:
+# empty next page token
+break
+return ids



[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003135#comment-16003135
 ] 

ASF subversion and git services commented on AIRFLOW-1138:
--

Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch 
refs/heads/v1-8-stable from [~criccomini]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ]

[AIRFLOW-1138] Add missing licenses to files in scripts directory

Closes #2253 from criccomini/AIRFLOW-1138

(cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7)


> Add licenses to files in scripts directory
> --
>
> Key: AIRFLOW-1138
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1138
> Project: Apache Airflow
>  Issue Type: Task
>  Components: release
>Reporter: Chris Riccomini
>Assignee: Chris Riccomini
>Priority: Blocker
> Fix For: 1.8.1
>
>
> These two files need license headers:
>   modified:   scripts/ci/requirements.txt
>   modified:   scripts/systemd/airflow



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1142) SubDAG Tasks Not Executed Even Though All Dependencies Met

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003137#comment-16003137
 ] 

ASF subversion and git services commented on AIRFLOW-1142:
--

Commit 5800f565628d11d8ea504468bcc14c4d1c0da10c in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5800f56 ]

[AIRFLOW-1142] Do not reset orphaned state for backfills

The scheduler could interfere with backfills when
it resets the state
of tasks that were considered orphaned. This patch
prevents the scheduler
from doing so and adds a guard in the backfill.

Closes #2260 from bolkedebruin/AIRFLOW-1142

(cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986)
Signed-off-by: Bolke de Bruin 


> SubDAG Tasks Not Executed Even Though All Dependencies Met
> --
>
> Key: AIRFLOW-1142
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1142
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: subdag
>Affects Versions: 1.8.1
> Environment: 1.8.1rc1+incubating, Celery
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 2017-04-24T23-20-38-776547, run3-scheduler-stdout.log, 
> run3-task.log, SubDAGOperatorTaskLog-DEBUG.txt, Test_Nested_SubDAG_0.png, 
> Test_Nested_SubDAG_1-Zoomed.png, test_nested_subdag.py
>
>
> Testing on 1.8.1rc1, we noticed that tasks in subdags were not getting 
> executed even though all dependencies had been met.
> We were able to create a simple test DAG that re-creates the issue. Attached 
> is a test DAG, the log file of the subdag operator that shows it fails to run 
> even though dependencies are met, and screenshots of what the UI looks like.
> This is definitely a regression as we have many similarly constructed DAGs 
> that have been running successfully on a pre-v1.8 version (a fork of 
> 1.7.1.3+master) for some time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-970) Latest runs on homepage should load async and in batch

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-970?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003141#comment-16003141
 ] 

ASF subversion and git services commented on AIRFLOW-970:
-

Commit af2d0b4b5cb1ef30a065b1af66f90a01a953e2be in incubator-airflow's branch 
refs/heads/v1-8-stable from [~saguziel]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=af2d0b4 ]

[AIRFLOW-970] Load latest_runs on homepage async

The latest_runs column on the homepage loads
synchronously with an n+1
query. Homepage loads will be significantly faster
if this happens
asynchronously and as a batch.

Closes #2144 from saguziel/aguziel-latest-run-
async

(cherry picked from commit 0f7ddbbedb05f2f11500250db4989edcb27bc164)


> Latest runs on homepage should load async and in batch
> --
>
> Key: AIRFLOW-970
> URL: https://issues.apache.org/jira/browse/AIRFLOW-970
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Alex Guziel
>Assignee: Alex Guziel
>
> The latest_dag_run column on the homepage makes one query for each dag and 
> does it synchronously. We should do the opposite.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1142) SubDAG Tasks Not Executed Even Though All Dependencies Met

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1142?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003138#comment-16003138
 ] 

ASF subversion and git services commented on AIRFLOW-1142:
--

Commit 5800f565628d11d8ea504468bcc14c4d1c0da10c in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=5800f56 ]

[AIRFLOW-1142] Do not reset orphaned state for backfills

The scheduler could interfere with backfills when
it resets the state
of tasks that were considered orphaned. This patch
prevents the scheduler
from doing so and adds a guard in the backfill.

Closes #2260 from bolkedebruin/AIRFLOW-1142

(cherry picked from commit 4e79b830e3261b9d54fdbc7c9dcb510d36565986)
Signed-off-by: Bolke de Bruin 


> SubDAG Tasks Not Executed Even Though All Dependencies Met
> --
>
> Key: AIRFLOW-1142
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1142
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: subdag
>Affects Versions: 1.8.1
> Environment: 1.8.1rc1+incubating, Celery
>Reporter: Joe Schmid
>Priority: Blocker
> Fix For: 1.8.1
>
> Attachments: 2017-04-24T23-20-38-776547, run3-scheduler-stdout.log, 
> run3-task.log, SubDAGOperatorTaskLog-DEBUG.txt, Test_Nested_SubDAG_0.png, 
> Test_Nested_SubDAG_1-Zoomed.png, test_nested_subdag.py
>
>
> Testing on 1.8.1rc1, we noticed that tasks in subdags were not getting 
> executed even though all dependencies had been met.
> We were able to create a simple test DAG that re-creates the issue. Attached 
> is a test DAG, the log file of the subdag operator that shows it fails to run 
> even though dependencies are met, and screenshots of what the UI looks like.
> This is definitely a regression as we have many similarly constructed DAGs 
> that have been running successfully on a pre-v1.8 version (a fork of 
> 1.7.1.3+master) for some time.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003139#comment-16003139
 ] 

ASF subversion and git services commented on AIRFLOW-492:
-

Commit e342d0d223e47ea25f73baaa00a16df414a6e0df in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e342d0d ]

[AIRFLOW-492] Make sure stat updates cannot fail a task

Previously a failed commit into the db for the statistics
could also fail a task. Secondly, the ui could display
out of date statistics.

This patch reworks DagStat so that failure to update the
statistics does not propagate. Next to that, it make sure
the ui always displays the latest statistics.

Closes #2254 from bolkedebruin/AIRFLOW-492

(cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a)


> Insert into dag_stats table results into failed task while task itself 
> succeeded
> 
>
> Key: AIRFLOW-492
> URL: https://issues.apache.org/jira/browse/AIRFLOW-492
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Assignee: Siddharth Anand
>Priority: Critical
> Fix For: 1.8.1
>
> Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in 
> dag_stats that results in a task/dag run being marked failed while the task 
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run  hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: 
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 1409, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
>  line 88, in execute
> executor=self.executor)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3244, in run
> job.run()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 189, in run
> self._execute()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 1855, in _execute
> models.DagStat.clean_dirty([run.dag_id], session=session)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3695, in clean_dirty
> session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 801, in commit
> self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 392, in commit
> self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 372, in _prepare_impl
> self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2019, in flush
> self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2137, in _flush
> transaction.rollback(_capture_exception=True)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 
> 60, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2101, in _flush
> flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 373, in execute
> rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 532, in execute
> uow
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 174, in save_obj
> mapper, table, insert)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 767, in _emit_insert_statements
> execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 914, in execute
> return meth(self, multiparams, params)
>   File 

[jira] [Commented] (AIRFLOW-1127) Move license notices to LICENSE instead of NOTICE

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003134#comment-16003134
 ] 

ASF subversion and git services commented on AIRFLOW-1127:
--

Commit dc6ebaab94bcc69b36bb97eefba3a01ee149b746 in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=dc6ebaa ]

[AIRFLOW-1127] Move license notices to LICENSE

Closes #2250 from bolkedebruin/AIRFLOW-1127

(cherry picked from commit 659827639e256a668d669d0d229abf49d6010bb8)


> Move license notices to LICENSE instead of NOTICE
> -
>
> Key: AIRFLOW-1127
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1127
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Assignee: Bolke de Bruin
>Priority: Blocker
> Fix For: 1.8.1
>
>
> For all the bundled files with different licenses (MIT, BSD, etc), the full
> texts of these licenses should be in the source tarball preferably at the
> end of the LICENSE file.
> webgl-2d needs to be called out as MIT license.
> Are all the entries in the NOTICE file required or do they
> just need to be in the LICENSE file? Any additions to the NOTICE have
> downstream repercussions as they need to be propagated down by any other
> project using airflow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-492) Insert into dag_stats table results into failed task while task itself succeeded

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-492?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003140#comment-16003140
 ] 

ASF subversion and git services commented on AIRFLOW-492:
-

Commit e342d0d223e47ea25f73baaa00a16df414a6e0df in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=e342d0d ]

[AIRFLOW-492] Make sure stat updates cannot fail a task

Previously a failed commit into the db for the statistics
could also fail a task. Secondly, the ui could display
out of date statistics.

This patch reworks DagStat so that failure to update the
statistics does not propagate. Next to that, it make sure
the ui always displays the latest statistics.

Closes #2254 from bolkedebruin/AIRFLOW-492

(cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a)


> Insert into dag_stats table results into failed task while task itself 
> succeeded
> 
>
> Key: AIRFLOW-492
> URL: https://issues.apache.org/jira/browse/AIRFLOW-492
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Bolke de Bruin
>Assignee: Siddharth Anand
>Priority: Critical
> Fix For: 1.8.1
>
> Attachments: subdag_test.py
>
>
> In some occasions there seem to be a duplicate key being inserted in 
> dag_stats that results in a task/dag run being marked failed while the task 
> itself has succeeded.
> [2016-09-07 18:44:16,940] {models.py:3912} INFO - Marking run  hanging_subdags_n16_sqe.level_2_14 @ 2016-04-21 00:00:00: 
> backfill_2016-04-21T00:00:00, externally triggered: False> successful
> [2016-09-07 18:44:17,671] {models.py:1450} ERROR - 
> (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 
> 'hanging_subdags_n16_sqe.level_2_14-success' for key 'PRIMARY'") [SQL: 
> u'INSERT INTO dag_stats (dag_id, state, count, dirty) VALUES (%s, %s, %s, 
> %s)'] [parameters: ('hanging_subdags_n16_sqe.level_2_14', 'success', 3L, 0)]
> Traceback (most recent call last):
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 1409, in run
> result = task_copy.execute(context=context)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/operators/subdag_operator.py",
>  line 88, in execute
> executor=self.executor)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3244, in run
> job.run()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 189, in run
> self._execute()
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/jobs.py",
>  line 1855, in _execute
> models.DagStat.clean_dirty([run.dag_id], session=session)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/utils/db.py",
>  line 54, in wrapper
> result = func(*args, **kwargs)
>   File 
> "/usr/local/lib/python2.7/dist-packages/airflow-1.7.1.3-py2.7.egg/airflow/models.py",
>  line 3695, in clean_dirty
> session.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 801, in commit
> self.transaction.commit()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 392, in commit
> self._prepare_impl()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 372, in _prepare_impl
> self.session.flush()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2019, in flush
> self._flush(objects)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2137, in _flush
> transaction.rollback(_capture_exception=True)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/util/langhelpers.py", line 
> 60, in __exit__
> compat.reraise(exc_type, exc_value, exc_tb)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/session.py", 
> line 2101, in _flush
> flush_context.execute()
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 373, in execute
> rec.execute(self)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/unitofwork.py", 
> line 532, in execute
> uow
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 174, in save_obj
> mapper, table, insert)
>   File 
> "/usr/local/lib/python2.7/dist-packages/sqlalchemy/orm/persistence.py", line 
> 767, in _emit_insert_statements
> execute(statement, multiparams)
>   File "/usr/local/lib/python2.7/dist-packages/sqlalchemy/engine/base.py", 
> line 914, in execute
> return meth(self, multiparams, params)
>   File 

[20/36] incubator-airflow git commit: [AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

2017-05-09 Thread criccomini
[AIRFLOW-1004][AIRFLOW-276] Fix `airflow webserver -D` to run in background

AIRFLOW-276 introduced a monitor process for gunicorn
to find new files in the dag folder, but it also changed
`airflow webserver -D`'s behavior to run in foreground.
This PR fixes that by running the monitor as a daemon
process.

Closes #2208 from sekikn/AIRFLOW-1004

(cherry picked from commit a9b20a04b052e9479dbb79fd46124293085610e9)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a5fb785
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a5fb785
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a5fb785

Branch: refs/heads/v1-8-stable
Commit: 0a5fb7856b545073516210fcfc369d2072823ae9
Parents: c94b3a0
Author: Kengo Seki 
Authored: Tue Apr 4 08:32:44 2017 +0200
Committer: Chris Riccomini 
Committed: Mon Apr 10 14:24:31 2017 -0700

--
 airflow/bin/cli.py | 64 -
 tests/core.py  | 54 +
 2 files changed, 107 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a5fb785/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e9c54e6..e4755c7 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -753,7 +753,12 @@ def webserver(args):
 app.run(debug=True, port=args.port, host=args.hostname,
 ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else 
None)
 else:
-pid, stdout, stderr, log_file = setup_locations("webserver", 
pid=args.pid)
+pid, stdout, stderr, log_file = setup_locations("webserver", args.pid, 
args.stdout, args.stderr, args.log_file)
+if args.daemon:
+handle = setup_logging(log_file)
+stdout = open(stdout, 'w+')
+stderr = open(stderr, 'w+')
+
 print(
 textwrap.dedent('''\
 Running the Gunicorn Server with:
@@ -771,7 +776,6 @@ def webserver(args):
 '-t', str(worker_timeout),
 '-b', args.hostname + ':' + str(args.port),
 '-n', 'airflow-webserver',
-'-p', str(pid),
 '-c', 'airflow.www.gunicorn_config'
 ]
 
@@ -782,28 +786,66 @@ def webserver(args):
 run_args += ['--error-logfile', str(args.error_logfile)]
 
 if args.daemon:
-run_args += ["-D"]
+run_args += ['-D', '-p', str(pid)]
+
 if ssl_cert:
 run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]
 
 run_args += ["airflow.www.app:cached_app()"]
 
-gunicorn_master_proc = subprocess.Popen(run_args)
+gunicorn_master_proc = None
 
 def kill_proc(dummy_signum, dummy_frame):
 gunicorn_master_proc.terminate()
 gunicorn_master_proc.wait()
 sys.exit(0)
 
-signal.signal(signal.SIGINT, kill_proc)
-signal.signal(signal.SIGTERM, kill_proc)
+def monitor_gunicorn(gunicorn_master_proc):
+# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
+if conf.getint('webserver', 'worker_refresh_interval') > 0:
+restart_workers(gunicorn_master_proc, num_workers)
+else:
+while True:
+time.sleep(1)
 
-# These run forever until SIG{INT, TERM, KILL, ...} signal is sent
-if conf.getint('webserver', 'worker_refresh_interval') > 0:
-restart_workers(gunicorn_master_proc, num_workers)
+if args.daemon:
+base, ext = os.path.splitext(pid)
+ctx = daemon.DaemonContext(
+pidfile=TimeoutPIDLockFile(base + "-monitor" + ext, -1),
+files_preserve=[handle],
+stdout=stdout,
+stderr=stderr,
+signal_map={
+signal.SIGINT: kill_proc,
+signal.SIGTERM: kill_proc
+},
+)
+with ctx:
+subprocess.Popen(run_args)
+
+# Reading pid file directly, since Popen#pid doesn't
+# seem to return the right value with DaemonContext.
+while True:
+try:
+with open(pid) as f:
+gunicorn_master_proc_pid = int(f.read())
+break
+except IOError:
+logging.debug("Waiting for gunicorn's pid file to be 
created.")
+time.sleep(0.1)
+
+gunicorn_master_proc = psutil.Process(gunicorn_master_proc_pid)
+ 

[jira] [Commented] (AIRFLOW-1127) Move license notices to LICENSE instead of NOTICE

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1127?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003133#comment-16003133
 ] 

ASF subversion and git services commented on AIRFLOW-1127:
--

Commit dc6ebaab94bcc69b36bb97eefba3a01ee149b746 in incubator-airflow's branch 
refs/heads/v1-8-stable from [~bolke]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=dc6ebaa ]

[AIRFLOW-1127] Move license notices to LICENSE

Closes #2250 from bolkedebruin/AIRFLOW-1127

(cherry picked from commit 659827639e256a668d669d0d229abf49d6010bb8)


> Move license notices to LICENSE instead of NOTICE
> -
>
> Key: AIRFLOW-1127
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1127
> Project: Apache Airflow
>  Issue Type: Improvement
>Reporter: Bolke de Bruin
>Assignee: Bolke de Bruin
>Priority: Blocker
> Fix For: 1.8.1
>
>
> For all the bundled files with different licenses (MIT, BSD, etc), the full
> texts of these licenses should be in the source tarball preferably at the
> end of the LICENSE file.
> webgl-2d needs to be called out as MIT license.
> Are all the entries in the NOTICE file required or do they
> just need to be in the LICENSE file? Any additions to the NOTICE have
> downstream repercussions as they need to be propagated down by any other
> project using airflow.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[14/36] incubator-airflow git commit: [AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

2017-05-09 Thread criccomini
[AIRFLOW-1033][AIFRLOW-1033] Fix ti_deps for no schedule dags

DAGs that did not have a schedule (None or @once)
make the dependency
checker raise an exception as the previous
schedule will not exist.

Also activates all ti_deps tests.

Closes #2220 from bolkedebruin/AIRFLOW-1033

(cherry picked from commit fbcbd053a2c5fffb0c95eb55a91cb92fa860e1ab)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ebfc3ea7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ebfc3ea7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ebfc3ea7

Branch: refs/heads/v1-8-stable
Commit: ebfc3ea73ae1ffe273e4ff532f1ad47441bef518
Parents: 9167411
Author: Bolke de Bruin 
Authored: Thu Apr 6 14:03:11 2017 +0200
Committer: Bolke de Bruin 
Committed: Thu Apr 6 14:03:24 2017 +0200

--
 airflow/ti_deps/deps/base_ti_dep.py |  14 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py |   5 +
 .../ti_deps/deps/dag_ti_slots_available_dep.py  |  41 ---
 tests/ti_deps/deps/dag_unpaused_dep.py  |  41 ---
 tests/ti_deps/deps/dagrun_exists_dep.py |  41 ---
 tests/ti_deps/deps/not_in_retry_period_dep.py   |  61 
 tests/ti_deps/deps/not_running_dep.py   |  39 ---
 tests/ti_deps/deps/not_skipped_dep.py   |  38 ---
 tests/ti_deps/deps/pool_has_space_dep.py|  37 ---
 tests/ti_deps/deps/prev_dagrun_dep.py   | 143 -
 tests/ti_deps/deps/runnable_exec_date_dep.py|  92 --
 .../deps/test_dag_ti_slots_available_dep.py |  42 +++
 tests/ti_deps/deps/test_dag_unpaused_dep.py |  42 +++
 tests/ti_deps/deps/test_dagrun_exists_dep.py|  40 +++
 .../deps/test_not_in_retry_period_dep.py|  59 
 tests/ti_deps/deps/test_not_running_dep.py  |  37 +++
 tests/ti_deps/deps/test_not_skipped_dep.py  |  36 +++
 tests/ti_deps/deps/test_prev_dagrun_dep.py  | 123 
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  76 +
 tests/ti_deps/deps/test_trigger_rule_dep.py | 252 
 tests/ti_deps/deps/test_valid_state_dep.py  |  46 +++
 tests/ti_deps/deps/trigger_rule_dep.py  | 295 ---
 tests/ti_deps/deps/valid_state_dep.py   |  49 ---
 23 files changed, 768 insertions(+), 881 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ebfc3ea7/airflow/ti_deps/deps/base_ti_dep.py
--
diff --git a/airflow/ti_deps/deps/base_ti_dep.py 
b/airflow/ti_deps/deps/base_ti_dep.py
index 0188043..bad1fa0 100644
--- a/airflow/ti_deps/deps/base_ti_dep.py
+++ b/airflow/ti_deps/deps/base_ti_dep.py
@@ -51,7 +51,7 @@ class BaseTIDep(object):
 """
 return getattr(self, 'NAME', self.__class__.__name__)
 
-def _get_dep_statuses(self, ti, session, dep_context):
+def _get_dep_statuses(self, ti, session, dep_context=None):
 """
 Abstract method that returns an iterable of TIDepStatus objects that 
describe
 whether the given task instance has this dependency met.
@@ -69,7 +69,7 @@ class BaseTIDep(object):
 raise NotImplementedError
 
 @provide_session
-def get_dep_statuses(self, ti, session, dep_context):
+def get_dep_statuses(self, ti, session, dep_context=None):
 """
 Wrapper around the private _get_dep_statuses method that contains some 
global
 checks for all dependencies.
@@ -81,6 +81,12 @@ class BaseTIDep(object):
 :param dep_context: the context for which this dependency should be 
evaluated for
 :type dep_context: DepContext
 """
+# this avoids a circular dependency
+from airflow.ti_deps.dep_context import DepContext
+
+if dep_context is None:
+dep_context = DepContext()
+
 if self.IGNOREABLE and dep_context.ignore_all_deps:
 yield self._passing_status(
 reason="Context specified all dependencies should be ignored.")
@@ -95,7 +101,7 @@ class BaseTIDep(object):
 yield dep_status
 
 @provide_session
-def is_met(self, ti, session, dep_context):
+def is_met(self, ti, session, dep_context=None):
 """
 Returns whether or not this dependency is met for a given task 
instance. A
 dependency is considered met if all of the dependency statuses it 
reports are
@@ -113,7 +119,7 @@ class BaseTIDep(object):
self.get_dep_statuses(ti, session, dep_context))
 
 @provide_session
-def get_failure_reasons(self, ti, session, dep_context):
+def get_failure_reasons(self, ti, session, dep_context=None):
 """
 Returns an iterable of strings that 

[jira] [Commented] (AIRFLOW-1138) Add licenses to files in scripts directory

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003136#comment-16003136
 ] 

ASF subversion and git services commented on AIRFLOW-1138:
--

Commit 4b5c6efd4a450b4a202f87cb12ea1f9eb4daf8fc in incubator-airflow's branch 
refs/heads/v1-8-stable from [~criccomini]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4b5c6ef ]

[AIRFLOW-1138] Add missing licenses to files in scripts directory

Closes #2253 from criccomini/AIRFLOW-1138

(cherry picked from commit 94f9822ffd867e559fd71046124626fee6acedf7)


> Add licenses to files in scripts directory
> --
>
> Key: AIRFLOW-1138
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1138
> Project: Apache Airflow
>  Issue Type: Task
>  Components: release
>Reporter: Chris Riccomini
>Assignee: Chris Riccomini
>Priority: Blocker
> Fix For: 1.8.1
>
>
> These two files need license headers:
>   modified:   scripts/ci/requirements.txt
>   modified:   scripts/systemd/airflow



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[19/36] incubator-airflow git commit: [AIRFLOW-1001] Fix landing times if there is no following schedule

2017-05-09 Thread criccomini
[AIRFLOW-1001] Fix landing times if there is no following schedule

@once does not have a following schedule. This was
not checked for
and therefore the landing times page could bork.

Closes #2213 from bolkedebruin/AIRFLOW-1001

(cherry picked from commit 0371df4f1bd78e220e591d5cb23630d6a062f109)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/c94b3a02
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/c94b3a02
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/c94b3a02

Branch: refs/heads/v1-8-stable
Commit: c94b3a02f430f1a5a86c83d5f7286dcdac31492b
Parents: aec9770
Author: Bolke de Bruin 
Authored: Wed Apr 5 09:57:55 2017 +0200
Committer: Chris Riccomini 
Committed: Mon Apr 10 14:19:53 2017 -0700

--
 airflow/www/views.py |  2 +-
 tests/core.py| 16 ++--
 2 files changed, 15 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c94b3a02/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index 962c1f0..fec4779 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -1553,7 +1553,7 @@ class Airflow(BaseView):
 for ti in task.get_task_instances(session, start_date=min_date,
   end_date=base_date):
 ts = ti.execution_date
-if dag.schedule_interval:
+if dag.schedule_interval and dag.following_schedule(ts):
 ts = dag.following_schedule(ts)
 if ti.end_date:
 dttm = wwwutils.epoch(ti.execution_date)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/c94b3a02/tests/core.py
--
diff --git a/tests/core.py b/tests/core.py
index 870a0cb..c55b1e2 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1413,6 +1413,7 @@ class WebUiTests(unittest.TestCase):
 self.dag_bash2 = self.dagbag.dags['test_example_bash_operator']
 self.sub_dag = self.dagbag.dags['example_subdag_operator']
 self.runme_0 = self.dag_bash.get_task('runme_0')
+self.example_xcom = self.dagbag.dags['example_xcom']
 
 self.dag_bash2.create_dagrun(
 run_id="test_{}".format(models.DagRun.id_for_date(datetime.now())),
@@ -1428,6 +1429,13 @@ class WebUiTests(unittest.TestCase):
 state=State.RUNNING
 )
 
+self.example_xcom.create_dagrun(
+run_id="test_{}".format(models.DagRun.id_for_date(datetime.now())),
+execution_date=DEFAULT_DATE,
+start_date=datetime.now(),
+state=State.RUNNING
+)
+
 def test_index(self):
 response = self.app.get('/', follow_redirects=True)
 assert "DAGs" in response.data.decode('utf-8')
@@ -1473,8 +1481,12 @@ class WebUiTests(unittest.TestCase):
 assert "example_bash_operator" in response.data.decode('utf-8')
 response = self.app.get(
 '/admin/airflow/landing_times?'
-'days=30_id=example_bash_operator')
-assert "example_bash_operator" in response.data.decode('utf-8')
+'days=30_id=test_example_bash_operator')
+assert "test_example_bash_operator" in response.data.decode('utf-8')
+response = self.app.get(
+'/admin/airflow/landing_times?'
+'days=30_id=example_xcom')
+assert "example_xcom" in response.data.decode('utf-8')
 response = self.app.get(
 '/admin/airflow/gantt?dag_id=example_bash_operator')
 assert "example_bash_operator" in response.data.decode('utf-8')



[33/36] incubator-airflow git commit: [AIRFLOW-XXX] Fix merge issue with test/models.py by adding execution_date

2017-05-09 Thread criccomini
[AIRFLOW-XXX] Fix merge issue with test/models.py by adding execution_date


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d61af623
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d61af623
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d61af623

Branch: refs/heads/v1-8-stable
Commit: d61af623178253eb39a1fabd6116a94dca3f33a6
Parents: 0a105ee
Author: Chris Riccomini 
Authored: Thu Apr 27 13:15:37 2017 -0700
Committer: Chris Riccomini 
Committed: Thu Apr 27 13:15:37 2017 -0700

--
 tests/models.py | 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d61af623/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index 981561a..da36d56 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -285,11 +285,13 @@ class DagStatTest(unittest.TestCase):
 
 class DagRunTest(unittest.TestCase):
 
-def create_dag_run(self, dag, state=State.RUNNING, task_states=None):
+def create_dag_run(self, dag, state=State.RUNNING, task_states=None, 
execution_date=None):
 now = datetime.datetime.now()
+if execution_date is None:
+execution_date = now
 dag_run = dag.create_dagrun(
 run_id='manual__' + now.isoformat(),
-execution_date=now,
+execution_date=execution_date,
 start_date=now,
 state=state,
 external_trigger=False,



[12/36] incubator-airflow git commit: Merge pull request #2195 from bolkedebruin/AIRFLOW-719

2017-05-09 Thread criccomini
Merge pull request #2195 from bolkedebruin/AIRFLOW-719

(cherry picked from commit 4a6bef69d1817a5fc3ddd6ffe14c2578eaa49cf0)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/dff6d21b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/dff6d21b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/dff6d21b

Branch: refs/heads/v1-8-stable
Commit: dff6d21bfd9a2585ca484fc8fd56aa100f640908
Parents: 9070a82
Author: Bolke de Bruin 
Authored: Tue Apr 4 17:04:12 2017 +0200
Committer: Bolke de Bruin 
Committed: Wed Apr 5 19:16:22 2017 +0200

--
 airflow/operators/latest_only_operator.py |  30 ++-
 airflow/operators/python_operator.py  |  82 +--
 airflow/ti_deps/deps/trigger_rule_dep.py  |   6 +-
 scripts/ci/requirements.txt   |   1 +
 tests/dags/test_dagrun_short_circuit_false.py |  38 
 tests/models.py   |  77 +++
 tests/operators/__init__.py   |   2 +
 tests/operators/latest_only_operator.py   |  12 +-
 tests/operators/python_operator.py| 244 +
 9 files changed, 384 insertions(+), 108 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dff6d21b/airflow/operators/latest_only_operator.py
--
diff --git a/airflow/operators/latest_only_operator.py 
b/airflow/operators/latest_only_operator.py
index 8b4e614..9d5defb 100644
--- a/airflow/operators/latest_only_operator.py
+++ b/airflow/operators/latest_only_operator.py
@@ -34,7 +34,7 @@ class LatestOnlyOperator(BaseOperator):
 def execute(self, context):
 # If the DAG Run is externally triggered, then return without
 # skipping downstream tasks
-if context['dag_run'].external_trigger:
+if context['dag_run'] and context['dag_run'].external_trigger:
 logging.info("""Externally triggered DAG_Run:
  allowing execution to proceed.""")
 return
@@ -46,17 +46,39 @@ class LatestOnlyOperator(BaseOperator):
 logging.info(
 'Checking latest only with left_window: %s right_window: %s '
 'now: %s', left_window, right_window, now)
+
 if not left_window < now <= right_window:
 logging.info('Not latest execution, skipping downstream.')
 session = settings.Session()
-for task in context['task'].downstream_list:
-ti = TaskInstance(
-task, execution_date=context['ti'].execution_date)
+
+TI = TaskInstance
+tis = session.query(TI).filter(
+TI.execution_date == context['ti'].execution_date,
+TI.task_id.in_(context['task'].downstream_task_ids)
+).with_for_update().all()
+
+for ti in tis:
 logging.info('Skipping task: %s', ti.task_id)
 ti.state = State.SKIPPED
 ti.start_date = now
 ti.end_date = now
 session.merge(ti)
+
+# this is defensive against dag runs that are not complete
+for task in context['task'].downstream_list:
+if task.task_id in tis:
+continue
+
+logging.warning("Task {} was not part of a dag run. "
+"This should not happen."
+.format(task))
+now = datetime.datetime.now()
+ti = TaskInstance(task, 
execution_date=context['ti'].execution_date)
+ti.state = State.SKIPPED
+ti.start_date = now
+ti.end_date = now
+session.merge(ti)
+
 session.commit()
 session.close()
 logging.info('Done.')

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/dff6d21b/airflow/operators/python_operator.py
--
diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index b5f6386..114bc7e 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -106,14 +106,36 @@ class BranchPythonOperator(PythonOperator):
 logging.info("Following branch " + branch)
 logging.info("Marking other directly downstream tasks as skipped")
 session = settings.Session()
+
+TI = TaskInstance
+tis = session.query(TI).filter(
+TI.execution_date == context['ti'].execution_date,
+TI.task_id.in_(context['task'].downstream_task_ids),
+TI.task_id != 

[08/36] incubator-airflow git commit: [AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

2017-05-09 Thread criccomini
[AIRFLOW-1011] Fix bug in BackfillJob._execute() for SubDAGs

BackfillJob._execute() checks that the next run
date is less than
or equal to the end date before creating a DAG run
and task
instances. For SubDAGs, the next run date is not
relevant,
i.e. schedule_interval can be anything other than
None
or '@once' and should be ignored. However, current
code calculates
the next run date for a SubDAG and the condition
check mentioned
above always fails for SubDAG triggered manually.

This change adds a simple check to determine if
this is a SubDAG
and, if so, sets next run date to DAG run's start
date.

Closes #2179 from joeschmid/AIRFLOW-1011-fix-bug-
backfill-execute-for-subdags

(cherry picked from commit 56501e6062df9456f7ac4efe94e21940734dd5bc)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2bebeaf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2bebeaf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2bebeaf9

Branch: refs/heads/v1-8-stable
Commit: 2bebeaf9554d35710de6eb1b4006157e105ac79b
Parents: 68b1c98
Author: Joe Schmid 
Authored: Tue Apr 4 08:27:45 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:28:07 2017 +0200

--
 airflow/jobs.py   |  7 +--
 airflow/models.py |  1 +
 tests/jobs.py | 28 
 3 files changed, 34 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 222d9ba..7db9b9c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1734,7 +1734,7 @@ class BackfillJob(BaseJob):
 
 # consider max_active_runs but ignore when running subdags
 # "parent.child" as a dag_id is by convention a subdag
-if self.dag.schedule_interval and "." not in self.dag.dag_id:
+if self.dag.schedule_interval and not self.dag.is_subdag:
 active_runs = DagRun.find(
 dag_id=self.dag.dag_id,
 state=State.RUNNING,
@@ -1774,8 +1774,11 @@ class BackfillJob(BaseJob):
 
 # create dag runs
 dr_start_date = start_date or min([t.start_date for t in 
self.dag.tasks])
-next_run_date = self.dag.normalize_schedule(dr_start_date)
 end_date = end_date or datetime.now()
+# next run date for a subdag isn't relevant (schedule_interval for 
subdags
+# is ignored) so we use the dag run's start date in the case of a 
subdag
+next_run_date = (self.dag.normalize_schedule(dr_start_date)
+ if not self.dag.is_subdag else dr_start_date)
 
 active_dag_runs = []
 while next_run_date and next_run_date <= end_date:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index bdda701..fdff54e 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -2682,6 +2682,7 @@ class DAG(BaseDag, LoggingMixin):
 self.sla_miss_callback = sla_miss_callback
 self.orientation = orientation
 self.catchup = catchup
+self.is_subdag = False  # DagBag.bag_dag() will set this to True if 
appropriate
 
 self.partial = False
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2bebeaf9/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index aee0e9c..f9ede68 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,34 @@ class BackfillJobTest(unittest.TestCase):
 else:
 self.assertEqual(State.NONE, ti.state)
 
+def test_backfill_execute_subdag(self):
+dag = self.dagbag.get_dag('example_subdag_operator')
+subdag_op_task = dag.get_task('section-1')
+
+subdag = subdag_op_task.subdag
+subdag.schedule_interval = '@daily'
+
+start_date = datetime.datetime.now()
+executor = TestExecutor(do_update=True)
+job = BackfillJob(dag=subdag,
+  start_date=start_date,
+  end_date=start_date,
+  executor=executor,
+  donot_pickle=True)
+job.run()
+
+history = executor.history
+subdag_history = history[0]
+
+# check that all 5 task instances of the subdag 'section-1' were 
executed
+self.assertEqual(5, len(subdag_history))
+for sdh in subdag_history:
+ti = sdh[3]
+self.assertIn('section-1-task-', 

[21/36] incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

2017-05-09 Thread criccomini
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import

(cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a9e0894b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a9e0894b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a9e0894b

Branch: refs/heads/v1-8-stable
Commit: a9e0894ba0113cf62c7e9006fb0b42085bc5e9f9
Parents: 0a5fb78
Author: pdambrauskas 
Authored: Tue Apr 4 08:39:54 2017 +0200
Committer: Chris Riccomini 
Committed: Mon Apr 10 14:25:43 2017 -0700

--

--




[05/36] incubator-airflow git commit: [AIRFLOW-832] Let debug server run without SSL

2017-05-09 Thread criccomini
[AIRFLOW-832] Let debug server run without SSL

Closes #2051 from gsakkis/fix-debug-server

(cherry picked from commit b0ae70d3a8e935dc9266b6853683ae5375a7390b)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/eb12f016
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/eb12f016
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/eb12f016

Branch: refs/heads/v1-8-stable
Commit: eb12f0164fbeedbe2701744c213cc90e6fc805f5
Parents: 46ca569
Author: George Sakkis 
Authored: Sun Feb 12 16:09:26 2017 -0500
Committer: Chris Riccomini 
Committed: Wed Mar 29 14:12:06 2017 -0700

--
 airflow/bin/cli.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/eb12f016/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index 61d8707..e9c54e6 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -751,7 +751,7 @@ def webserver(args):
 "Starting the web server on port {0} and host {1}.".format(
 args.port, args.hostname))
 app.run(debug=True, port=args.port, host=args.hostname,
-ssl_context=(ssl_cert, ssl_key))
+ssl_context=(ssl_cert, ssl_key) if ssl_cert and ssl_key else 
None)
 else:
 pid, stdout, stderr, log_file = setup_locations("webserver", 
pid=args.pid)
 print(



[02/36] incubator-airflow git commit: [AIRFLOW-989] Do not mark dag run successful if unfinished tasks

2017-05-09 Thread criccomini
[AIRFLOW-989] Do not mark dag run successful if unfinished tasks

Dag runs could be marked successful if all root
tasks were successful,
even if some tasks did not run yet, ie. in case of
clearing. Now
we consider unfinished_tasks, before marking
successful.

Closes #2154 from bolkedebruin/AIRFLOW-989

(cherry picked from commit 3d6095ff5cf6eff0444d7e47a2360765f2953daf)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/15600e42
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/15600e42
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/15600e42

Branch: refs/heads/v1-8-stable
Commit: 15600e42c805b222d6147b60376b56c8e708dcde
Parents: 3b37cfa
Author: Bolke de Bruin 
Authored: Wed Mar 15 16:39:12 2017 -0700
Committer: Bolke de Bruin 
Committed: Wed Mar 15 16:39:26 2017 -0700

--
 airflow/models.py |  6 +++---
 tests/models.py   | 51 ++
 2 files changed, 54 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 7c6590f..42b8a7f 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4064,9 +4064,9 @@ class DagRun(Base):
 logging.info('Marking run {} failed'.format(self))
 self.state = State.FAILED
 
-# if all roots succeeded, the run succeeded
-elif all(r.state in (State.SUCCESS, State.SKIPPED)
- for r in roots):
+# if all roots succeeded and no unfinished tasks, the run succeeded
+elif not unfinished_tasks and all(r.state in (State.SUCCESS, 
State.SKIPPED)
+  for r in roots):
 logging.info('Marking run {} successful'.format(self))
 self.state = State.SUCCESS
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/15600e42/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index ffd1f31..1fbb3e6 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -259,6 +259,57 @@ class DagRunTest(unittest.TestCase):
 updated_dag_state = dag_run.update_state()
 self.assertEqual(State.SUCCESS, updated_dag_state)
 
+def test_dagrun_success_conditions(self):
+session = settings.Session()
+
+dag = DAG(
+'test_dagrun_success_conditions',
+start_date=DEFAULT_DATE,
+default_args={'owner': 'owner1'})
+
+# A -> B
+# A -> C -> D
+# ordered: B, D, C, A or D, B, C, A or D, C, B, A
+with dag:
+op1 = DummyOperator(task_id='A')
+op2 = DummyOperator(task_id='B')
+op3 = DummyOperator(task_id='C')
+op4 = DummyOperator(task_id='D')
+op1.set_upstream([op2, op3])
+op3.set_upstream(op4)
+
+dag.clear()
+
+now = datetime.datetime.now()
+dr = dag.create_dagrun(run_id='test_dagrun_success_conditions',
+   state=State.RUNNING,
+   execution_date=now,
+   start_date=now)
+
+# op1 = root
+ti_op1 = dr.get_task_instance(task_id=op1.task_id)
+ti_op1.set_state(state=State.SUCCESS, session=session)
+
+ti_op2 = dr.get_task_instance(task_id=op2.task_id)
+ti_op3 = dr.get_task_instance(task_id=op3.task_id)
+ti_op4 = dr.get_task_instance(task_id=op4.task_id)
+
+# root is successful, but unfinished tasks
+state = dr.update_state()
+self.assertEqual(State.RUNNING, state)
+
+# one has failed, but root is successful
+ti_op2.set_state(state=State.FAILED, session=session)
+ti_op3.set_state(state=State.SUCCESS, session=session)
+ti_op4.set_state(state=State.SUCCESS, session=session)
+state = dr.update_state()
+self.assertEqual(State.SUCCESS, state)
+
+# upstream dependency failed, root has not run
+ti_op1.set_state(State.NONE, session)
+state = dr.update_state()
+self.assertEqual(State.FAILED, state)
+
 
 class DagBagTest(unittest.TestCase):
 



[04/36] incubator-airflow git commit: [AIRFLOW-906] Update Code icon from lightning bolt to file

2017-05-09 Thread criccomini
[AIRFLOW-906] Update Code icon from lightning bolt to file

Lightning bolts are not a visual metaphor for code
or files. Since Glyphicon doesn't have a code icon
(<>, for instance), we should use its file icon.

Dear Airflow Maintainers,

Please accept this PR that addresses the following
issues:
AIRFLOW-906

Testing Done:
None.

Before/After screenshots in AIRFLOW-906 (https://i
ssues.apache.org/jira/browse/AIRFLOW-906)

Update Code icon from lightning bolt to file

Lightning bolts are not a visual metaphor for code
or files. Since Glyphicon doesn't have a code icon
(<>, for instance), we should use its file icon.

Merge pull request #1 from djarratt/djarratt-
patch-1

Update Code icon from lightning bolt to file

AIRFLOW-906 change glyphicon flash to file

Merge pull request #2 from djarratt/djarratt-
patch-2

AIRFLOW-906 change glyphicon flash to file

Closes #2104 from djarratt/master

(cherry picked from commit bc47200711be4d2c0b36b772651dae4f5e01a204)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/46ca569a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/46ca569a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/46ca569a

Branch: refs/heads/v1-8-stable
Commit: 46ca569a37513f3d13c529786f65c7e443c9837e
Parents: 2106ff5
Author: Dan Jarratt 
Authored: Fri Feb 24 15:00:51 2017 -0800
Committer: Chris Riccomini 
Committed: Wed Mar 29 14:10:33 2017 -0700

--
 airflow/www/static/bootstrap-theme.css   | 2 +-
 airflow/www/templates/airflow/dag.html   | 2 +-
 airflow/www/templates/airflow/dags.html  | 2 +-
 airflow/www/templates/airflow/list_dags.html | 2 +-
 4 files changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/static/bootstrap-theme.css
--
diff --git a/airflow/www/static/bootstrap-theme.css 
b/airflow/www/static/bootstrap-theme.css
index 5b126ae..734f940 100644
--- a/airflow/www/static/bootstrap-theme.css
+++ b/airflow/www/static/bootstrap-theme.css
@@ -3068,7 +3068,7 @@ tbody.collapse.in {
 .glyphicon-log-in:before {
   content: "\e161";
 }
-.glyphicon-flash:before {
+.glyphicon-file:before {
   content: "\e162";
 }
 .glyphicon-log-out:before {

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/dag.html
--
diff --git a/airflow/www/templates/airflow/dag.html 
b/airflow/www/templates/airflow/dag.html
index 8a4793d..c695f04 100644
--- a/airflow/www/templates/airflow/dag.html
+++ b/airflow/www/templates/airflow/dag.html
@@ -90,7 +90,7 @@
   
   
 
-  
+  
   Code
 
   

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/dags.html
--
diff --git a/airflow/www/templates/airflow/dags.html 
b/airflow/www/templates/airflow/dags.html
index 5792c6a..7c59dea 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -167,7 +167,7 @@
 
 
 
-
+
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/46ca569a/airflow/www/templates/airflow/list_dags.html
--
diff --git a/airflow/www/templates/airflow/list_dags.html 
b/airflow/www/templates/airflow/list_dags.html
index 2ad9416..9ace2fd 100644
--- a/airflow/www/templates/airflow/list_dags.html
+++ b/airflow/www/templates/airflow/list_dags.html
@@ -167,7 +167,7 @@
 
   
   
-
+
   
   
 



[10/36] incubator-airflow git commit: [AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

2017-05-09 Thread criccomini
[AIRFLOW-1030][AIRFLOW-1] Fix hook import for HttpSensor

Closes #2180 from
pdambrauskas/fix/http_hook_import

(cherry picked from commit f2dae7d15623e2534e7c0dab3b5a7e02d4cff81d)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4db53f39
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4db53f39
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4db53f39

Branch: refs/heads/v1-8-stable
Commit: 4db53f39a972cae691dc49687a407dda0ff49aaf
Parents: 010b80a
Author: pdambrauskas 
Authored: Tue Apr 4 08:39:54 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:40:16 2017 +0200

--
 airflow/operators/sensors.py | 24 +---
 1 file changed, 13 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4db53f39/airflow/operators/sensors.py
--
diff --git a/airflow/operators/sensors.py b/airflow/operators/sensors.py
index c0aba27..883d884 100644
--- a/airflow/operators/sensors.py
+++ b/airflow/operators/sensors.py
@@ -25,12 +25,12 @@ from time import sleep
 import re
 import sys
 
-import airflow
-from airflow import hooks, settings
+from airflow import settings
 from airflow.exceptions import AirflowException, AirflowSensorTimeout, 
AirflowSkipException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.hooks.base_hook import BaseHook
 from airflow.hooks.hdfs_hook import HDFSHook
+from airflow.hooks.http_hook import HttpHook
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
 
@@ -298,9 +298,9 @@ class NamedHivePartitionSensor(BaseSensorOperator):
 raise ValueError('Could not parse ' + partition)
 
 def poke(self, context):
-
 if not hasattr(self, 'hook'):
-self.hook = hooks.HiveMetastoreHook(
+from airflow.hooks.hive_hooks import HiveMetastoreHook
+self.hook = HiveMetastoreHook(
 metastore_conn_id=self.metastore_conn_id)
 
 def poke_partition(partition):
@@ -369,7 +369,8 @@ class HivePartitionSensor(BaseSensorOperator):
 'Poking for table {self.schema}.{self.table}, '
 'partition {self.partition}'.format(**locals()))
 if not hasattr(self, 'hook'):
-self.hook = hooks.HiveMetastoreHook(
+from airflow.hooks.hive_hooks import HiveMetastoreHook
+self.hook = HiveMetastoreHook(
 metastore_conn_id=self.metastore_conn_id)
 return self.hook.check_for_partition(
 self.schema, self.table, self.partition)
@@ -470,7 +471,8 @@ class WebHdfsSensor(BaseSensorOperator):
 self.webhdfs_conn_id = webhdfs_conn_id
 
 def poke(self, context):
-c = airflow.hooks.webhdfs_hook.WebHDFSHook(self.webhdfs_conn_id)
+from airflow.hooks.webhdfs_hook import WebHDFSHook
+c = WebHDFSHook(self.webhdfs_conn_id)
 logging.info(
 'Poking for file {self.filepath} '.format(**locals()))
 return c.check_for_path(hdfs_path=self.filepath)
@@ -520,8 +522,8 @@ class S3KeySensor(BaseSensorOperator):
 self.s3_conn_id = s3_conn_id
 
 def poke(self, context):
-import airflow.hooks.S3_hook
-hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+from airflow.hooks.S3_hook import S3Hook
+hook = S3Hook(s3_conn_id=self.s3_conn_id)
 full_url = "s3://" + self.bucket_name + "/" + self.bucket_key
 logging.info('Poking for key : {full_url}'.format(**locals()))
 if self.wildcard_match:
@@ -567,8 +569,8 @@ class S3PrefixSensor(BaseSensorOperator):
 def poke(self, context):
 logging.info('Poking for prefix : {self.prefix}\n'
  'in bucket s3://{self.bucket_name}'.format(**locals()))
-import airflow.hooks.S3_hook
-hook = airflow.hooks.S3_hook.S3Hook(s3_conn_id=self.s3_conn_id)
+from airflow.hooks.S3_hook import S3Hook
+hook = S3Hook(s3_conn_id=self.s3_conn_id)
 return hook.check_for_prefix(
 prefix=self.prefix,
 delimiter=self.delimiter,
@@ -660,7 +662,7 @@ class HttpSensor(BaseSensorOperator):
 self.extra_options = extra_options or {}
 self.response_check = response_check
 
-self.hook = hooks.http_hook.HttpHook(method='GET', 
http_conn_id=http_conn_id)
+self.hook = HttpHook(method='GET', http_conn_id=http_conn_id)
 
 def poke(self, context):
 logging.info('Poking: ' + self.endpoint)



[32/36] incubator-airflow git commit: [AIRFLOW-XXX] Set version to 1.8.1

2017-05-09 Thread criccomini
[AIRFLOW-XXX] Set version to 1.8.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0a105eed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0a105eed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0a105eed

Branch: refs/heads/v1-8-stable
Commit: 0a105eed4c14c1f1595c10a6529e3bdb51187a14
Parents: e342d0d
Author: Chris Riccomini 
Authored: Thu Apr 27 12:37:14 2017 -0700
Committer: Chris Riccomini 
Committed: Thu Apr 27 12:37:14 2017 -0700

--
 airflow/version.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0a105eed/airflow/version.py
--
diff --git a/airflow/version.py b/airflow/version.py
index 6bff40b..eb0bd4a 100644
--- a/airflow/version.py
+++ b/airflow/version.py
@@ -13,4 +13,4 @@
 # limitations under the License.
 #
 
-version = '1.8.1rc1+incubating'
+version = '1.8.1+incubating'



[23/36] incubator-airflow git commit: [AIRFLOW-XXX] Set 1.8.1 version

2017-05-09 Thread criccomini
[AIRFLOW-XXX] Set 1.8.1 version


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/58a0ee78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/58a0ee78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/58a0ee78

Branch: refs/heads/v1-8-stable
Commit: 58a0ee787ed372034b417e6743175bdfe7f14808
Parents: bc52d09
Author: Chris Riccomini 
Authored: Mon Apr 17 11:18:12 2017 -0700
Committer: Chris Riccomini 
Committed: Mon Apr 17 11:18:12 2017 -0700

--
 airflow/version.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/58a0ee78/airflow/version.py
--
diff --git a/airflow/version.py b/airflow/version.py
index 8f87df9..50361b9 100644
--- a/airflow/version.py
+++ b/airflow/version.py
@@ -13,4 +13,4 @@
 # limitations under the License.
 #
 
-version = '1.8.1alpha0'
+version = '1.8.1rc0+apache.incubating'



[17/36] incubator-airflow git commit: [AIRFLOW-1035] Use binary exponential backoff

2017-05-09 Thread criccomini
[AIRFLOW-1035] Use binary exponential backoff

Closes #2196 from IvanVergiliev/exponential-
backoff

(cherry picked from commit 4ec932b551774bb394c5770c4d2660f565a4c592)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4199cd3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4199cd3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4199cd3d

Branch: refs/heads/v1-8-stable
Commit: 4199cd3d23d35183253c5d078e0f9937e87df232
Parents: ceb2ac3
Author: Ivan Vergiliev 
Authored: Fri Apr 7 19:35:03 2017 +0200
Committer: Bolke de Bruin 
Committed: Fri Apr 7 19:35:23 2017 +0200

--
 airflow/models.py | 10 +-
 tests/models.py   | 17 ++---
 2 files changed, 19 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4199cd3d/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 47413e0..5db0287 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -1161,7 +1161,15 @@ class TaskInstance(Base):
 """
 delay = self.task.retry_delay
 if self.task.retry_exponential_backoff:
-delay_backoff_in_seconds = delay.total_seconds() ** self.try_number
+# timedelta has a maximum representable value. The exponentiation
+# here means this value can be exceeded after a certain number
+# of tries (around 50 if the initial delay is 1s, even fewer if
+# the delay is larger). Cap the value here before creating a
+# timedelta object so the operation doesn't fail.
+delay_backoff_in_seconds = min(
+delay.total_seconds() * (2 ** (self.try_number - 1)),
+timedelta.max.total_seconds() - 1
+)
 delay = timedelta(seconds=delay_backoff_in_seconds)
 if self.task.max_retry_delay:
 delay = min(self.task.max_retry_delay, delay)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4199cd3d/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index 9478088..8223276 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -757,9 +757,8 @@ class TaskInstanceTest(unittest.TestCase):
 self.assertEqual(ti.try_number, 4)
 
 def test_next_retry_datetime(self):
-delay = datetime.timedelta(seconds=3)
-delay_squared = datetime.timedelta(seconds=9)
-max_delay = datetime.timedelta(seconds=10)
+delay = datetime.timedelta(seconds=30)
+max_delay = datetime.timedelta(minutes=60)
 
 dag = models.DAG(dag_id='fail_dag')
 task = BashOperator(
@@ -778,13 +777,17 @@ class TaskInstanceTest(unittest.TestCase):
 
 ti.try_number = 1
 dt = ti.next_retry_datetime()
-self.assertEqual(dt, ti.end_date+delay)
+self.assertEqual(dt, ti.end_date + delay)
 
-ti.try_number = 2
+ti.try_number = 6
 dt = ti.next_retry_datetime()
-self.assertEqual(dt, ti.end_date+delay_squared)
+self.assertEqual(dt, ti.end_date + (2 ** 5) * delay)
 
-ti.try_number = 3
+ti.try_number = 8
+dt = ti.next_retry_datetime()
+self.assertEqual(dt, ti.end_date+max_delay)
+
+ti.try_number = 50
 dt = ti.next_retry_datetime()
 self.assertEqual(dt, ti.end_date+max_delay)
 



[24/36] incubator-airflow git commit: [AIRFLOW-1120] Update version view to include Apache prefix

2017-05-09 Thread criccomini
[AIRFLOW-1120] Update version view to include Apache prefix

Closes #2244 from criccomini/AIRFLOW-1120

(cherry picked from commit 6684597d951cb9f2fea24576a3d19534d67c89ea)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/1725c951
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/1725c951
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/1725c951

Branch: refs/heads/v1-8-stable
Commit: 1725c95163cf3a3d3b4c073922e39851e00942bf
Parents: 58a0ee7
Author: Chris Riccomini 
Authored: Tue Apr 18 13:53:03 2017 -0700
Committer: Chris Riccomini 
Committed: Tue Apr 18 13:53:55 2017 -0700

--
 airflow/www/views.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/1725c951/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index fec4779..53c6394 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -2513,7 +2513,7 @@ class VersionView(wwwutils.SuperUserMixin, LoggingMixin, 
BaseView):
 def version(self):
 # Look at the version from setup.py
 try:
-airflow_version = pkg_resources.require("airflow")[0].version
+airflow_version = 
pkg_resources.require("apache-airflow")[0].version
 except Exception as e:
 airflow_version = None
 self.logger.error(e)



[09/36] incubator-airflow git commit: [AIRFLOW-1062] Fix DagRun#find to return correct result

2017-05-09 Thread criccomini
[AIRFLOW-1062] Fix DagRun#find to return correct result

DagRun#find returns wrong result if
external_trigger=False is specified,
because adding filter is skipped on that
condition. This PR fixes it.

Closes #2210 from sekikn/AIRFLOW-1062

(cherry picked from commit e4494f85ed5593c99949b52e1e0044c2a35f097f)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/010b80aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/010b80aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/010b80aa

Branch: refs/heads/v1-8-stable
Commit: 010b80aa8b417091705556a07d5970fe0cc4efb2
Parents: 2bebeaf
Author: Kengo Seki 
Authored: Tue Apr 4 08:30:40 2017 +0200
Committer: Bolke de Bruin 
Committed: Tue Apr 4 08:31:05 2017 +0200

--
 airflow/models.py |  2 +-
 tests/models.py   | 33 +
 2 files changed, 34 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index fdff54e..6828ab6 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -3925,7 +3925,7 @@ class DagRun(Base):
 qry = qry.filter(DR.execution_date == execution_date)
 if state:
 qry = qry.filter(DR.state == state)
-if external_trigger:
+if external_trigger is not None:
 qry = qry.filter(DR.external_trigger == external_trigger)
 
 dr = qry.order_by(DR.execution_date).all()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/010b80aa/tests/models.py
--
diff --git a/tests/models.py b/tests/models.py
index c63c67e..6673c04 100644
--- a/tests/models.py
+++ b/tests/models.py
@@ -227,6 +227,39 @@ class DagRunTest(unittest.TestCase):
 'scheduled__2015-01-02T03:04:05', run_id,
 'Generated run_id did not match expectations: {0}'.format(run_id))
 
+def test_dagrun_find(self):
+session = settings.Session()
+now = datetime.datetime.now()
+
+dag_id1 = "test_dagrun_find_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id1,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=True,
+)
+session.add(dag_run)
+
+dag_id2 = "test_dagrun_find_not_externally_triggered"
+dag_run = models.DagRun(
+dag_id=dag_id2,
+run_id='manual__' + now.isoformat(),
+execution_date=now,
+start_date=now,
+state=State.RUNNING,
+external_trigger=False,
+)
+session.add(dag_run)
+
+session.commit()
+
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=True)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id1, 
external_trigger=False)))
+self.assertEqual(0, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=True)))
+self.assertEqual(1, len(models.DagRun.find(dag_id=dag_id2, 
external_trigger=False)))
+
 def test_dagrun_running_when_upstream_skipped(self):
 """
 Tests that a DAG run is not failed when an upstream task is skipped



[35/36] incubator-airflow git commit: Merge branch 'v1-8-test' into v1-8-stable

2017-05-09 Thread criccomini
Merge branch 'v1-8-test' into v1-8-stable


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2b811c44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2b811c44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2b811c44

Branch: refs/heads/v1-8-stable
Commit: 2b811c445ef62236d25f37ac72bab94e79827032
Parents: f4760c3 af2d0b4
Author: Chris Riccomini 
Authored: Tue May 9 10:29:23 2017 -0700
Committer: Chris Riccomini 
Committed: Tue May 9 10:29:23 2017 -0700

--
 .rat-excludes   |   1 +
 LICENSE | 343 +++
 NOTICE  |  17 +-
 airflow/bin/cli.py  |  65 +++-
 airflow/contrib/hooks/spark_submit_hook.py  |  32 +-
 .../contrib/operators/spark_submit_operator.py  |  13 +-
 airflow/hooks/mssql_hook.py |  10 +-
 airflow/hooks/mysql_hook.py |  15 +-
 airflow/hooks/postgres_hook.py  |   4 +-
 airflow/jobs.py |  58 +++-
 airflow/models.py   | 235 +
 airflow/operators/latest_only_operator.py   |  30 +-
 airflow/operators/mssql_operator.py |  11 +-
 airflow/operators/mysql_operator.py |   8 +-
 airflow/operators/postgres_operator.py  |   7 +-
 airflow/operators/python_operator.py|  85 +++--
 airflow/operators/sensors.py|  24 +-
 airflow/ti_deps/deps/base_ti_dep.py |  14 +-
 airflow/ti_deps/deps/prev_dagrun_dep.py |   5 +
 airflow/ti_deps/deps/trigger_rule_dep.py|   6 +-
 airflow/utils/file.py   |  20 +-
 airflow/version.py  |   2 +-
 airflow/www/api/experimental/endpoints.py   |  23 +-
 airflow/www/static/bootstrap-theme.css  |   2 +-
 airflow/www/templates/airflow/dag.html  |   2 +-
 airflow/www/templates/airflow/dags.html |  31 +-
 airflow/www/templates/airflow/list_dags.html|   2 +-
 airflow/www/views.py|  11 +-
 dags/test_dag.py|   3 +-
 scripts/ci/requirements.txt |  14 +
 scripts/systemd/airflow |  13 +
 setup.py|  13 +-
 tests/contrib/hooks/spark_submit_hook.py|  51 ++-
 .../contrib/operators/spark_submit_operator.py  |   8 +-
 tests/core.py   | 120 ++-
 tests/dags/test_dagrun_short_circuit_false.py   |  38 --
 tests/dags/test_latest_runs.py  |  27 ++
 tests/jobs.py   | 201 +++
 tests/models.py | 306 ++---
 tests/operators/__init__.py |   2 +
 tests/operators/latest_only_operator.py |  12 +-
 tests/operators/operators.py|  43 +++
 tests/operators/python_operator.py  | 244 +
 .../ti_deps/deps/dag_ti_slots_available_dep.py  |  41 ---
 tests/ti_deps/deps/dag_unpaused_dep.py  |  41 ---
 tests/ti_deps/deps/dagrun_exists_dep.py |  41 ---
 tests/ti_deps/deps/not_in_retry_period_dep.py   |  61 
 tests/ti_deps/deps/not_running_dep.py   |  39 ---
 tests/ti_deps/deps/not_skipped_dep.py   |  38 --
 tests/ti_deps/deps/pool_has_space_dep.py|  37 --
 tests/ti_deps/deps/prev_dagrun_dep.py   | 143 
 tests/ti_deps/deps/runnable_exec_date_dep.py|  92 -
 .../deps/test_dag_ti_slots_available_dep.py |  42 +++
 tests/ti_deps/deps/test_dag_unpaused_dep.py |  42 +++
 tests/ti_deps/deps/test_dagrun_exists_dep.py|  40 +++
 .../deps/test_not_in_retry_period_dep.py|  59 
 tests/ti_deps/deps/test_not_running_dep.py  |  37 ++
 tests/ti_deps/deps/test_not_skipped_dep.py  |  36 ++
 tests/ti_deps/deps/test_prev_dagrun_dep.py  | 123 +++
 .../ti_deps/deps/test_runnable_exec_date_dep.py |  76 
 tests/ti_deps/deps/test_trigger_rule_dep.py | 252 ++
 tests/ti_deps/deps/test_valid_state_dep.py  |  46 +++
 tests/ti_deps/deps/trigger_rule_dep.py  | 295 
 tests/ti_deps/deps/valid_state_dep.py   |  49 ---
 64 files changed, 2606 insertions(+), 1195 deletions(-)
--




[22/36] incubator-airflow git commit: [AIRFLOW-1000] Rebrand distribution to Apache Airflow

2017-05-09 Thread criccomini
[AIRFLOW-1000] Rebrand distribution to Apache Airflow

Per Apache requirements Airflow should be branded
Apache Airflow.
It is impossible to provide a forward compatible
automatic update
path and users will be required to manually
upgrade.

Closes #2172 from bolkedebruin/AIRFLOW-1000

(cherry picked from commit 4fb05d8cc7a69255c6bff33c7f856eb4a341d5f2)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/bc52d092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/bc52d092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/bc52d092

Branch: refs/heads/v1-8-stable
Commit: bc52d092b5194c3a389c19ce45c2c2bdda3bf265
Parents: a9e0894
Author: Bolke de Bruin 
Authored: Mon Apr 17 10:09:47 2017 +0200
Committer: Bolke de Bruin 
Committed: Mon Apr 17 11:27:51 2017 +0200

--
 .rat-excludes |  1 +
 setup.py  | 13 -
 2 files changed, 13 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc52d092/.rat-excludes
--
diff --git a/.rat-excludes b/.rat-excludes
index 1363766..1238abb 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -13,6 +13,7 @@ docs
 dist
 build
 airflow.egg-info
+apache_airflow.egg-info
 .idea
 metastore_db
 .*sql

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/bc52d092/setup.py
--
diff --git a/setup.py b/setup.py
index 43b97d3..7426ce9 100644
--- a/setup.py
+++ b/setup.py
@@ -18,6 +18,7 @@ from setuptools.command.test import test as TestCommand
 import imp
 import logging
 import os
+import pip
 import sys
 
 logger = logging.getLogger(__name__)
@@ -99,6 +100,15 @@ def write_version(filename=os.path.join(*['airflow',
 a.write(text)
 
 
+def check_previous():
+installed_packages = ([package.project_name for package
+   in pip.get_installed_distributions()])
+if 'airflow' in installed_packages:
+print("An earlier non-apache version of Airflow was installed, "
+  "please uninstall it first. Then reinstall.")
+sys.exit(1)
+
+
 async = [
 'greenlet>=0.4.9',
 'eventlet>= 0.9.7',
@@ -184,9 +194,10 @@ devel_all = devel + all_dbs + doc + samba + s3 + slack + 
crypto + oracle + docke
 
 
 def do_setup():
+check_previous()
 write_version()
 setup(
-name='airflow',
+name='apache-airflow',
 description='Programmatically author, schedule and monitor data 
pipelines',
 license='Apache License 2.0',
 version=version,



[25/36] incubator-airflow git commit: [AIRFLOW-1124] Do not set all tasks to scheduled in backfill

2017-05-09 Thread criccomini
[AIRFLOW-1124] Do not set all tasks to scheduled in backfill

Backfill is supposed to fill in the blanks and not
to reschedule
all tasks. This fixes a regression from 1.8.0.

Closes #2247 from bolkedebruin/AIRFLOW-1124

(cherry picked from commit 0406462dc91427793ba40d0f05f321e85dbc6f19)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/f0d072cf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/f0d072cf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/f0d072cf

Branch: refs/heads/v1-8-stable
Commit: f0d072cfb3b023dd4c80fd4e30e42fef595793c7
Parents: 1725c95
Author: Bolke de Bruin 
Authored: Wed Apr 19 17:15:46 2017 +0200
Committer: Bolke de Bruin 
Committed: Wed Apr 19 17:15:59 2017 +0200

--
 airflow/jobs.py |  3 ++-
 tests/jobs.py   | 69 
 2 files changed, 71 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f0d072cf/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 11ff926..9a6687c 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1822,7 +1822,8 @@ class BackfillJob(BaseJob):
 
 for ti in run.get_task_instances():
 # all tasks part of the backfill are scheduled to run
-ti.set_state(State.SCHEDULED, session=session)
+if ti.state == State.NONE:
+ti.set_state(State.SCHEDULED, session=session)
 tasks_to_run[ti.key] = ti
 
 next_run_date = self.dag.following_schedule(next_run_date)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/f0d072cf/tests/jobs.py
--
diff --git a/tests/jobs.py b/tests/jobs.py
index 9b245ae..5db858d 100644
--- a/tests/jobs.py
+++ b/tests/jobs.py
@@ -348,6 +348,75 @@ class BackfillJobTest(unittest.TestCase):
 else:
 self.assertEqual(State.NONE, ti.state)
 
+def test_backfill_fill_blanks(self):
+dag = DAG(
+'test_backfill_fill_blanks',
+start_date=DEFAULT_DATE,
+default_args={'owner': 'owner1'},
+)
+
+with dag:
+op1 = DummyOperator(task_id='op1')
+op2 = DummyOperator(task_id='op2')
+op3 = DummyOperator(task_id='op3')
+op4 = DummyOperator(task_id='op4')
+op5 = DummyOperator(task_id='op5')
+op6 = DummyOperator(task_id='op6')
+
+dag.clear()
+dr = dag.create_dagrun(run_id='test',
+   state=State.SUCCESS,
+   execution_date=DEFAULT_DATE,
+   start_date=DEFAULT_DATE)
+executor = TestExecutor(do_update=True)
+
+session = settings.Session()
+
+tis = dr.get_task_instances()
+for ti in tis:
+if ti.task_id == op1.task_id:
+ti.state = State.UP_FOR_RETRY
+ti.end_date = DEFAULT_DATE
+elif ti.task_id == op2.task_id:
+ti.state = State.FAILED
+elif ti.task_id == op3.task_id:
+ti.state = State.SKIPPED
+elif ti.task_id == op4.task_id:
+ti.state = State.SCHEDULED
+elif ti.task_id == op5.task_id:
+ti.state = State.UPSTREAM_FAILED
+# op6 = None
+session.merge(ti)
+session.commit()
+session.close()
+
+job = BackfillJob(dag=dag,
+  start_date=DEFAULT_DATE,
+  end_date=DEFAULT_DATE,
+  executor=executor)
+self.assertRaisesRegexp(
+AirflowException,
+'Some task instances failed',
+job.run)
+
+self.assertRaises(sqlalchemy.orm.exc.NoResultFound, dr.refresh_from_db)
+# the run_id should have changed, so a refresh won't work
+drs = DagRun.find(dag_id=dag.dag_id, execution_date=DEFAULT_DATE)
+dr = drs[0]
+
+self.assertEqual(dr.state, State.FAILED)
+
+tis = dr.get_task_instances()
+for ti in tis:
+if ti.task_id in (op1.task_id, op4.task_id, op6.task_id):
+self.assertEqual(ti.state, State.SUCCESS)
+elif ti.task_id == op2.task_id:
+self.assertEqual(ti.state, State.FAILED)
+elif ti.task_id == op3.task_id:
+self.assertEqual(ti.state, State.SKIPPED)
+elif ti.task_id == op5.task_id:
+self.assertEqual(ti.state, State.UPSTREAM_FAILED)
+
 

[36/36] incubator-airflow git commit: Merge branch 'v1-8-stable' of https://git-wip-us.apache.org/repos/asf/incubator-airflow into v1-8-stable

2017-05-09 Thread criccomini
Merge branch 'v1-8-stable' of 
https://git-wip-us.apache.org/repos/asf/incubator-airflow into v1-8-stable


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0d8509e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0d8509e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0d8509e7

Branch: refs/heads/v1-8-stable
Commit: 0d8509e7ec9894c62664df81b4f76fa37727d84d
Parents: 2b811c4 8e7a558
Author: Chris Riccomini 
Authored: Tue May 9 10:32:23 2017 -0700
Committer: Chris Riccomini 
Committed: Tue May 9 10:32:23 2017 -0700

--

--




[15/36] incubator-airflow git commit: [AIRFLOW-1050] Do not count up_for_retry as not ready

2017-05-09 Thread criccomini
[AIRFLOW-1050] Do not count up_for_retry as not ready

up_for_retry tasks were incorrectly counted
towards not_ready
therefore marking a dag run deadlocked instead of
retrying.

Closes #2225 from bolkedebruin/AIRFLOW-1050

(cherry picked from commit 35e43f5067f4741640278b765c0e54e4fd45ffa3)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/0fa593e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/0fa593e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/0fa593e3

Branch: refs/heads/v1-8-stable
Commit: 0fa593e38c7ea88765408af10abad3c3780ba27d
Parents: ebfc3ea
Author: Bolke de Bruin 
Authored: Fri Apr 7 08:00:10 2017 +0200
Committer: Bolke de Bruin 
Committed: Fri Apr 7 08:00:23 2017 +0200

--
 airflow/jobs.py | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/0fa593e3/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index ce45e05..11ff926 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1925,6 +1925,15 @@ class BackfillJob(BaseJob):
 started.pop(key)
 continue
 
+# special case
+if ti.state == State.UP_FOR_RETRY:
+self.logger.debug("Task instance {} retry period not 
expired yet"
+  .format(ti))
+if key in started:
+started.pop(key)
+tasks_to_run[key] = ti
+continue
+
 # all remaining tasks
 self.logger.debug('Adding {} to not_ready'.format(ti))
 not_ready.add(key)



[26/36] incubator-airflow git commit: [AIRFLOW-1121][AIRFLOW-1004] Fix `airflow webserver --pid` to write out pid file

2017-05-09 Thread criccomini
[AIRFLOW-1121][AIRFLOW-1004] Fix `airflow webserver --pid` to write out pid file

After AIRFLOW-1004, --pid option is no longer
honored and
the pid file is not being written out. This PR
fixes it.

Closes #2249 from sekikn/AIRFLOW-1121

(cherry picked from commit 8d643897cf6171d110e7139fb31c3d4d47c3acca)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/aef7dd0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/aef7dd0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/aef7dd0a

Branch: refs/heads/v1-8-stable
Commit: aef7dd0a53411f3edb2333cb36a457056e5ab652
Parents: f0d072c
Author: Kengo Seki 
Authored: Wed Apr 19 12:31:10 2017 -0700
Committer: Chris Riccomini 
Committed: Thu Apr 20 15:22:21 2017 -0700

--
 airflow/bin/cli.py |  3 ++-
 tests/core.py  | 36 ++--
 2 files changed, 28 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aef7dd0a/airflow/bin/cli.py
--
diff --git a/airflow/bin/cli.py b/airflow/bin/cli.py
index e4755c7..8e92ea1 100755
--- a/airflow/bin/cli.py
+++ b/airflow/bin/cli.py
@@ -776,6 +776,7 @@ def webserver(args):
 '-t', str(worker_timeout),
 '-b', args.hostname + ':' + str(args.port),
 '-n', 'airflow-webserver',
+'-p', str(pid),
 '-c', 'airflow.www.gunicorn_config'
 ]
 
@@ -786,7 +787,7 @@ def webserver(args):
 run_args += ['--error-logfile', str(args.error_logfile)]
 
 if args.daemon:
-run_args += ['-D', '-p', str(pid)]
+run_args += ['-D']
 
 if ssl_cert:
 run_args += ['--certfile', ssl_cert, '--keyfile', ssl_key]

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/aef7dd0a/tests/core.py
--
diff --git a/tests/core.py b/tests/core.py
index 4fd2f08..c36c6c2 100644
--- a/tests/core.py
+++ b/tests/core.py
@@ -1398,6 +1398,14 @@ class CliTests(unittest.TestCase):
 os.remove('variables1.json')
 os.remove('variables2.json')
 
+def _wait_pidfile(self, pidfile):
+while True:
+try:
+with open(pidfile) as f:
+return int(f.read())
+except:
+sleep(1)
+
 def test_cli_webserver_foreground(self):
 import subprocess
 
@@ -1417,18 +1425,26 @@ class CliTests(unittest.TestCase):
 
 @unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
  "Skipping test due to lack of required file permission")
+def test_cli_webserver_foreground_with_pid(self):
+import subprocess
+
+# Run webserver in foreground with --pid option
+pidfile = tempfile.mkstemp()[1]
+p = subprocess.Popen(["airflow", "webserver", "--pid", pidfile])
+
+# Check the file specified by --pid option exists
+self._wait_pidfile(pidfile)
+
+# Terminate webserver
+p.terminate()
+p.wait()
+
+@unittest.skipIf("TRAVIS" in os.environ and bool(os.environ["TRAVIS"]),
+ "Skipping test due to lack of required file permission")
 def test_cli_webserver_background(self):
 import subprocess
 import psutil
 
-def wait_pidfile(pidfile):
-while True:
-try:
-with open(pidfile) as f:
-return int(f.read())
-except IOError:
-sleep(1)
-
 # Confirm that webserver hasn't been launched.
 self.assertEqual(1, subprocess.Popen(["pgrep", "-c", 
"airflow"]).wait())
 self.assertEqual(1, subprocess.Popen(["pgrep", "-c", 
"gunicorn"]).wait())
@@ -1436,7 +1452,7 @@ class CliTests(unittest.TestCase):
 # Run webserver in background.
 subprocess.Popen(["airflow", "webserver", "-D"])
 pidfile = cli.setup_locations("webserver")[0]
-wait_pidfile(pidfile)
+self._wait_pidfile(pidfile)
 
 # Assert that gunicorn and its monitor are launched.
 self.assertEqual(0, subprocess.Popen(["pgrep", "-c", 
"airflow"]).wait())
@@ -1444,7 +1460,7 @@ class CliTests(unittest.TestCase):
 
 # Terminate monitor process.
 pidfile = cli.setup_locations("webserver-monitor")[0]
-pid = wait_pidfile(pidfile)
+pid = self._wait_pidfile(pidfile)
 p = psutil.Process(pid)
 p.terminate()
 p.wait()



[34/36] incubator-airflow git commit: [AIRFLOW-970] Load latest_runs on homepage async

2017-05-09 Thread criccomini
[AIRFLOW-970] Load latest_runs on homepage async

The latest_runs column on the homepage loads
synchronously with an n+1
query. Homepage loads will be significantly faster
if this happens
asynchronously and as a batch.

Closes #2144 from saguziel/aguziel-latest-run-
async

(cherry picked from commit 0f7ddbbedb05f2f11500250db4989edcb27bc164)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/af2d0b4b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/af2d0b4b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/af2d0b4b

Branch: refs/heads/v1-8-stable
Commit: af2d0b4b5cb1ef30a065b1af66f90a01a953e2be
Parents: d61af62
Author: Alex Guziel 
Authored: Wed Apr 5 10:02:42 2017 +0200
Committer: Chris Riccomini 
Committed: Thu Apr 27 13:35:40 2017 -0700

--
 airflow/models.py | 23 
 airflow/www/api/experimental/endpoints.py | 23 +++-
 airflow/www/templates/airflow/dags.html   | 29 ++
 tests/dags/test_latest_runs.py| 27 
 tests/models.py   |  2 +-
 5 files changed, 89 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 1ceb821..646f74b 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -4249,6 +4249,29 @@ class DagRun(Base):
 
 return False
 
+@classmethod
+@provide_session
+def get_latest_runs(cls, session):
+"""Returns the latest running DagRun for each DAG. """
+subquery = (
+session
+.query(
+cls.dag_id,
+func.max(cls.execution_date).label('execution_date'))
+.filter(cls.state == State.RUNNING)
+.group_by(cls.dag_id)
+.subquery()
+)
+dagruns = (
+session
+.query(cls)
+.join(subquery,
+  and_(cls.dag_id == subquery.c.dag_id,
+   cls.execution_date == subquery.c.execution_date))
+.all()
+)
+return dagruns
+
 
 class Pool(Base):
 __tablename__ = "slot_pool"

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/www/api/experimental/endpoints.py
--
diff --git a/airflow/www/api/experimental/endpoints.py 
b/airflow/www/api/experimental/endpoints.py
index 56b9d79..63355c7 100644
--- a/airflow/www/api/experimental/endpoints.py
+++ b/airflow/www/api/experimental/endpoints.py
@@ -20,7 +20,8 @@ from airflow.exceptions import AirflowException
 from airflow.www.app import csrf
 
 from flask import (
-g, Markup, Blueprint, redirect, jsonify, abort, request, current_app, 
send_file
+g, Markup, Blueprint, redirect, jsonify, abort,
+request, current_app, send_file, url_for
 )
 from datetime import datetime
 
@@ -110,3 +111,23 @@ def task_info(dag_id, task_id):
 task = dag.get_task(task_id)
 fields = {k: str(v) for k, v in vars(task).items() if not 
k.startswith('_')}
 return jsonify(fields)
+
+
+@api_experimental.route('/latest_runs', methods=['GET'])
+@requires_authentication
+def latest_dag_runs():
+"""Returns the latest running DagRun for each DAG formatted for the UI. """
+from airflow.models import DagRun
+dagruns = DagRun.get_latest_runs()
+payload = []
+for dagrun in dagruns:
+if dagrun.execution_date:
+payload.append({
+'dag_id': dagrun.dag_id,
+'execution_date': dagrun.execution_date.strftime("%Y-%m-%d 
%H:%M"),
+'start_date': ((dagrun.start_date or '') and
+   dagrun.start_date.strftime("%Y-%m-%d %H:%M")),
+'dag_run_url': url_for('airflow.graph', dag_id=dagrun.dag_id,
+   execution_date=dagrun.execution_date)
+})
+return jsonify(payload)

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/af2d0b4b/airflow/www/templates/airflow/dags.html
--
diff --git a/airflow/www/templates/airflow/dags.html 
b/airflow/www/templates/airflow/dags.html
index 7c59dea..c0dbc62 100644
--- a/airflow/www/templates/airflow/dags.html
+++ b/airflow/www/templates/airflow/dags.html
@@ -105,19 +105,7 @@
 
 
 
-
-{% if dag %}
-{% set last_run = 

[07/36] incubator-airflow git commit: [AIRFLOW-1054] Fix broken import in test_dag

2017-05-09 Thread criccomini
[AIRFLOW-1054] Fix broken import in test_dag

Closes #2201 from
r39132/fix_broken_import_on_test_dag

(cherry picked from commit c64e876bd50eeb6c9e2600ac9d832c05eb5e9640)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/68b1c982
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/68b1c982
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/68b1c982

Branch: refs/heads/v1-8-stable
Commit: 68b1c982e048878ec9dd658072c147e4341bf2c2
Parents: 5eb3335
Author: Siddharth Anand 
Authored: Mon Apr 3 13:10:51 2017 -0700
Committer: Chris Riccomini 
Committed: Mon Apr 3 13:11:42 2017 -0700

--
 dags/test_dag.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/68b1c982/dags/test_dag.py
--
diff --git a/dags/test_dag.py b/dags/test_dag.py
index db0b648..f2a9f6a 100644
--- a/dags/test_dag.py
+++ b/dags/test_dag.py
@@ -12,6 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from airflow import utils
 from airflow import DAG
 from airflow.operators.dummy_operator import DummyOperator
 from datetime import datetime, timedelta
@@ -24,7 +25,7 @@ DAG_NAME = 'test_dag_v1'
 default_args = {
 'owner': 'airflow',
 'depends_on_past': True,
-'start_date': airflow.utils.dates.days_ago(2)
+'start_date': utils.dates.days_ago(2)
 }
 dag = DAG(DAG_NAME, schedule_interval='*/10 * * * *', 
default_args=default_args)
 



[01/36] incubator-airflow git commit: [AIRFLOW-974] Fix mkdirs race condition

2017-05-09 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-8-stable 8e7a55836 -> 0d8509e7e


[AIRFLOW-974] Fix mkdirs race condition

mkdirs congtained a race condition for when if the
directory is
created between the os.path.exists and the
os.makedirs calls,
the os.makedirs will fail with an OSError.

This reworks the function to be non-recursive as
well, as
permission errors were due to umasks being
applied.

Closes #2147 from bolkedebruin/AIRFLOW-974

(cherry picked from commit c5cc298cf16c9777c90aec1fc8cc24bde62f7b2f)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3b37cfa1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3b37cfa1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3b37cfa1

Branch: refs/heads/v1-8-stable
Commit: 3b37cfa1f2642ff90908a3af0a5674637c9518ee
Parents: 2a60897
Author: Bolke de Bruin 
Authored: Mon Mar 13 20:14:07 2017 -0700
Committer: Bolke de Bruin 
Committed: Mon Mar 13 20:14:30 2017 -0700

--
 airflow/utils/file.py | 20 +---
 1 file changed, 9 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3b37cfa1/airflow/utils/file.py
--
diff --git a/airflow/utils/file.py b/airflow/utils/file.py
index 78ddeaa..352755e 100644
--- a/airflow/utils/file.py
+++ b/airflow/utils/file.py
@@ -44,16 +44,14 @@ def mkdirs(path, mode):
 
 :param path: The directory to create
 :type path: str
-:param mode: The mode to give to the directory e.g. 0o755
+:param mode: The mode to give to the directory e.g. 0o755, ignores umask
 :type mode: int
-:return: A list of directories that were created
-:rtype: list[str]
 """
-if not path or os.path.exists(path):
-return []
-(head, _) = os.path.split(path)
-res = mkdirs(head, mode)
-os.mkdir(path)
-os.chmod(path, mode)
-res += [path]
-return res
+try:
+o_umask = os.umask(0)
+os.makedirs(path, mode)
+except OSError:
+if not os.path.isdir(path):
+raise
+finally:
+os.umask(o_umask)



[11/36] incubator-airflow git commit: [AIRFLOW-111] Include queued tasks in scheduler concurrency check

2017-05-09 Thread criccomini
[AIRFLOW-111] Include queued tasks in scheduler concurrency check

The concurrency argument in dags appears to not be
obeyed because the
scheduler does not check the concurrency properly
when checking tasks.
The tasks do not run, but this leads to a lot of
scheduler churn.

Closes #2214 from saguziel/aguziel-fix-concurrency

(cherry picked from commit 3ff5abee3f9d29e545e021c2c060e9c9f3045236)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9070a827
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9070a827
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9070a827

Branch: refs/heads/v1-8-stable
Commit: 9070a82775691e08fb1b95c28fbc2cc5ee7b843d
Parents: 4db53f3
Author: Alex Guziel 
Authored: Wed Apr 5 09:59:53 2017 +0200
Committer: Bolke de Bruin 
Committed: Wed Apr 5 10:00:06 2017 +0200

--
 airflow/jobs.py   | 25 +++-
 airflow/models.py | 48 ++
 tests/jobs.py | 62 ++
 tests/models.py   | 38 +++
 4 files changed, 142 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9070a827/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 7db9b9c..ce45e05 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -43,7 +43,7 @@ from tabulate import tabulate
 from airflow import executors, models, settings
 from airflow import configuration as conf
 from airflow.exceptions import AirflowException
-from airflow.models import DagRun
+from airflow.models import DAG, DagRun
 from airflow.settings import Stats
 from airflow.task_runner import get_task_runner
 from airflow.ti_deps.dep_context import DepContext, QUEUE_DEPS, RUN_DEPS
@@ -1036,7 +1036,7 @@ class SchedulerJob(BaseJob):
 task_instances, key=lambda ti: (-ti.priority_weight, 
ti.execution_date))
 
 # DAG IDs with running tasks that equal the concurrency limit of 
the dag
-dag_id_to_running_task_count = {}
+dag_id_to_possibly_running_task_count = {}
 
 for task_instance in priority_sorted_task_instances:
 if open_slots <= 0:
@@ -1063,22 +1063,24 @@ class SchedulerJob(BaseJob):
 # reached.
 dag_id = task_instance.dag_id
 
-if dag_id not in dag_id_to_running_task_count:
-dag_id_to_running_task_count[dag_id] = \
-DagRun.get_running_tasks(
-session,
+if dag_id not in dag_id_to_possibly_running_task_count:
+dag_id_to_possibly_running_task_count[dag_id] = \
+DAG.get_num_task_instances(
 dag_id,
-simple_dag_bag.get_dag(dag_id).task_ids)
+simple_dag_bag.get_dag(dag_id).task_ids,
+states=[State.RUNNING, State.QUEUED],
+session=session)
 
-current_task_concurrency = dag_id_to_running_task_count[dag_id]
+current_task_concurrency = 
dag_id_to_possibly_running_task_count[dag_id]
 task_concurrency_limit = 
simple_dag_bag.get_dag(dag_id).concurrency
-self.logger.info("DAG {} has {}/{} running tasks"
+self.logger.info("DAG {} has {}/{} running and queued tasks"
  .format(dag_id,
  current_task_concurrency,
  task_concurrency_limit))
-if current_task_concurrency > task_concurrency_limit:
+if current_task_concurrency >= task_concurrency_limit:
 self.logger.info("Not executing {} since the number "
- "of tasks running from DAG {} is >= to 
the "
+ "of tasks running or queued from DAG {}"
+ " is >= to the "
  "DAG's task concurrency limit of {}"
  .format(task_instance,
  dag_id,
@@ -1137,6 +1139,7 @@ class SchedulerJob(BaseJob):
 queue=queue)
 
 open_slots -= 1
+dag_id_to_possibly_running_task_count[dag_id] += 1
 
 def _process_dags(self, dagbag, dags, tis_out):
 """

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9070a827/airflow/models.py

[13/36] incubator-airflow git commit: [AIRFLOW-969] Catch bad python_callable argument

2017-05-09 Thread criccomini
[AIRFLOW-969] Catch bad python_callable argument

Checks for callable when Operator is
created, not when it is run.

* added initial PythonOperator unit test, testing
run
* python_callable must be callable; added unit
test

Closes #2142 from abloomston/python-callable

(cherry picked from commit 12901ddfa9961a11feaa3f17696d19102ff8ecd0)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/91674117
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/91674117
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/91674117

Branch: refs/heads/v1-8-stable
Commit: 916741171cc0c6426dbcbe8a2b5ce2468fce870d
Parents: dff6d21
Author: abloomston 
Authored: Thu Mar 16 19:36:00 2017 -0400
Committer: Bolke de Bruin 
Committed: Thu Apr 6 09:47:18 2017 +0200

--
 airflow/operators/python_operator.py | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/91674117/airflow/operators/python_operator.py
--
diff --git a/airflow/operators/python_operator.py 
b/airflow/operators/python_operator.py
index 114bc7e..cf240f2 100644
--- a/airflow/operators/python_operator.py
+++ b/airflow/operators/python_operator.py
@@ -16,6 +16,7 @@ from builtins import str
 from datetime import datetime
 import logging
 
+from airflow.exceptions import AirflowException
 from airflow.models import BaseOperator, TaskInstance
 from airflow.utils.state import State
 from airflow.utils.decorators import apply_defaults
@@ -63,6 +64,8 @@ class PythonOperator(BaseOperator):
 templates_exts=None,
 *args, **kwargs):
 super(PythonOperator, self).__init__(*args, **kwargs)
+if not callable(python_callable):
+raise AirflowException('`python_callable` param must be callable')
 self.python_callable = python_callable
 self.op_args = op_args or []
 self.op_kwargs = op_kwargs or {}



[31/36] incubator-airflow git commit: [AIRFLOW-492] Make sure stat updates cannot fail a task

2017-05-09 Thread criccomini
[AIRFLOW-492] Make sure stat updates cannot fail a task

Previously a failed commit into the db for the statistics
could also fail a task. Secondly, the ui could display
out of date statistics.

This patch reworks DagStat so that failure to update the
statistics does not propagate. Next to that, it make sure
the ui always displays the latest statistics.

Closes #2254 from bolkedebruin/AIRFLOW-492

(cherry picked from commit c2472ffa124ffc65b8762ea583554494624dbb6a)


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/e342d0d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/e342d0d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/e342d0d2

Branch: refs/heads/v1-8-stable
Commit: e342d0d223e47ea25f73baaa00a16df414a6e0df
Parents: 5800f56
Author: Bolke de Bruin 
Authored: Wed Apr 26 20:39:48 2017 +0200
Committer: Chris Riccomini 
Committed: Thu Apr 27 12:35:46 2017 -0700

--
 airflow/jobs.py  |   4 +-
 airflow/models.py| 133 ++
 airflow/www/views.py |   7 +--
 tests/core.py|  34 +++-
 tests/models.py  |  66 ++-
 5 files changed, 190 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e342d0d2/airflow/jobs.py
--
diff --git a/airflow/jobs.py b/airflow/jobs.py
index 11dbddf..379c96e 100644
--- a/airflow/jobs.py
+++ b/airflow/jobs.py
@@ -1177,7 +1177,7 @@ class SchedulerJob(BaseJob):
 self._process_task_instances(dag, tis_out)
 self.manage_slas(dag)
 
-models.DagStat.clean_dirty([d.dag_id for d in dags])
+models.DagStat.update([d.dag_id for d in dags])
 
 def _process_executor_events(self):
 """
@@ -1977,7 +1977,7 @@ class BackfillJob(BaseJob):
 active_dag_runs.remove(run)
 
 if run.dag.is_paused:
-models.DagStat.clean_dirty([run.dag_id], session=session)
+models.DagStat.update([run.dag_id], session=session)
 
 msg = ' | '.join([
 "[backfill progress]",

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/e342d0d2/airflow/models.py
--
diff --git a/airflow/models.py b/airflow/models.py
index 2de88f6..1ceb821 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -29,6 +29,7 @@ import functools
 import getpass
 import imp
 import importlib
+import itertools
 import inspect
 import zipfile
 import jinja2
@@ -719,6 +720,7 @@ class TaskInstance(Base):
 even while multiple schedulers may be firing task instances.
 """
 
+
 __tablename__ = "task_instance"
 
 task_id = Column(String(ID_LEN), primary_key=True)
@@ -3089,7 +3091,7 @@ class DAG(BaseDag, LoggingMixin):
 for dr in drs:
 dr.state = state
 dirty_ids.append(dr.dag_id)
-DagStat.clean_dirty(dirty_ids, session=session)
+DagStat.update(dirty_ids, session=session)
 
 def clear(
 self, start_date=None, end_date=None,
@@ -3383,6 +3385,9 @@ class DAG(BaseDag, LoggingMixin):
 state=state
 )
 session.add(run)
+
+DagStat.set_dirty(dag_id=self.dag_id, session=session)
+
 session.commit()
 
 run.dag = self
@@ -3392,12 +3397,7 @@ class DAG(BaseDag, LoggingMixin):
 run.verify_integrity(session=session)
 
 run.refresh_from_db()
-DagStat.set_dirty(self.dag_id, session=session)
 
-# add a placeholder row into DagStat table
-if not session.query(DagStat).filter(DagStat.dag_id == 
self.dag_id).first():
-session.add(DagStat(dag_id=self.dag_id, state=state, count=0, 
dirty=True))
-session.commit()
 return run
 
 @staticmethod
@@ -3805,7 +3805,7 @@ class DagStat(Base):
 count = Column(Integer, default=0)
 dirty = Column(Boolean, default=False)
 
-def __init__(self, dag_id, state, count, dirty=False):
+def __init__(self, dag_id, state, count=0, dirty=False):
 self.dag_id = dag_id
 self.state = state
 self.count = count
@@ -3814,42 +3814,104 @@ class DagStat(Base):
 @staticmethod
 @provide_session
 def set_dirty(dag_id, session=None):
-for dag in session.query(DagStat).filter(DagStat.dag_id == dag_id):
-dag.dirty = True
-session.commit()
+"""
+:param dag_id: the dag_id to mark dirty
+:param session: database session
+:return: 
+"""
+DagStat.create(dag_id=dag_id, session=session)
+
+try:
+stats = 

[16/36] incubator-airflow git commit: [AIRFLOW-1085] Enhance the SparkSubmitOperator

2017-05-09 Thread criccomini
[AIRFLOW-1085] Enhance the SparkSubmitOperator

- Allow the Spark home to be set on per connection
basis to obviate
  the need for the spark-submit to be on the PATH,
and allows different
  versions of Spark to be easily used.
- Enable the use of the --driver-memory parameter
on the spark-submit
  by making it parameter on the operator
- Enable the use of the --class parameter on the
spark-submit by making
  it a parameter on the operator

Closes #2211 from camshrun/sparkSubmitImprovements

(cherry picked from commit 0ade066f44257c5e119b292f4cc2ba105774f4e7)
Signed-off-by: Bolke de Bruin 


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ceb2ac36
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ceb2ac36
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ceb2ac36

Branch: refs/heads/v1-8-stable
Commit: ceb2ac366fce4eac7ca007e6ec15194e71e66409
Parents: 0fa593e
Author: Stephan Werges 
Authored: Fri Apr 7 19:20:46 2017 +0200
Committer: Bolke de Bruin 
Committed: Fri Apr 7 19:21:38 2017 +0200

--
 airflow/contrib/hooks/spark_submit_hook.py  | 32 ++--
 .../contrib/operators/spark_submit_operator.py  | 13 -
 tests/contrib/hooks/spark_submit_hook.py| 51 +---
 .../contrib/operators/spark_submit_operator.py  |  8 ++-
 4 files changed, 90 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ceb2ac36/airflow/contrib/hooks/spark_submit_hook.py
--
diff --git a/airflow/contrib/hooks/spark_submit_hook.py 
b/airflow/contrib/hooks/spark_submit_hook.py
index 619cc71..59d28b5 100644
--- a/airflow/contrib/hooks/spark_submit_hook.py
+++ b/airflow/contrib/hooks/spark_submit_hook.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 #
 import logging
+import os
 import subprocess
 import re
 
@@ -25,7 +26,8 @@ log = logging.getLogger(__name__)
 class SparkSubmitHook(BaseHook):
 """
 This hook is a wrapper around the spark-submit binary to kick off a 
spark-submit job.
-It requires that the "spark-submit" binary is in the PATH.
+It requires that the "spark-submit" binary is in the PATH or the 
spark_home to be 
+supplied.
 :param conf: Arbitrary Spark configuration properties
 :type conf: dict
 :param conn_id: The connection id as configured in Airflow administration. 
When an
@@ -38,10 +40,14 @@ class SparkSubmitHook(BaseHook):
 :type py_files: str
 :param jars: Submit additional jars to upload and place them in executor 
classpath.
 :type jars: str
+:param java_class: the main class of the Java application
+:type java_class: str
 :param executor_cores: Number of cores per executor (Default: 2)
 :type executor_cores: int
 :param executor_memory: Memory per executor (e.g. 1000M, 2G) (Default: 1G)
 :type executor_memory: str
+:param driver_memory: Memory allocated to the driver (e.g. 1000M, 2G) 
(Default: 1G)
+:type driver_memory: str
 :param keytab: Full path to the file that contains the keytab
 :type keytab: str
 :param principal: The name of the kerberos principal used for keytab
@@ -60,8 +66,10 @@ class SparkSubmitHook(BaseHook):
  files=None,
  py_files=None,
  jars=None,
+ java_class=None,
  executor_cores=None,
  executor_memory=None,
+ driver_memory=None,
  keytab=None,
  principal=None,
  name='default-name',
@@ -72,8 +80,10 @@ class SparkSubmitHook(BaseHook):
 self._files = files
 self._py_files = py_files
 self._jars = jars
+self._java_class = java_class
 self._executor_cores = executor_cores
 self._executor_memory = executor_memory
+self._driver_memory = driver_memory
 self._keytab = keytab
 self._principal = principal
 self._name = name
@@ -82,7 +92,7 @@ class SparkSubmitHook(BaseHook):
 self._sp = None
 self._yarn_application_id = None
 
-(self._master, self._queue, self._deploy_mode) = 
self._resolve_connection()
+(self._master, self._queue, self._deploy_mode, self._spark_home) = 
self._resolve_connection()
 self._is_yarn = 'yarn' in self._master
 
 def _resolve_connection(self):
@@ -90,6 +100,7 @@ class SparkSubmitHook(BaseHook):
 master = 'yarn'
 queue = None
 deploy_mode = None
+spark_home = None
 
 try:
 # Master can be local, yarn, spark://HOST:PORT or mesos://HOST:PORT
@@ -105,6 +116,8 @@ class SparkSubmitHook(BaseHook):
   

[incubator-airflow] Git Push Summary

2017-05-09 Thread criccomini
Repository: incubator-airflow
Updated Tags:  refs/tags/1.8.1 2b811c445 -> 0d8509e7e


[incubator-airflow] Git Push Summary

2017-05-09 Thread criccomini
Repository: incubator-airflow
Updated Tags:  refs/tags/1.8.1 [created] 2b811c445


[jira] [Closed] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen closed AIRFLOW-1183.

Resolution: Fixed

> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> *Error*
> {code}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}
> *Code*
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp',
> deploy_mode='cluster',
> dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Closed] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook

2017-05-09 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini closed AIRFLOW-1179.

   Resolution: Fixed
Fix Version/s: 1.9.0

> Pandas 0.20 broke Google BigQuery hook
> --
>
> Key: AIRFLOW-1179
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1179
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Niels Zeilemaker
>Assignee: Niels Zeilemaker
> Fix For: 1.9.0
>
>
> Master build is broken due to pandas bigquery support being moved to an 
> external package



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


incubator-airflow git commit: [AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change

2017-05-09 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 4284e6485 -> ac9ccb151


[AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change

Closes #2279 from NielsZeilemaker/AIRFLOW-1179


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ac9ccb15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ac9ccb15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ac9ccb15

Branch: refs/heads/master
Commit: ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9
Parents: 4284e64
Author: Niels Zeilemaker 
Authored: Tue May 9 09:42:32 2017 -0700
Committer: Chris Riccomini 
Committed: Tue May 9 09:42:32 2017 -0700

--
 airflow/contrib/hooks/bigquery_hook.py | 2 +-
 scripts/ci/requirements.txt| 1 +
 setup.py   | 1 +
 3 files changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/airflow/contrib/hooks/bigquery_hook.py
--
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 53ca123..06de4e8 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -24,7 +24,7 @@ import time
 from apiclient.discovery import build, HttpError
 from googleapiclient import errors
 from builtins import range
-from pandas.io.gbq import GbqConnector, \
+from pandas_gbq.gbq import GbqConnector, \
 _parse_data as gbq_parse_data, \
 _check_google_client_version as gbq_check_google_client_version, \
 _test_google_api_imports as gbq_test_google_api_imports

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/scripts/ci/requirements.txt
--
diff --git a/scripts/ci/requirements.txt b/scripts/ci/requirements.txt
index 9769dfb..06ad0bb 100644
--- a/scripts/ci/requirements.txt
+++ b/scripts/ci/requirements.txt
@@ -61,6 +61,7 @@ nose-ignore-docstring==0.2
 nose-parameterized
 nose-timer
 pandas
+pandas-gbq
 psutil>=4.2.0, <5.0.0
 psycopg2
 pydruid

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ac9ccb15/setup.py
--
diff --git a/setup.py b/setup.py
index 0b98e56..b0b2ddd 100644
--- a/setup.py
+++ b/setup.py
@@ -142,6 +142,7 @@ gcp_api = [
 'google-api-python-client>=1.5.0, <1.6.0',
 'oauth2client>=2.0.2, <2.1.0',
 'PyOpenSSL',
+'pandas-gbq'
 ]
 hdfs = ['snakebite>=2.7.8']
 webhdfs = ['hdfs[dataframe,avro,kerberos]>=2.0.4']



[jira] [Commented] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003006#comment-16003006
 ] 

ASF subversion and git services commented on AIRFLOW-1179:
--

Commit ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9 in incubator-airflow's branch 
refs/heads/master from [~nzeilemaker]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ac9ccb1 ]

[AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change

Closes #2279 from NielsZeilemaker/AIRFLOW-1179


> Pandas 0.20 broke Google BigQuery hook
> --
>
> Key: AIRFLOW-1179
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1179
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Niels Zeilemaker
>Assignee: Niels Zeilemaker
>
> Master build is broken due to pandas bigquery support being moved to an 
> external package



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Commented] (AIRFLOW-1179) Pandas 0.20 broke Google BigQuery hook

2017-05-09 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16003005#comment-16003005
 ] 

ASF subversion and git services commented on AIRFLOW-1179:
--

Commit ac9ccb1518f6a0273d53fcd8e32aba1ac5563fb9 in incubator-airflow's branch 
refs/heads/master from [~nzeilemaker]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=ac9ccb1 ]

[AIRFLOW-1179] Fix Pandas 0.2x breaking Google BigQuery change

Closes #2279 from NielsZeilemaker/AIRFLOW-1179


> Pandas 0.20 broke Google BigQuery hook
> --
>
> Key: AIRFLOW-1179
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1179
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Niels Zeilemaker
>Assignee: Niels Zeilemaker
>
> Master build is broken due to pandas bigquery support being moved to an 
> external package



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1184) Contrib Spark Submit Hook does not split argument and argument value

2017-05-09 Thread Vianney FOUCAULT (JIRA)
Vianney FOUCAULT created AIRFLOW-1184:
-

 Summary: Contrib Spark Submit Hook does not split argument and 
argument value
 Key: AIRFLOW-1184
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1184
 Project: Apache Airflow
  Issue Type: Bug
  Components: contrib, hooks
Affects Versions: Airflow 2.0, Airflow 1.8
Reporter: Vianney FOUCAULT
Assignee: Vianney FOUCAULT
 Fix For: Airflow 2.0, Airflow 1.8


Python Popen expect a list as command. Spark submit too, as: 
* ['--option value'] 
is not the same as 
* ['--option', 'value']

in regards of spark. eg spark logs : (yarn logs)

Error: Unknown option --end 2017-05-08
Error: Unknown option --begin 2017-05-07
Error: Unknown option --db_name mydb
Error: Missing option --begin
Error: Missing option --end
Error: Missing option --db_name




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

*Error*
{code}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> *Error*
> {code}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}
> *Code*
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

*Error*
{code}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp',
deploy_mode='cluster',
dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

*Error*
{code}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> *Error*
> {code}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {code}
> *Code*
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp',
> deploy_mode='cluster',
> dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'} {quote}
> *Code*
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

*Code*
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

#Code
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> {quote}
> *Code*
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

#Code
{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> {quote}
> #Code
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

{code:python}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> {quote}
> {code:python}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

{code}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

{code:python}
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
{code}


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> {quote}
> {code}
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
bq. *args: ()
bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> bq. PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> #Code
> ```
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

{quote}
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
{quote}

#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> {quote}
> PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> {quote}
> #Code
> ```
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

```
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}```


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> bq. PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> *args: ()
> **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> #Code
> ```
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Updated] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1183?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sam sen updated AIRFLOW-1183:
-
Description: 
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
bq. *args: ()
bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```

  was:
How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.


bq. PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```


> How to pass Spark URL for standalone cluster?
> -
>
> Key: AIRFLOW-1183
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: operators
>Affects Versions: Airflow 1.8
>Reporter: sam sen
>Priority: Critical
>
> How can I pass my Spark URL? When I look in the logs I see `--master` is 
> pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing 
> it within the function but I'm getting an error.
> bq. PendingDeprecationWarning: Invalid arguments were passed to 
> SparkSubmitOperator. Support for passing such arguments will be dropped in 
> Airflow 2.0. Invalid arguments were:
> bq. *args: ()
> bq. **kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}
> #Code
> ```
> testSpark = SparkSubmitOperator(
>task_id='test-spark',
> deploy_mode='cluster',
> 
> application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
> java_class='SimpleApp'
>dag=dag)
> ```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1183) How to pass Spark URL for standalone cluster?

2017-05-09 Thread sam sen (JIRA)
sam sen created AIRFLOW-1183:


 Summary: How to pass Spark URL for standalone cluster?
 Key: AIRFLOW-1183
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1183
 Project: Apache Airflow
  Issue Type: Bug
  Components: operators
Affects Versions: Airflow 1.8
Reporter: sam sen
Priority: Critical


How can I pass my Spark URL? When I look in the logs I see `--master` is 
pointed to "yarn." Also, the same thing for `cluster-mode`. I tried passing it 
within the function but I'm getting an error.

```
PendingDeprecationWarning: Invalid arguments were passed to 
SparkSubmitOperator. Support for passing such arguments will be dropped in 
Airflow 2.0. Invalid arguments were:
*args: ()
**kwargs: {'deploy_mode': 'cluster', 'java_class': 'SimpleApp'}```


#Code
```
testSpark = SparkSubmitOperator(
   task_id='test-spark',
deploy_mode='cluster',
application='src/main/scala/target/scala-2.11/simple-project_2.11-1.0.jar',
java_class='SimpleApp'
   dag=dag)
```



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)


[jira] [Created] (AIRFLOW-1182) Contrib Spark Submit operator should template fields

2017-05-09 Thread Vianney FOUCAULT (JIRA)
Vianney FOUCAULT created AIRFLOW-1182:
-

 Summary: Contrib Spark Submit operator should template fields
 Key: AIRFLOW-1182
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1182
 Project: Apache Airflow
  Issue Type: Improvement
  Components: contrib, operators
Affects Versions: Airflow 2.0, Airflow 1.8
Reporter: Vianney FOUCAULT
Assignee: Vianney FOUCAULT
 Fix For: Airflow 2.0, 1.8.1


the spark submit operator is not templating any field making {{ ds }} unusable 
for spark apps.





--
This message was sent by Atlassian JIRA
(v6.3.15#6346)