[jira] [Updated] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.
[ https://issues.apache.org/jira/browse/AIRFLOW-1704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shintaro Murakami updated AIRFLOW-1704: --- Attachment: dags.png tree.png > If the subdag is defined as a local variable, its variable is_subdag is False. > -- > > Key: AIRFLOW-1704 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1704 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Shintaro Murakami > Attachments: dags.png, test_subdag_local_variable.py, tree.png > > > If the subdag is defined as a local variable in DAG definition code, its > variable is_subdag is False. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.
[ https://issues.apache.org/jira/browse/AIRFLOW-1704?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shintaro Murakami updated AIRFLOW-1704: --- Attachment: test_subdag_local_variable.py DAG definition for reproducing > If the subdag is defined as a local variable, its variable is_subdag is False. > -- > > Key: AIRFLOW-1704 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1704 > Project: Apache Airflow > Issue Type: Bug > Components: models >Reporter: Shintaro Murakami > Attachments: test_subdag_local_variable.py > > > If the subdag is defined as a local variable in DAG definition code, its > variable is_subdag is False. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1704) If the subdag is defined as a local variable, its variable is_subdag is False.
Shintaro Murakami created AIRFLOW-1704: -- Summary: If the subdag is defined as a local variable, its variable is_subdag is False. Key: AIRFLOW-1704 URL: https://issues.apache.org/jira/browse/AIRFLOW-1704 Project: Apache Airflow Issue Type: Bug Components: models Reporter: Shintaro Murakami If the subdag is defined as a local variable in DAG definition code, its variable is_subdag is False. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1681) Create way to batch retry task instances in the CRUD
[ https://issues.apache.org/jira/browse/AIRFLOW-1681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200963#comment-16200963 ] ASF subversion and git services commented on AIRFLOW-1681: -- Commit 9f2c16a0ac261888fe2ee4671538201c273f82d5 in incubator-airflow's branch refs/heads/master from [~kevinyang] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=9f2c16a ] [AIRFLOW-1681] Add batch clear in task instance view Allow users to batch clear selected task instance(s) in task instance view. Only state(s) of selected task instance(s) will be cleared--no upstream nor downstream task instance will be affected. DAG(s) involved will be set to "RUNNING" state, same as existing "clear" operation. Keeping both "Delete" and "Clear" operations for more smooth user habit transition--informing DAG state change in pop-up (check screenshots). Closes #2681 from yrqls21/add-batch-clear-in-task- instance-view > Create way to batch retry task instances in the CRUD > > > Key: AIRFLOW-1681 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1681 > Project: Apache Airflow > Issue Type: Bug > Components: webserver >Reporter: Dan Davydov > > The old way to batch retry tasks was to select them on the Task Instances > page on the webserver and do a With Selected -> Delete. > This no longer works as you will get overlapping task instance logs (e.g. the > first retry log will be placed in the same location as the first try log). We > need an option in the crud called With Selected -> Retry that does the same > thing as With Selected -> Delete but follows the logic for task clearing > (sets state to none, increases max_tries). Once this feature is stable With > Selected -> Delete should probably be removed as it leaders to bad states > with the logs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1681] Add batch clear in task instance view
Repository: incubator-airflow Updated Branches: refs/heads/master 98b4df945 -> 9f2c16a0a [AIRFLOW-1681] Add batch clear in task instance view Allow users to batch clear selected task instance(s) in task instance view. Only state(s) of selected task instance(s) will be cleared--no upstream nor downstream task instance will be affected. DAG(s) involved will be set to "RUNNING" state, same as existing "clear" operation. Keeping both "Delete" and "Clear" operations for more smooth user habit transition--informing DAG state change in pop-up (check screenshots). Closes #2681 from yrqls21/add-batch-clear-in-task- instance-view Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/9f2c16a0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/9f2c16a0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/9f2c16a0 Branch: refs/heads/master Commit: 9f2c16a0ac261888fe2ee4671538201c273f82d5 Parents: 98b4df9 Author: Kevin YangAuthored: Wed Oct 11 14:05:33 2017 -0700 Committer: Dan Davydov Committed: Wed Oct 11 14:05:35 2017 -0700 -- airflow/www/views.py | 62 +++ 1 file changed, 31 insertions(+), 31 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/9f2c16a0/airflow/www/views.py -- diff --git a/airflow/www/views.py b/airflow/www/views.py index bc63b5b..81ee61f 100644 --- a/airflow/www/views.py +++ b/airflow/www/views.py @@ -37,7 +37,7 @@ import sqlalchemy as sqla from sqlalchemy import or_, desc, and_, union_all from flask import ( -abort, redirect, url_for, request, Markup, Response, current_app, render_template, +abort, redirect, url_for, request, Markup, Response, current_app, render_template, make_response) from flask_admin import BaseView, expose, AdminIndexView from flask_admin.contrib.sqla import ModelView @@ -2488,7 +2488,6 @@ class TaskInstanceModelView(ModelViewOnly): 'start_date', 'end_date', 'duration', 'job_id', 'hostname', 'unixname', 'priority_weight', 'queue', 'queued_dttm', 'try_number', 'pool', 'log_url') -can_delete = True page_size = PAGE_SIZE @action('set_running', "Set state to 'running'", None) @@ -2507,58 +2506,59 @@ class TaskInstanceModelView(ModelViewOnly): def action_set_retry(self, ids): self.set_task_instance_state(ids, State.UP_FOR_RETRY) -@action('delete', -lazy_gettext('Delete'), -lazy_gettext('Are you sure you want to delete selected records?')) -def action_delete(self, ids): -""" -As a workaround for AIRFLOW-277, this method overrides Flask-Admin's ModelView.action_delete(). - -TODO: this method should be removed once the below bug is fixed on Flask-Admin side. -https://github.com/flask-admin/flask-admin/issues/1226 -""" -if 'sqlite' in conf.get('core', 'sql_alchemy_conn'): -self.delete_task_instances(ids) -else: -super(TaskInstanceModelView, self).action_delete(ids) - @provide_session -def set_task_instance_state(self, ids, target_state, session=None): +@action('clear', +lazy_gettext('Clear'), +lazy_gettext( +'Are you sure you want to clear the state of the selected task instance(s)' +' and set their dagruns to the running state?')) +def action_clear(self, ids, session=None): try: TI = models.TaskInstance -count = len(ids) + +dag_to_tis = {} + for id in ids: task_id, dag_id, execution_date = id.split(',') -execution_date = datetime.strptime(execution_date, '%Y-%m-%d %H:%M:%S') + ti = session.query(TI).filter(TI.task_id == task_id, TI.dag_id == dag_id, TI.execution_date == execution_date).one() -ti.state = target_state + +dag = dagbag.get_dag(dag_id) +tis = dag_to_tis.setdefault(dag, []) +tis.append(ti) + +for dag, tis in dag_to_tis.items(): +models.clear_task_instances(tis, session, dag=dag) + session.commit() -flash( -"{count} task instances were set to '{target_state}'".format(**locals())) +flash("{0} task instances have been cleared".format(len(ids))) + except Exception as ex: if not self.handle_view_exception(ex): raise Exception("Ooops") -flash('Failed to set state',
[jira] [Comment Edited] (AIRFLOW-1702) access to the count of the happened retries in a python method
[ https://issues.apache.org/jira/browse/AIRFLOW-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200914#comment-16200914 ] Joy Gao edited comment on AIRFLOW-1702 at 10/11/17 8:31 PM: The TaskInstance has an attribute `try_number`, you can access it via the the python operator. i.e. {code:java} def foo(**context): ti = context[ti] retry = ti.try_number - 1 # do something with retry count op = PythonOperator( task_id='task', provide_context=True, python_callable=foo, dag=dag) {code} Hope this helps! was (Author: joy.gao54): The TaskInstance has an attribute `try_number`, you can access it via the the python operator. i.e. def foo(**context): ti = context[ti] retry = ti.try_number - 1 # do something with retry count op = PythonOperator( task_id='task', provide_context=True, python_callable=foo, dag=dag) Hope this helps! > access to the count of the happened retries in a python method > -- > > Key: AIRFLOW-1702 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1702 > Project: Apache Airflow > Issue Type: Wish >Reporter: Igor Cherepanov > > hello, > is it possible to access to the count of the happened retries in a python > method > thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1702) access to the count of the happened retries in a python method
[ https://issues.apache.org/jira/browse/AIRFLOW-1702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200914#comment-16200914 ] Joy Gao commented on AIRFLOW-1702: -- The TaskInstance has an attribute `try_number`, you can access it via the the python operator. i.e. def foo(**context): ti = context[ti] retry = ti.try_number - 1 # do something with retry count op = PythonOperator( task_id='task', provide_context=True, python_callable=foo, dag=dag) Hope this helps! > access to the count of the happened retries in a python method > -- > > Key: AIRFLOW-1702 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1702 > Project: Apache Airflow > Issue Type: Wish >Reporter: Igor Cherepanov > > hello, > is it possible to access to the count of the happened retries in a python > method > thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version
[ https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200908#comment-16200908 ] ASF subversion and git services commented on AIRFLOW-1696: -- Commit 98b4df945dec5dfe766c8a9a15cf4b3932bf1a5c in incubator-airflow's branch refs/heads/master from [~TrevorEdwards] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=98b4df9 ] [AIRFLOW-1696] Fix dataproc version label error Closes #2676 from TrevorEdwards/airflow-1696 > GCP Dataproc Operator fails due to '+' in Airflow version > - > > Key: AIRFLOW-1696 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1696 > Project: Apache Airflow > Issue Type: Bug >Reporter: Trevor Edwards >Assignee: Trevor Edwards > Fix For: 1.10.0 > > > Since Dataproc operator attaches the Airflow version as one of its labels, > and dataproc labels cannot include the character '+', dataproc operator > currently fails with the following error: > {code:none} > [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', > '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 > --job_id 6 --raw -sd DAGS_FOLDER/dataa.py'] > [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor > [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from > /home/airflow/dags/dataa.py > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster > name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: > smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection > using a JSON key file. > [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,906] {models.py:1564} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in > _run_raw_task > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result > = task_copy.execute(context=context) > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py", > line 300, in execute > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e > [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,977] {models.py:1613} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,979]
[jira] [Resolved] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version
[ https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini resolved AIRFLOW-1696. -- Resolution: Fixed > GCP Dataproc Operator fails due to '+' in Airflow version > - > > Key: AIRFLOW-1696 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1696 > Project: Apache Airflow > Issue Type: Bug >Reporter: Trevor Edwards >Assignee: Trevor Edwards > Fix For: 1.10.0 > > > Since Dataproc operator attaches the Airflow version as one of its labels, > and dataproc labels cannot include the character '+', dataproc operator > currently fails with the following error: > {code:none} > [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', > '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 > --job_id 6 --raw -sd DAGS_FOLDER/dataa.py'] > [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor > [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from > /home/airflow/dags/dataa.py > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster > name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: > smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection > using a JSON key file. > [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,906] {models.py:1564} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in > _run_raw_task > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result > = task_copy.execute(context=context) > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py", > line 300, in execute > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e > [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,977] {models.py:1613} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/bin/airflow", line 6, in > [2017-10-09
incubator-airflow git commit: [AIRFLOW-1696] Fix dataproc version label error
Repository: incubator-airflow Updated Branches: refs/heads/master d578b292e -> 98b4df945 [AIRFLOW-1696] Fix dataproc version label error Closes #2676 from TrevorEdwards/airflow-1696 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/98b4df94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/98b4df94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/98b4df94 Branch: refs/heads/master Commit: 98b4df945dec5dfe766c8a9a15cf4b3932bf1a5c Parents: d578b29 Author: Trevor EdwardsAuthored: Wed Oct 11 13:28:13 2017 -0700 Committer: Chris Riccomini Committed: Wed Oct 11 13:28:17 2017 -0700 -- airflow/contrib/operators/dataproc_operator.py| 2 +- tests/contrib/operators/test_dataproc_operator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/98b4df94/airflow/contrib/operators/dataproc_operator.py -- diff --git a/airflow/contrib/operators/dataproc_operator.py b/airflow/contrib/operators/dataproc_operator.py index 0823ed8..99e4a0d 100644 --- a/airflow/contrib/operators/dataproc_operator.py +++ b/airflow/contrib/operators/dataproc_operator.py @@ -241,7 +241,7 @@ class DataprocClusterCreateOperator(BaseOperator): # [a-z]([-a-z0-9]*[a-z0-9])? (current airflow version string follows # semantic versioning spec: x.y.z). cluster_data['labels'].update({'airflow-version': - 'v' + version.replace('.', '-')}) + 'v' + version.replace('.', '-').replace('+','-')}) if self.storage_bucket: cluster_data['config']['configBucket'] = self.storage_bucket if self.metadata: http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/98b4df94/tests/contrib/operators/test_dataproc_operator.py -- diff --git a/tests/contrib/operators/test_dataproc_operator.py b/tests/contrib/operators/test_dataproc_operator.py index d206fba..ad78a8d 100644 --- a/tests/contrib/operators/test_dataproc_operator.py +++ b/tests/contrib/operators/test_dataproc_operator.py @@ -129,7 +129,7 @@ class DataprocClusterCreateOperatorTest(unittest.TestCase): # set to the dataproc operator. merged_labels = {} merged_labels.update(self.labels[suffix]) -merged_labels.update({'airflow-version': 'v' + version.replace('.', '-')}) +merged_labels.update({'airflow-version': 'v' + version.replace('.', '-').replace('+','-')}) self.assertTrue(re.match(r'[a-z]([-a-z0-9]*[a-z0-9])?', cluster_data['labels']['airflow-version'])) self.assertEqual(cluster_data['labels'], merged_labels)
[jira] [Updated] (AIRFLOW-1696) GCP Dataproc Operator fails due to '+' in Airflow version
[ https://issues.apache.org/jira/browse/AIRFLOW-1696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini updated AIRFLOW-1696: - Fix Version/s: 1.10.0 > GCP Dataproc Operator fails due to '+' in Airflow version > - > > Key: AIRFLOW-1696 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1696 > Project: Apache Airflow > Issue Type: Bug >Reporter: Trevor Edwards >Assignee: Trevor Edwards > Fix For: 1.10.0 > > > Since Dataproc operator attaches the Airflow version as one of its labels, > and dataproc labels cannot include the character '+', dataproc operator > currently fails with the following error: > {code:none} > [2017-10-09 19:28:48,035] {base_task_runner.py:115} INFO - Running: ['bash', > '-c', u'airflow run smokey-dataproc start_cluster 2017-10-08T00:00:00 > --job_id 6 --raw -sd DAGS_FOLDER/dataa.py'] > [2017-10-09 19:28:49,041] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,041] {__init__.py:45} INFO - Using executor LocalExecutor > [2017-10-09 19:28:49,139] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,139] {models.py:187} INFO - Filling up the DagBag from > /home/airflow/dags/dataa.py > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: Cluster > name: smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,258] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,257] {dataproc_operator.py:267} INFO - Creating cluster: > smoke-cluster-aa05845a-1b60-4543-94d0-f7dfddb90ee0 > [2017-10-09 19:28:49,265] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:49,265] {gcp_api_base_hook.py:82} INFO - Getting connection > using a JSON key file. > [2017-10-09 19:28:59,909] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,906] {models.py:1564} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,910] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/models.py", line 1462, in > _run_raw_task > [2017-10-09 19:28:59,911] {base_task_runner.py:98} INFO - Subtask: result > = task_copy.execute(context=context) > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: File > "/home/airflow/incubator-airflow/airflow/contrib/operators/dataproc_operator.py", > line 300, in execute > [2017-10-09 19:28:59,912] {base_task_runner.py:98} INFO - Subtask: raise e > [2017-10-09 19:28:59,913] {base_task_runner.py:98} INFO - Subtask: HttpError: > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,914] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,915] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,909] {models.py:1585} INFO - Marking task as UP_FOR_RETRY > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: > [2017-10-09 19:28:59,977] {models.py:1613} ERROR - requesting > https://dataproc.googleapis.com/v1/projects/cloud-airflow-test/regions/global/clusters?alt=json > returned "Multiple validation errors: > [2017-10-09 19:28:59,978] {base_task_runner.py:98} INFO - Subtask: - Not a > valid value: "v1-10-0dev0+incubating". Only lowercase letters, numbers, and > dashes are allowed. The value must start with lowercase letter or number and > end with a lowercase letter or number. > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: - User > label value must conform to '[\p{Ll}\p{Lo}\p{N}_-]{0,63}' pattern"> > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: Traceback > (most recent call last): > [2017-10-09 19:28:59,979] {base_task_runner.py:98} INFO - Subtask: File > "/usr/local/bin/airflow", line 6, in > [2017-10-09
[jira] [Resolved] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3
[ https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Riccomini resolved AIRFLOW-1613. -- Resolution: Fixed > Make MySqlToGoogleCloudStorageOperator compaitible with python3 > --- > > Key: AIRFLOW-1613 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1613 > Project: Apache Airflow > Issue Type: Bug > Components: contrib >Reporter: Joy Gao >Assignee: Joy Gao > Fix For: 1.9.0 > > > 1. > In Python 3, map(...) returns an iterator, which can only be iterated over > once. > Therefore the current implementation will return an empty list after the > first iteration of schema: > {code} > schema = map(lambda schema_tuple: schema_tuple[0], cursor.description) > file_no = 0 > tmp_file_handle = NamedTemporaryFile(delete=True) > tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} > for row in cursor: > # Convert datetime objects to utc seconds, and decimals to floats > row = map(self.convert_types, row) > row_dict = dict(zip(schema, row)) > {code} > 2. > File opened as binary, but string are written to it. Get error `a bytes-like > object is required, not 'str'`. Use mode='w' instead. > 3. > Operator currently does not support binary columns in mysql. We should > support uploading binary columns from mysql to cloud storage as it's a pretty > common use-case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3
[ https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200873#comment-16200873 ] ASF subversion and git services commented on AIRFLOW-1613: -- Commit afcdd097d765c8e88b4858436bb01585bc04ba7f in incubator-airflow's branch refs/heads/v1-9-test from [~joy.gao54] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=afcdd09 ] [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator Closes #2680 from jgao54/write-binary > Make MySqlToGoogleCloudStorageOperator compaitible with python3 > --- > > Key: AIRFLOW-1613 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1613 > Project: Apache Airflow > Issue Type: Bug > Components: contrib >Reporter: Joy Gao >Assignee: Joy Gao > Fix For: 1.9.0 > > > 1. > In Python 3, map(...) returns an iterator, which can only be iterated over > once. > Therefore the current implementation will return an empty list after the > first iteration of schema: > {code} > schema = map(lambda schema_tuple: schema_tuple[0], cursor.description) > file_no = 0 > tmp_file_handle = NamedTemporaryFile(delete=True) > tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} > for row in cursor: > # Convert datetime objects to utc seconds, and decimals to floats > row = map(self.convert_types, row) > row_dict = dict(zip(schema, row)) > {code} > 2. > File opened as binary, but string are written to it. Get error `a bytes-like > object is required, not 'str'`. Use mode='w' instead. > 3. > Operator currently does not support binary columns in mysql. We should > support uploading binary columns from mysql to cloud storage as it's a pretty > common use-case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
incubator-airflow git commit: [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator
Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test ace2b1d24 -> afcdd097d [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator Closes #2680 from jgao54/write-binary Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/afcdd097 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/afcdd097 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/afcdd097 Branch: refs/heads/v1-9-test Commit: afcdd097d765c8e88b4858436bb01585bc04ba7f Parents: ace2b1d Author: Joy GaoAuthored: Wed Oct 11 13:06:17 2017 -0700 Committer: Chris Riccomini Committed: Wed Oct 11 13:07:32 2017 -0700 -- airflow/contrib/hooks/bigquery_hook.py| 4 ++- airflow/contrib/operators/mysql_to_gcs.py | 48 +++--- 2 files changed, 38 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/hooks/bigquery_hook.py -- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 5fc7e22..fab2a43 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -971,7 +971,9 @@ def _bq_cast(string_field, bq_type): if string_field is None: return None elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP': -return int(string_field) +# convert to float first to handle cases where string_field is +# represented in scientific notation +return int(float(string_field)) elif bq_type == 'FLOAT': return float(string_field) elif bq_type == 'BOOLEAN': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/afcdd097/airflow/contrib/operators/mysql_to_gcs.py -- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index f94bc24..47b7ac9 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -14,6 +14,7 @@ import json import time +import base64 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.hooks.mysql_hook import MySqlHook @@ -21,7 +22,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from datetime import date, datetime from decimal import Decimal -from MySQLdb.constants import FIELD_TYPE +from MySQLdb.constants import FIELD_TYPE, FLAG from tempfile import NamedTemporaryFile @@ -120,15 +121,20 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ -schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) +field_names = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) +mysql_types = list(map(lambda schema_tuple: schema_tuple[1], cursor.description)) +byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, cursor.description_flags)] + file_no = 0 -tmp_file_handle = NamedTemporaryFile(delete=True) +tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} for row in cursor: -# Convert datetime objects to utc seconds, and decimals to floats -row = map(self.convert_types, row) -row_dict = dict(zip(schema, row)) +# Convert datetime objects to utc seconds, decimals to floats, and binaries +# to base64-encoded strings +row_dict = {} +for name, value, is_binary in zip(field_names, row, byte_fields): +row_dict[name] = self.convert_types(value, is_binary) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. json.dump(row_dict, tmp_file_handle) @@ -139,7 +145,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): # Stop if the file exceeds the file size limit. if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: file_no += 1 -tmp_file_handle = NamedTemporaryFile(delete=True) +tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle return tmp_file_handles @@ -154,10 +160,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): contains the BigQuery schema fields in .json format. """
[jira] [Commented] (AIRFLOW-1613) Make MySqlToGoogleCloudStorageOperator compaitible with python3
[ https://issues.apache.org/jira/browse/AIRFLOW-1613?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200871#comment-16200871 ] ASF subversion and git services commented on AIRFLOW-1613: -- Commit d578b292e96d5fdd87b5168508005cd73edc4f96 in incubator-airflow's branch refs/heads/master from [~joy.gao54] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=d578b29 ] [AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator Closes #2680 from jgao54/write-binary > Make MySqlToGoogleCloudStorageOperator compaitible with python3 > --- > > Key: AIRFLOW-1613 > URL: https://issues.apache.org/jira/browse/AIRFLOW-1613 > Project: Apache Airflow > Issue Type: Bug > Components: contrib >Reporter: Joy Gao >Assignee: Joy Gao > Fix For: 1.9.0 > > > 1. > In Python 3, map(...) returns an iterator, which can only be iterated over > once. > Therefore the current implementation will return an empty list after the > first iteration of schema: > {code} > schema = map(lambda schema_tuple: schema_tuple[0], cursor.description) > file_no = 0 > tmp_file_handle = NamedTemporaryFile(delete=True) > tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} > for row in cursor: > # Convert datetime objects to utc seconds, and decimals to floats > row = map(self.convert_types, row) > row_dict = dict(zip(schema, row)) > {code} > 2. > File opened as binary, but string are written to it. Get error `a bytes-like > object is required, not 'str'`. Use mode='w' instead. > 3. > Operator currently does not support binary columns in mysql. We should > support uploading binary columns from mysql to cloud storage as it's a pretty > common use-case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1703) Airflow LocalExecutor crashes after 3 hours of work. Database is locked
Kirill Dubovikov created AIRFLOW-1703: - Summary: Airflow LocalExecutor crashes after 3 hours of work. Database is locked Key: AIRFLOW-1703 URL: https://issues.apache.org/jira/browse/AIRFLOW-1703 Project: Apache Airflow Issue Type: Bug Components: db, worker Affects Versions: Airflow 1.8 Environment: Single CentOS virtual server Reporter: Kirill Dubovikov Attachments: nohup.out Airflow consistently crashes after working several hours on a single node when using SQLite DB. Our DAG is scheduled to run {{@daily}}. We launch airflow using the following commands {code:sh} airflow scheduler airflow webserver -p 8080 {code} After a while worker and webserver crash with the following error: {{sqlalchemy.exc.OperationalError: (sqlite3.OperationalError) database is locked [SQL: 'SELECT connection.conn_id AS connection_conn_id \nFROM connection GROUP BY connection.conn_id']}} I've attached full logs for further investigation -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (AIRFLOW-1702) access to the count of the happened retries in a python method
Igor Cherepanov created AIRFLOW-1702: Summary: access to the count of the happened retries in a python method Key: AIRFLOW-1702 URL: https://issues.apache.org/jira/browse/AIRFLOW-1702 Project: Apache Airflow Issue Type: Wish Reporter: Igor Cherepanov hello, is it possible to access to the count of the happened retries in a python method thanks! -- This message was sent by Atlassian JIRA (v6.4.14#64029)