[GitHub] codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end
codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end URL: https://github.com/apache/incubator-airflow/pull/4087#issuecomment-434066340 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=h1) Report > Merging [#4087](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/66cad8d6a0d98b026fb5c27c1f5c60e56cd7b111?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `60%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4087/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4087 +/- ## == + Coverage 76.66% 76.66% +<.01% == Files 199 199 Lines 1619216197 +5 == + Hits1241312418 +5 Misses 3779 3779 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/settings.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9zZXR0aW5ncy5weQ==) | `80.41% <60%> (-0.74%)` | :arrow_down: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (+0.18%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=footer). Last update [66cad8d...9a9234e](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end
codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end URL: https://github.com/apache/incubator-airflow/pull/4087#issuecomment-434066340 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=h1) Report > Merging [#4087](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/66cad8d6a0d98b026fb5c27c1f5c60e56cd7b111?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `60%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4087/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4087 +/- ## == + Coverage 76.66% 76.66% +<.01% == Files 199 199 Lines 1619216197 +5 == + Hits1241312418 +5 Misses 3779 3779 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/settings.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9zZXR0aW5ncy5weQ==) | `80.41% <60%> (-0.74%)` | :arrow_down: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (+0.18%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=footer). Last update [66cad8d...9a9234e](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on a change in pull request #4118: [AIRFLOW-3271] Airflow RBAC Permissions modification via UI do not persist
XD-DENG commented on a change in pull request #4118: [AIRFLOW-3271] Airflow RBAC Permissions modification via UI do not persist URL: https://github.com/apache/incubator-airflow/pull/4118#discussion_r229934643 ## File path: tests/www_rbac/test_security.py ## @@ -107,6 +107,20 @@ def test_init_role_modelview(self): self.assertIsNotNone(role) self.assertEqual(len(role_perms), len(role.permissions)) +def test_update_and_verify_permission_role(self): +self.security_manager.init_role('User', [], []) Review comment: Seems using `'User'` here break `test_log_success_for_user (tests.www_rbac.test_views.TestDagACLView)`. May I suggest to use a "fake" role here for this test, say `self.security_manager.init_role('Test_Role', [], [])`, then make the same change for line 119 as well. It should help pass the CI tests. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] XD-DENG commented on issue #4118: [AIRFLOW-3271] Airflow RBAC Permissions modification via UI do not persist
XD-DENG commented on issue #4118: [AIRFLOW-3271] Airflow RBAC Permissions modification via UI do not persist URL: https://github.com/apache/incubator-airflow/pull/4118#issuecomment-434923995 Cool. I observer this issue as well. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3118) DAGs not successful on new installation
[ https://issues.apache.org/jira/browse/AIRFLOW-3118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16671023#comment-16671023 ] Huy Nguyen commented on AIRFLOW-3118: - [~kaxilnaik] [~ashb] I'm a first-time contributor and would like to start with this low-impact bug. I was able to reproduce this locally and am testing a fix. Do you guys mind if I grab this JIRA? > DAGs not successful on new installation > --- > > Key: AIRFLOW-3118 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3118 > Project: Apache Airflow > Issue Type: Bug > Components: DAG >Affects Versions: 1.10.0 > Environment: Ubuntu 18.04 > Python 3.6 >Reporter: Brylie Christopher Oxley >Priority: Blocker > Attachments: Screenshot_20180926_161837.png, > image-2018-09-26-12-39-03-094.png > > > When trying out Airflow, on localhost, none of the DAG runs are getting to > the 'success' state. They are getting stuck in 'running', or I manually label > them as failed: > !image-2018-09-26-12-39-03-094.png! > h2. Steps to reproduce > # create new conda environment > ** conda create -n airflow > ** source activate airflow > # install airflow > ** pip install apache-airflow > # initialize Airflow db > ** airflow initdb > # disable default paused setting in airflow.cfg > ** dags_are_paused_at_creation = False > # {color:#6a8759}run airflow and airflow scheduler (in separate > terminal){color} > ** {color:#6a8759}airflow scheduler{color} > ** {color:#6a8759}airflow webserver{color} > # {color:#6a8759}unpause example_bash_operator{color} > ** {color:#6a8759}airflow unpause example_bash_operator{color} > # {color:#6a8759}log in to Airflow UI{color} > # {color:#6a8759}turn on example_bash_operator{color} > # {color:#6a8759}click "Trigger DAG" in `example_bash_operator` row{color} > h2. {color:#6a8759}Observed result{color} > {color:#6a8759}The `example_bash_operator` never leaves the "running" > state.{color} > h2. {color:#6a8759}Expected result{color} > {color:#6a8759}The `example_bash_operator` would quickly enter the "success" > state{color} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-3136) Scheduler Failing the Task retries run while processing Executor Events
[ https://issues.apache.org/jira/browse/AIRFLOW-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vardan Gupta reassigned AIRFLOW-3136: - Assignee: Vardan Gupta > Scheduler Failing the Task retries run while processing Executor Events > --- > > Key: AIRFLOW-3136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3136 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0 >Reporter: raman >Assignee: Vardan Gupta >Priority: Major > Fix For: 2.0.0, 1.10.1 > > > Following behaviour is observed with Airflow 1.9 with LocalExecutor mode > > Airflow scheduler processes the executor events in > "_process_executor_events(self, simple_dag_bag, session=None)" function of > jobs.py. > The events are identified by key which is composed of dag id, task id, > execution date. So all retries of a task have the same key. > If task retry interval is very small like 30 seconds than scheduler might > schedule the next retry run while the previous task run result is still in > the executor event queue. > Current task run might be in queued state while scheduler is processing the > executor's previous events Which might make scheduler to fail the current run > because of following code in the jobs.py file > def _process_executor_events(self, simple_dag_bag, session=None): > """ > Respond to executor events. > """ > # TODO: this shares quite a lot of code with _manage_executor_state > TI = models.TaskInstance > *for key, state in > list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)* > *.items()):* > dag_id, task_id, execution_date = key > self.log.info( > "Executor reports %s.%s execution_date=%s as %s", > dag_id, task_id, execution_date, state > ) > if state == State.FAILED or state == State.SUCCESS: > qry = session.query(TI).filter(TI.dag_id == dag_id, > TI.task_id == task_id, > TI.execution_date == execution_date) > ti = qry.first() > if not ti: > self.log.warning("TaskInstance %s went missing from the database", ti) > continue > TODO: should we fail RUNNING as well, as we do in Backfills? > *if ti.state == State.QUEUED:* > msg = ("Executor reports task instance %s finished (%s) " > "although the task says its %s. Was the task " > "killed externally?".format(ti, state, ti.state)) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io edited a comment on issue #2825: [AIRFLOW-1117] Change default min_file_process_interval
codecov-io edited a comment on issue #2825: [AIRFLOW-1117] Change default min_file_process_interval URL: https://github.com/apache/incubator-airflow/pull/2825#issuecomment-348044711 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=h1) Report > Merging [#2825](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `2.82%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/2825/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=tree) ```diff @@Coverage Diff @@ ## master#2825 +/- ## == - Coverage 76.67% 73.84% -2.83% == Files 199 159 -40 Lines 1618612076-4110 == - Hits12410 8918-3492 + Misses 3776 3158 -618 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/operators/email\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvZW1haWxfb3BlcmF0b3IucHk=) | `0% <0%> (-100%)` | :arrow_down: | | [airflow/hooks/pig\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9waWdfaG9vay5weQ==) | `0% <0%> (-100%)` | :arrow_down: | | [airflow/operators/slack\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvc2xhY2tfb3BlcmF0b3IucHk=) | `0% <0%> (-97.37%)` | :arrow_down: | | [airflow/operators/s3\_file\_transform\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvczNfZmlsZV90cmFuc2Zvcm1fb3BlcmF0b3IucHk=) | `0% <0%> (-96.23%)` | :arrow_down: | | [airflow/operators/redshift\_to\_s3\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvcmVkc2hpZnRfdG9fczNfb3BlcmF0b3IucHk=) | `0% <0%> (-95.46%)` | :arrow_down: | | [airflow/hooks/mssql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9tc3NxbF9ob29rLnB5) | `6.66% <0%> (-66.67%)` | :arrow_down: | | [airflow/hooks/hdfs\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9oZGZzX2hvb2sucHk=) | `32.5% <0%> (-60%)` | :arrow_down: | | [airflow/operators/hive\_operator.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9vcGVyYXRvcnMvaGl2ZV9vcGVyYXRvci5weQ==) | `41.02% <0%> (-45.52%)` | :arrow_down: | | [airflow/hooks/hive\_hooks.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9oaXZlX2hvb2tzLnB5) | `39.52% <0%> (-33.9%)` | :arrow_down: | | [airflow/utils/helpers.py](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9oZWxwZXJzLnB5) | `53.44% <0%> (-30.93%)` | :arrow_down: | | ... and [196 more](https://codecov.io/gh/apache/incubator-airflow/pull/2825/diff?src=pr=tree-more) | | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=footer). Last update [e703d6b...c3d009a](https://codecov.io/gh/apache/incubator-airflow/pull/2825?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229910341 ## File path: tests/hooks/test_s3_hook.py ## @@ -74,6 +75,13 @@ def test_get_bucket(self): b = hook.get_bucket('bucket') self.assertIsNotNone(b) +@mock_s3 +def test_create_bucket(self): +hook = S3Hook(aws_conn_id=None) +hook.create_bucket(bucket_name='new_bucket') +b = hook.get_bucket('new_bucket') +self.assertIsNotNone(b) Review comment: I added a region_name argument(default to region where s3 connection is) for create_bucket because I just realize it's not a good idea to force users to always create bucket in the region where s3 connection is. And I added 2 more unit tests. One for explicitly test region us-east-1 since it's special in S3. The other tests us-east-2 as a non-default customized region. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r22991 ## File path: airflow/contrib/operators/sagemaker_tuning_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTuningOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker hyper-parameter tuning job. + +This operator returns The ARN of the tuning job created in Amazon SageMaker. + +:param config: The configuration necessary to start a tuning job (templated) +:type config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: str +:param wait_for_completion: if the operator should block until tuning job finishes +:type wait_for_completion: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the tuning job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the tuning job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because tuning can take very long) +:type max_ingestion_time: int +""" + +integer_fields = [ +['HyperParameterTuningJobConfig', 'ResourceLimits', 'MaxNumberOfTrainingJobs'], +['HyperParameterTuningJobConfig', 'ResourceLimits', 'MaxParallelTrainingJobs'], +['TrainingJobDefinition', 'ResourceConfig', 'InstanceCount'], +['TrainingJobDefinition', 'ResourceConfig', 'VolumeSizeInGB'], +['TrainingJobDefinition', 'StoppingCondition', 'MaxRuntimeInSeconds'] +] + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTuningOperator, self).__init__(config=config, + *args, **kwargs) +self.config = config +self.wait_for_completion = wait_for_completion +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time + +def expand_role(self): +if 'TrainingJobDefinition' in self.config: +config = self.config['TrainingJobDefinition'] +if 'RoleArn' in config: +hook = AwsHook() Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229909963 ## File path: airflow/contrib/operators/sagemaker_training_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTrainingOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker training job. + +This operator returns The ARN of the training job created in Amazon SageMaker. + +:param config: The configuration necessary to start a training job (templated) +:type config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: str +:param wait_for_completion: if the operator should block until training job finishes +:type wait_for_completion: bool +:param print_log: if the operator should print the cloudwatch log during training +:type print_log: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the training job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the training job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because training can take very long) +Setting it to None implies no timeout. +:type max_ingestion_time: int +""" + +integer_fields = [ +['ResourceConfig', 'InstanceCount'], +['ResourceConfig', 'VolumeSizeInGB'], +['StoppingCondition', 'MaxRuntimeInSeconds'] +] + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + print_log=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTrainingOperator, self).__init__(config=config, +*args, **kwargs) + +self.wait_for_completion = wait_for_completion +self.print_log = print_log +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time + +def expand_role(self): +if 'RoleArn' in self.config: +hook = AwsHook() Review comment: Nice catch! Thanks Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229909989 ## File path: airflow/contrib/operators/sagemaker_transform_operator.py ## @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTransformOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker transform job. + +This operator returns The ARN of the model created in Amazon SageMaker. + +:param config: The configuration necessary to start a transform job (templated) +:type config: dict +:param model_config: +The configuration necessary to create a SageMaker model, the default is none +which means the SageMaker model used for the SageMaker transform job already exists. +If given, it will be used to create a SageMaker model before creating +the SageMaker transform job +:type model_config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: string +:param wait_for_completion: if the program should keep running until job finishes +:type wait_for_completion: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the transform job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the transform job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because transform can take very long) +:type max_ingestion_time: int +""" + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTransformOperator, self).__init__(config=config, + *args, **kwargs) +self.config = config +self.wait_for_completion = wait_for_completion +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time +self.create_integer_fields() + +def create_integer_fields(self): +self.integer_fields = [ +['Transform', 'TransformResources', 'InstanceCount'], +['Transform', 'MaxConcurrentTransforms'], +['Transform', 'MaxPayloadInMB'] +] +if 'Transform' not in self.config: +for field in self.integer_fields: +field.pop(0) + +def expand_role(self): +if 'Model' not in self.config: +return +config = self.config['Model'] +if 'ExecutionRoleArn' in config: +hook = AwsHook() Review comment: Updated. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229909937 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') +status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + +return '\n'.join(status_strs) + + class SageMakerHook(AwsHook): """ Interact with Amazon SageMaker. -sagemaker_conn_id is required for using -the config stored in db for training/tuning """ -non_terminal_states = {'InProgress', 'Stopping', 'Stopped'} +non_terminal_states = {'InProgress', 'Stopping'} +endpoint_non_terminal_states = {'Creating', 'Updating',
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229909883 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') Review comment: This is s nice! Thanks! I tried it and it worked if the datetime object has a timezone explicitly specified. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-3282) Implement a Azure Kubernetes Service Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-3282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aparna updated AIRFLOW-3282: Summary: Implement a Azure Kubernetes Service Operator (was: Implement a Azure Kubernetes Service) > Implement a Azure Kubernetes Service Operator > - > > Key: AIRFLOW-3282 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3282 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Aparna >Assignee: Aparna >Priority: Major > > Add AKS Operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3282) Implement a Azure Kubernetes Service
Aparna created AIRFLOW-3282: --- Summary: Implement a Azure Kubernetes Service Key: AIRFLOW-3282 URL: https://issues.apache.org/jira/browse/AIRFLOW-3282 Project: Apache Airflow Issue Type: New Feature Reporter: Aparna Assignee: Aparna Add AKS Operator -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#issuecomment-433705746 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=h1) Report > Merging [#4111](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4111/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4111 +/- ## == + Coverage 76.68% 76.68% +<.01% == Files 199 199 Lines 1618916192 +3 == + Hits1241412417 +3 Misses 3775 3775 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4111/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (-0.05%)` | :arrow_down: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4111/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.46% <0%> (+0.15%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=footer). Last update [c4e5151...f853ed5](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
codecov-io edited a comment on issue #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#issuecomment-433705746 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=h1) Report > Merging [#4111](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/c4e5151bcd095eae1cd6ca1b4e96b302df3a2166?src=pr=desc) will **increase** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4111/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4111 +/- ## == + Coverage 76.68% 76.68% +<.01% == Files 199 199 Lines 1618916192 +3 == + Hits1241412417 +3 Misses 3775 3775 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4111/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (-0.05%)` | :arrow_down: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4111/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.46% <0%> (+0.15%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=footer). Last update [c4e5151...f853ed5](https://codecov.io/gh/apache/incubator-airflow/pull/4111?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
codecov-io edited a comment on issue #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#issuecomment-434876896 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=h1) Report > Merging [#4120](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4120/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4120 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=footer). Last update [215b8c8...e88dfd4](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-3236) Create AzureDataLakeStorageListOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-3236?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-3236. - Resolution: Fixed Fix Version/s: 2.0.0 Resolved by https://github.com/apache/incubator-airflow/pull/4094 > Create AzureDataLakeStorageListOperator > --- > > Key: AIRFLOW-3236 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3236 > Project: Apache Airflow > Issue Type: Improvement > Components: operators >Reporter: Brandon Kvarda >Assignee: Brandon Kvarda >Priority: Minor > Fix For: 2.0.0 > > > Creates an Operator that is similar to GoogleCloudStorageListOperator and > S3ListOperator that returns a list of files at some specified path. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-3236) Create AzureDataLakeStorageListOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-3236?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670879#comment-16670879 ] ASF GitHub Bot commented on AIRFLOW-3236: - kaxil closed pull request #4094: [AIRFLOW-3236] Create AzureDataLakeStorageListOperator URL: https://github.com/apache/incubator-airflow/pull/4094 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/adls_list_operator.py b/airflow/contrib/operators/adls_list_operator.py new file mode 100644 index 00..7d03e86b17 --- /dev/null +++ b/airflow/contrib/operators/adls_list_operator.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class AzureDataLakeStorageListOperator(BaseOperator): +""" +List all files from the specified path + +This operator returns a python list with the names of files which can be used by + `xcom` in the downstream tasks. + +:param path: The Azure Data Lake path to find the objects. Supports glob +strings (templated) +:type path: str +:param azure_data_lake_conn_id: The connection ID to use when +connecting to Azure Data Lake Storage. +:type azure_data_lake_conn_id: str + +**Example**: +The following Operator would list all the Parquet files from ``folder/output/`` +folder in the specified ADLS account :: + +adls_files = AzureDataLakeStorageListOperator( +task_id='adls_files', +path='folder/output/*.parquet', +azure_data_lake_conn_id='azure_data_lake_default' +) +""" +template_fields = ('path',) +ui_color = '#901dd2' + +@apply_defaults +def __init__(self, + path, + azure_data_lake_conn_id='azure_data_lake_default', + *args, + **kwargs): +super(AzureDataLakeStorageListOperator, self).__init__(*args, **kwargs) +self.path = path +self.azure_data_lake_conn_id = azure_data_lake_conn_id + +def execute(self, context): + +hook = AzureDataLakeHook( +azure_data_lake_conn_id=self.azure_data_lake_conn_id +) + +self.log.info('Getting list of ADLS files in path: %s', self.path) + +return hook.list(path=self.path) diff --git a/docs/code.rst b/docs/code.rst index 211e1abafe..5b4a494911 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -112,6 +112,7 @@ Operators ^ .. Alphabetize this list +.. autoclass:: airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator .. autoclass:: airflow.contrib.operators.awsbatch_operator.AWSBatchOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator diff --git a/docs/integration.rst b/docs/integration.rst index 67298b15b6..d12b94b8b6 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -145,7 +145,7 @@ Airflow can be configured to read and write task logs in Azure Blob Storage. See :ref:`write-logs-azure`. Azure Data Lake -'' +''' AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a @@ -153,14 +153,22 @@ login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) a (see connection `azure_data_lake_default` for an example). - :ref:`AzureDataLakeHook`: Interface with Azure Data Lake. +- :ref:`AzureDataLakeStorageListOperator`: Lists the files located in a specified Azure Data Lake path. .. _AzureDataLakeHook: AzureDataLakeHook -"
[GitHub] kaxil closed pull request #4094: [AIRFLOW-3236] Create AzureDataLakeStorageListOperator
kaxil closed pull request #4094: [AIRFLOW-3236] Create AzureDataLakeStorageListOperator URL: https://github.com/apache/incubator-airflow/pull/4094 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/operators/adls_list_operator.py b/airflow/contrib/operators/adls_list_operator.py new file mode 100644 index 00..7d03e86b17 --- /dev/null +++ b/airflow/contrib/operators/adls_list_operator.py @@ -0,0 +1,70 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + + +class AzureDataLakeStorageListOperator(BaseOperator): +""" +List all files from the specified path + +This operator returns a python list with the names of files which can be used by + `xcom` in the downstream tasks. + +:param path: The Azure Data Lake path to find the objects. Supports glob +strings (templated) +:type path: str +:param azure_data_lake_conn_id: The connection ID to use when +connecting to Azure Data Lake Storage. +:type azure_data_lake_conn_id: str + +**Example**: +The following Operator would list all the Parquet files from ``folder/output/`` +folder in the specified ADLS account :: + +adls_files = AzureDataLakeStorageListOperator( +task_id='adls_files', +path='folder/output/*.parquet', +azure_data_lake_conn_id='azure_data_lake_default' +) +""" +template_fields = ('path',) +ui_color = '#901dd2' + +@apply_defaults +def __init__(self, + path, + azure_data_lake_conn_id='azure_data_lake_default', + *args, + **kwargs): +super(AzureDataLakeStorageListOperator, self).__init__(*args, **kwargs) +self.path = path +self.azure_data_lake_conn_id = azure_data_lake_conn_id + +def execute(self, context): + +hook = AzureDataLakeHook( +azure_data_lake_conn_id=self.azure_data_lake_conn_id +) + +self.log.info('Getting list of ADLS files in path: %s', self.path) + +return hook.list(path=self.path) diff --git a/docs/code.rst b/docs/code.rst index 211e1abafe..5b4a494911 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -112,6 +112,7 @@ Operators ^ .. Alphabetize this list +.. autoclass:: airflow.contrib.operators.adls_list_operator.AzureDataLakeStorageListOperator .. autoclass:: airflow.contrib.operators.awsbatch_operator.AWSBatchOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryValueCheckOperator diff --git a/docs/integration.rst b/docs/integration.rst index 67298b15b6..d12b94b8b6 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -145,7 +145,7 @@ Airflow can be configured to read and write task logs in Azure Blob Storage. See :ref:`write-logs-azure`. Azure Data Lake -'' +''' AzureDataLakeHook communicates via a REST API compatible with WebHDFS. Make sure that a Airflow connection of type `azure_data_lake` exists. Authorization can be done by supplying a @@ -153,14 +153,22 @@ login (=Client ID), password (=Client Secret) and extra fields tenant (Tenant) a (see connection `azure_data_lake_default` for an example). - :ref:`AzureDataLakeHook`: Interface with Azure Data Lake. +- :ref:`AzureDataLakeStorageListOperator`: Lists the files located in a specified Azure Data Lake path. .. _AzureDataLakeHook: AzureDataLakeHook -" +" .. autoclass:: airflow.contrib.hooks.azure_data_lake_hook.AzureDataLakeHook +.. _AzureDataLakeStorageListOperator: + +AzureDataLakeStorageListOperator + + +.. autoclass::
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229897042 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') +status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + +return '\n'.join(status_strs) + + class SageMakerHook(AwsHook): """ Interact with Amazon SageMaker. -sagemaker_conn_id is required for using -the config stored in db for training/tuning """ -non_terminal_states = {'InProgress', 'Stopping', 'Stopped'} +non_terminal_states = {'InProgress', 'Stopping'} +endpoint_non_terminal_states = {'Creating', 'Updating',
[GitHub] yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
yangaws commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229896905 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') +status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + +return '\n'.join(status_strs) + + class SageMakerHook(AwsHook): """ Interact with Amazon SageMaker. -sagemaker_conn_id is required for using -the config stored in db for training/tuning """ -non_terminal_states = {'InProgress', 'Stopping', 'Stopped'} +non_terminal_states = {'InProgress', 'Stopping'} +endpoint_non_terminal_states = {'Creating', 'Updating',
[GitHub] codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end
codecov-io edited a comment on issue #4087: [WIP][AIRFLOW-2192] Allow non-latin1 usernames with MySQL back-end URL: https://github.com/apache/incubator-airflow/pull/4087#issuecomment-434066340 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=h1) Report > Merging [#4087](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `60%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4087/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4087 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616197 +11 == + Hits1241012418 +8 - Misses 3776 3779 +3 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/settings.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9zZXR0aW5ncy5weQ==) | `80.41% <60%> (-0.74%)` | :arrow_down: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.09% <0%> (+0.04%)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4087/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=footer). Last update [e703d6b...43f8462](https://codecov.io/gh/apache/incubator-airflow/pull/4087?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-3231) Basic operators for Google Cloud SQL (deploy / patch / delete)
[ https://issues.apache.org/jira/browse/AIRFLOW-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik resolved AIRFLOW-3231. - Resolution: Fixed Fix Version/s: 1.10.1 Resolved by https://github.com/apache/incubator-airflow/pull/4097 > Basic operators for Google Cloud SQL (deploy / patch / delete) > -- > > Key: AIRFLOW-3231 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3231 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp >Reporter: Szymon Przedwojski >Assignee: Szymon Przedwojski >Priority: Trivial > Fix For: 1.10.1 > > > In order to be able to interact with Google Cloud SQL, we need operators that > should be able to: > Deploy Instance (Insert or Update): > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert] > [https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/update]) > Patch Instance > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch]) > Delete Instance: > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3231) Basic operators for Google Cloud SQL (deploy / patch / delete)
[ https://issues.apache.org/jira/browse/AIRFLOW-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kaxil Naik updated AIRFLOW-3231: Component/s: gcp > Basic operators for Google Cloud SQL (deploy / patch / delete) > -- > > Key: AIRFLOW-3231 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3231 > Project: Apache Airflow > Issue Type: New Feature > Components: gcp >Reporter: Szymon Przedwojski >Assignee: Szymon Przedwojski >Priority: Trivial > Fix For: 1.10.1 > > > In order to be able to interact with Google Cloud SQL, we need operators that > should be able to: > Deploy Instance (Insert or Update): > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert] > [https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/update]) > Patch Instance > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch]) > Delete Instance: > ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete]) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io commented on issue #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
codecov-io commented on issue #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#issuecomment-434876896 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=h1) Report > Merging [#4120](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4120/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4120 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4120/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=footer). Last update [215b8c8...e88dfd4](https://codecov.io/gh/apache/incubator-airflow/pull/4120?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3231) Basic operators for Google Cloud SQL (deploy / patch / delete)
[ https://issues.apache.org/jira/browse/AIRFLOW-3231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670871#comment-16670871 ] ASF GitHub Bot commented on AIRFLOW-3231: - kaxil closed pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py new file mode 100644 index 00..a484456f6e --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_sql.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance +in Google Cloud Platform. + +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance. +* INSTANCE_NAME - Name of the Cloud SQL instance. +""" + +import datetime + +import airflow +from airflow import models + +from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \ +CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator + +# [START howto_operator_cloudsql_arguments] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '') +# [END howto_operator_cloudsql_arguments] + +# Bodies below represent Cloud SQL instance resources: +# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances + +# [START howto_operator_cloudsql_create_body] +body = { +"name": INSTANCE_NAME, +"settings": { +"tier": "db-n1-standard-1", +"backupConfiguration": { +"binaryLogEnabled": True, +"enabled": True, +"startTime": "05:00" +}, +"activationPolicy": "ALWAYS", +"dataDiskSizeGb": 30, +"dataDiskType": "PD_SSD", +"databaseFlags": [], +"ipConfiguration": { +"ipv4Enabled": True, +"requireSsl": True, +}, +"locationPreference": { +"zone": "europe-west4-a" +}, +"maintenanceWindow": { +"hour": 5, +"day": 7, +"updateTrack": "canary" +}, +"pricingPlan": "PER_USE", +"replicationType": "ASYNCHRONOUS", +"storageAutoResize": False, +"storageAutoResizeLimit": 0, +"userLabels": { +"my-key": "my-value" +} +}, +"databaseVersion": "MYSQL_5_7", +"region": "europe-west4", +} +# [END howto_operator_cloudsql_create_body] +# [START howto_operator_cloudsql_patch_body] +patch_body = { +"name": INSTANCE_NAME, +"settings": { +"dataDiskSizeGb": 35, +"maintenanceWindow": { +"hour": 3, +"day": 6, +"updateTrack": "canary" +}, +"userLabels": { +"my-key-patch": "my-value-patch" +} +} +} +# [END howto_operator_cloudsql_patch_body] + +default_args = { +'start_date': airflow.utils.dates.days_ago(1) +} + +with models.DAG( +'example_gcp_sql', +default_args=default_args, +schedule_interval=datetime.timedelta(days=1) +) as dag: +# [START howto_operator_cloudsql_create] +sql_instance_create_task = CloudSqlInstanceCreateOperator( +project_id=PROJECT_ID, +body=body, +instance=INSTANCE_NAME, +task_id='sql_instance_create_task' +) +# [END howto_operator_cloudsql_create] +# [START howto_operator_cloudsql_patch] +sql_instance_patch_task = CloudSqlInstancePatchOperator( +project_id=PROJECT_ID, +body=patch_body, +instance=INSTANCE_NAME, +task_id='sql_instance_patch_task' +) +# [END howto_operator_cloudsql_patch] +# [START howto_operator_cloudsql_delete] +
[GitHub] kaxil closed pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
kaxil closed pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py new file mode 100644 index 00..a484456f6e --- /dev/null +++ b/airflow/contrib/example_dags/example_gcp_sql.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +""" +Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance +in Google Cloud Platform. + +This DAG relies on the following Airflow variables +https://airflow.apache.org/concepts.html#variables +* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance. +* INSTANCE_NAME - Name of the Cloud SQL instance. +""" + +import datetime + +import airflow +from airflow import models + +from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \ +CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator + +# [START howto_operator_cloudsql_arguments] +PROJECT_ID = models.Variable.get('PROJECT_ID', '') +INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '') +# [END howto_operator_cloudsql_arguments] + +# Bodies below represent Cloud SQL instance resources: +# https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances + +# [START howto_operator_cloudsql_create_body] +body = { +"name": INSTANCE_NAME, +"settings": { +"tier": "db-n1-standard-1", +"backupConfiguration": { +"binaryLogEnabled": True, +"enabled": True, +"startTime": "05:00" +}, +"activationPolicy": "ALWAYS", +"dataDiskSizeGb": 30, +"dataDiskType": "PD_SSD", +"databaseFlags": [], +"ipConfiguration": { +"ipv4Enabled": True, +"requireSsl": True, +}, +"locationPreference": { +"zone": "europe-west4-a" +}, +"maintenanceWindow": { +"hour": 5, +"day": 7, +"updateTrack": "canary" +}, +"pricingPlan": "PER_USE", +"replicationType": "ASYNCHRONOUS", +"storageAutoResize": False, +"storageAutoResizeLimit": 0, +"userLabels": { +"my-key": "my-value" +} +}, +"databaseVersion": "MYSQL_5_7", +"region": "europe-west4", +} +# [END howto_operator_cloudsql_create_body] +# [START howto_operator_cloudsql_patch_body] +patch_body = { +"name": INSTANCE_NAME, +"settings": { +"dataDiskSizeGb": 35, +"maintenanceWindow": { +"hour": 3, +"day": 6, +"updateTrack": "canary" +}, +"userLabels": { +"my-key-patch": "my-value-patch" +} +} +} +# [END howto_operator_cloudsql_patch_body] + +default_args = { +'start_date': airflow.utils.dates.days_ago(1) +} + +with models.DAG( +'example_gcp_sql', +default_args=default_args, +schedule_interval=datetime.timedelta(days=1) +) as dag: +# [START howto_operator_cloudsql_create] +sql_instance_create_task = CloudSqlInstanceCreateOperator( +project_id=PROJECT_ID, +body=body, +instance=INSTANCE_NAME, +task_id='sql_instance_create_task' +) +# [END howto_operator_cloudsql_create] +# [START howto_operator_cloudsql_patch] +sql_instance_patch_task = CloudSqlInstancePatchOperator( +project_id=PROJECT_ID, +body=patch_body, +instance=INSTANCE_NAME, +task_id='sql_instance_patch_task' +) +# [END howto_operator_cloudsql_patch] +# [START howto_operator_cloudsql_delete] +sql_instance_delete_task = CloudSqlInstanceDeleteOperator( +project_id=PROJECT_ID, +instance=INSTANCE_NAME, +task_id='sql_instance_delete_task' +) +# [END howto_operator_cloudsql_delete] + +
[GitHub] codecov-io edited a comment on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
codecov-io edited a comment on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119#issuecomment-434858962 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=h1) Report > Merging [#4119](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4119/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4119 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=footer). Last update [215b8c8...cc0c555](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] odracci commented on issue #3770: [AIRFLOW-3281] Fix Kubernetes operator with git-sync
odracci commented on issue #3770: [AIRFLOW-3281] Fix Kubernetes operator with git-sync URL: https://github.com/apache/incubator-airflow/pull/3770#issuecomment-434866757 @ashb ticket created, branch rebase /cc @Fokko @dimberman This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Work started] (AIRFLOW-3281) Kubernetes git sync implementation is broken
[ https://issues.apache.org/jira/browse/AIRFLOW-3281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Work on AIRFLOW-3281 started by Riccardo Bini. -- > Kubernetes git sync implementation is broken > > > Key: AIRFLOW-3281 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3281 > Project: Apache Airflow > Issue Type: Bug >Reporter: Riccardo Bini >Assignee: Riccardo Bini >Priority: Major > > The current implementation of git-sync when airflow is being used with > kubernetes is broken. > The init container doesn't share the volume with the airflow container and > the path of the dag folder doesn't take into account the fact that git sync > creates a sym link to the revision -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-3281) Kubernetes git sync implementation is broken
Riccardo Bini created AIRFLOW-3281: -- Summary: Kubernetes git sync implementation is broken Key: AIRFLOW-3281 URL: https://issues.apache.org/jira/browse/AIRFLOW-3281 Project: Apache Airflow Issue Type: Bug Reporter: Riccardo Bini Assignee: Riccardo Bini The current implementation of git-sync when airflow is being used with kubernetes is broken. The init container doesn't share the volume with the airflow container and the path of the dag folder doesn't take into account the fact that git sync creates a sym link to the revision -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io edited a comment on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io commented on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts
codecov-io commented on issue #4114: [AIRFLOW-3259] Fix internal server error when displaying charts URL: https://github.com/apache/incubator-airflow/pull/4114#issuecomment-434860238 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=h1) Report > Merging [#4114](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `0%`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4114/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4114 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4114/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=footer). Last update [215b8c8...26297aa](https://codecov.io/gh/apache/incubator-airflow/pull/4114?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
codecov-io commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119#issuecomment-434858962 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=h1) Report > Merging [#4119](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/215b8c8170bd63f4c449614945bb4b6d90f6a860?src=pr=desc) will **increase** coverage by `0.02%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4119/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4119 +/- ## == + Coverage 76.64% 76.67% +0.02% == Files 199 199 Lines 1619216192 == + Hits1241112415 +4 + Misses 3781 3777 -4 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.28%)` | :arrow_down: | | [airflow/www/views.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy93d3cvdmlld3MucHk=) | `68.73% <0%> (+0.12%)` | :arrow_up: | | [airflow/hooks/dbapi\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9kYmFwaV9ob29rLnB5) | `79.83% <0%> (+0.8%)` | :arrow_up: | | [airflow/utils/sqlalchemy.py](https://codecov.io/gh/apache/incubator-airflow/pull/4119/diff?src=pr=tree#diff-YWlyZmxvdy91dGlscy9zcWxhbGNoZW15LnB5) | `81.42% <0%> (+5.71%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=footer). Last update [215b8c8...cc0c555](https://codecov.io/gh/apache/incubator-airflow/pull/4119?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-3136) Scheduler Failing the Task retries run while processing Executor Events
[ https://issues.apache.org/jira/browse/AIRFLOW-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-3136. Resolution: Fixed Fix Version/s: 1.10.1 2.0.0 > Scheduler Failing the Task retries run while processing Executor Events > --- > > Key: AIRFLOW-3136 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3136 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0 >Reporter: raman >Priority: Major > Fix For: 2.0.0, 1.10.1 > > > Following behaviour is observed with Airflow 1.9 with LocalExecutor mode > > Airflow scheduler processes the executor events in > "_process_executor_events(self, simple_dag_bag, session=None)" function of > jobs.py. > The events are identified by key which is composed of dag id, task id, > execution date. So all retries of a task have the same key. > If task retry interval is very small like 30 seconds than scheduler might > schedule the next retry run while the previous task run result is still in > the executor event queue. > Current task run might be in queued state while scheduler is processing the > executor's previous events Which might make scheduler to fail the current run > because of following code in the jobs.py file > def _process_executor_events(self, simple_dag_bag, session=None): > """ > Respond to executor events. > """ > # TODO: this shares quite a lot of code with _manage_executor_state > TI = models.TaskInstance > *for key, state in > list(self.executor.get_event_buffer(simple_dag_bag.dag_ids)* > *.items()):* > dag_id, task_id, execution_date = key > self.log.info( > "Executor reports %s.%s execution_date=%s as %s", > dag_id, task_id, execution_date, state > ) > if state == State.FAILED or state == State.SUCCESS: > qry = session.query(TI).filter(TI.dag_id == dag_id, > TI.task_id == task_id, > TI.execution_date == execution_date) > ti = qry.first() > if not ti: > self.log.warning("TaskInstance %s went missing from the database", ti) > continue > TODO: should we fail RUNNING as well, as we do in Backfills? > *if ti.state == State.QUEUED:* > msg = ("Executor reports task instance %s finished (%s) " > "although the task says its %s. Was the task " > "killed externally?".format(ti, state, ti.state)) > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] fenglu-g commented on issue #4064: AIRFLOW-3149 Support dataproc cluster deletion on ERROR
fenglu-g commented on issue #4064: AIRFLOW-3149 Support dataproc cluster deletion on ERROR URL: https://github.com/apache/incubator-airflow/pull/4064#issuecomment-434850291 Sorry for the late reply. Thank you @dossett for the detailed explanation. The root cause seems to be that DataprocClusterCreateOperator is not idempotent. Similar to what you have described, how about we re-factor the operator based on the following logic? - check existence of dataproc cluster - if yes and in "running" state, no-op. - else, delete the cluster, and attempt create again. So we don't have to export this additional delete_cluster_on_error? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fenglu-g commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart
fenglu-g commented on a change in pull request #4083: [AIRFLOW-3211] Reattach to GCP Dataproc jobs upon Airflow restart URL: https://github.com/apache/incubator-airflow/pull/4083#discussion_r229869150 ## File path: airflow/contrib/hooks/gcp_dataproc_hook.py ## @@ -33,12 +33,82 @@ def __init__(self, dataproc_api, project_id, job, region='global', self.dataproc_api = dataproc_api self.project_id = project_id self.region = region + +# Check if the job to submit is already running on the cluster. +# If so, don't resubmit the job. +try: +cluster_name = job['job']['placement']['clusterName'] +except KeyError: +self.log.error('Job to submit is incorrectly configured.') +raise + +jobs_on_cluster_response = dataproc_api.projects().regions().jobs().list( +projectId=self.project_id, +region=self.region, +clusterName=cluster_name).execute() + +UUID_LENGTH = 9 Review comment: Sorry for the late reply, yes but we should also surface this job_dedupe_regex argument to dataproc submit job operators (e.g., [DataProcSparkOperator](https://github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/dataproc_operator.py#L982)), wdyt? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4061: [AIRFLOW-2799] Fix filtering UI objects by datetime
ashb commented on a change in pull request #4061: [AIRFLOW-2799] Fix filtering UI objects by datetime URL: https://github.com/apache/incubator-airflow/pull/4061#discussion_r229864536 ## File path: airflow/www/utils.py ## @@ -448,7 +448,42 @@ def __call__(self, field, **kwargs): return wtforms.widgets.core.HTMLString(html) -class UtcFilterConverter(FilterConverter): +class UtcDateTimeFilterMixin(object): +def clean(self, value): +return timezone.make_aware(super(UtcDateTimeFilterMixin, self).clean(value)) Review comment: Yes, this is not working when default_timezone is not in UTC. Will fix. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] bolkedebruin commented on a change in pull request #4061: [AIRFLOW-2799] Fix filtering UI objects by datetime
bolkedebruin commented on a change in pull request #4061: [AIRFLOW-2799] Fix filtering UI objects by datetime URL: https://github.com/apache/incubator-airflow/pull/4061#discussion_r229861880 ## File path: airflow/www/utils.py ## @@ -448,7 +448,42 @@ def __call__(self, field, **kwargs): return wtforms.widgets.core.HTMLString(html) -class UtcFilterConverter(FilterConverter): +class UtcDateTimeFilterMixin(object): +def clean(self, value): +return timezone.make_aware(super(UtcDateTimeFilterMixin, self).clean(value)) Review comment: Make aware returns 'default_timezone' is that what you require? I think UTC is better. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
ashb commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#discussion_r229861823 ## File path: CONTRIBUTING.md ## @@ -183,6 +183,24 @@ docker-compose -f scripts/ci/docker-compose.yml run airflow-testing /app/scripts Alternatively can also set up [Travis CI](https://travis-ci.org/) on your repo to automate this. It is free for open source projects. +Another great way of automating linting and testing is to use [Git Hooks](https://git-scm.com/book/uz/v2/Customizing-Git-Git-Hooks). For example you could create a `pre-commit` file based on the Travis CI Pipeline so that before each commit a local pipeline will be executed and if this pipeline failed (returned an exit code other than `0`) the commit does not come through. +This "in theory" has the advantage that you can not commit any code that fails that again reduces the errors in the Travis CI Pipelines. + +Since there are a lot of tests the script would last very long so you propably only should test your new feature locally. + +The following example of a `pre-commit` file allows you.. +- to lint your code +- test your code in a docker container based on python 2 +- test your code in a docker container based on python 3 + +NOTE: Change the `airflow-py2` and `airflow-py3` to your docker containers or remove the `docker exec` if you have set up your environment directly on your host system. +``` Review comment: Could also add `set -e` at the start (which I find cleaner than having `|| exit` on each command personally.) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-3136) Scheduler Failing the Task retries run while processing Executor Events
[ https://issues.apache.org/jira/browse/AIRFLOW-3136?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670690#comment-16670690 ] ASF GitHub Bot commented on AIRFLOW-3136: - ashb closed pull request #3994: [AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition URL: https://github.com/apache/incubator-airflow/pull/3994 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index de1f9f4235..cf58169345 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -332,7 +332,7 @@ def run_next(self, next_job): """ self.log.info('Kubernetes job is %s', str(next_job)) key, command, kube_executor_config = next_job -dag_id, task_id, execution_date = key +dag_id, task_id, execution_date, try_number = key self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image) pod = self.worker_configuration.make_pod( @@ -453,7 +453,8 @@ def _labels_to_key(self, labels): try: return ( labels['dag_id'], labels['task_id'], - self._label_safe_datestring_to_datetime(labels['execution_date'])) + self._label_safe_datestring_to_datetime(labels['execution_date']), +labels['try_number']) except Exception as e: self.log.warn( 'Error while converting labels to key; labels: %s; exception: %s', @@ -612,7 +613,7 @@ def _change_state(self, key, state, pod_id): self.log.debug('Could not find key: %s', str(key)) pass self.event_buffer[key] = state -(dag_id, task_id, ex_time) = key +(dag_id, task_id, ex_time, try_number) = key item = self._session.query(TaskInstance).filter_by( dag_id=dag_id, task_id=task_id, diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index a989dc4408..0979ba07fe 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -175,7 +175,7 @@ def get_event_buffer(self, dag_ids=None): self.event_buffer = dict() else: for key in list(self.event_buffer.keys()): -dag_id, _, _ = key +dag_id, _, _, _ = key if dag_id in dag_ids: cleared_events[key] = self.event_buffer.pop(key) diff --git a/airflow/jobs.py b/airflow/jobs.py index 48e15f758d..15b9c65b82 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1442,10 +1442,10 @@ def _process_executor_events(self, simple_dag_bag, session=None): TI = models.TaskInstance for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids) .items()): -dag_id, task_id, execution_date = key +dag_id, task_id, execution_date, try_number = key self.log.info( -"Executor reports %s.%s execution_date=%s as %s", -dag_id, task_id, execution_date, state +"Executor reports %s.%s execution_date=%s as %s for try_number %s", +dag_id, task_id, execution_date, state, try_number ) if state == State.FAILED or state == State.SUCCESS: qry = session.query(TI).filter(TI.dag_id == dag_id, @@ -1457,7 +1457,7 @@ def _process_executor_events(self, simple_dag_bag, session=None): continue # TODO: should we fail RUNNING as well, as we do in Backfills? -if ti.state == State.QUEUED: +if ti.try_number == try_number and ti.state == State.QUEUED: msg = ("Executor reports task instance {} finished ({}) " "although the task says its {}. Was the task " "killed externally?".format(ti, state, ti.state)) diff --git a/airflow/models.py b/airflow/models.py index 97a0fa92fc..0644dcdee5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1230,7 +1230,7 @@ def key(self): """ Returns a tuple that identifies the task instance uniquely """ -return self.dag_id, self.task_id, self.execution_date +return self.dag_id, self.task_id, self.execution_date, self.try_number @provide_session def set_state(self, state, session=None): diff --git a/tests/executors/test_base_executor.py
[GitHub] ashb closed pull request #3994: [AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition
ashb closed pull request #3994: [AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition URL: https://github.com/apache/incubator-airflow/pull/3994 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/executors/kubernetes_executor.py b/airflow/contrib/executors/kubernetes_executor.py index de1f9f4235..cf58169345 100644 --- a/airflow/contrib/executors/kubernetes_executor.py +++ b/airflow/contrib/executors/kubernetes_executor.py @@ -332,7 +332,7 @@ def run_next(self, next_job): """ self.log.info('Kubernetes job is %s', str(next_job)) key, command, kube_executor_config = next_job -dag_id, task_id, execution_date = key +dag_id, task_id, execution_date, try_number = key self.log.debug("Kubernetes running for command %s", command) self.log.debug("Kubernetes launching image %s", self.kube_config.kube_image) pod = self.worker_configuration.make_pod( @@ -453,7 +453,8 @@ def _labels_to_key(self, labels): try: return ( labels['dag_id'], labels['task_id'], - self._label_safe_datestring_to_datetime(labels['execution_date'])) + self._label_safe_datestring_to_datetime(labels['execution_date']), +labels['try_number']) except Exception as e: self.log.warn( 'Error while converting labels to key; labels: %s; exception: %s', @@ -612,7 +613,7 @@ def _change_state(self, key, state, pod_id): self.log.debug('Could not find key: %s', str(key)) pass self.event_buffer[key] = state -(dag_id, task_id, ex_time) = key +(dag_id, task_id, ex_time, try_number) = key item = self._session.query(TaskInstance).filter_by( dag_id=dag_id, task_id=task_id, diff --git a/airflow/executors/base_executor.py b/airflow/executors/base_executor.py index a989dc4408..0979ba07fe 100644 --- a/airflow/executors/base_executor.py +++ b/airflow/executors/base_executor.py @@ -175,7 +175,7 @@ def get_event_buffer(self, dag_ids=None): self.event_buffer = dict() else: for key in list(self.event_buffer.keys()): -dag_id, _, _ = key +dag_id, _, _, _ = key if dag_id in dag_ids: cleared_events[key] = self.event_buffer.pop(key) diff --git a/airflow/jobs.py b/airflow/jobs.py index 48e15f758d..15b9c65b82 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -1442,10 +1442,10 @@ def _process_executor_events(self, simple_dag_bag, session=None): TI = models.TaskInstance for key, state in list(self.executor.get_event_buffer(simple_dag_bag.dag_ids) .items()): -dag_id, task_id, execution_date = key +dag_id, task_id, execution_date, try_number = key self.log.info( -"Executor reports %s.%s execution_date=%s as %s", -dag_id, task_id, execution_date, state +"Executor reports %s.%s execution_date=%s as %s for try_number %s", +dag_id, task_id, execution_date, state, try_number ) if state == State.FAILED or state == State.SUCCESS: qry = session.query(TI).filter(TI.dag_id == dag_id, @@ -1457,7 +1457,7 @@ def _process_executor_events(self, simple_dag_bag, session=None): continue # TODO: should we fail RUNNING as well, as we do in Backfills? -if ti.state == State.QUEUED: +if ti.try_number == try_number and ti.state == State.QUEUED: msg = ("Executor reports task instance {} finished ({}) " "although the task says its {}. Was the task " "killed externally?".format(ti, state, ti.state)) diff --git a/airflow/models.py b/airflow/models.py index 97a0fa92fc..0644dcdee5 100755 --- a/airflow/models.py +++ b/airflow/models.py @@ -1230,7 +1230,7 @@ def key(self): """ Returns a tuple that identifies the task instance uniquely """ -return self.dag_id, self.task_id, self.execution_date +return self.dag_id, self.task_id, self.execution_date, self.try_number @provide_session def set_state(self, state, session=None): diff --git a/tests/executors/test_base_executor.py b/tests/executors/test_base_executor.py index f640a75e01..29a953ece9 100644 --- a/tests/executors/test_base_executor.py +++ b/tests/executors/test_base_executor.py @@ -30,10 +30,10 @@ def test_get_event_buffer(self): executor = BaseExecutor() date =
[GitHub] ashb commented on issue #3994: [AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition
ashb commented on issue #3994: [AIRFLOW-3136] Add retry_number to TaskInstance Key property to avoid race condition URL: https://github.com/apache/incubator-airflow/pull/3994#issuecomment-434839763 Thanks, your reproduction steps were enough for me to reproduce this. The fix seemed to make the error go away, and things still get scheduled :D This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
feluelle commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#discussion_r229857502 ## File path: CONTRIBUTING.md ## @@ -183,6 +183,24 @@ docker-compose -f scripts/ci/docker-compose.yml run airflow-testing /app/scripts Alternatively can also set up [Travis CI](https://travis-ci.org/) on your repo to automate this. It is free for open source projects. +Another great way of automating linting and testing is to use [Git Hooks](https://git-scm.com/book/uz/v2/Customizing-Git-Git-Hooks). For example you could create a `pre-commit` file based on the Travis CI Pipeline so that before each commit a local pipeline will be executed and if this pipeline failed (returned an exit code other than `0`) the commit does not come through. +This "in theory" has the advantage that you can not commit any code that fails that again reduces the errors in the Travis CI Pipelines. + +Since there are a lot of tests the script would last very long so you propably only should test your new feature locally. + +The following example of a `pre-commit` file allows you.. +- to lint your code +- test your code in a docker container based on python 2 +- test your code in a docker container based on python 3 + +NOTE: Change the `airflow-py2` and `airflow-py3` to your docker containers or remove the `docker exec` if you have set up your environment directly on your host system. +``` Review comment: I am going to add `|| exit "$?"` after each command because it is necessary to return the exit code from the command to the shell script. So it exits if one of the commands return a non-zero exit code. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
feluelle commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#discussion_r229851956 ## File path: CONTRIBUTING.md ## @@ -183,6 +183,24 @@ docker-compose -f scripts/ci/docker-compose.yml run airflow-testing /app/scripts Alternatively can also set up [Travis CI](https://travis-ci.org/) on your repo to automate this. It is free for open source projects. +Another great way of automating linting and testing is to use [Git Hooks](https://git-scm.com/book/uz/v2/Customizing-Git-Git-Hooks). For example you could create a `pre-commit` file based on the Travis CI Pipeline so that before each commit a local pipeline will be executed and if this pipeline failed (returned an exit code other than `0`) the commit does not come through. +This "in theory" has the advantage that you can not commit any code that fails that again reduces the errors in the Travis CI Pipelines. + +Since there are a lot of tests the script would last very long so you propably only should test your new feature locally. + +The following example of a `pre-commit` file allows you.. +- to lint your code +- test your code in a docker container based on python 2 +- test your code in a docker container based on python 3 + +NOTE: Change the `airflow-py2` and `airflow-py3` to your docker containers or remove the `docker exec` if you have set up your environment directly on your host system. +``` +git diff upstream/master -u -- "*.py" | flake8 --diff Review comment: Thats true. I will change this. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
ashb commented on a change in pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120#discussion_r229849942 ## File path: CONTRIBUTING.md ## @@ -183,6 +183,24 @@ docker-compose -f scripts/ci/docker-compose.yml run airflow-testing /app/scripts Alternatively can also set up [Travis CI](https://travis-ci.org/) on your repo to automate this. It is free for open source projects. +Another great way of automating linting and testing is to use [Git Hooks](https://git-scm.com/book/uz/v2/Customizing-Git-Git-Hooks). For example you could create a `pre-commit` file based on the Travis CI Pipeline so that before each commit a local pipeline will be executed and if this pipeline failed (returned an exit code other than `0`) the commit does not come through. +This "in theory" has the advantage that you can not commit any code that fails that again reduces the errors in the Travis CI Pipelines. + +Since there are a lot of tests the script would last very long so you propably only should test your new feature locally. + +The following example of a `pre-commit` file allows you.. +- to lint your code +- test your code in a docker container based on python 2 +- test your code in a docker container based on python 3 + +NOTE: Change the `airflow-py2` and `airflow-py3` to your docker containers or remove the `docker exec` if you have set up your environment directly on your host system. +``` +git diff upstream/master -u -- "*.py" | flake8 --diff Review comment: This chould just be `flake8` now - we shouldn't have any flake8 failures anymore, and checking the entire code base won't take more than a few seconds(?) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle opened a new pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks
feluelle opened a new pull request #4120: [AIRFLOW-XXX] Update Contributing Guide - Git Hooks URL: https://github.com/apache/incubator-airflow/pull/4120 Make sure you have checked _all_ steps below. ### Jira - [x] My PR addresses the following [Airflow Jira](https://issues.apache.org/jira/browse/AIRFLOW/) issues and references them in the PR title. For example, "\[AIRFLOW-XXX\] My Airflow PR" - https://issues.apache.org/jira/browse/AIRFLOW-XXX - In case you are fixing a typo in the documentation you can prepend your commit with \[AIRFLOW-XXX\], code changes always need a Jira issue. ### Description - [x] Here are some details about my PR, including screenshots of any UI changes: I thought of how you can help reducing failures in the Travis CI Pipeline and I came up with the idea of introducing Git Hooks. Read the changes for more information. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: There are no tests since it is only a change to the docs. ### Commits - [x] My commits all reference Jira issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 1. Subject is limited to 50 characters (not including Jira issue reference) 1. Subject does not end with a period 1. Subject uses the imperative mood ("add", not "adding") 1. Body wraps at 72 characters 1. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `flake8` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on issue #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on issue #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#issuecomment-434823353 A few small changes left then this is good to merge! ✨ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229840916 ## File path: airflow/contrib/operators/sagemaker_tuning_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTuningOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker hyper-parameter tuning job. + +This operator returns The ARN of the tuning job created in Amazon SageMaker. + +:param config: The configuration necessary to start a tuning job (templated) +:type config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: str +:param wait_for_completion: if the operator should block until tuning job finishes +:type wait_for_completion: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the tuning job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the tuning job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because tuning can take very long) +:type max_ingestion_time: int +""" + +integer_fields = [ +['HyperParameterTuningJobConfig', 'ResourceLimits', 'MaxNumberOfTrainingJobs'], +['HyperParameterTuningJobConfig', 'ResourceLimits', 'MaxParallelTrainingJobs'], +['TrainingJobDefinition', 'ResourceConfig', 'InstanceCount'], +['TrainingJobDefinition', 'ResourceConfig', 'VolumeSizeInGB'], +['TrainingJobDefinition', 'StoppingCondition', 'MaxRuntimeInSeconds'] +] + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTuningOperator, self).__init__(config=config, + *args, **kwargs) +self.config = config +self.wait_for_completion = wait_for_completion +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time + +def expand_role(self): +if 'TrainingJobDefinition' in self.config: +config = self.config['TrainingJobDefinition'] +if 'RoleArn' in config: +hook = AwsHook() Review comment: ```suggestion hook = AwsHook(self.aws_conn_id) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229840727 ## File path: airflow/contrib/operators/sagemaker_transform_operator.py ## @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTransformOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker transform job. + +This operator returns The ARN of the model created in Amazon SageMaker. + +:param config: The configuration necessary to start a transform job (templated) +:type config: dict +:param model_config: +The configuration necessary to create a SageMaker model, the default is none +which means the SageMaker model used for the SageMaker transform job already exists. +If given, it will be used to create a SageMaker model before creating +the SageMaker transform job +:type model_config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: string +:param wait_for_completion: if the program should keep running until job finishes +:type wait_for_completion: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the transform job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the transform job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because transform can take very long) +:type max_ingestion_time: int +""" + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTransformOperator, self).__init__(config=config, + *args, **kwargs) +self.config = config +self.wait_for_completion = wait_for_completion +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time +self.create_integer_fields() + +def create_integer_fields(self): +self.integer_fields = [ +['Transform', 'TransformResources', 'InstanceCount'], +['Transform', 'MaxConcurrentTransforms'], +['Transform', 'MaxPayloadInMB'] +] +if 'Transform' not in self.config: +for field in self.integer_fields: +field.pop(0) + +def expand_role(self): +if 'Model' not in self.config: +return +config = self.config['Model'] +if 'ExecutionRoleArn' in config: +hook = AwsHook() Review comment: ```suggestion hook = AwsHook(self.aws_conn_id) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229840272 ## File path: airflow/contrib/operators/sagemaker_training_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.contrib.hooks.aws_hook import AwsHook +from airflow.contrib.operators.sagemaker_base_operator import SageMakerBaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.exceptions import AirflowException + + +class SageMakerTrainingOperator(SageMakerBaseOperator): +""" +Initiate a SageMaker training job. + +This operator returns The ARN of the training job created in Amazon SageMaker. + +:param config: The configuration necessary to start a training job (templated) +:type config: dict +:param aws_conn_id: The AWS connection ID to use. +:type aws_conn_id: str +:param wait_for_completion: if the operator should block until training job finishes +:type wait_for_completion: bool +:param print_log: if the operator should print the cloudwatch log during training +:type print_log: bool +:param check_interval: if wait is set to be true, this is the time interval +in seconds which the operator will check the status of the training job +:type check_interval: int +:param max_ingestion_time: if wait is set to be true, the operator will fail +if the training job hasn't finish within the max_ingestion_time in seconds +(Caution: be careful to set this parameters because training can take very long) +Setting it to None implies no timeout. +:type max_ingestion_time: int +""" + +integer_fields = [ +['ResourceConfig', 'InstanceCount'], +['ResourceConfig', 'VolumeSizeInGB'], +['StoppingCondition', 'MaxRuntimeInSeconds'] +] + +@apply_defaults +def __init__(self, + config, + wait_for_completion=True, + print_log=True, + check_interval=30, + max_ingestion_time=None, + *args, **kwargs): +super(SageMakerTrainingOperator, self).__init__(config=config, +*args, **kwargs) + +self.wait_for_completion = wait_for_completion +self.print_log = print_log +self.check_interval = check_interval +self.max_ingestion_time = max_ingestion_time + +def expand_role(self): +if 'RoleArn' in self.config: +hook = AwsHook() Review comment: This should have `self.aws_conn_id` passed to it: ```suggestion hook = AwsHook(self.aws_conn_id) ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229838425 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') +status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + +return '\n'.join(status_strs) + + class SageMakerHook(AwsHook): """ Interact with Amazon SageMaker. -sagemaker_conn_id is required for using -the config stored in db for training/tuning """ -non_terminal_states = {'InProgress', 'Stopping', 'Stopped'} +non_terminal_states = {'InProgress', 'Stopping'} +endpoint_non_terminal_states = {'Creating', 'Updating', 'SystemUpdating',
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229837811 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') +status_strs.append('{} {} - {}'.format(time_str, transition['Status'], message)) + +return '\n'.join(status_strs) + + class SageMakerHook(AwsHook): """ Interact with Amazon SageMaker. -sagemaker_conn_id is required for using -the config stored in db for training/tuning """ -non_terminal_states = {'InProgress', 'Stopping', 'Stopped'} +non_terminal_states = {'InProgress', 'Stopping'} +endpoint_non_terminal_states = {'Creating', 'Updating', 'SystemUpdating',
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229836638 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') Review comment: ``` from airflow.utils import timezone time_str = timezone.convert_to_utc(job_description['LastModifiedTime']).strftime('%Y-%m-%d %H:%M:%S') ``` should work I think - but you'll need to check that the datetime is marked correctly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For
[jira] [Created] (AIRFLOW-3280) SubDagOperator marked as failed, even if inner sub dag succeeds
Eran created AIRFLOW-3280: - Summary: SubDagOperator marked as failed, even if inner sub dag succeeds Key: AIRFLOW-3280 URL: https://issues.apache.org/jira/browse/AIRFLOW-3280 Project: Apache Airflow Issue Type: Bug Components: subdag Affects Versions: 1.9.0 Reporter: Eran Running the following dag: {code:java} from airflow.exceptions import AirflowException from airflow.models import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator dag = DAG(dag_id="test_dag", start_date='2018-10-01', schedule_interval=None) inner_dag = DAG(dag_id="test_dag.sub_dag_task", start_date='2018-10-01', schedule_interval=None) sub_dag_operator = SubDagOperator(task_id="sub_dag_task", subdag=inner_dag, dag=dag) def throw_exception(): raise AirflowException() dummy_operator1 = DummyOperator(task_id="dummy1", dag=inner_dag) python_operator = PythonOperator(task_id="python1", dag=inner_dag, python_callable=throw_exception) dummy_operator2 = DummyOperator(task_id="dummy2", dag=inner_dag, trigger_rule="all_failed") dummy_operator3 = DummyOperator(task_id="dummy3", dag=inner_dag) dummy_operator4 = DummyOperator(task_id="dummy4", dag=inner_dag) dummy_operator5 = DummyOperator(task_id="dummy5", dag=inner_dag, trigger_rule="all_failed") dummy_operator1 >> python_operator >> dummy_operator2 python_operator >> dummy_operator3 dummy_operator3 >> dummy_operator2 dummy_operator2 >> dummy_operator4 dummy_operator2 >> dummy_operator5 {code} would result in a sub_dag_task as 'failed', even though the inner dag (when clicking 'zoom into Sub DAG') is marked as 'success', only because there are a few tasks marked as failed (which is ok, if its not the end of the DAG) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] feluelle commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
feluelle commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119#issuecomment-434785855 I think the commit https://github.com/apache/incubator-airflow/pull/4119/commits/cc0c5552cc9cccb44a8eb8436675f21a3ecdb053 does not actually fix the kubernetes error. The kubernetes error was random I guess. Sorry for the extra commits/pipeline runs I thought I already had tested it on Python 2.x. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] yegeniy commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case
yegeniy commented on issue #2367: [AIRFLOW-1077] Warn about subdag deadlock case URL: https://github.com/apache/incubator-airflow/pull/2367#issuecomment-434776246 Great point. I think it's still totally over-writeable. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
codecov-io edited a comment on issue #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#issuecomment-433374475 # [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=h1) Report > Merging [#4097](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=desc) into [master](https://codecov.io/gh/apache/incubator-airflow/commit/e703d6beeb379ee88ef5e7df495e8a785666f8af?src=pr=desc) will **decrease** coverage by `<.01%`. > The diff coverage is `n/a`. [![Impacted file tree graph](https://codecov.io/gh/apache/incubator-airflow/pull/4097/graphs/tree.svg?width=650=WdLKlKHOAU=150=pr)](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) ```diff @@Coverage Diff @@ ## master#4097 +/- ## == - Coverage 76.67% 76.66% -0.01% == Files 199 199 Lines 1618616192 +6 == + Hits1241012414 +4 - Misses 3776 3778 +2 ``` | [Impacted Files](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=tree) | Coverage Δ | | |---|---|---| | [airflow/jobs.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9qb2JzLnB5) | `77.18% <0%> (-0.13%)` | :arrow_down: | | [airflow/models.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9tb2RlbHMucHk=) | `92.04% <0%> (ø)` | :arrow_up: | | [airflow/hooks/mysql\_hook.py](https://codecov.io/gh/apache/incubator-airflow/pull/4097/diff?src=pr=tree#diff-YWlyZmxvdy9ob29rcy9teXNxbF9ob29rLnB5) | `90.38% <0%> (+0.38%)` | :arrow_up: | -- [Continue to review full report at Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=continue). > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta) > `Δ = absolute (impact)`, `ø = not affected`, `? = missing data` > Powered by [Codecov](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=footer). Last update [e703d6b...68ff70b](https://codecov.io/gh/apache/incubator-airflow/pull/4097?src=pr=lastupdated). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments). This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229757030 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): +""" +Airflow operator to run presto queries on athena. + +:param query: Presto to be run on athena. (templated) +:type query: str +:param database: Database to select. (templated) +:type database: str +:param output_location: s3 path to write the query results into. (templated) +:type output_location: str +:param aws_conn_id: aws connection to use. +:type aws_conn_id: str +""" + +ui_color = '#44b5e2' +template_fields = ('query', 'database', 'output_location') + +@apply_defaults +def __init__(self, query, database, output_location, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaOperator, self).__init__(*args, **kwargs) +self.query = query +self.database = database +self.output_location = output_location +self.aws_conn_id = aws_conn_id +self.client_request_token = kwargs.get('client_request_token') +self.query_execution_context = kwargs.get('query_execution_context') or {} +self.result_configuration = kwargs.get('result_configuration') or {} +self.query_execution_context['Database'] = self.database Review comment: Templates are not rendered until just before `execute()` is called, so these two things probably just need to be delayed until then. Another option might be to just declare `query_execution_context` as templated (then it will look for/resolve templates in the values of the dict) but I think this interface you have is more direct and likely more friendly. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-3279) Documentation for Google Logging unclear
[ https://issues.apache.org/jira/browse/AIRFLOW-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Velthuis updated AIRFLOW-3279: --- Description: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? Furthermore, at step 6 it is a little unclear what remote_log_conn_id means. I would propose to add a little more information to make this more clear. was: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? > Documentation for Google Logging unclear > > > Key: AIRFLOW-3279 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3279 > Project: Apache Airflow > Issue Type: Bug > Components: configuration, Documentation, logging >Reporter: Paul Velthuis >Assignee: Fokko Driesprong >Priority: Major > > The documentation of how to install logging to a Google Cloud bucket is > unclear. > I am now following the tutorial on the airflow page: > [https://airflow.apache.org/howto/write-logs.html] > Here I find it unclear what part of the 'logger' I have to adjust in the > `{{airflow/config_templates/airflow_local_settings.py}}`. > > The adjustment states: > > # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' > instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': > ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... > }, 'airflow': { 'handlers': ['console'], ... }, } > > However what I find in the template is: > |'loggers': {| > |'airflow.processor': {| > |'handlers': ['processor'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'airflow.task': {| > |'handlers': ['task'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'flask_appbuilder': {| > |'handler': ['console'],| > |'level': FAB_LOG_LEVEL,| > |'propagate': True,| > |}| > }, > > Since for me it is very important to do it right at the first time I hope > some clarity can be provided in what has to be adjusted in the logger. Is it > only the 'airflow.task' or more? > Furthermore, at step 6 it is a little unclear what remote_log_conn_id means. > I would propose to add a little more information to make this more clear. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-3279) Documentation for Google Logging unclear
[ https://issues.apache.org/jira/browse/AIRFLOW-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Velthuis updated AIRFLOW-3279: --- Description: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? was: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? Another question I have is regarding step 6. The Google web hook. What kind of information do you have to give at 'remote_log_conn_id =' Is this a self imagined name you can give, or what is behind it? > Documentation for Google Logging unclear > > > Key: AIRFLOW-3279 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3279 > Project: Apache Airflow > Issue Type: Bug > Components: configuration, Documentation, logging >Reporter: Paul Velthuis >Assignee: Fokko Driesprong >Priority: Major > > The documentation of how to install logging to a Google Cloud bucket is > unclear. > I am now following the tutorial on the airflow page: > [https://airflow.apache.org/howto/write-logs.html] > Here I find it unclear what part of the 'logger' I have to adjust in the > `{{airflow/config_templates/airflow_local_settings.py}}`. > > The adjustment states: > > # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' > instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': > ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... > }, 'airflow': { 'handlers': ['console'], ... }, } > > However what I find in the template is: > |'loggers': {| > |'airflow.processor': {| > |'handlers': ['processor'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'airflow.task': {| > |'handlers': ['task'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'flask_appbuilder': {| > |'handler': ['console'],| > |'level': FAB_LOG_LEVEL,| > |'propagate': True,| > |}| > }, > > Since for me it is very important to do it right at the first time I hope > some clarity can be provided in what has to be adjusted in the logger. Is it > only the 'airflow.task' or more? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#discussion_r229745314 ## File path: airflow/contrib/operators/gcp_sql_operator.py ## @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from googleapiclient.errors import HttpError + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook +from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + +SETTINGS = 'settings' +SETTINGS_VERSION = 'settingsVersion' + +CLOUD_SQL_VALIDATION = [ +dict(name="name", allow_empty=False), Review comment: I discussed this a lot with others and it seems really hard to simplify at the moment while preserving the full validation functionality. How about we leave it as is for now, given that such specs (although shorter) have already been merged in the past (for GCF and GCE operators), and we discuss the general approach to validation at a later time on the mailing list / Slack? If we reach a conclusion I'll be happy to revisit all the merged specs and refactor them. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (AIRFLOW-3279) Documentation for Google Logging unclear
[ https://issues.apache.org/jira/browse/AIRFLOW-3279?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Paul Velthuis updated AIRFLOW-3279: --- Description: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? Another question I have is regarding step 6. The Google web hook. What kind of information do you have to give at 'remote_log_conn_id =' Is this a self imagined name you can give, or what is behind it? was: The documentation of how to install logging to a Google Cloud bucket is unclear. I am now following the tutorial on the airflow page: [https://airflow.apache.org/howto/write-logs.html] Here I find it unclear what part of the 'logger' I have to adjust in the `{{airflow/config_templates/airflow_local_settings.py}}`. The adjustment states: # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } However what I find in the template is: |'loggers': {| |'airflow.processor': {| |'handlers': ['processor'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'airflow.task': {| |'handlers': ['task'],| |'level': LOG_LEVEL,| |'propagate': False,| |},| |'flask_appbuilder': {| |'handler': ['console'],| |'level': FAB_LOG_LEVEL,| |'propagate': True,| |}| }, Since for me it is very important to do it right at the first time I hope some clarity can be provided in what has to be adjusted in the logger. Is it only the 'airflow.task' or more? > Documentation for Google Logging unclear > > > Key: AIRFLOW-3279 > URL: https://issues.apache.org/jira/browse/AIRFLOW-3279 > Project: Apache Airflow > Issue Type: Bug > Components: configuration, Documentation, logging >Reporter: Paul Velthuis >Assignee: Fokko Driesprong >Priority: Major > > The documentation of how to install logging to a Google Cloud bucket is > unclear. > I am now following the tutorial on the airflow page: > [https://airflow.apache.org/howto/write-logs.html] > Here I find it unclear what part of the 'logger' I have to adjust in the > `{{airflow/config_templates/airflow_local_settings.py}}`. > > The adjustment states: > > # Update the airflow.task and airflow.tas_runner blocks to be 'gcs.task' > instead of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': > ['gcs.task'], ... }, 'airflow.task_runner': { 'handlers': ['gcs.task'], ... > }, 'airflow': { 'handlers': ['console'], ... }, } > > However what I find in the template is: > |'loggers': {| > |'airflow.processor': {| > |'handlers': ['processor'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'airflow.task': {| > |'handlers': ['task'],| > |'level': LOG_LEVEL,| > |'propagate': False,| > |},| > |'flask_appbuilder': {| > |'handler': ['console'],| > |'level': FAB_LOG_LEVEL,| > |'propagate': True,| > |}| > }, > > Since for me it is very important to do it right at the first time I hope > some clarity can be provided in what has to be adjusted in the logger. Is it > only the 'airflow.task' or more? > Another question I have is regarding step 6. The Google web hook. What kind > of information do you have to give at 'remote_log_conn_id =' Is this a self > imagined name you can give, or what is behind it? > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#discussion_r229742330 ## File path: airflow/contrib/hooks/gcp_sql_hook.py ## @@ -0,0 +1,173 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import time +from googleapiclient.discovery import build + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook + +# Number of retries - used by googleapiclient method calls to perform retries +# For requests that are "retriable" +NUM_RETRIES = 5 + +# Time to sleep between active checks of the operation results +TIME_TO_SLEEP_IN_SECONDS = 1 + + +class CloudSqlOperationStatus: +PENDING = "PENDING" +RUNNING = "RUNNING" +DONE = "DONE" +UNKNOWN = "UNKNOWN" + + +# noinspection PyAbstractClass +class CloudSqlHook(GoogleCloudBaseHook): +""" +Hook for Google Cloud SQL APIs. +""" +_conn = None + +def __init__(self, + api_version, + gcp_conn_id='google_cloud_default', + delegate_to=None): +super(CloudSqlHook, self).__init__(gcp_conn_id, delegate_to) +self.api_version = api_version + +def get_conn(self): +""" +Retrieves connection to Cloud SQL. + +:return: Google Cloud SQL services object. +:rtype: dict +""" +if not self._conn: +http_authorized = self._authorize() +self._conn = build('sqladmin', self.api_version, + http=http_authorized, cache_discovery=False) +return self._conn + +def get_instance(self, project_id, instance): +""" +Retrieves a resource containing information about a Cloud SQL instance. + +:param project_id: Project ID of the project that contains the instance. +:type project_id: str +:param instance: Database instance ID. This does not include the project ID. +:type instance: str +:return: A Cloud SQL instance resource. +:rtype: dict +""" +return self.get_conn().instances().get( +project=project_id, +instance=instance +).execute(num_retries=NUM_RETRIES) + +def insert_instance(self, project_id, body): Review comment: Done, I've renamed the operator to `CloudSqlInstanceCreateOperator` and also changed `insert` to `create` in all methods, comments, documentation etc. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#discussion_r229741920 ## File path: airflow/contrib/operators/gcp_sql_operator.py ## @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from googleapiclient.errors import HttpError + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook +from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + +SETTINGS = 'settings' +SETTINGS_VERSION = 'settingsVersion' + +CLOUD_SQL_VALIDATION = [ +dict(name="name", allow_empty=False), +dict(name="settings", type="dict", fields=[ +dict(name="tier", allow_empty=False), +dict(name="backupConfiguration", type="dict", fields=[ +dict(name="binaryLogEnabled", optional=True), +dict(name="enabled", optional=True), +dict(name="replicationLogArchivingEnabled", optional=True), +dict(name="startTime", allow_empty=False, optional=True) +], optional=True), +dict(name="activationPolicy", allow_empty=False, optional=True), +dict(name="authorizedGaeApplications", type="list", optional=True), +dict(name="crashSafeReplicationEnabled", optional=True), +dict(name="dataDiskSizeGb", optional=True), +dict(name="dataDiskType", allow_empty=False, optional=True), +dict(name="databaseFlags", type="list", optional=True), +dict(name="ipConfiguration", type="dict", fields=[ +dict(name="authorizedNetworks", type="list", fields=[ +dict(name="expirationTime", optional=True), +dict(name="name", allow_empty=False, optional=True), +dict(name="value", allow_empty=False, optional=True) +], optional=True), +dict(name="ipv4Enabled", optional=True), +dict(name="privateNetwork", allow_empty=False, optional=True), +dict(name="requireSsl", optional=True), +], optional=True), +dict(name="locationPreference", type="dict", fields=[ +dict(name="followGaeApplication", allow_empty=False, optional=True), +dict(name="zone", allow_empty=False, optional=True), +], optional=True), +dict(name="maintenanceWindow", type="dict", fields=[ +dict(name="hour", optional=True), +dict(name="day", optional=True), +dict(name="updateTrack", allow_empty=False, optional=True), +], optional=True), +dict(name="pricingPlan", allow_empty=False, optional=True), +dict(name="replicationType", allow_empty=False, optional=True), +dict(name="storageAutoResize", optional=True), +dict(name="storageAutoResizeLimit", optional=True), +dict(name="userLabels", type="dict", optional=True), +]), +dict(name="databaseVersion", allow_empty=False, optional=True), +dict(name="failoverReplica", type="dict", fields=[ +dict(name="name", allow_empty=False) +], optional=True), +dict(name="masterInstanceName", allow_empty=False, optional=True), +dict(name="onPremisesConfiguration", type="dict", optional=True), +dict(name="region", allow_empty=False, optional=True), +dict(name="replicaConfiguration", type="dict", fields=[ +dict(name="failoverTarget", optional=True), +dict(name="mysqlReplicaConfiguration", type="dict", fields=[ +dict(name="caCertificate", allow_empty=False, optional=True), +dict(name="clientCertificate", allow_empty=False, optional=True), +dict(name="clientKey", allow_empty=False, optional=True), +dict(name="connectRetryInterval", optional=True), +dict(name="dumpFilePath", allow_empty=False, optional=True), +dict(name="masterHeartbeatPeriod", optional=True), +dict(name="password", allow_empty=False, optional=True), +dict(name="sslCipher", allow_empty=False, optional=True), +
[GitHub] sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL
sprzedwojski commented on a change in pull request #4097: [AIRFLOW-3231] Basic operators for Google Cloud SQL URL: https://github.com/apache/incubator-airflow/pull/4097#discussion_r229741779 ## File path: airflow/contrib/operators/gcp_sql_operator.py ## @@ -0,0 +1,288 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from googleapiclient.errors import HttpError + +from airflow import AirflowException +from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook +from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults + +SETTINGS = 'settings' +SETTINGS_VERSION = 'settingsVersion' + +CLOUD_SQL_VALIDATION = [ +dict(name="name", allow_empty=False), +dict(name="settings", type="dict", fields=[ +dict(name="tier", allow_empty=False), +dict(name="backupConfiguration", type="dict", fields=[ +dict(name="binaryLogEnabled", optional=True), +dict(name="enabled", optional=True), +dict(name="replicationLogArchivingEnabled", optional=True), +dict(name="startTime", allow_empty=False, optional=True) +], optional=True), +dict(name="activationPolicy", allow_empty=False, optional=True), +dict(name="authorizedGaeApplications", type="list", optional=True), +dict(name="crashSafeReplicationEnabled", optional=True), +dict(name="dataDiskSizeGb", optional=True), +dict(name="dataDiskType", allow_empty=False, optional=True), +dict(name="databaseFlags", type="list", optional=True), +dict(name="ipConfiguration", type="dict", fields=[ +dict(name="authorizedNetworks", type="list", fields=[ +dict(name="expirationTime", optional=True), +dict(name="name", allow_empty=False, optional=True), +dict(name="value", allow_empty=False, optional=True) +], optional=True), +dict(name="ipv4Enabled", optional=True), +dict(name="privateNetwork", allow_empty=False, optional=True), +dict(name="requireSsl", optional=True), +], optional=True), +dict(name="locationPreference", type="dict", fields=[ +dict(name="followGaeApplication", allow_empty=False, optional=True), +dict(name="zone", allow_empty=False, optional=True), +], optional=True), +dict(name="maintenanceWindow", type="dict", fields=[ +dict(name="hour", optional=True), +dict(name="day", optional=True), +dict(name="updateTrack", allow_empty=False, optional=True), +], optional=True), +dict(name="pricingPlan", allow_empty=False, optional=True), +dict(name="replicationType", allow_empty=False, optional=True), +dict(name="storageAutoResize", optional=True), +dict(name="storageAutoResizeLimit", optional=True), +dict(name="userLabels", type="dict", optional=True), +]), +dict(name="databaseVersion", allow_empty=False, optional=True), +dict(name="failoverReplica", type="dict", fields=[ +dict(name="name", allow_empty=False) +], optional=True), +dict(name="masterInstanceName", allow_empty=False, optional=True), +dict(name="onPremisesConfiguration", type="dict", optional=True), +dict(name="region", allow_empty=False, optional=True), +dict(name="replicaConfiguration", type="dict", fields=[ +dict(name="failoverTarget", optional=True), +dict(name="mysqlReplicaConfiguration", type="dict", fields=[ +dict(name="caCertificate", allow_empty=False, optional=True), +dict(name="clientCertificate", allow_empty=False, optional=True), +dict(name="clientKey", allow_empty=False, optional=True), +dict(name="connectRetryInterval", optional=True), +dict(name="dumpFilePath", allow_empty=False, optional=True), +dict(name="masterHeartbeatPeriod", optional=True), +dict(name="password", allow_empty=False, optional=True), +dict(name="sslCipher", allow_empty=False, optional=True), +
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229741217 ## File path: airflow/contrib/operators/aws_athena_operator.py ## @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.contrib.hooks.aws_athena_hook import AWSAthenaHook + + +class AWSAthenaOperator(BaseOperator): +""" +Airflow operator to run presto queries on athena. + +:param query: Presto to be run on athena. (templated) +:type query: str +:param database: Database to select. (templated) +:type database: str +:param output_location: s3 path to write the query results into. (templated) +:type output_location: str +:param aws_conn_id: aws connection to use. +:type aws_conn_id: str +""" + +ui_color = '#44b5e2' +template_fields = ('query', 'database', 'output_location') + +@apply_defaults +def __init__(self, query, database, output_location, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaOperator, self).__init__(*args, **kwargs) +self.query = query +self.database = database +self.output_location = output_location +self.aws_conn_id = aws_conn_id +self.client_request_token = kwargs.get('client_request_token') +self.query_execution_context = kwargs.get('query_execution_context') or {} +self.result_configuration = kwargs.get('result_configuration') or {} +self.query_execution_context['Database'] = self.database Review comment: I am not aware of that. Can you tell me where is the correct place to initialize above variable for the operator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229740567 ## File path: airflow/contrib/hooks/aws_athena_hook.py ## @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from time import sleep +from airflow.contrib.hooks.aws_hook import AwsHook +from uuid import uuid4 + +INTERMEDIATE_STATES = ('QUEUED', 'RUNNING',) +FAILURE_STATES = ('FAILED', 'CANCELLED',) +SUCCESS_STATES = ('SUCCEEDED',) + + +class AWSAthenaHook(AwsHook): +""" +Interact with AWS Athena to run, poll queries and return query results +""" + +def __init__(self, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaHook, self).__init__(aws_conn_id, **kwargs) +self.sleep_time = kwargs.get('sleep_time') or 30 +self.conn = None + +def get_conn(self): +""" +check if aws conn exists already or create one and return it + +:return: boto3 session +""" +if not hasattr(self, 'conn'): +self.conn = self.get_client_type('athena') +return self.conn + +def run_query(self, query, query_context, result_configuration, client_request_token=None): +""" +Run Presto query on athena with provided config and return submitted query_execution_id + +:param query: Presto query to run +:type query: str +:param query_context: Context in which query need to be run +:type query_context: dict +:param result_configuration: Dict with path to store results in and config related to encryption +:type result_configuration: dict +:param client_request_token: Unique token created by user to avoid multiple executions of same query +:type client_request_token: str +:return: str +""" +if client_request_token is None: +client_request_token = str(uuid4()) Review comment: Got the point. I will move this logic to operator. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] idavison commented on issue #3138: [AIRFLOW-2221] Create DagFetcher abstraction
idavison commented on issue #3138: [AIRFLOW-2221] Create DagFetcher abstraction URL: https://github.com/apache/incubator-airflow/pull/3138#issuecomment-434718083 Hey guys, I've drafted up a proposal for the dag fetcher. I've tried to keep in mind everything people have discussed here previously, but I'm sure I've missed some stuff. I'd appreciate people's thoughts on this and I have some questions about different design decisions we'll need to make . https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-5+DagFetcher Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle edited a comment on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
feluelle edited a comment on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119#issuecomment-434695580 This is the result of reopening #3661 from my feature branch instead of my master branch and it contains a few changes that @ashb requested. So it is now open for reviewing once again. :) I would love to have a review from maybe @Fokko @ashb @kaxil if possible :) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Issue Comment Deleted] (AIRFLOW-2780) Adds IMAP Hook to interact with a mail server
[ https://issues.apache.org/jira/browse/AIRFLOW-2780?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor updated AIRFLOW-2780: --- Comment: was deleted (was: feluelle closed pull request #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/imap_hook.py b/airflow/contrib/hooks/imap_hook.py new file mode 100644 index 00..b9cc44fdaa --- /dev/null +++ b/airflow/contrib/hooks/imap_hook.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import email +import imaplib +import re + +from airflow import LoggingMixin +from airflow.hooks.base_hook import BaseHook + + +class ImapHook(BaseHook): +""" +This hook connects to a mail server by using the imap protocol. + +:param imap_conn_id: The connection id that contains the information + used to authenticate the client. + The default value is 'imap_default'. +:type imap_conn_id: str +""" + +def __init__(self, imap_conn_id='imap_default'): +super(ImapHook, self).__init__(imap_conn_id) +self.conn = self.get_connection(imap_conn_id) +self.mail_client = imaplib.IMAP4_SSL(self.conn.host) + +def __enter__(self): +self.mail_client.login(self.conn.login, self.conn.password) +return self + +def __exit__(self, exc_type, exc_val, exc_tb): +self.mail_client.logout() + +def has_mail_attachment(self, name, mail_folder='INBOX', check_regex=False): +""" +Checks the mail folder for mails containing attachments with the given name. + +:param name: The name of the attachment that will be searched for. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:returns: True if there is an attachment with the given name and False if not. +:rtype: bool +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only=True) +return len(mail_attachments) > 0 + +def retrieve_mail_attachments(self, name, mail_folder='INBOX', check_regex=False, + latest_only=False): +""" +Retrieves mail's attachments in the mail folder by its name. + +:param name: The name of the attachment that will be downloaded. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:param latest_only: If set to True it will only retrieve +the first matched attachment. +The default value is False. +:type latest_only: bool +:returns: a list of tuple each containing the attachment filename and its payload. +:rtype: a list of tuple +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only) +return mail_attachments + +def download_mail_attachments(self, name, local_output_directory, mail_folder='INBOX', + check_regex=False, latest_only=False): +""" +Downloads mail's attachments in the mail folder by its name +to the local directory. + +:param name: The
[GitHub] feluelle commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
feluelle commented on issue #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119#issuecomment-434695580 #3661 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-2780) Adds IMAP Hook to interact with a mail server
[ https://issues.apache.org/jira/browse/AIRFLOW-2780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670132#comment-16670132 ] ASF GitHub Bot commented on AIRFLOW-2780: - feluelle opened a new pull request #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119 ### JIRA - [x] My PR addresses the following [Airflow JIRA](https://issues.apache.org/jira/browse/AIRFLOW-2780) issues and references them in the PR title. ### Description - [x] My PR adds the feature to be able to interact with a mail server via the IMAP Protocol. It exposes the following methods: - `has_mail_attachment`: Can be used in a `Sensor` to poke the mail server for attachments with the given name and returns True if at least one has been found and False if not. - `retrieve_mail_attachments`: Can be used in an `Operator` to do sth. with the attachments returned as list of tuple (filename, payload). - `download_mail_attachments`: Can be used in an `Operator` to download the attachment. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: - `test_connect_and_disconnect`: this tests calls the ImapHook as ContextManager and so it tests for both (on enter/connect and on exit/disconnect) - `test_has_mail_attachments_found` - `test_has_mail_attachments_not_found` - `test_has_mail_attachments_with_regex_found` - `test_has_mail_attachments_with_regex_not_found` - `test_retrieve_mail_attachments_found` - `test_retrieve_mail_attachments_not_found` - `test_retrieve_mail_attachments_with_regex_found` - `test_retrieve_mail_attachments_with_regex_not_found` - `test_retrieve_mail_attachments_latest_only` - `test_download_mail_attachments_found` - `test_download_mail_attachments_not_found` - `test_download_mail_attachments_with_regex_found` - `test_download_mail_attachments_with_regex_not_found` - `test_download_mail_attachments_with_latest_only` - `test_download_mail_attachments_with_escaping_chars` - `test_download_mail_attachments_with_symlink` ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Adds IMAP Hook to interact with a mail server > - > > Key: AIRFLOW-2780 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2780 > Project: Apache Airflow > Issue Type: New Feature >Reporter: Felix Uellendall >Assignee: Felix Uellendall >Priority: Major > > This Hook connects to a mail server via IMAP to be able to retrieve email > attachments by using [Python's IMAP > Library.|https://docs.python.org/3.6/library/imaplib.html] > Features: > - `has_mail_attachment`: Can be used in a `Sensor` to check if there is an > attachment on the mail server with the given name. > - `retrieve_mail_attachments`: Can be used in an `Operator` to do sth. with > the attachments returned as list of tuple. > - `download_mail_attachments`: Can be used in an `Operator` to download the > attachment. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] feluelle opened a new pull request #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments
feluelle opened a new pull request #4119: [AIRFLOW-2780] Add IMAP Hook to retrieve email attachments URL: https://github.com/apache/incubator-airflow/pull/4119 ### JIRA - [x] My PR addresses the following [Airflow JIRA](https://issues.apache.org/jira/browse/AIRFLOW-2780) issues and references them in the PR title. ### Description - [x] My PR adds the feature to be able to interact with a mail server via the IMAP Protocol. It exposes the following methods: - `has_mail_attachment`: Can be used in a `Sensor` to poke the mail server for attachments with the given name and returns True if at least one has been found and False if not. - `retrieve_mail_attachments`: Can be used in an `Operator` to do sth. with the attachments returned as list of tuple (filename, payload). - `download_mail_attachments`: Can be used in an `Operator` to download the attachment. ### Tests - [x] My PR adds the following unit tests __OR__ does not need testing for this extremely good reason: - `test_connect_and_disconnect`: this tests calls the ImapHook as ContextManager and so it tests for both (on enter/connect and on exit/disconnect) - `test_has_mail_attachments_found` - `test_has_mail_attachments_not_found` - `test_has_mail_attachments_with_regex_found` - `test_has_mail_attachments_with_regex_not_found` - `test_retrieve_mail_attachments_found` - `test_retrieve_mail_attachments_not_found` - `test_retrieve_mail_attachments_with_regex_found` - `test_retrieve_mail_attachments_with_regex_not_found` - `test_retrieve_mail_attachments_latest_only` - `test_download_mail_attachments_found` - `test_download_mail_attachments_not_found` - `test_download_mail_attachments_with_regex_found` - `test_download_mail_attachments_with_regex_not_found` - `test_download_mail_attachments_with_latest_only` - `test_download_mail_attachments_with_escaping_chars` - `test_download_mail_attachments_with_symlink` ### Commits - [x] My commits all reference JIRA issues in their subject lines, and I have squashed multiple commits if they address the same issue. In addition, my commits follow the guidelines from "[How to write a good git commit message](http://chris.beams.io/posts/git-commit/)": 1. Subject is separated from body by a blank line 2. Subject is limited to 50 characters 3. Subject does not end with a period 4. Subject uses the imperative mood ("add", not "adding") 5. Body wraps at 72 characters 6. Body explains "what" and "why", not "how" ### Documentation - [x] In case of new functionality, my PR adds documentation that describes how to use it. - When adding new operators/hooks/sensors, the autoclass documentation generation needs to be added. ### Code Quality - [x] Passes `git diff upstream/master -u -- "*.py" | flake8 --diff` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (AIRFLOW-2780) Adds IMAP Hook to interact with a mail server
[ https://issues.apache.org/jira/browse/AIRFLOW-2780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670127#comment-16670127 ] ASF GitHub Bot commented on AIRFLOW-2780: - feluelle closed pull request #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/imap_hook.py b/airflow/contrib/hooks/imap_hook.py new file mode 100644 index 00..b9cc44fdaa --- /dev/null +++ b/airflow/contrib/hooks/imap_hook.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import email +import imaplib +import re + +from airflow import LoggingMixin +from airflow.hooks.base_hook import BaseHook + + +class ImapHook(BaseHook): +""" +This hook connects to a mail server by using the imap protocol. + +:param imap_conn_id: The connection id that contains the information + used to authenticate the client. + The default value is 'imap_default'. +:type imap_conn_id: str +""" + +def __init__(self, imap_conn_id='imap_default'): +super(ImapHook, self).__init__(imap_conn_id) +self.conn = self.get_connection(imap_conn_id) +self.mail_client = imaplib.IMAP4_SSL(self.conn.host) + +def __enter__(self): +self.mail_client.login(self.conn.login, self.conn.password) +return self + +def __exit__(self, exc_type, exc_val, exc_tb): +self.mail_client.logout() + +def has_mail_attachment(self, name, mail_folder='INBOX', check_regex=False): +""" +Checks the mail folder for mails containing attachments with the given name. + +:param name: The name of the attachment that will be searched for. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:returns: True if there is an attachment with the given name and False if not. +:rtype: bool +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only=True) +return len(mail_attachments) > 0 + +def retrieve_mail_attachments(self, name, mail_folder='INBOX', check_regex=False, + latest_only=False): +""" +Retrieves mail's attachments in the mail folder by its name. + +:param name: The name of the attachment that will be downloaded. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:param latest_only: If set to True it will only retrieve +the first matched attachment. +The default value is False. +:type latest_only: bool +:returns: a list of tuple each containing the attachment filename and its payload. +:rtype: a list of tuple +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only) +return mail_attachments + +def download_mail_attachments(self, name, local_output_directory, mail_folder='INBOX', + check_regex=False, latest_only=False): +""" +Downloads mail's attachments in the mail folder by its name +to the local directory. + +:param name: The
[GitHub] feluelle edited a comment on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server
feluelle edited a comment on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661#issuecomment-434693822 I am going to close this PR, because I want to open a new one directly from my feature branch instead of my master branch. @ashb @SamWildmo This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] feluelle closed pull request #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server
feluelle closed pull request #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/contrib/hooks/imap_hook.py b/airflow/contrib/hooks/imap_hook.py new file mode 100644 index 00..b9cc44fdaa --- /dev/null +++ b/airflow/contrib/hooks/imap_hook.py @@ -0,0 +1,262 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import email +import imaplib +import re + +from airflow import LoggingMixin +from airflow.hooks.base_hook import BaseHook + + +class ImapHook(BaseHook): +""" +This hook connects to a mail server by using the imap protocol. + +:param imap_conn_id: The connection id that contains the information + used to authenticate the client. + The default value is 'imap_default'. +:type imap_conn_id: str +""" + +def __init__(self, imap_conn_id='imap_default'): +super(ImapHook, self).__init__(imap_conn_id) +self.conn = self.get_connection(imap_conn_id) +self.mail_client = imaplib.IMAP4_SSL(self.conn.host) + +def __enter__(self): +self.mail_client.login(self.conn.login, self.conn.password) +return self + +def __exit__(self, exc_type, exc_val, exc_tb): +self.mail_client.logout() + +def has_mail_attachment(self, name, mail_folder='INBOX', check_regex=False): +""" +Checks the mail folder for mails containing attachments with the given name. + +:param name: The name of the attachment that will be searched for. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:returns: True if there is an attachment with the given name and False if not. +:rtype: bool +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only=True) +return len(mail_attachments) > 0 + +def retrieve_mail_attachments(self, name, mail_folder='INBOX', check_regex=False, + latest_only=False): +""" +Retrieves mail's attachments in the mail folder by its name. + +:param name: The name of the attachment that will be downloaded. +:type name: str +:param mail_folder: The mail folder where to look at. +The default value is 'INBOX'. +:type mail_folder: str +:param check_regex: Checks the name for a regular expression. +The default value is False. +:type check_regex: bool +:param latest_only: If set to True it will only retrieve +the first matched attachment. +The default value is False. +:type latest_only: bool +:returns: a list of tuple each containing the attachment filename and its payload. +:rtype: a list of tuple +""" +mail_attachments = self._retrieve_mails_attachments_by_name(name, mail_folder, + check_regex, + latest_only) +return mail_attachments + +def download_mail_attachments(self, name, local_output_directory, mail_folder='INBOX', + check_regex=False, latest_only=False): +""" +Downloads mail's attachments in the mail folder by its name +to the local directory. + +:param name: The name of the attachment that will be downloaded. +:type name: str +:param local_output_directory: The output directory on the local machine + where the files will be downloaded to. +
[GitHub] feluelle commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server
feluelle commented on issue #3661: [AIRFLOW-2780] Adds IMAP Hook to interact with a mail server URL: https://github.com/apache/incubator-airflow/pull/3661#issuecomment-434693822 I am going to close this PR, because I want to open a new one directly from my feature branch instead of my master branch. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229695423 ## File path: airflow/contrib/hooks/aws_athena_hook.py ## @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from time import sleep +from airflow.contrib.hooks.aws_hook import AwsHook +from uuid import uuid4 + +INTERMEDIATE_STATES = ('QUEUED', 'RUNNING',) +FAILURE_STATES = ('FAILED', 'CANCELLED',) +SUCCESS_STATES = ('SUCCEEDED',) + + +class AWSAthenaHook(AwsHook): +""" +Interact with AWS Athena to run, poll queries and return query results +""" + +def __init__(self, aws_conn_id='aws_default', *args, **kwargs): +super(AWSAthenaHook, self).__init__(aws_conn_id, **kwargs) +self.sleep_time = kwargs.get('sleep_time') or 30 +self.conn = None + +def get_conn(self): +""" +check if aws conn exists already or create one and return it + +:return: boto3 session +""" +if not hasattr(self, 'conn'): +self.conn = self.get_client_type('athena') +return self.conn + +def run_query(self, query, query_context, result_configuration, client_request_token=None): +""" +Run Presto query on athena with provided config and return submitted query_execution_id + +:param query: Presto query to run +:type query: str +:param query_context: Context in which query need to be run +:type query_context: dict +:param result_configuration: Dict with path to store results in and config related to encryption +:type result_configuration: dict +:param client_request_token: Unique token created by user to avoid multiple executions of same query +:type client_request_token: str +:return: str +""" +if client_request_token is None: +client_request_token = str(uuid4()) Review comment: I mean the generating a random string that is unknown to the caller. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (AIRFLOW-2703) Scheduler crashes if Mysql Connectivity is lost
[ https://issues.apache.org/jira/browse/AIRFLOW-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ash Berlin-Taylor resolved AIRFLOW-2703. Resolution: Fixed Fix Version/s: 2.0.0 > Scheduler crashes if Mysql Connectivity is lost > --- > > Key: AIRFLOW-2703 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2703 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0, 2.0.0 >Reporter: raman >Assignee: Mishika Singh >Priority: Major > Fix For: 2.0.0 > > > Airflow scheduler crashes if connectivity to Mysql is lost. > Below is the stack Trace > Traceback (most recent call last): File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 371, > in helper pickle_dags) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line > 50, in wrapper result = func(*args, **kwargs) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1762, > in process_file dag.sync_to_db() File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line > 50, in wrapper result = func(*args, **kwargs) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/models.py", line > 3816, in sync_to_db session.commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 943, in commit self.transaction.commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 471, in commit t[1].commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1643, in commit self._do_commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1674, in _do_commit self.connection._commit_impl() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 726, in _commit_impl self._handle_dbapi_exception(e, None, None, None, > None) File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 1413, in _handle_dbapi_exception exc_info File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/util/compat.py", > line 203, in raise_from_cause reraise(type(exception), exception, tb=exc_tb, > cause=cause) File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/engine/base.py", > line 724, in _commit_impl self.engine.dialect.do_commit(self.connection) File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/dialects/mysql/base.py", > line 1784, in do_commit dbapi_connection.commit() OperationalError: > (_mysql_exceptions.OperationalError) (2013, 'Lost connection to MySQL server > during query') (Background on this error at: http://sqlalche.me/e/e3q8) > Process DagFileProcessor141318-Process: -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2703) Scheduler crashes if Mysql Connectivity is lost
[ https://issues.apache.org/jira/browse/AIRFLOW-2703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16670102#comment-16670102 ] ASF GitHub Bot commented on AIRFLOW-2703: - ashb closed pull request #3650: [AIRFLOW-2703] exceptions from scheduler's heartbeat is handled so that scheduler does not crash URL: https://github.com/apache/incubator-airflow/pull/3650 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/jobs.py b/airflow/jobs.py index 04d596b9b7..38f96e2247 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -160,33 +160,36 @@ def heartbeat(self): heart rate. If you go over 60 seconds before calling it, it won't sleep at all. """ -with create_session() as session: -job = session.query(BaseJob).filter_by(id=self.id).one() -make_transient(job) -session.commit() +try: +with create_session() as session: +job = session.query(BaseJob).filter_by(id=self.id).one() +make_transient(job) +session.commit() -if job.state == State.SHUTDOWN: -self.kill() +if job.state == State.SHUTDOWN: +self.kill() -# Figure out how long to sleep for -sleep_for = 0 -if job.latest_heartbeat: -sleep_for = max( -0, -self.heartrate - ( -timezone.utcnow() - job.latest_heartbeat).total_seconds()) +# Figure out how long to sleep for +sleep_for = 0 +if job.latest_heartbeat: +sleep_for = max( +0, +self.heartrate - (timezone.utcnow() - + job.latest_heartbeat).total_seconds()) -sleep(sleep_for) +sleep(sleep_for) -# Update last heartbeat time -with create_session() as session: -job = session.query(BaseJob).filter(BaseJob.id == self.id).first() -job.latest_heartbeat = timezone.utcnow() -session.merge(job) -session.commit() +# Update last heartbeat time +with create_session() as session: +job = session.query(BaseJob).filter(BaseJob.id == self.id).first() +job.latest_heartbeat = timezone.utcnow() +session.merge(job) +session.commit() -self.heartbeat_callback(session=session) -self.log.debug('[heartbeat]') +self.heartbeat_callback(session=session) +self.log.debug('[heartbeat]') +except OperationalError as e: +self.log.error("Scheduler heartbeat got an exception: %s", str(e)) def run(self): Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Scheduler crashes if Mysql Connectivity is lost > --- > > Key: AIRFLOW-2703 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2703 > Project: Apache Airflow > Issue Type: Bug > Components: scheduler >Affects Versions: 1.9.0, 2.0.0 >Reporter: raman >Assignee: Mishika Singh >Priority: Major > > Airflow scheduler crashes if connectivity to Mysql is lost. > Below is the stack Trace > Traceback (most recent call last): File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 371, > in helper pickle_dags) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line > 50, in wrapper result = func(*args, **kwargs) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/jobs.py", line 1762, > in process_file dag.sync_to_db() File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/utils/db.py", line > 50, in wrapper result = func(*args, **kwargs) File > "/usr/src/venv/local/lib/python2.7/site-packages/airflow/models.py", line > 3816, in sync_to_db session.commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 943, in commit self.transaction.commit() File > "/usr/src/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", > line 471, in commit t[1].commit() File >
[GitHub] ashb closed pull request #3650: [AIRFLOW-2703] exceptions from scheduler's heartbeat is handled so that scheduler does not crash
ashb closed pull request #3650: [AIRFLOW-2703] exceptions from scheduler's heartbeat is handled so that scheduler does not crash URL: https://github.com/apache/incubator-airflow/pull/3650 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/airflow/jobs.py b/airflow/jobs.py index 04d596b9b7..38f96e2247 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -160,33 +160,36 @@ def heartbeat(self): heart rate. If you go over 60 seconds before calling it, it won't sleep at all. """ -with create_session() as session: -job = session.query(BaseJob).filter_by(id=self.id).one() -make_transient(job) -session.commit() +try: +with create_session() as session: +job = session.query(BaseJob).filter_by(id=self.id).one() +make_transient(job) +session.commit() -if job.state == State.SHUTDOWN: -self.kill() +if job.state == State.SHUTDOWN: +self.kill() -# Figure out how long to sleep for -sleep_for = 0 -if job.latest_heartbeat: -sleep_for = max( -0, -self.heartrate - ( -timezone.utcnow() - job.latest_heartbeat).total_seconds()) +# Figure out how long to sleep for +sleep_for = 0 +if job.latest_heartbeat: +sleep_for = max( +0, +self.heartrate - (timezone.utcnow() - + job.latest_heartbeat).total_seconds()) -sleep(sleep_for) +sleep(sleep_for) -# Update last heartbeat time -with create_session() as session: -job = session.query(BaseJob).filter(BaseJob.id == self.id).first() -job.latest_heartbeat = timezone.utcnow() -session.merge(job) -session.commit() +# Update last heartbeat time +with create_session() as session: +job = session.query(BaseJob).filter(BaseJob.id == self.id).first() +job.latest_heartbeat = timezone.utcnow() +session.merge(job) +session.commit() -self.heartbeat_callback(session=session) -self.log.debug('[heartbeat]') +self.heartbeat_callback(session=session) +self.log.debug('[heartbeat]') +except OperationalError as e: +self.log.error("Scheduler heartbeat got an exception: %s", str(e)) def run(self): Stats.incr(self.__class__.__name__.lower() + '_start', 1, 1) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented
ashb commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented URL: https://github.com/apache/incubator-airflow/pull/4075#discussion_r229668850 ## File path: airflow/operators/bash_operator.py ## @@ -49,6 +49,16 @@ class BashOperator(BaseOperator): :type env: dict :param output_encoding: Output encoding of bash command :type output_encoding: str + +On execution of the operator the task will up for retry when exception is raised. +However if a command exists with non-zero value Airflow will not recognize +it as failure unless explicitly specified in the beggining of the script. +Example: bash_command = python3 script.py '{{ next_execution_date }}' + when executing command exit(1) the task will be marked as success. Review comment: Adding to the instructions are good - but `|| exit(1)` is a bit of a hack that `set -e` should work better (and in more cases). For example: ``` bash_command = "set -e; python3 script.py '{{ next_execution_date }}'" ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
ashb commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229667790 ## File path: docs/integration.rst ## @@ -169,6 +169,19 @@ AWS: Amazon Web Services Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section. +AWS Athena + + +- :ref:`AWSAthenaOperator` : Run Athena Query. + +.. _AWSAthenaOperator: + +AWSAthenaOperator +""" + +.. autoclass:: airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator Review comment: Please remove Athena from this section - EMR is out of place so we don't want to add more here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ron819 commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented
ron819 commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented URL: https://github.com/apache/incubator-airflow/pull/4075#discussion_r229667702 ## File path: airflow/operators/bash_operator.py ## @@ -49,6 +49,16 @@ class BashOperator(BaseOperator): :type env: dict :param output_encoding: Output encoding of bash command :type output_encoding: str + +On execution of the operator the task will up for retry when exception is raised. +However if a command exists with non-zero value Airflow will not recognize +it as failure unless explicitly specified in the beggining of the script. +Example: bash_command = python3 script.py '{{ next_execution_date }}' + when executing command exit(1) the task will be marked as success. Review comment: @ashb The exit(1) is common for scripting. There are many work-around to make it "airflow" however people needs to know to make such modification. This problem occurs as I expected airflow to catch the unsuccessful exit but it didn't. This PR only adds instructions for thous who will encounter a situation like me. Are we discussing the necessity of such notice in the docs or the words of the notice itself? This problem is hard to catch as if the user run the script from bash he would see it fails but if run from airflow he won't see the failure. In order to make changes with the code as you suggest the user must know that he requires to. My PR is attempt to alert the user about the issue.. there can be other (maybe better) way.. I'm open to suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform
ashb commented on a change in pull request #4091: [AIRFLOW-2524] Update SageMaker hook, operator and sensor for training, tuning and transform URL: https://github.com/apache/incubator-airflow/pull/4091#discussion_r229663263 ## File path: airflow/contrib/hooks/sagemaker_hook.py ## @@ -16,299 +16,793 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -import copy +import tarfile +import tempfile import time +import os +import collections +import functools +from datetime import datetime + +import botocore.config from botocore.exceptions import ClientError from airflow.exceptions import AirflowException from airflow.contrib.hooks.aws_hook import AwsHook from airflow.hooks.S3_hook import S3Hook +class LogState(object): +STARTING = 1 +WAIT_IN_PROGRESS = 2 +TAILING = 3 +JOB_COMPLETE = 4 +COMPLETE = 5 + + +# Position is a tuple that includes the last read timestamp and the number of items that were read +# at that time. This is used to figure out which event to start with on the next read. +Position = collections.namedtuple('Position', ['timestamp', 'skip']) + + +def argmin(arr, f): +"""Return the index, i, in arr that minimizes f(arr[i])""" +m = None +i = None +for idx, item in enumerate(arr): +if item is not None: +if m is None or f(item) < m: +m = f(item) +i = idx +return i + + +def some(arr): +"""Return True iff there is an element, a, of arr such that a is not None""" +return functools.reduce(lambda x, y: x or (y is not None), arr, False) + + +def secondary_training_status_changed(current_job_description, prev_job_description): +""" +Returns true if training job's secondary status message has changed. + +:param current_job_description: Current job description, returned from DescribeTrainingJob call. +:type current_job_description: dict +:param prev_job_description: Previous job description, returned from DescribeTrainingJob call. +:type prev_job_description: dict + +:return: Whether the secondary status message of a training job changed or not. +""" +current_secondary_status_transitions = current_job_description.get('SecondaryStatusTransitions') +if current_secondary_status_transitions is None or len(current_secondary_status_transitions) == 0: +return False + +prev_job_secondary_status_transitions = prev_job_description.get('SecondaryStatusTransitions') \ +if prev_job_description is not None else None + +last_message = prev_job_secondary_status_transitions[-1]['StatusMessage'] \ +if prev_job_secondary_status_transitions is not None \ +and len(prev_job_secondary_status_transitions) > 0 else '' + +message = current_job_description['SecondaryStatusTransitions'][-1]['StatusMessage'] + +return message != last_message + + +def secondary_training_status_message(job_description, prev_description): +""" +Returns a string contains start time and the secondary training job status message. + +:param job_description: Returned response from DescribeTrainingJob call +:type job_description: dict +:param prev_description: Previous job description from DescribeTrainingJob call +:type prev_description: dict + +:return: Job status string to be printed. +""" + +if job_description is None or job_description.get('SecondaryStatusTransitions') is None\ +or len(job_description.get('SecondaryStatusTransitions')) == 0: +return '' + +prev_description_secondary_transitions = prev_description.get('SecondaryStatusTransitions')\ +if prev_description is not None else None +prev_transitions_num = len(prev_description['SecondaryStatusTransitions'])\ +if prev_description_secondary_transitions is not None else 0 +current_transitions = job_description['SecondaryStatusTransitions'] + +transitions_to_print = current_transitions[-1:] if len(current_transitions) == prev_transitions_num else \ +current_transitions[prev_transitions_num - len(current_transitions):] + +status_strs = [] +for transition in transitions_to_print: +message = transition['StatusMessage'] +time_str = datetime.utcfromtimestamp( + time.mktime(job_description['LastModifiedTime'].timetuple())).strftime('%Y-%m-%d %H:%M:%S') Review comment: Local as in in what ever timezone of the system this code is running on? I'll find a better way. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ashb commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented
ashb commented on a change in pull request #4075: [AIRFLOW-502] BashOperator success/failure conditions not documented URL: https://github.com/apache/incubator-airflow/pull/4075#discussion_r229661810 ## File path: airflow/operators/bash_operator.py ## @@ -49,6 +49,16 @@ class BashOperator(BaseOperator): :type env: dict :param output_encoding: Output encoding of bash command :type output_encoding: str + +On execution of the operator the task will up for retry when exception is raised. +However if a command exists with non-zero value Airflow will not recognize +it as failure unless explicitly specified in the beggining of the script. +Example: bash_command = python3 script.py '{{ next_execution_date }}' + when executing command exit(1) the task will be marked as success. Review comment: If you can test, I think that having `set -e` at the start would mean that `exit(1)` is not needed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook
phani8996 commented on a change in pull request #4111: [AIRFLOW-3266] Add AWS Athena Operator and hook URL: https://github.com/apache/incubator-airflow/pull/4111#discussion_r229661660 ## File path: docs/integration.rst ## @@ -169,6 +169,19 @@ AWS: Amazon Web Services Airflow has extensive support for Amazon Web Services. But note that the Hooks, Sensors and Operators are in the contrib section. +AWS Athena + + +- :ref:`AWSAthenaOperator` : Run Athena Query. + +.. _AWSAthenaOperator: + +AWSAthenaOperator +""" + +.. autoclass:: airflow.contrib.operators.aws_athena_operator.AWSAthenaOperator Review comment: I'm moving Athena operator doc to the bottom and adding the same on `docs/code.rst` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services