[jira] [Assigned] (AIRFLOW-2304) Quick start documentation is not quick: should mention scheduler part
[ https://issues.apache.org/jira/browse/AIRFLOW-2304?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tao Feng reassigned AIRFLOW-2304: - Assignee: Tao Feng > Quick start documentation is not quick: should mention scheduler part > - > > Key: AIRFLOW-2304 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2304 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Shawn Xu >Assignee: Tao Feng >Priority: Major > > The [Quick start documentation|https://airflow.apache.org/start.html] part > should offer user a push button style launch to pilot stuff quickly. > > However right now it only mentions web server so none of the DAGs ended up > running. And this is particularly confusing to new users if they don't know > what to expect (flow will be in running state but tasks have no states). > > There are 2 things to get DAGs scheduled: > # Start a scheduler even if the default SequentialExecutor is used > # The DAG must be enabled. > They should be documented in quick start doc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2307) Airflow should track Celery task ID
John Arnold created AIRFLOW-2307: Summary: Airflow should track Celery task ID Key: AIRFLOW-2307 URL: https://issues.apache.org/jira/browse/AIRFLOW-2307 Project: Apache Airflow Issue Type: Improvement Components: scheduler Affects Versions: 1.9.0 Reporter: John Arnold For celery executor, airflow keeps the AsyncResult in memory for the duration of the task, then deletes when task completes (success or fail). This can make troubleshooting failed tasks hard – trying to figure out which task instance is which "celery task" in Flower, etc. It would be a lot easier if there was a "Task Instance id" that could be populated with the AsyncResult.id for celery, or otherwise a UID for other executors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (AIRFLOW-2306) Add Bonnier Broadcasting to README
[ https://issues.apache.org/jira/browse/AIRFLOW-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Siddharth Anand closed AIRFLOW-2306. Resolution: Fixed > Add Bonnier Broadcasting to README > -- > > Key: AIRFLOW-2306 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2306 > Project: Apache Airflow > Issue Type: Wish >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Trivial > > Add Bonnier Broadcasting to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2306] Add Bonnier Broadcasting to list of current users
Repository: incubator-airflow Updated Branches: refs/heads/master 44ce9ca71 -> 3beb9bcb5 [AIRFLOW-2306] Add Bonnier Broadcasting to list of current users Closes #3206 from wileeam/add-bbr-to-readme Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3beb9bcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3beb9bcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3beb9bcb Branch: refs/heads/master Commit: 3beb9bcb51f66e8a3db56f4712e2a846568bedef Parents: 44ce9ca Author: Guillermo RodrÃguez Cano Authored: Mon Apr 9 14:50:18 2018 -0700 Committer: r39132 Committed: Mon Apr 9 14:50:25 2018 -0700 -- README.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3beb9bcb/README.md -- diff --git a/README.md b/README.md index 62aa27c..8a5b057 100644 --- a/README.md +++ b/README.md @@ -103,6 +103,7 @@ Currently **officially** using Airflow: 1. [Blue Yonder](http://www.blue-yonder.com) [[@blue-yonder](https://github.com/blue-yonder)] 1. [Boda Telecom Suite - CE](https://github.com/bodastage/bts-ce) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)] 1. [Bodastage Solutions](http://bodastage.com) [[@erssebaggala](https://github.com/erssebaggala), [@bodastage](https://github.com/bodastage)] +1. [Bonnier Broadcasting](http://www.bonnierbroadcasting.com) [[@wileeam](https://github.com/wileeam)] 1. [California Data Collaborative](https://github.com/California-Data-Collaborative) powered by [ARGO Labs](http://www.argolabs.org) 1. [Carbonite](https://www.carbonite.com) [[@ajbosco](https://github.com/ajbosco)] 1. [Celect](http://www.celect.com) [[@superdosh](https://github.com/superdosh) & [@chadcelect](https://github.com/chadcelect)]
[jira] [Commented] (AIRFLOW-687) Gracefully halt workers through CLI
[ https://issues.apache.org/jira/browse/AIRFLOW-687?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431330#comment-16431330 ] John Arnold commented on AIRFLOW-687: - commenting on old Jira for posterity – my solution to this was to just import execute_command task from airflow into a "regular" celery app, and register the task with the app. That way, I can run vanilla celery workers using 'celery multi' which support graceful restarts etc. Also allows sharing workers/config between airflow and our much larger celery installation. > Gracefully halt workers through CLI > --- > > Key: AIRFLOW-687 > URL: https://issues.apache.org/jira/browse/AIRFLOW-687 > Project: Apache Airflow > Issue Type: Bug >Reporter: Paul Zaczkieiwcz >Priority: Minor > > When deploying a new set of airflow DAGs, it is useful to gracefully shut > down all airflow services and restart them. This allows you to pip install > requirements for your DAGs in a virtual environment so that you're sure that > your DAGs don't contain unmet dependencies. > Trouble is, if you kill the celery workers then they'll drop their current > task on the floor. There should be a CLI option to gracefully shut down the > workers so that deploy scripts can restart all services without worrying > about killing the workers. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2156) Parallelize Celery Executor
[ https://issues.apache.org/jira/browse/AIRFLOW-2156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431308#comment-16431308 ] John Arnold commented on AIRFLOW-2156: -- This seems like a pretty easy candidate for multiprocessing. > Parallelize Celery Executor > --- > > Key: AIRFLOW-2156 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2156 > Project: Apache Airflow > Issue Type: Improvement > Components: celery >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > > The CeleryExecutor doesn't currently support parallel execution to check task > states since Celery does not support this. This can greatly slow down the > Scheduler loops since each request to check a task's state is a network > request. > > The Celery Executor should parallelize these requests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2306) Add Bonnier Broadcasting to README
[ https://issues.apache.org/jira/browse/AIRFLOW-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431135#comment-16431135 ] Guillermo Rodríguez Cano commented on AIRFLOW-2306: --- Done: https://github.com/apache/incubator-airflow/pull/3206 > Add Bonnier Broadcasting to README > -- > > Key: AIRFLOW-2306 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2306 > Project: Apache Airflow > Issue Type: Wish >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Trivial > > Add Bonnier Broadcasting to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2306) Add Bonnier Broadcasting to README
[ https://issues.apache.org/jira/browse/AIRFLOW-2306?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Rodríguez Cano reassigned AIRFLOW-2306: - Assignee: Guillermo Rodríguez Cano > Add Bonnier Broadcasting to README > -- > > Key: AIRFLOW-2306 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2306 > Project: Apache Airflow > Issue Type: Wish >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Trivial > > Add Bonnier Broadcasting to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2306) Add Bonnier Broadcasting to README
Guillermo Rodríguez Cano created AIRFLOW-2306: - Summary: Add Bonnier Broadcasting to README Key: AIRFLOW-2306 URL: https://issues.apache.org/jira/browse/AIRFLOW-2306 Project: Apache Airflow Issue Type: Wish Reporter: Guillermo Rodríguez Cano Add Bonnier Broadcasting to current list of Airflow users -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2305) Fix CI failure caused by AIRFLOW-2027
[ https://issues.apache.org/jira/browse/AIRFLOW-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431075#comment-16431075 ] ASF subversion and git services commented on AIRFLOW-2305: -- Commit 44ce9ca71beb5b0aca1b21cb8c6e9420ce89639f in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=44ce9ca ] [AIRFLOW-2305][AIRFLOW-2027] Fix CI failure caused by [] Closes #3205 from sekikn/AIRFLOW-2305 > Fix CI failure caused by AIRFLOW-2027 > - > > Key: AIRFLOW-2305 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2305 > Project: Apache Airflow > Issue Type: Bug > Components: ci, tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Critical > Fix For: 1.10.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2305) Fix CI failure caused by AIRFLOW-2027
[ https://issues.apache.org/jira/browse/AIRFLOW-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431073#comment-16431073 ] ASF subversion and git services commented on AIRFLOW-2305: -- Commit 44ce9ca71beb5b0aca1b21cb8c6e9420ce89639f in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=44ce9ca ] [AIRFLOW-2305][AIRFLOW-2027] Fix CI failure caused by [] Closes #3205 from sekikn/AIRFLOW-2305 > Fix CI failure caused by AIRFLOW-2027 > - > > Key: AIRFLOW-2305 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2305 > Project: Apache Airflow > Issue Type: Bug > Components: ci, tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Critical > Fix For: 1.10.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16431074#comment-16431074 ] ASF subversion and git services commented on AIRFLOW-2027: -- Commit 44ce9ca71beb5b0aca1b21cb8c6e9420ce89639f in incubator-airflow's branch refs/heads/master from [~sekikn] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=44ce9ca ] [AIRFLOW-2305][AIRFLOW-2027] Fix CI failure caused by [] Closes #3205 from sekikn/AIRFLOW-2305 > Only trigger sleep in scheduler after all files have parsed > --- > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > Fix For: 1.10.0 > > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling, and instead do it once all files have > been parsed. It can add up since it runs to every scheduler loop which runs # > of dags to parse/scheduler parallelism times. > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2305) Fix CI failure caused by AIRFLOW-2027
[ https://issues.apache.org/jira/browse/AIRFLOW-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-2305. - Resolution: Fixed Fix Version/s: 1.10.0 Issue resolved by pull request #3205 [https://github.com/apache/incubator-airflow/pull/3205] > Fix CI failure caused by AIRFLOW-2027 > - > > Key: AIRFLOW-2305 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2305 > Project: Apache Airflow > Issue Type: Bug > Components: ci, tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Critical > Fix For: 1.10.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2305][AIRFLOW-2027] Fix CI failure caused by []
Repository: incubator-airflow Updated Branches: refs/heads/master b3dea2f0d -> 44ce9ca71 [AIRFLOW-2305][AIRFLOW-2027] Fix CI failure caused by [] Closes #3205 from sekikn/AIRFLOW-2305 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/44ce9ca7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/44ce9ca7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/44ce9ca7 Branch: refs/heads/master Commit: 44ce9ca71beb5b0aca1b21cb8c6e9420ce89639f Parents: b3dea2f Author: Kengo Seki Authored: Mon Apr 9 21:07:17 2018 +0200 Committer: Bolke de Bruin Committed: Mon Apr 9 21:07:17 2018 +0200 -- tests/jobs.py | 6 ++ 1 file changed, 2 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44ce9ca7/tests/jobs.py -- diff --git a/tests/jobs.py b/tests/jobs.py index 7c278b7..7df7eb8 100644 --- a/tests/jobs.py +++ b/tests/jobs.py @@ -2438,8 +2438,7 @@ class SchedulerJobTest(unittest.TestCase): execution_date=test_start_date)) # Now call manage_slas and see if the sla_miss callback gets called -scheduler = SchedulerJob(dag_id='test_sla_miss', - **self.default_scheduler_args) +scheduler = SchedulerJob(dag_id='test_sla_miss') with mock.patch('airflow.jobs.SchedulerJob.log', new_callable=PropertyMock) as mock_log: @@ -2481,8 +2480,7 @@ class SchedulerJobTest(unittest.TestCase): execution_date=test_start_date)) scheduler = SchedulerJob(dag_id='test_sla_miss', - num_runs=1, - **self.default_scheduler_args) + num_runs=1) with mock.patch('airflow.jobs.SchedulerJob.log', new_callable=PropertyMock) as mock_log:
[jira] [Assigned] (AIRFLOW-2293) Fix S3FileTransformOperator to work with boto3
[ https://issues.apache.org/jira/browse/AIRFLOW-2293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kengo Seki reassigned AIRFLOW-2293: --- Assignee: Kengo Seki > Fix S3FileTransformOperator to work with boto3 > -- > > Key: AIRFLOW-2293 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2293 > Project: Apache Airflow > Issue Type: Bug > Components: aws, operators >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > > Since > [boto.s3.key.Key#get_contents_to_file()|http://boto.cloudhackers.com/en/latest/ref/s3.html#boto.s3.key.Key.get_contents_to_file] > is no longer supported by boto3, S3FileTransformOperator fails with the > following error: > {code} > /home/sekikn/dev/incubator-airflow/airflow/operators/s3_file_transform_operator.py > in execute(self, context) > 84 self.source_s3_key, f_source.name > 85 ) > ---> 86 source_s3_key_object.get_contents_to_file(f_source) > 87 f_source.flush() > 88 source_s3.connection.close() > AttributeError: 's3.Object' object has no attribute 'get_contents_to_file' > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2305) Fix CI failure caused by AIRFLOW-2027
[ https://issues.apache.org/jira/browse/AIRFLOW-2305?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kengo Seki reassigned AIRFLOW-2305: --- Assignee: Kengo Seki > Fix CI failure caused by AIRFLOW-2027 > - > > Key: AIRFLOW-2305 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2305 > Project: Apache Airflow > Issue Type: Bug > Components: ci, tests >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Critical > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2305) Fix CI failure caused by AIRFLOW-2027
Kengo Seki created AIRFLOW-2305: --- Summary: Fix CI failure caused by AIRFLOW-2027 Key: AIRFLOW-2305 URL: https://issues.apache.org/jira/browse/AIRFLOW-2305 Project: Apache Airflow Issue Type: Bug Components: ci, tests Reporter: Kengo Seki -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2304) Quick start documentation is not quick: should mention scheduler part
Shawn Xu created AIRFLOW-2304: - Summary: Quick start documentation is not quick: should mention scheduler part Key: AIRFLOW-2304 URL: https://issues.apache.org/jira/browse/AIRFLOW-2304 Project: Apache Airflow Issue Type: Improvement Reporter: Shawn Xu The [Quick start documentation|https://airflow.apache.org/start.html] part should offer user a push button style launch to pilot stuff quickly. However right now it only mentions web server so none of the DAGs ended up running. And this is particularly confusing to new users if they don't know what to expect (flow will be in running state but tasks have no states). There are 2 things to get DAGs scheduled: # Start a scheduler even if the default SequentialExecutor is used # The DAG must be enabled. They should be documented in quick start doc. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2303) S3 List Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430501#comment-16430501 ] Guillermo Rodríguez Cano commented on AIRFLOW-2303: --- Just submitted the PR: https://github.com/apache/incubator-airflow/pull/3203 > S3 List Operator > > > Key: AIRFLOW-2303 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2303 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Major > Fix For: 2.0.0 > > > There is no list operator of an S3 bucket (but there is one for Google Cloud > Storage) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2303) S3 List Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Rodríguez Cano updated AIRFLOW-2303: -- Fix Version/s: 2.0.0 > S3 List Operator > > > Key: AIRFLOW-2303 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2303 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Major > Fix For: 2.0.0 > > > There is no list operator of an S3 bucket (but there is one for Google Cloud > Storage) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2303) S3 List Operator
Guillermo Rodríguez Cano created AIRFLOW-2303: - Summary: S3 List Operator Key: AIRFLOW-2303 URL: https://issues.apache.org/jira/browse/AIRFLOW-2303 Project: Apache Airflow Issue Type: New Feature Components: aws, gcp Reporter: Guillermo Rodríguez Cano There is no list operator of an S3 bucket (but there is one for Google Cloud Storage) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2303) S3 List Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2303?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Rodríguez Cano reassigned AIRFLOW-2303: - Assignee: Guillermo Rodríguez Cano > S3 List Operator > > > Key: AIRFLOW-2303 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2303 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Major > > There is no list operator of an S3 bucket (but there is one for Google Cloud > Storage) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2301) S3 to Google Cloud Storage Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Rodríguez Cano updated AIRFLOW-2301: -- Fix Version/s: 2.0.0 > S3 to Google Cloud Storage Operator > --- > > Key: AIRFLOW-2301 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2301 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Major > Fix For: 2.0.0 > > > In lieu of a prior issue AIRFLOW-2284 I think that the reverse operator would > also be useful -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2128) 'Tall' DAGs scale worse than 'wide' DAGs
[ https://issues.apache.org/jira/browse/AIRFLOW-2128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430399#comment-16430399 ] Máté Szabó commented on AIRFLOW-2128: - {code:java} min_file_process_interval = 0 {code} I believe this is the default setting. > 'Tall' DAGs scale worse than 'wide' DAGs > > > Key: AIRFLOW-2128 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2128 > Project: Apache Airflow > Issue Type: Bug > Components: DAG, DagRun, scheduler >Affects Versions: 1.9.0 >Reporter: Máté Szabó >Priority: Major > Labels: performance, usability > Attachments: tall_dag.py, wide_dag.py > > > Tall DAG = a DAG with long chains of dependencies, e.g.: 0 -> 1 -> 2 -> ... > -> 998 -> 999 > Wide DAG = a DAG with many short, parallel dependencies e.g. 0 -> 1; 0 -> 2; > ... 0 -> 999 > Take a super simple case where both graphs are of 1000 tasks, and all the > tasks are just "sleep 0.03" bash commands (see the attached files). > With the default SequentialExecutor (without paralellism), I would expect my > 2 example DAGs to take (approximately) the same time to run, but apparently > this is not the case. > For the wide DAG it was about 80 successfully executed tasks in 10 minutes, > for the tall one it was 0. > This anomaly also seem to affect the web UI. Opening up the graph view or the > tree view for the wide DAG takes about 6 seconds on my machine, but for the > tall one it takes significantly longer, in fact currently it does not load at > all. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2291) Ability to specify runtimeVersion, pythonVersion and jobDir in ML Engine operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Volquartz Lebech updated AIRFLOW-2291: Description: For the Google Cloud ML Engine Training Operator (in {{contrib}}), the [current list of arguments|https://github.com/apache/incubator-airflow/blob/b918a4976a641a3c944ef4ad4323fe25bd219ea0/airflow/contrib/operators/mlengine_operator.py#L538-L544] does not seem to include the option for specifying runtime and Python version which limits execution to Python 2.7. It also does not include the useful job directory argument. It seems that simply adding {{runtimeVersion}}, {{pythonVersion}} and {{jobDir}} to the list would be enough to make this work, according to [the documentations on ML engine|https://cloud.google.com/ml-engine/docs/tensorflow/versioning#set-python-version-training]. was: For the Google Cloud ML Engine Training Operator (in {{contrib}}), the [current list of arguments|https://github.com/apache/incubator-airflow/blob/b918a4976a641a3c944ef4ad4323fe25bd219ea0/airflow/contrib/operators/mlengine_operator.py#L538-L544] does not seem to include the option for specifying runtime and Python version which limits execution to Python 2.7. It seems that simply adding {{runtimeVersion}} and {{pythonVersion}} to the list would be enough to make this work, according to [the documentations on ML engine|https://cloud.google.com/ml-engine/docs/tensorflow/versioning#set-python-version-training]. Summary: Ability to specify runtimeVersion, pythonVersion and jobDir in ML Engine operator (was: Ability to specify runtimeVersion and pythonVersion in ML Engine operator) > Ability to specify runtimeVersion, pythonVersion and jobDir in ML Engine > operator > - > > Key: AIRFLOW-2291 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2291 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, gcp >Affects Versions: Airflow 2.0, Airflow 1.8, Airflow 1.9.0 >Reporter: David Volquartz Lebech >Assignee: David Volquartz Lebech >Priority: Minor > > For the Google Cloud ML Engine Training Operator (in {{contrib}}), the > [current list of > arguments|https://github.com/apache/incubator-airflow/blob/b918a4976a641a3c944ef4ad4323fe25bd219ea0/airflow/contrib/operators/mlengine_operator.py#L538-L544] > does not seem to include the option for specifying runtime and Python > version which limits execution to Python 2.7. It also does not include the > useful job directory argument. > It seems that simply adding {{runtimeVersion}}, {{pythonVersion}} and > {{jobDir}} to the list would be enough to make this work, according to [the > documentations on ML > engine|https://cloud.google.com/ml-engine/docs/tensorflow/versioning#set-python-version-training]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (AIRFLOW-2280) Extra argument for comparison with another table in IntervalCheckOperator
[ https://issues.apache.org/jira/browse/AIRFLOW-2280?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuyin Yang updated AIRFLOW-2280: Description: Current IntervalCheckOperator can only check the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before for the same table. For example, if I set metrics as COUNT(*), threshold ratio=1.5, and days_back=-7, then I can compare the count of this table at current, and the count of same table 7 days back. However, in practice, we would like to first load tables to a tmp dataset, which has an expiration date. And after validation, we start to load it to production dataset. In this case, it makes more sense to compare the current tmp one, with production dataset days_back, because days_back temporary table may not exist. was: Current IntervalCheckOperator can only check the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before for the same table. For example, if I set metrics as COUNT(*), threshold ratio=1.5, and days_back=-7, then I can compare the count of this table at current, and the count of same table 7 days back. However, during practice, we would like to first load tables to a tmp dataset, which has an expiration date. And after validation, we start to load it to production dataset. In this case, it makes more sense to compare the current tmp one, with production dataset days_back, because days_back temporary table may not exist. > Extra argument for comparison with another table in IntervalCheckOperator > - > > Key: AIRFLOW-2280 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2280 > Project: Apache Airflow > Issue Type: Improvement > Components: core >Reporter: Yuyin Yang >Assignee: Yuyin Yang >Priority: Minor > > Current IntervalCheckOperator can only check the values of metrics given as > SQL expressions are within a certain tolerance of the ones from days_back > before for the same table. For example, if I set metrics as COUNT(*), > threshold ratio=1.5, and days_back=-7, then I can compare the count of this > table at current, and the count of same table 7 days back. > However, in practice, we would like to first load tables to a tmp dataset, > which has an expiration date. And after validation, we start to load it to > production dataset. In this case, it makes more sense to compare the current > tmp one, with production dataset days_back, because days_back temporary table > may not exist. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (AIRFLOW-2302) Add missing operators/hooks to the docs
Fokko Driesprong created AIRFLOW-2302: - Summary: Add missing operators/hooks to the docs Key: AIRFLOW-2302 URL: https://issues.apache.org/jira/browse/AIRFLOW-2302 Project: Apache Airflow Issue Type: Improvement Reporter: Fokko Driesprong -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2281) Add support for Sendgrid categories
[ https://issues.apache.org/jira/browse/AIRFLOW-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430279#comment-16430279 ] ASF subversion and git services commented on AIRFLOW-2281: -- Commit b3dea2f0d4873bfce138131f9acd891868b32935 in incubator-airflow's branch refs/heads/master from [~ms32035] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=b3dea2f ] [AIRFLOW-2281] Add support for Sendgrid categories Closes #3188 from ms32035/sendgrid_categories > Add support for Sendgrid categories > --- > > Key: AIRFLOW-2281 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2281 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib >Reporter: Marcin Szymanski >Assignee: Marcin Szymanski >Priority: Minor > Fix For: 2.0.0 > > > Allow adding categories to sendgrid mails > https://github.com/sendgrid/sendgrid-python/blob/master/sendgrid/helpers/mail/category.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2281) Add support for Sendgrid categories
[ https://issues.apache.org/jira/browse/AIRFLOW-2281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2281. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3188 [https://github.com/apache/incubator-airflow/pull/3188] > Add support for Sendgrid categories > --- > > Key: AIRFLOW-2281 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2281 > Project: Apache Airflow > Issue Type: New Feature > Components: contrib >Reporter: Marcin Szymanski >Assignee: Marcin Szymanski >Priority: Minor > Fix For: 2.0.0 > > > Allow adding categories to sendgrid mails > https://github.com/sendgrid/sendgrid-python/blob/master/sendgrid/helpers/mail/category.py -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2281] Add support for Sendgrid categories
Repository: incubator-airflow Updated Branches: refs/heads/master 3c4f1fd9e -> b3dea2f0d [AIRFLOW-2281] Add support for Sendgrid categories Closes #3188 from ms32035/sendgrid_categories Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/b3dea2f0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/b3dea2f0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/b3dea2f0 Branch: refs/heads/master Commit: b3dea2f0d4873bfce138131f9acd891868b32935 Parents: 3c4f1fd Author: Marcin Szymanski Authored: Mon Apr 9 10:51:25 2018 +0200 Committer: Fokko Driesprong Committed: Mon Apr 9 10:51:25 2018 +0200 -- airflow/contrib/utils/sendgrid.py| 14 ++ tests/contrib/utils/test_sendgrid.py | 13 - 2 files changed, 18 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3dea2f0/airflow/contrib/utils/sendgrid.py -- diff --git a/airflow/contrib/utils/sendgrid.py b/airflow/contrib/utils/sendgrid.py index 07614a5..ebc32ec 100644 --- a/airflow/contrib/utils/sendgrid.py +++ b/airflow/contrib/utils/sendgrid.py @@ -20,12 +20,13 @@ from __future__ import unicode_literals import base64 import mimetypes import os + import sendgrid +from sendgrid.helpers.mail import Attachment, Content, Email, Mail, \ +Personalization, CustomArg, Category from airflow.utils.email import get_email_address_list from airflow.utils.log.logging_mixin import LoggingMixin -from sendgrid.helpers.mail import Attachment, Content, Email, Mail, \ -Personalization, CustomArg def send_email(to, subject, html_content, files=None, @@ -61,8 +62,6 @@ def send_email(to, subject, html_content, files=None, bcc = get_email_address_list(bcc) for bcc_address in bcc: personalization.add_bcc(Email(bcc_address)) -mail.add_personalization(personalization) -mail.add_content(Content('text/html', html_content)) # Add custom_args to personalization if present pers_custom_args = kwargs.get('personalization_custom_args', None) @@ -70,6 +69,13 @@ def send_email(to, subject, html_content, files=None, for key in pers_custom_args.keys(): personalization.add_custom_arg(CustomArg(key, pers_custom_args[key])) +mail.add_personalization(personalization) +mail.add_content(Content('text/html', html_content)) + +categories = kwargs.get('categories', []) +for cat in categories: +mail.add_category(Category(cat)) + # Add email attachment. for fname in files or []: basename = os.path.basename(fname) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/b3dea2f0/tests/contrib/utils/test_sendgrid.py -- diff --git a/tests/contrib/utils/test_sendgrid.py b/tests/contrib/utils/test_sendgrid.py index ba039e7..204aaa3 100644 --- a/tests/contrib/utils/test_sendgrid.py +++ b/tests/contrib/utils/test_sendgrid.py @@ -44,9 +44,11 @@ class SendEmailSendGridTest(unittest.TestCase): 'from': {'email': u'f...@bar.com'}, 'subject': 'sendgrid-send-email unit test'} self.personalization_custom_args = {'arg1': 'val1', 'arg2': 'val2'} -self.expected_mail_data_custom_args = copy.deepcopy(self.expected_mail_data) - self.expected_mail_data_custom_args['personalizations'][0]['custom_args'] = \ +self.categories = ['cat1', 'cat2'] +self.expected_mail_data_extras = copy.deepcopy(self.expected_mail_data) +self.expected_mail_data_extras['personalizations'][0]['custom_args'] = \ self.personalization_custom_args +self.expected_mail_data_extras['categories'] = self.categories # Test the right email is constructed. @@ -60,8 +62,9 @@ class SendEmailSendGridTest(unittest.TestCase): # Test the right email is constructed. @mock.patch('os.environ.get') @mock.patch('airflow.contrib.utils.sendgrid._post_sendgrid_mail') -def test_send_email_sendgrid_correct_email_custom_args(self, mock_post, mock_get): +def test_send_email_sendgrid_correct_email_extras(self, mock_post, mock_get): mock_get.return_value = 'f...@bar.com' send_email(self.to, self.subject, self.html_content, cc=self.cc, bcc=self.bcc, - personalization_custom_args=self.personalization_custom_args) -mock_post.assert_called_with(self.expected_mail_data_custom_args) + personalization_custom_args=self.personalization_custom_args, + categories=self.categories) +mock_post.assert_called_with(self.expected_mail_data_extras)
[jira] [Commented] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430253#comment-16430253 ] ASF subversion and git services commented on AIRFLOW-2027: -- Commit 3c4f1fd9e668a71531ec07d1e7bd60523cb31d32 in incubator-airflow's branch refs/heads/master from [~aoen] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=3c4f1fd ] [AIRFLOW-2027] Only trigger sleep in scheduler after all files have parsed Closes #2986 from aoen/ddavydov--open_source_disab le_unecessary_sleep_in_scheduler_loop > Only trigger sleep in scheduler after all files have parsed > --- > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > Fix For: 1.10.0 > > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling, and instead do it once all files have > been parsed. It can add up since it runs to every scheduler loop which runs # > of dags to parse/scheduler parallelism times. > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2027) Only trigger sleep in scheduler after all files have parsed
[ https://issues.apache.org/jira/browse/AIRFLOW-2027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bolke de Bruin resolved AIRFLOW-2027. - Resolution: Fixed Fix Version/s: 1.10.0 Issue resolved by pull request #2986 [https://github.com/apache/incubator-airflow/pull/2986] > Only trigger sleep in scheduler after all files have parsed > --- > > Key: AIRFLOW-2027 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2027 > Project: Apache Airflow > Issue Type: Improvement > Components: scheduler >Reporter: Dan Davydov >Assignee: Dan Davydov >Priority: Major > Fix For: 1.10.0 > > > The scheduler loop sleeps for 1 second every loop unnecessarily. Remove this > sleep to slightly speed up scheduling, and instead do it once all files have > been parsed. It can add up since it runs to every scheduler loop which runs # > of dags to parse/scheduler parallelism times. > Also remove the unnecessary increased file processing interval in tests which > slows them down. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2027] Only trigger sleep in scheduler after all files have parsed
Repository: incubator-airflow Updated Branches: refs/heads/master 3ece6f6dc -> 3c4f1fd9e [AIRFLOW-2027] Only trigger sleep in scheduler after all files have parsed Closes #2986 from aoen/ddavydov--open_source_disab le_unecessary_sleep_in_scheduler_loop Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3c4f1fd9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3c4f1fd9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3c4f1fd9 Branch: refs/heads/master Commit: 3c4f1fd9e668a71531ec07d1e7bd60523cb31d32 Parents: 3ece6f6 Author: Dan Davydov Authored: Mon Apr 9 10:22:11 2018 +0200 Committer: Bolke de Bruin Committed: Mon Apr 9 10:22:11 2018 +0200 -- UPDATING.md | 3 + airflow/config_templates/default_airflow.cfg | 3 + airflow/jobs.py | 30 +++--- airflow/utils/dag_processing.py | 25 - tests/core.py| 5 +- tests/jobs.py| 112 ++ tests/utils/test_dag_processing.py | 22 +++-- 7 files changed, 115 insertions(+), 85 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/UPDATING.md -- diff --git a/UPDATING.md b/UPDATING.md index 7d4ef0b..0abccff 100644 --- a/UPDATING.md +++ b/UPDATING.md @@ -224,6 +224,9 @@ indefinitely. This is only available on the command line. min_file_process_interval After how much time should an updated DAG be picked up from the filesystem. + min_file_parsing_loop_time +How many seconds to wait between file-parsing loops to prevent the logs from being spammed. + dag_dir_list_interval How often the scheduler should relist the contents of the DAG directory. If you experience that while developing your dags are not being picked up, have a look at this number and decrease it when necessary. http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/airflow/config_templates/default_airflow.cfg -- diff --git a/airflow/config_templates/default_airflow.cfg b/airflow/config_templates/default_airflow.cfg index eb2981f..e8da82f 100644 --- a/airflow/config_templates/default_airflow.cfg +++ b/airflow/config_templates/default_airflow.cfg @@ -379,6 +379,9 @@ run_duration = -1 # after how much time a new DAGs should be picked up from the filesystem min_file_process_interval = 0 +# How many seconds to wait between file-parsing loops to prevent the logs from being spammed. +min_file_parsing_loop_time = 1 + dag_dir_list_interval = 300 # How often should stats be printed to the logs http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3c4f1fd9/airflow/jobs.py -- diff --git a/airflow/jobs.py b/airflow/jobs.py index 6241717..bcff868 100644 --- a/airflow/jobs.py +++ b/airflow/jobs.py @@ -510,7 +510,8 @@ class SchedulerJob(BaseJob): num_runs=-1, file_process_interval=conf.getint('scheduler', 'min_file_process_interval'), -processor_poll_interval=1.0, +min_file_parsing_loop_time=conf.getint('scheduler', + 'min_file_parsing_loop_time'), run_duration=None, do_pickle=False, log=None, @@ -525,8 +526,6 @@ class SchedulerJob(BaseJob): :type subdir: unicode :param num_runs: The number of times to try to schedule each DAG file. -1 for unlimited within the run_duration. -:param processor_poll_interval: The number of seconds to wait between -polls of running processors :param run_duration: how long to run (in seconds) before exiting :type run_duration: int :param do_pickle: once a DAG object is obtained by executing the Python @@ -543,7 +542,6 @@ class SchedulerJob(BaseJob): self.num_runs = num_runs self.run_duration = run_duration -self._processor_poll_interval = processor_poll_interval self.do_pickle = do_pickle super(SchedulerJob, self).__init__(*args, **kwargs) @@ -572,6 +570,10 @@ class SchedulerJob(BaseJob): # to 3 minutes. self.file_process_interval = file_process_interval +# Wait until at least this many seconds have passed before parsing files once all +# files have finished parsing. +self.min_file_parsing_loop_time = min_file_parsing_loop_time + self.max_tis_per_query = conf.getint('scheduler', 'max_tis_per_query')
[jira] [Resolved] (AIRFLOW-2256) SparkOperator supports client standalone mode and retry mechanism
[ https://issues.apache.org/jira/browse/AIRFLOW-2256?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2256. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3163 [https://github.com/apache/incubator-airflow/pull/3163] > SparkOperator supports client standalone mode and retry mechanism > - > > Key: AIRFLOW-2256 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2256 > Project: Apache Airflow > Issue Type: Improvement >Affects Versions: 1.9.0 >Reporter: Milan van der Meer >Assignee: Milan van der Meer >Priority: Minor > Fix For: 2.0.0 > > > Currently submitting to a Spark standalone cluster in client mode, is not > supported. > When submitting to a Spark standalone cluster, network failures, or timeouts > are common, > therefore a retry mechanism to poll for the Spark Job status needs to be > implemented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (AIRFLOW-2284) Google Cloud Storage to S3 Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fokko Driesprong resolved AIRFLOW-2284. --- Resolution: Fixed Fix Version/s: 2.0.0 Issue resolved by pull request #3190 [https://github.com/apache/incubator-airflow/pull/3190] > Google Cloud Storage to S3 Operator > --- > > Key: AIRFLOW-2284 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2284 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Niels Zeilemaker >Assignee: Niels Zeilemaker >Priority: Major > Fix For: 2.0.0 > > > I've created a gcs to s3 operator which copies all files from gcs to s3 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2256) SparkOperator supports client standalone mode and retry mechanism
[ https://issues.apache.org/jira/browse/AIRFLOW-2256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430250#comment-16430250 ] ASF subversion and git services commented on AIRFLOW-2256: -- Commit 3ece6f6dcb6ab14249c493c3b1a16ce1c848c29a in incubator-airflow's branch refs/heads/master from milanvdm [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=3ece6f6 ] [AIRFLOW-2256] SparkOperator: Add Client Standalone mode and retry mechanism Closes #3163 from milanvdm/milanvdm/improve-spark- operator > SparkOperator supports client standalone mode and retry mechanism > - > > Key: AIRFLOW-2256 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2256 > Project: Apache Airflow > Issue Type: Improvement >Affects Versions: 1.9.0 >Reporter: Milan van der Meer >Assignee: Milan van der Meer >Priority: Minor > > Currently submitting to a Spark standalone cluster in client mode, is not > supported. > When submitting to a Spark standalone cluster, network failures, or timeouts > are common, > therefore a retry mechanism to poll for the Spark Job status needs to be > implemented. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2256] SparkOperator: Add Client Standalone mode and retry mechanism
Repository: incubator-airflow Updated Branches: refs/heads/master 4e80b5f10 -> 3ece6f6dc [AIRFLOW-2256] SparkOperator: Add Client Standalone mode and retry mechanism Closes #3163 from milanvdm/milanvdm/improve-spark- operator Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/3ece6f6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/3ece6f6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/3ece6f6d Branch: refs/heads/master Commit: 3ece6f6dcb6ab14249c493c3b1a16ce1c848c29a Parents: 4e80b5f Author: milanvdm Authored: Mon Apr 9 10:20:44 2018 +0200 Committer: Fokko Driesprong Committed: Mon Apr 9 10:20:44 2018 +0200 -- airflow/contrib/hooks/spark_submit_hook.py | 29 - 1 file changed, 23 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/3ece6f6d/airflow/contrib/hooks/spark_submit_hook.py -- diff --git a/airflow/contrib/hooks/spark_submit_hook.py b/airflow/contrib/hooks/spark_submit_hook.py index ae024a9..cc2fee8 100644 --- a/airflow/contrib/hooks/spark_submit_hook.py +++ b/airflow/contrib/hooks/spark_submit_hook.py @@ -334,6 +334,9 @@ class SparkSubmitHook(BaseHook, LoggingMixin): """ Processes the log files and extracts useful information out of it. +If the deploy-mode is 'client', log the output of the submit command as those +are the output logs of the Spark worker directly. + Remark: If the driver needs to be tracked for its status, the log-level of the spark deploy needs to be at least INFO (log4j.logger.org.apache.spark.deploy=INFO) @@ -353,7 +356,7 @@ class SparkSubmitHook(BaseHook, LoggingMixin): # If we run Kubernetes cluster mode, we want to extract the driver pod id # from the logs so we can kill the application when we stop it unexpectedly -if self._is_kubernetes: +elif self._is_kubernetes: match = re.search('\s*pod name: ((.+?)-([a-z0-9]+)-driver)', line) if match: self._kubernetes_driver_pod = match.groups()[0] @@ -368,13 +371,16 @@ class SparkSubmitHook(BaseHook, LoggingMixin): # if we run in standalone cluster mode and we want to track the driver status # we need to extract the driver id from the logs. This allows us to poll for # the status using the driver id. Also, we can kill the driver when needed. -if self._should_track_driver_status and not self._driver_id: +elif self._should_track_driver_status and not self._driver_id: match_driver_id = re.search('(driver-[0-9\-]+)', line) if match_driver_id: self._driver_id = match_driver_id.groups()[0] self.log.info("identified spark driver id: {}" .format(self._driver_id)) +else: +self.log.info(line) + self.log.debug("spark submit log: {}".format(line)) def _process_spark_status_log(self, itr): @@ -413,6 +419,14 @@ class SparkSubmitHook(BaseHook, LoggingMixin): ERROR: Unable to run or restart due to an unrecoverable error (e.g. missing jar file) """ + +# When your Spark Standalone cluster is not performing well +# due to misconfiguration or heavy loads. +# it is possible that the polling request will timeout. +# Therefore we use a simple retry mechanism. +missed_job_status_reports = 0 +max_missed_job_status_reports = 10 + # Keep polling as long as the driver is processing while self._driver_status not in ["FINISHED", "UNKNOWN", "KILLED", "FAILED", "ERROR"]: @@ -434,10 +448,13 @@ class SparkSubmitHook(BaseHook, LoggingMixin): returncode = status_process.wait() if returncode: -raise AirflowException( -"Failed to poll for the driver status: returncode = {}" -.format(returncode) -) +if missed_job_status_reports < max_missed_job_status_reports: +missed_job_status_reports = missed_job_status_reports + 1 +else: +raise AirflowException( +"Failed to poll for the driver status {} times: returncode = {}" +.format(max_missed_job_status_reports, returncode) +) def _build_spark_driver_kill_command(self): """
[jira] [Created] (AIRFLOW-2301) S3 to Google Cloud Storage Operator
Guillermo Rodríguez Cano created AIRFLOW-2301: - Summary: S3 to Google Cloud Storage Operator Key: AIRFLOW-2301 URL: https://issues.apache.org/jira/browse/AIRFLOW-2301 Project: Apache Airflow Issue Type: New Feature Components: aws, gcp Reporter: Guillermo Rodríguez Cano In lieu of a prior issue AIRFLOW-2284 I think that the reverse operator would also be useful -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (AIRFLOW-2301) S3 to Google Cloud Storage Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guillermo Rodríguez Cano reassigned AIRFLOW-2301: - Assignee: Guillermo Rodríguez Cano > S3 to Google Cloud Storage Operator > --- > > Key: AIRFLOW-2301 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2301 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Guillermo Rodríguez Cano >Assignee: Guillermo Rodríguez Cano >Priority: Major > > In lieu of a prior issue AIRFLOW-2284 I think that the reverse operator would > also be useful -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2284) Google Cloud Storage to S3 Operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430228#comment-16430228 ] ASF subversion and git services commented on AIRFLOW-2284: -- Commit 4e80b5f10ae892cc1a5e11ebf4c29759a9ae38e3 in incubator-airflow's branch refs/heads/master from [~nzeilemaker] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=4e80b5f ] [AIRFLOW-2284] GCS to S3 operator Closes #3190 from NielsZeilemaker/gcp_to_s3 > Google Cloud Storage to S3 Operator > --- > > Key: AIRFLOW-2284 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2284 > Project: Apache Airflow > Issue Type: New Feature > Components: aws, gcp >Reporter: Niels Zeilemaker >Assignee: Niels Zeilemaker >Priority: Major > > I've created a gcs to s3 operator which copies all files from gcs to s3 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2284] GCS to S3 operator
Repository: incubator-airflow Updated Branches: refs/heads/master a30f009ae -> 4e80b5f10 [AIRFLOW-2284] GCS to S3 operator Closes #3190 from NielsZeilemaker/gcp_to_s3 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/4e80b5f1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/4e80b5f1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/4e80b5f1 Branch: refs/heads/master Commit: 4e80b5f10ae892cc1a5e11ebf4c29759a9ae38e3 Parents: a30f009 Author: niels Authored: Mon Apr 9 09:59:35 2018 +0200 Committer: Fokko Driesprong Committed: Mon Apr 9 09:59:35 2018 +0200 -- airflow/contrib/operators/gcs_to_s3.py | 107 +++ airflow/hooks/S3_hook.py| 2 +- .../contrib/operators/test_gcs_list_operator.py | 6 +- .../operators/test_gcs_to_s3_operator.py| 69 4 files changed, 182 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/4e80b5f1/airflow/contrib/operators/gcs_to_s3.py -- diff --git a/airflow/contrib/operators/gcs_to_s3.py b/airflow/contrib/operators/gcs_to_s3.py new file mode 100644 index 000..e596fa1 --- /dev/null +++ b/airflow/contrib/operators/gcs_to_s3.py @@ -0,0 +1,107 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator +from airflow.utils.decorators import apply_defaults +from airflow.hooks.S3_hook import S3Hook + + +class GoogleCloudStorageToS3Operator(GoogleCloudStorageListOperator): +""" +Synchronizes a Google Cloud Storage bucket with an S3 bucket. + +:param bucket: The Google Cloud Storage bucket to find the objects. +:type bucket: string +:param prefix: Prefix string which filters objects whose name begin with this prefix +:type prefix: string +:param delimiter: The delimiter by which you want to filter the objects. +For e.g to lists the CSV files from in a directory in GCS you would use +delimiter='.csv'. +:type delimiter: string +:param google_cloud_storage_conn_id: The connection ID to use when +connecting to Google Cloud Storage. +:type google_cloud_storage_conn_id: string +:param delegate_to: The account to impersonate, if any. +For this to work, the service account making the request must have +domain-wide delegation enabled. +:type delegate_to: string +:param dest_aws_conn_id: The destination S3 connection +:type dest_aws_conn_id: str +:param dest_s3_key: The base S3 key to be used to store the files +:type dest_s3_key: str +""" +template_fields = ('bucket', 'prefix', 'delimiter', 'dest_s3_key') +ui_color = '#f0eee4' + +@apply_defaults +def __init__(self, + bucket, + prefix=None, + delimiter=None, + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + dest_aws_conn_id=None, + dest_s3_key=None, + replace=False, + *args, + **kwargs): + +super(GoogleCloudStorageToS3Operator, self).__init__( +bucket=bucket, +prefix=prefix, +delimiter=delimiter, +google_cloud_storage_conn_id=google_cloud_storage_conn_id, +delegate_to=delegate_to, +*args, +**kwargs +) +self.dest_aws_conn_id = dest_aws_conn_id +self.dest_s3_key = dest_s3_key +self.replace = replace + +def execute(self, context): +# use the super to list all files in an Google Cloud Storage bucket +files = super(GoogleCloudStorageToS3Operator, self).execute(context) +s3_hook = S3Hook(aws_conn_id=self.dest_aws_conn_id) + +if not self.replace: +# if we are not replacing -> list all files in the S3 bucket +# and only keep those files which are present in +# Google Cloud Storage and not in S3 +bucket_name, _ = S3Ho
[jira] [Commented] (AIRFLOW-2287) Missing and incorrect license headers
[ https://issues.apache.org/jira/browse/AIRFLOW-2287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430201#comment-16430201 ] ASF subversion and git services commented on AIRFLOW-2287: -- Commit a30f009aeb19347524562d3893ab538585183167 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a30f009 ] [AIRFLOW-2287] Update license notices Closes #3195 from bolkedebruin/AIRFLOW-2287 > Missing and incorrect license headers > - > > Key: AIRFLOW-2287 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2287 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.10.0 > > > * {color:#454545}a few files are missing licenses, like docs/Makefile{color} > * {color:#454545}please fix year in notice ("2016 and onwards” makes it a > little bard to work out when copyright would expire){color} > * {color:#454545}LICENSE is OK but some license texts are missing i.e. > Bootstrap Toggle, normalize.css, parallel.js. Note that in order to comply > with the terms of the the licenses the full text of the license MUST be > included.{color} > * {color:#454545}also note that ace and d3 are under a BSD 3 clause not BSD > 2 clause{color} > * {color:#454545} A large number of files are missing the correct ASF > header. (see below){color} > ** {color:#454545}Re incorrect header not perfect but shows scope of the > issue:{color} > *** {color:#454545} find . -name "*.*" -exec grep "contributor license" {} > \; -print | wc{color} > *** {color:#454545} find . -name "*.*" -exec grep > "[http://www.apache.org/licenses/LICENSE-2.0]"; {} \; -print | wc{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (AIRFLOW-2287) Missing and incorrect license headers
[ https://issues.apache.org/jira/browse/AIRFLOW-2287?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16430202#comment-16430202 ] ASF subversion and git services commented on AIRFLOW-2287: -- Commit a30f009aeb19347524562d3893ab538585183167 in incubator-airflow's branch refs/heads/master from [~bolke] [ https://git-wip-us.apache.org/repos/asf?p=incubator-airflow.git;h=a30f009 ] [AIRFLOW-2287] Update license notices Closes #3195 from bolkedebruin/AIRFLOW-2287 > Missing and incorrect license headers > - > > Key: AIRFLOW-2287 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2287 > Project: Apache Airflow > Issue Type: Improvement >Reporter: Bolke de Bruin >Priority: Blocker > Fix For: 1.10.0 > > > * {color:#454545}a few files are missing licenses, like docs/Makefile{color} > * {color:#454545}please fix year in notice ("2016 and onwards” makes it a > little bard to work out when copyright would expire){color} > * {color:#454545}LICENSE is OK but some license texts are missing i.e. > Bootstrap Toggle, normalize.css, parallel.js. Note that in order to comply > with the terms of the the licenses the full text of the license MUST be > included.{color} > * {color:#454545}also note that ace and d3 are under a BSD 3 clause not BSD > 2 clause{color} > * {color:#454545} A large number of files are missing the correct ASF > header. (see below){color} > ** {color:#454545}Re incorrect header not perfect but shows scope of the > issue:{color} > *** {color:#454545} find . -name "*.*" -exec grep "contributor license" {} > \; -print | wc{color} > *** {color:#454545} find . -name "*.*" -exec grep > "[http://www.apache.org/licenses/LICENSE-2.0]"; {} \; -print | wc{color} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
incubator-airflow git commit: [AIRFLOW-2287] Update license notices
Repository: incubator-airflow Updated Branches: refs/heads/master f7f1d38ba -> a30f009ae [AIRFLOW-2287] Update license notices Closes #3195 from bolkedebruin/AIRFLOW-2287 Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/a30f009a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/a30f009a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/a30f009a Branch: refs/heads/master Commit: a30f009aeb19347524562d3893ab538585183167 Parents: f7f1d38 Author: Bolke de Bruin Authored: Mon Apr 9 00:32:02 2018 -0700 Committer: r39132 Committed: Mon Apr 9 00:32:09 2018 -0700 -- LICENSE | 24 + dev/airflow-license | 71 ++ licenses/LICENSE-bootstrap-toggle.txt | 21 licenses/LICENSE-bootstrap.txt| 25 - licenses/LICENSE-jquery.txt | 33 licenses/LICENSE-jsclockplugin.txt| 7 --- licenses/LICENSE-normalize.txt| 21 licenses/LICENSE-parallel-coordinates.txt | 26 ++ licenses/LICENSE-typeahead.txt| 13 - scripts/ci/check-license.sh | 2 +- 10 files changed, 155 insertions(+), 88 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a30f009a/LICENSE -- diff --git a/LICENSE b/LICENSE index d468094..540289b 100644 --- a/LICENSE +++ b/LICENSE @@ -214,28 +214,34 @@ MIT licenses The following components are provided under the MIT License. See project link for details. -The text of each license is also included at licenses/LICENSE-[project]-[version].txt. +The text of each license is also included at licenses/LICENSE-[project].txt. (MIT License) jquery (https://jquery.org/license/) (MIT License) dagre-d3 (https://github.com/cpettitt/dagre-d3) -(MIT License) bootstrap (https://github.com/twbs/bootstrap/blob/v3-dev/LICENSE) +(MIT License) bootstrap (https://github.com/twbs/bootstrap/) (MIT License) d3-tip (https://github.com/Caged/d3-tip) (MIT License) dataTables (https://datatables.net) -(MIT License) Clock Plugin (https://github.com/Lwangaman/jQuery-Clock-Plugin) (MIT License) WebGL-2D (https://github.com/gameclosure/webgl-2d) -(MIT License) Underscore.js (http://underscorejs.org) +(MIT License) Underscorejs (http://underscorejs.org) (MIT License) Bootstrap Toggle (http://www.bootstraptoggle.com) -(MIT License) normalize.css (http://git.io/normalize) +(MIT License) normalize.css (http://necolas.github.io/normalize.css/) BSD 2-Clause licenses The following components are provided under the BSD 2-Clause license. See file headers and project links for details. -The text of each license is also included at licenses/LICENSE-[project]-[version].txt. +The text of each license is also included at licenses/LICENSE-[project].txt. (BSD 2 License) flask-kerberos (https://github.com/mkomitee/flask-kerberos) -(BSD 2 License) Ace (https://github.com/ajaxorg/ace) -(BSD 2 License) d3js (https://d3js.org) -(BSD 3 License) parallel.js (https://parallel.js.org/) + +BSD 3-Clause licenses + +The following components are provided under the BSD 2-Clause license. +See file headers and project links for details. +The text of each license is also included at licenses/LICENSE-[project].txt. + +(BSD 3 License) Ace (https://github.com/ajaxorg/ace) +(BSD 3 License) d3js (https://d3js.org) +(BSD 3 License) parallel-coordinates (http://syntagmatic.github.com/parallel-coordinates/) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/a30f009a/dev/airflow-license -- diff --git a/dev/airflow-license b/dev/airflow-license new file mode 100755 index 000..b7c7ad6 --- /dev/null +++ b/dev/airflow-license @@ -0,0 +1,71 @@ +#!/usr/bin/env python + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License
[jira] [Assigned] (AIRFLOW-2291) Ability to specify runtimeVersion and pythonVersion in ML Engine operator
[ https://issues.apache.org/jira/browse/AIRFLOW-2291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Volquartz Lebech reassigned AIRFLOW-2291: --- Assignee: David Volquartz Lebech > Ability to specify runtimeVersion and pythonVersion in ML Engine operator > - > > Key: AIRFLOW-2291 > URL: https://issues.apache.org/jira/browse/AIRFLOW-2291 > Project: Apache Airflow > Issue Type: Improvement > Components: contrib, gcp >Affects Versions: Airflow 2.0, Airflow 1.8, Airflow 1.9.0 >Reporter: David Volquartz Lebech >Assignee: David Volquartz Lebech >Priority: Minor > > For the Google Cloud ML Engine Training Operator (in {{contrib}}), the > [current list of > arguments|https://github.com/apache/incubator-airflow/blob/b918a4976a641a3c944ef4ad4323fe25bd219ea0/airflow/contrib/operators/mlengine_operator.py#L538-L544] > does not seem to include the option for specifying runtime and Python > version which limits execution to Python 2.7. > It seems that simply adding {{runtimeVersion}} and {{pythonVersion}} to the > list would be enough to make this work, according to [the documentations on > ML > engine|https://cloud.google.com/ml-engine/docs/tensorflow/versioning#set-python-version-training]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)