[jira] [Updated] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.

2017-10-11 Thread Shintaro Murakami (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shintaro Murakami updated AIRFLOW-1704:
---
Attachment: dags.png
tree.png

> If the subdag is defined as a local variable, its variable is_subdag is False.
> --
>
> Key: AIRFLOW-1704
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1704
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Shintaro Murakami
> Attachments: dags.png, test_subdag_local_variable.py, tree.png
>
>
> If the subdag is defined as a local variable in DAG definition code, its 
> variable is_subdag is False.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.

2017-10-11 Thread Shintaro Murakami (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shintaro Murakami updated AIRFLOW-1704:
---
Attachment: test_subdag_local_variable.py

DAG definition for reproducing

> If the subdag is defined as a local variable, its variable is_subdag is False.
> --
>
> Key: AIRFLOW-1704
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1704
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: models
>Reporter: Shintaro Murakami
> Attachments: test_subdag_local_variable.py
>
>
> If the subdag is defined as a local variable in DAG definition code, its 
> variable is_subdag is False.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.

2017-10-11 Thread Shintaro Murakami (JIRA)
Shintaro Murakami created AIRFLOW-1704:
--

 Summary: If the subdag is defined as a local variable, its 
variable is_subdag is False.
 Key: AIRFLOW-1704
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1704
 Project: Apache Airflow
  Issue Type: Bug
  Components: models
Reporter: Shintaro Murakami


If the subdag is defined as a local variable in DAG definition code, its 
variable is_subdag is False.




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (AIRFLOW-1681) Create way to batch retry task instances in the CRUD

2017-10-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200963#comment-16200963
 ] 

ASF subversion and git services commented on AIRFLOW-1681:
--

Commit 9f2c16a0ac261888fe2ee4671538201c273f82d5 in incubator-airflow's branch 
refs/heads/master from [~kevinyang]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9f2c16a ]

[AIRFLOW-1681] Add batch clear in task instance view

Allow users to batch clear selected task
instance(s) in task instance view. Only state(s)
of selected task instance(s) will be cleared--no
upstream nor downstream task instance will be
affected.

DAG(s) involved will be set to "RUNNING" state,
same as existing "clear" operation.

Keeping both "Delete" and "Clear" operations for
more smooth user habit transition--informing DAG
state change in pop-up (check screenshots).

Closes #2681 from yrqls21/add-batch-clear-in-task-
instance-view


> Create way to batch retry task instances in the CRUD
> 
>
> Key: AIRFLOW-1681
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1681
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: webserver
>Reporter: Dan Davydov
>
> The old way to batch retry tasks was to select them on the Task Instances 
> page on the webserver and do a With Selected -> Delete.
> This no longer works as you will get overlapping task instance logs (e.g. the 
> first retry log will be placed in the same location as the first try log). We 
> need an option in the crud called With Selected -> Retry that does the same 
> thing as With Selected -> Delete but follows the logic for task clearing 
> (sets state to none, increases max_tries). Once this feature is stable With 
> Selected -> Delete should probably be removed as it leaders to bad states 
> with the logs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view

2017-10-11 Thread davydov
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 98b4df945 -> 9f2c16a0a


[AIRFLOW-1681] Add batch clear in task instance view

Allow users to batch clear selected task
instance(s) in task instance view. Only state(s)
of selected task instance(s) will be cleared--no
upstream nor downstream task instance will be
affected.

DAG(s) involved will be set to "RUNNING" state,
same as existing "clear" operation.

Keeping both "Delete" and "Clear" operations for
more smooth user habit transition--informing DAG
state change in pop-up (check screenshots).

Closes #2681 from yrqls21/add-batch-clear-in-task-
instance-view


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0

Branch: refs/heads/master
Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5
Parents: 98b4df9
Author: Kevin Yang 
Authored: Wed Oct 11 14:05:33 2017 -0700
Committer: Dan Davydov 
Committed: Wed Oct 11 14:05:35 2017 -0700

--
 airflow/www/views.py | 62 +++
 1 file changed, 31 insertions(+), 31 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py
--
diff --git a/airflow/www/views.py b/airflow/www/views.py
index bc63b5b..81ee61f 100644
--- a/airflow/www/views.py
+++ b/airflow/www/views.py
@@ -37,7 +37,7 @@ import sqlalchemy as sqla
 from sqlalchemy import or_, desc, and_, union_all
 
 from flask import (
-abort, redirect, url_for, request, Markup, Response, current_app, 
render_template, 
+abort, redirect, url_for, request, Markup, Response, current_app, 
render_template,
 make_response)
 from flask_admin import BaseView, expose, AdminIndexView
 from flask_admin.contrib.sqla import ModelView
@@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly):
 'start_date', 'end_date', 'duration', 'job_id', 'hostname',
 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number',
 'pool', 'log_url')
-can_delete = True
 page_size = PAGE_SIZE
 
 @action('set_running', "Set state to 'running'", None)
@@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly):
 def action_set_retry(self, ids):
 self.set_task_instance_state(ids, State.UP_FOR_RETRY)
 
-@action('delete',
-lazy_gettext('Delete'),
-lazy_gettext('Are you sure you want to delete selected records?'))
-def action_delete(self, ids):
-"""
-As a workaround for AIRFLOW-277, this method overrides Flask-Admin's 
ModelView.action_delete().
-
-TODO: this method should be removed once the below bug is fixed on 
Flask-Admin side.
-https://github.com/flask-admin/flask-admin/issues/1226
-"""
-if 'sqlite' in conf.get('core', 'sql_alchemy_conn'):
-self.delete_task_instances(ids)
-else:
-super(TaskInstanceModelView, self).action_delete(ids)
-
 @provide_session
-def set_task_instance_state(self, ids, target_state, session=None):
+@action('clear',
+lazy_gettext('Clear'),
+lazy_gettext(
+'Are you sure you want to clear the state of the selected task 
instance(s)'
+' and set their dagruns to the running state?'))
+def action_clear(self, ids, session=None):
 try:
 TI = models.TaskInstance
-count = len(ids)
+
+dag_to_tis = {}
+
 for id in ids:
 task_id, dag_id, execution_date = id.split(',')
-execution_date = datetime.strptime(execution_date, '%Y-%m-%d 
%H:%M:%S')
+
 ti = session.query(TI).filter(TI.task_id == task_id,
   TI.dag_id == dag_id,
   TI.execution_date == 
execution_date).one()
-ti.state = target_state
+
+dag = dagbag.get_dag(dag_id)
+tis = dag_to_tis.setdefault(dag, [])
+tis.append(ti)
+
+for dag, tis in dag_to_tis.items():
+models.clear_task_instances(tis, session, dag=dag)
+
 session.commit()
-flash(
-"{count} task instances were set to 
'{target_state}'".format(**locals()))
+flash("{0} task instances have been cleared".format(len(ids)))
+
 except Exception as ex:
 if not self.handle_view_exception(ex):
 raise Exception("Ooops")
-flash('Failed to set state', 

[jira] [Comment Edited] (AIRFLOW-1702) access to the count of the happened retries in a python method

2017-10-11 Thread Joy Gao (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200914#comment-16200914
 ] 

Joy Gao edited comment on AIRFLOW-1702 at 10/11/17 8:31 PM:


The TaskInstance has an attribute `try_number`, you can access it via the the 
python operator. 

i.e.

{code:java}
def foo(**context):
ti = context[ti]
retry = ti.try_number - 1
# do something with retry count

op = PythonOperator(
task_id='task',
provide_context=True,
python_callable=foo,
dag=dag)
{code}


Hope this helps!


was (Author: joy.gao54):
The TaskInstance has an attribute `try_number`, you can access it via the the 
python operator. 

i.e.
def foo(**context):
ti = context[ti]
retry = ti.try_number - 1
# do something with retry count

op = PythonOperator(
task_id='task',
provide_context=True,
python_callable=foo,
dag=dag)

Hope this helps!

> access to the count of the happened retries in a python method
> --
>
> Key: AIRFLOW-1702
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1702
> Project: Apache Airflow
>  Issue Type: Wish
>Reporter: Igor Cherepanov
>
> hello, 
> is it possible to access to the count of the happened retries in a python 
> method
> thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (AIRFLOW-1702) access to the count of the happened retries in a python method

2017-10-11 Thread Joy Gao (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200914#comment-16200914
 ] 

Joy Gao commented on AIRFLOW-1702:
--

The TaskInstance has an attribute `try_number`, you can access it via the the 
python operator. 

i.e.
def foo(**context):
ti = context[ti]
retry = ti.try_number - 1
# do something with retry count

op = PythonOperator(
task_id='task',
provide_context=True,
python_callable=foo,
dag=dag)

Hope this helps!

> access to the count of the happened retries in a python method
> --
>
> Key: AIRFLOW-1702
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1702
> Project: Apache Airflow
>  Issue Type: Wish
>Reporter: Igor Cherepanov
>
> hello, 
> is it possible to access to the count of the happened retries in a python 
> method
> thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version

2017-10-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200908#comment-16200908
 ] 

ASF subversion and git services commented on AIRFLOW-1696:
--

Commit 98b4df945dec5dfe766c8a9a15cf4b3932bf1a5c in incubator-airflow's branch 
refs/heads/master from [~TrevorEdwards]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=98b4df9 ]

[AIRFLOW-1696] Fix dataproc version label error

Closes #2676 from TrevorEdwards/airflow-1696


> GCP Dataproc Operator fails due to '+' in Airflow version
> -
>
> Key: AIRFLOW-1696
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1696
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Trevor Edwards
>Assignee: Trevor Edwards
> Fix For: 1.10.0
>
>
> Since Dataproc operator attaches the Airflow version as one of its labels, 
> and dataproc labels cannot include the character '+', dataproc operator 
> currently fails with the following error:
> {code:none}
> [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 
> --job_id 6 --raw -sd DAGS_FOLDER/dataa.py']
> [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor
> [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from 
> /home/airflow/dags/dataa.py
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster 
> name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: 
> smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection 
> using a JSON key file.
> [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,906] {models.py:1564} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in 
> _run_raw_task
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result 
> = task_copy.execute(context=context)
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py",
>  line 300, in execute
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e
> [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: 
>  https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,977] {models.py:1613} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,979] 

[jira] [Resolved] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version

2017-10-11 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini resolved AIRFLOW-1696.
--
Resolution: Fixed

> GCP Dataproc Operator fails due to '+' in Airflow version
> -
>
> Key: AIRFLOW-1696
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1696
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Trevor Edwards
>Assignee: Trevor Edwards
> Fix For: 1.10.0
>
>
> Since Dataproc operator attaches the Airflow version as one of its labels, 
> and dataproc labels cannot include the character '+', dataproc operator 
> currently fails with the following error:
> {code:none}
> [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 
> --job_id 6 --raw -sd DAGS_FOLDER/dataa.py']
> [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor
> [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from 
> /home/airflow/dags/dataa.py
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster 
> name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: 
> smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection 
> using a JSON key file.
> [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,906] {models.py:1564} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in 
> _run_raw_task
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result 
> = task_copy.execute(context=context)
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py",
>  line 300, in execute
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e
> [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: 
>  https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,977] {models.py:1613} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask:   File 
> "/usr/local/bin/airflow", line 6, in 
> [2017-10-09 

incubator-airflow git commit: [AIRFLOW-1696] Fix dataproc version label error

2017-10-11 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d578b292e -> 98b4df945


[AIRFLOW-1696] Fix dataproc version label error

Closes #2676 from TrevorEdwards/airflow-1696


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/98b4df94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/98b4df94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/98b4df94

Branch: refs/heads/master
Commit: 98b4df945dec5dfe766c8a9a15cf4b3932bf1a5c
Parents: d578b29
Author: Trevor Edwards 
Authored: Wed Oct 11 13:28:13 2017 -0700
Committer: Chris Riccomini 
Committed: Wed Oct 11 13:28:17 2017 -0700

--
 airflow/contrib/operators/dataproc_operator.py| 2 +-
 tests/contrib/operators/test_dataproc_operator.py | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/98b4df94/airflow/contrib/operators/dataproc_operator.py
--
diff --git a/airflow/contrib/operators/dataproc_operator.py 
b/airflow/contrib/operators/dataproc_operator.py
index 0823ed8..99e4a0d 100644
--- a/airflow/contrib/operators/dataproc_operator.py
+++ b/airflow/contrib/operators/dataproc_operator.py
@@ -241,7 +241,7 @@ class DataprocClusterCreateOperator(BaseOperator):
 # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows
 # semantic versioning spec: x.y.z).
 cluster_data['labels'].update({'airflow-version':
-   'v' + version.replace('.', '-')})
+   'v' + version.replace('.', 
'-').replace('+','-')})
 if self.storage_bucket:
 cluster_data['config']['configBucket'] = self.storage_bucket
 if self.metadata:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/98b4df94/tests/contrib/operators/test_dataproc_operator.py
--
diff --git a/tests/contrib/operators/test_dataproc_operator.py 
b/tests/contrib/operators/test_dataproc_operator.py
index d206fba..ad78a8d 100644
--- a/tests/contrib/operators/test_dataproc_operator.py
+++ b/tests/contrib/operators/test_dataproc_operator.py
@@ -129,7 +129,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase):
 # set to the dataproc operator.
 merged_labels = {}
 merged_labels.update(self.labels[suffix])
-merged_labels.update({'airflow-version': 'v' + 
version.replace('.', '-')})
+merged_labels.update({'airflow-version': 'v' + 
version.replace('.', '-').replace('+','-')})
 self.assertTrue(re.match(r'[a-z]([-a-z0-9]*[a-z0-9])?',
  
cluster_data['labels']['airflow-version']))
 self.assertEqual(cluster_data['labels'], merged_labels)



[jira] [Updated] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version

2017-10-11 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini updated AIRFLOW-1696:
-
Fix Version/s: 1.10.0

> GCP Dataproc Operator fails due to '+' in Airflow version
> -
>
> Key: AIRFLOW-1696
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1696
> Project: Apache Airflow
>  Issue Type: Bug
>Reporter: Trevor Edwards
>Assignee: Trevor Edwards
> Fix For: 1.10.0
>
>
> Since Dataproc operator attaches the Airflow version as one of its labels, 
> and dataproc labels cannot include the character '+', dataproc operator 
> currently fails with the following error:
> {code:none}
> [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', 
> '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 
> --job_id 6 --raw -sd DAGS_FOLDER/dataa.py']
> [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor
> [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from 
> /home/airflow/dags/dataa.py
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster 
> name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: 
> smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0
> [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection 
> using a JSON key file.
> [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,906] {models.py:1564} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in 
> _run_raw_task
> [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result 
> = task_copy.execute(context=context)
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask:   File 
> "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py",
>  line 300, in execute
> [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e
> [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: 
>  https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: 
> [2017-10-09 19:28:59,977] {models.py:1613} ERROR -  requesting 
> https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json
>  returned "Multiple validation errors:
> [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask:  - Not a 
> valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and 
> dashes are allowed. The value must start with lowercase letter or number and 
> end with a lowercase letter or number.
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask:  - User 
> label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern">
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: Traceback 
> (most recent call last):
> [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask:   File 
> "/usr/local/bin/airflow", line 6, in 
> [2017-10-09 

[jira] [Resolved] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3

2017-10-11 Thread Chris Riccomini (JIRA)

 [ 
https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Riccomini resolved AIRFLOW-1613.
--
Resolution: Fixed

> Make MySqlToGoogleCloudStorageOperator compaitible with python3
> ---
>
> Key: AIRFLOW-1613
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1613
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: contrib
>Reporter: Joy Gao
>Assignee: Joy Gao
> Fix For: 1.9.0
>
>
> 1. 
> In Python 3, map(...) returns an iterator, which can only be iterated over 
> once. 
> Therefore the current implementation will return an empty list after the 
> first iteration of schema:
> {code}
> schema = map(lambda schema_tuple: schema_tuple[0], cursor.description)
> file_no = 0
> tmp_file_handle = NamedTemporaryFile(delete=True)
> tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
> for row in cursor:
> # Convert datetime objects to utc seconds, and decimals to floats
> row = map(self.convert_types, row)
> row_dict = dict(zip(schema, row))
> {code}
> 2.
> File opened as binary, but string are written to it. Get error `a bytes-like 
> object is required, not 'str'`. Use mode='w' instead.
> 3.
> Operator currently does not support binary columns in mysql.  We should 
> support uploading binary columns from mysql to cloud storage as it's a pretty 
> common use-case. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3

2017-10-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200873#comment-16200873
 ] 

ASF subversion and git services commented on AIRFLOW-1613:
--

Commit afcdd097d765c8e88b4858436bb01585bc04ba7f in incubator-airflow's branch 
refs/heads/v1-9-test from [~joy.gao54]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=afcdd09 ]

[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator

Closes #2680 from jgao54/write-binary


> Make MySqlToGoogleCloudStorageOperator compaitible with python3
> ---
>
> Key: AIRFLOW-1613
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1613
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: contrib
>Reporter: Joy Gao
>Assignee: Joy Gao
> Fix For: 1.9.0
>
>
> 1. 
> In Python 3, map(...) returns an iterator, which can only be iterated over 
> once. 
> Therefore the current implementation will return an empty list after the 
> first iteration of schema:
> {code}
> schema = map(lambda schema_tuple: schema_tuple[0], cursor.description)
> file_no = 0
> tmp_file_handle = NamedTemporaryFile(delete=True)
> tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
> for row in cursor:
> # Convert datetime objects to utc seconds, and decimals to floats
> row = map(self.convert_types, row)
> row_dict = dict(zip(schema, row))
> {code}
> 2.
> File opened as binary, but string are written to it. Get error `a bytes-like 
> object is required, not 'str'`. Use mode='w' instead.
> 3.
> Operator currently does not support binary columns in mysql.  We should 
> support uploading binary columns from mysql to cloud storage as it's a pretty 
> common use-case. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


incubator-airflow git commit: [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator

2017-10-11 Thread criccomini
Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test ace2b1d24 -> afcdd097d


[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator

Closes #2680 from jgao54/write-binary


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/afcdd097
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/afcdd097
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/afcdd097

Branch: refs/heads/v1-9-test
Commit: afcdd097d765c8e88b4858436bb01585bc04ba7f
Parents: ace2b1d
Author: Joy Gao 
Authored: Wed Oct 11 13:06:17 2017 -0700
Committer: Chris Riccomini 
Committed: Wed Oct 11 13:07:32 2017 -0700

--
 airflow/contrib/hooks/bigquery_hook.py|  4 ++-
 airflow/contrib/operators/mysql_to_gcs.py | 48 +++---
 2 files changed, 38 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/hooks/bigquery_hook.py
--
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index 5fc7e22..fab2a43 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -971,7 +971,9 @@ def _bq_cast(string_field, bq_type):
 if string_field is None:
 return None
 elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP':
-return int(string_field)
+# convert to float first to handle cases where string_field is
+# represented in scientific notation
+return int(float(string_field))
 elif bq_type == 'FLOAT':
 return float(string_field)
 elif bq_type == 'BOOLEAN':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/operators/mysql_to_gcs.py
--
diff --git a/airflow/contrib/operators/mysql_to_gcs.py 
b/airflow/contrib/operators/mysql_to_gcs.py
index f94bc24..47b7ac9 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -14,6 +14,7 @@
 
 import json
 import time
+import base64
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.hooks.mysql_hook import MySqlHook
@@ -21,7 +22,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from datetime import date, datetime
 from decimal import Decimal
-from MySQLdb.constants import FIELD_TYPE
+from MySQLdb.constants import FIELD_TYPE, FLAG
 from tempfile import NamedTemporaryFile
 
 
@@ -120,15 +121,20 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
 names in GCS, and values are file handles to local files that
 contain the data for the GCS objects.
 """
-schema = list(map(lambda schema_tuple: schema_tuple[0], 
cursor.description))
+field_names = list(map(lambda schema_tuple: schema_tuple[0], 
cursor.description))
+mysql_types = list(map(lambda schema_tuple: schema_tuple[1], 
cursor.description))
+byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, 
cursor.description_flags)]
+
 file_no = 0
-tmp_file_handle = NamedTemporaryFile(delete=True)
+tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
 tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
 
 for row in cursor:
-# Convert datetime objects to utc seconds, and decimals to floats
-row = map(self.convert_types, row)
-row_dict = dict(zip(schema, row))
+# Convert datetime objects to utc seconds, decimals to floats, and 
binaries
+# to base64-encoded strings
+row_dict = {}
+for name, value, is_binary in zip(field_names, row, byte_fields):
+row_dict[name] = self.convert_types(value, is_binary)
 
 # TODO validate that row isn't > 2MB. BQ enforces a hard row size 
of 2MB.
 json.dump(row_dict, tmp_file_handle)
@@ -139,7 +145,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
 # Stop if the file exceeds the file size limit.
 if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
 file_no += 1
-tmp_file_handle = NamedTemporaryFile(delete=True)
+tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
 tmp_file_handles[self.filename.format(file_no)] = 
tmp_file_handle
 
 return tmp_file_handles
@@ -154,10 +160,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
 contains the BigQuery schema fields in .json format.
 """
 

[jira] [Commented] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3

2017-10-11 Thread ASF subversion and git services (JIRA)

[ 
https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200871#comment-16200871
 ] 

ASF subversion and git services commented on AIRFLOW-1613:
--

Commit d578b292e96d5fdd87b5168508005cd73edc4f96 in incubator-airflow's branch 
refs/heads/master from [~joy.gao54]
[ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=d578b29 ]

[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator

Closes #2680 from jgao54/write-binary


> Make MySqlToGoogleCloudStorageOperator compaitible with python3
> ---
>
> Key: AIRFLOW-1613
> URL: https://issues.apache.org/jira/browse/AIRFLOW-1613
> Project: Apache Airflow
>  Issue Type: Bug
>  Components: contrib
>Reporter: Joy Gao
>Assignee: Joy Gao
> Fix For: 1.9.0
>
>
> 1. 
> In Python 3, map(...) returns an iterator, which can only be iterated over 
> once. 
> Therefore the current implementation will return an empty list after the 
> first iteration of schema:
> {code}
> schema = map(lambda schema_tuple: schema_tuple[0], cursor.description)
> file_no = 0
> tmp_file_handle = NamedTemporaryFile(delete=True)
> tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
> for row in cursor:
> # Convert datetime objects to utc seconds, and decimals to floats
> row = map(self.convert_types, row)
> row_dict = dict(zip(schema, row))
> {code}
> 2.
> File opened as binary, but string are written to it. Get error `a bytes-like 
> object is required, not 'str'`. Use mode='w' instead.
> 3.
> Operator currently does not support binary columns in mysql.  We should 
> support uploading binary columns from mysql to cloud storage as it's a pretty 
> common use-case. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1703) Airflow LocalExecutor crashes after 3 hours of work. Database is locked

2017-10-11 Thread Kirill Dubovikov (JIRA)
Kirill Dubovikov created AIRFLOW-1703:
-

 Summary: Airflow LocalExecutor crashes after 3 hours of work. 
Database is locked
 Key: AIRFLOW-1703
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1703
 Project: Apache Airflow
  Issue Type: Bug
  Components: db, worker
Affects Versions: Airflow 1.8
 Environment: Single CentOS virtual server
Reporter: Kirill Dubovikov
 Attachments: nohup.out

Airflow consistently crashes after working several hours on a single node when 
using SQLite DB. Our DAG is scheduled to run {{@daily}}. We launch airflow 
using the following commands

{code:sh}
airflow scheduler
airflow webserver -p 8080
{code}

After a while worker and webserver crash with the following error: 
{{sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is 
locked [SQL: 'SELECT connection.conn_id AS connection_conn_id \nFROM connection 
GROUP BY connection.conn_id']}}

I've attached full logs for further investigation



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (AIRFLOW-1702) access to the count of the happened retries in a python method

2017-10-11 Thread Igor Cherepanov (JIRA)
Igor Cherepanov created AIRFLOW-1702:


 Summary: access to the count of the happened retries in a python 
method
 Key: AIRFLOW-1702
 URL: https://issues.apache.org/jira/browse/AIRFLOW-1702
 Project: Apache Airflow
  Issue Type: Wish
Reporter: Igor Cherepanov


hello, 

is it possible to access to the count of the happened retries in a python method

thanks!



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)